python으로 짜여진 소스코드를 java8 로 작성하였다.
package org.test.ch3;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.feature.Normalizer;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
public class Ch3_JavaApp2 {
private static final long serialVersionUID = 154356342432L;
public static void main(String...strings){
PropertyConfigurator.configure(System.getProperty("user.dir") + "/src/resources/log4j.properties");
Long start = System.currentTimeMillis();
Ch3_JavaApp2 c = new Ch3_JavaApp2();
JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
// c.proc4(sc); //mean 과 median
// c.proc5(sc);
// c.proc6(sc);
// c.proc7(sc);
c.proc8(sc);
c.s("\n" + (System.currentTimeMillis() - start) + " ms spend...");
sc.stop();
}
public static <T> T s(T o){
System.out.println(o);
return o;
}
public void proc4(JavaSparkContext sc){
/**
* 103p 부적절하거나 소실된 데이터를 대체하기
*/
/**
* mean과 median
* mean : 모든 값 / 표본 수
* median : 최소값 최대값에서 하나하나 세어서 중간에 있는 값을 의미
*/
// JavaRDD<String> rdd = sc.parallelize(Arrays.asList("31","42","13","24","35"));
// rdd = rdd.sortBy(a -> a, false, 3);
// s(rdd.collect());
// 순서|영화제목|연도||등등
// 1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
// 2|GoldenEye (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?GoldenEye%20(1995)|0|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
// 3|Four Rooms (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Four%20Rooms%20(1995)|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
// 4|Get Shorty (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Get%20Shorty%20(1995)|0|1|0|0|0|1|0|0|1|0|0|0|0|0|0|0|0|0|0
// 5|Copycat (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Copycat%20(1995)|0|0|0|0|0|0|1|0|1|0|0|0|0|0|0|0|1|0|0
JavaRDD<String> movie_data = sc.textFile(getClass().getResource("data/ml-100k/u.item").getFile());
JavaRDD<String[]> movie_fields = movie_data.map(a -> a.split("\\|"));
JavaDoubleRDD years = movie_fields.mapToDouble(a -> {
try{
return Double.parseDouble(a[2].substring(a[2].length() - 4));
}catch(Exception e){
System.out.println("에러발생::" + Arrays.toString(a)); //267번째 라인 <-------어떤값으로 채울것인가? mean과 median 값을 구해서 참고!
return 1900.0;
}
});
JavaDoubleRDD years_filtered = years.filter(str -> str != 1900.0);
List<Double> list1 = years_filtered.collect();
// list1.add(1.0); //add 할 수 없음.
// Collections.sort(list1); //add나 remove를 할 수 없어서 sort도 안된다.
/*
JavaDoubleRDD 내에서는 sort가 안되서 List로 넘어 온 후 정렬을 하려고 했지만,,, UnsupportedOperationException 에러발생
원인:
Arrays.asList: Returns a fixed-size list backed by the specified array.
You can't add to it; you can't remove from it. You can't structurally modify the List.
해결:
Create a LinkedList, which supports faster remove.
List<String> list = new LinkedList<String>(Arrays.asList(split));
출처:http://stackoverflow.com/questions/2965747/why-i-get-unsupportedoperationexception-when-trying-to-remove-from-the-list
*/
//다시 JavaDoubleRDD 에서 sort를 할 수 있게 하자..어떻게든.. -> ArrayList로 변환!
List<Double> list2 = new ArrayList<Double>(list1);
Collections.sort(list2);
//median(중간값)
double median = s(getMedianD(list2)); //1995.0
//mean(평균값)
double mean = s(years_filtered.stats().mean()); //1989.3860797144548
}
/**
* Python의 Numby 대체
*/
// 중앙값 구하기 메소드
// 크기 순으로 이미 정렬된 배열을 입력해야만 함
// 코드의 범용성을 위해서 이 함수 자체에는 정렬 기능이 없음
private double getMedianD(List<Double> list1) {
if (list1.size() == 0)
return Double.NaN;
int center = list1.size() / 2;
if (list1.size() % 2 == 1) {
return list1.get(center);
} else {
return (list1.get(center-1) + list1.get(center)) / 2.0;
}
}
private double getMedianS(List<String> list1) {
if (list1.size() == 0)
return Double.NaN;
int center = list1.size() / 2;
if (list1.size() % 2 == 1) {
return Double.parseDouble(list1.get(center));
} else {
return (Double.parseDouble(list1.get(center-1)) + Double.parseDouble(list1.get(center))) / 2.0;
}
}
private double getMedianA(double[] array) {
if (array.length == 0)
return Double.NaN; // 빈 배열은 에러 반환(NaN은 숫자가 아니라는 뜻)
int center = array.length / 2; // 요소 개수의 절반값 구하기
if (array.length % 2 == 1) { // 요소 개수가 홀수면
return array[center]; // 홀수 개수인 배열에서는 중간 요소를 그대로 반환
} else {
return (array[center - 1] + array[center]) / 2.0; // 짝수 개 요소는, 중간 두 수의 평균 반환
}
}
public void proc5(JavaSparkContext sc){
/**
* 106p 카테고리 특징 (사용자의 직업)
*/
// 순서|나이|성별|직업|등등
// 1|24|M|technician|85711
// 2|53|F|other|94043
// 3|23|M|writer|32067
// 4|24|M|technician|43537
// 5|33|F|other|15213
//사용자
JavaRDD<String> user_data = sc.textFile(getClass().getResource("data/ml-100k/u.user").getFile());
JavaRDD<String[]> user_fields = user_data.map(a -> a.split("\\|")); //단순히 |를 쓰면, character 1개씩 쪼개진다.
List<String> all_occupatioins = user_fields.map(arr -> arr[3]).distinct().collect();
ArrayList<String> list2 = new ArrayList<String>(all_occupatioins);
Collections.sort(list2);
//딕셔너리 만들기
int i = 0;
LinkedHashMap<String, Integer> map = new LinkedHashMap<>();
for(String s : list2){
map.put(s, i++);
}
s("Encoding of 'doctor' : " + map.get("doctor")); //2
s("Encoding of 'programmer' : " + map.get("programmer")); //14
//zero 데이터 만들기1
Integer[] integers = new Integer[list2.size()];
Arrays.fill(integers, 0);
List<Integer> zeroList = Arrays.asList(integers);
s(zeroList); //[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
//zero 데이터 만들기2
List<Integer> zeroList2 = Collections.nCopies(list2.size(), 0);
s(zeroList2); //[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
// zeroList2.set(map.get("programmer"), 1); //할수 없다. Collections.nCopie 도 add, remove 할 수 없는 List를 리턴한다.
// s("Binary feature vector : " + zeroList2);
List<Integer> zeroList3 = new ArrayList<Integer>(zeroList2);
zeroList3.set(map.get("programmer"), 1);
s("Binary feature vector : " + zeroList3); //[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0]
s("Length of binary vector : " + zeroList3.size()); //21
}
public void proc6(JavaSparkContext sc){
/**
* 108p 파생특징 - 타임스탬프를 카테고리 특징으로 변환
*/
// 사람 영화 점수 시간
// 196 242 3 881250949
// 186 302 3 891717742
// 22 377 1 878887116
// 244 51 2 880606923
// 166 346 1 886397596
JavaRDD<String> rating_data_raw = sc.textFile(getClass().getResource("data/ml-100k/u.data").getFile());
JavaRDD<String[]> rating_data = rating_data_raw.map(s -> s.split("\t"));
JavaRDD<String> timestamps = rating_data.map(arr->arr[3]);
s(timestamps.take(5)); //[881250949, 891717742, 878887116, 880606923, 886397596]
JavaRDD<Integer> hour_of_day = timestamps.map(str -> {
// SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS");
SimpleDateFormat df = new SimpleDateFormat("HH");
// return df.format(str); //에러 : java.lang.IllegalArgumentException: Cannot format given Object as a Date
//아래와 같이 한다.
Date date = df.parse(str);
return Integer.parseInt(df.format(date));
});
s(hour_of_day.take(5)); //[13, 22, 12, 03, 04]
JavaRDD<String> time_of_day = hour_of_day.map(num -> {
if(7 <= num && num < 12){
return "morning";
}else if(12 <= num && num < 14){
return "lunch";
}else if(14 <= num && num < 18){
return "afternoon";
}else if(8 <= num && num < 23){
return "evening";
}else if(23 <= num || num < 7){
return "night";
}else{
return "ELSE num:" + num;
}
});
s(time_of_day.take(5)); //[lunch, evening, lunch, ELSE num:3, ELSE num:4]
}
public void proc7(JavaSparkContext sc){
/**
* 112p 단순 텍스트 특징 추출
*/
// 순서|영화제목|연도||등등
// 1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
// 2|GoldenEye (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?GoldenEye%20(1995)|0|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
// 3|Four Rooms (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Four%20Rooms%20(1995)|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
// 4|Get Shorty (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Get%20Shorty%20(1995)|0|1|0|0|0|1|0|0|1|0|0|0|0|0|0|0|0|0|0
// 5|Copycat (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Copycat%20(1995)|0|0|0|0|0|0|1|0|1|0|0|0|0|0|0|0|1|0|0
JavaRDD<String> movie_data = sc.textFile(getClass().getResource("data/ml-100k/u.item").getFile());
JavaRDD<String[]> movie_fields = movie_data.map(a -> a.split("\\|"));
JavaRDD<String> raw_titles = movie_fields.map(arr -> arr[1]);
s(raw_titles.take(5)); //[Toy Story (1995), GoldenEye (1995), Four Rooms (1995), Get Shorty (1995), Copycat (1995)]
JavaRDD<String> movie_titles = raw_titles.flatMap(str -> {
Pattern p = Pattern.compile("\\((\\w+)\\)");
Matcher m = p.matcher(str);
if(m.find()){
return Arrays.asList(str.substring(0, m.start()).trim().split(" ")); //flat을 이용해서 각 단어로 쪼갠다.
}
return Arrays.asList(str.split(" "));
});
s(movie_titles.take(5)); //[Toy, Story, GoldenEye, Four, Rooms]
List<String> all_terms = movie_titles.distinct().collect(); //이미 위에서 flat을 했다.
//딕셔너리 만들기1
int i = 0;
LinkedHashMap<String, Integer> all_terms_dict = new LinkedHashMap<>();
for(String s : all_terms){
all_terms_dict.put(s, i++);
}
s("Total number of terms : " + all_terms_dict.size()); //2645
s("Index of term 'Dead' : " + all_terms_dict.get("Dead")); //600
s("Index of term 'Rooms' : " + all_terms_dict.get("Rooms")); //1080
//딕셔너리 만들기2(스파크 이용)
Map<String, Long> all_terms_dict2 = movie_titles.zipWithIndex().collectAsMap();
s("Total number of terms : " + all_terms_dict2.size()); //2645
s("Index of term 'Dead' : " + all_terms_dict2.get("Dead")); //4597
s("Index of term 'Rooms' : " + all_terms_dict2.get("Rooms")); //3584
}
public void proc8(JavaSparkContext sc){
/**
* 116p 희박벡터??
*/
/**
* 117p 특징 정규화 - Numpy
*/
/**
* 118p 특징 정규화를 위한 MLlib 라이브러리 이용
*/
Normalizer normalizer = new Normalizer();
JavaRDD<Vector> vector = sc.parallelize(Arrays.asList(new Vector[]{Vectors.dense(-2.0, 5.0, 1.0)}));
Vector normalized_x_mllib = normalizer.transform(vector).first();
s(Arrays.toString(normalized_x_mllib.toArray())); //[-0.3651483716701107, 0.9128709291752769, 0.18257418583505536]
}
}
'Spark > Spark와 머신 러닝' 카테고리의 다른 글
| Spark 시작하기20 - [Spark와 머신 러닝] 5장 스파크를 이용한 분류 모델 구현 (2) | 2016.05.07 |
|---|---|
| Spark 시작하기19 - [Spark와 머신 러닝] 4장 스파크를 이용한 추천 엔진 구현 (0) | 2016.05.04 |
| Spark 시작하기05 - Exception (0) | 2016.04.03 |
| Spark 시작하기04 - [Spark와 머신 러닝] 3장 스파크를 이용한 데이터 수집, 프로세싱, 준비 (0) | 2016.04.03 |
| Spark 시작하기03 - [Spark와 머신 러닝] 1장 스파크의 시작과 구동 (0) | 2016.03.27 |