python으로 짜여진 소스코드를 java8 로 작성하였다.
콘솔 로그 출력하기에 유용한 기능을 유틸로 만들었다.
package org.test.sparkNmachineLearning3;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.PropertyConfigurator;
public class LogUtil {
List<String> currentFile = null;
public boolean isDebug = true;
private int stackTraceDepth;
public void setDebug(boolean isDebug){
this.isDebug = isDebug;
}
public LogUtil(boolean isDebug, int stackTraceDepth, String filePath){
this.isDebug = isDebug;
this.stackTraceDepth = stackTraceDepth;
//log4j 설정(로그레벨 WARN)... 일단 경로는 하드코딩-_-;;
PropertyConfigurator.configure("D:\\workspace\\spark\\sparkNmachineLearning\\src\\resources\\log4j.properties");
if(currentFile == null){
currentFile = new ArrayList<String>();
BufferedReader br = null;
try {
br = new BufferedReader(new FileReader(filePath));
String temp = null;
currentFile.add(""); //0번째 자리 빈값으로...
while((temp = br.readLine()) != null){
int index = temp.indexOf("=");
if(index != -1){
currentFile.add(temp.substring(0, index+1).trim() + " ");
}else{
currentFile.add("");
}
}
} catch (IOException e) {
e.printStackTrace();
} finally{
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 변수에 값을 할당할때, 사용하면, 일단 화면에 한번 출력해주고, 할당한다.
* StackTraceElement를 사용하면, 어느 라인에서 할당했는지 알 수 있음.
* 디버그용으로 좋음!!
*/
public <T> T s(T o){
if(isDebug){
Thread th = Thread.currentThread();
StackTraceElement[] lists = th.getStackTrace();
// int lineNumber = lists[2].getLineNumber();
int lineNumber = lists[this.stackTraceDepth].getLineNumber();
String variable = currentFile.get(lineNumber);
System.out.print("\n" + + lineNumber + ": " + variable + o);
}
return o;
}
}
소스코드
package org.test.sparkNmachineLearning3;
import java.util.List;
import java.util.Map;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.test.JavaGraph.MyHistogramPlot;
import org.test.JavaGraph.MySimpleBarPlot;
import scala.Tuple2;
public class Ch3_JavaApp {
LogUtil lu = new LogUtil(true, 3, "D:\\workspace\\spark\\sparkNmachineLearning\\src\\main\\java\\org\\test\\sparkNmachineLearning3\\Ch3_JavaApp.java");
JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
public static void main(String...strings){
Long start = System.currentTimeMillis();
Ch3_JavaApp c = new Ch3_JavaApp();
// c.proc1(); //사용자 데이터
// c.proc2(); //영화 데이터
c.proc3(); //평점 데이터
Long end = System.currentTimeMillis();
System.out.println(end - start + " ms spend...");
}
public <T> T s(T o){
return lu.s(o);
}
public void proc1(){
/**
* 사용자 데이터 집합 탐색
*/
// 1|24|M|technician|85711
// 2|53|F|other|94043
// 3|23|M|writer|32067
// 4|24|M|technician|43537
// 5|33|F|other|15213
JavaRDD<String> user_data = sc.textFile("D:\\workspace\\spark\\sparkNmachineLearning\\src\\main\\java\\org\\test\\sparkNmachineLearning3\\data\\ml-100k\\u.user");
s(user_data.first());
JavaRDD<String[]> user_fields = user_data.map(a -> a.split("\\|")); //단순히 |를 쓰면, character 1개씩 쪼개진다.
JavaRDD<String> users = user_fields.map(a -> a[0]);
s(users.count());
s(users.take(13));
JavaRDD<String> genders = user_fields.map(a -> a[2]).distinct();
s(genders.count());
s(genders.collect());
JavaRDD<String> occupations = user_fields.map(a ->a[3]).distinct();
s(occupations.count());
s(occupations.collect());
JavaRDD<String> zipcodes = user_fields.map(a -> a[4]).distinct();
s(zipcodes.count());
JavaDoubleRDD ages = user_fields.mapToDouble(a -> Double.parseDouble(a[1]));
List<Double> ages_list = s(ages.collect());
//사용자 나이의 분포 - 히스토그램 그래프
new MyHistogramPlot(2.0, 40, ages_list);
int method = 2; // or 2
if(method == 1){
//방법1
JavaPairRDD<String,Integer> count_by_occupation = user_fields.mapToPair(a -> new Tuple2<String, Integer>(a[3],1));
count_by_occupation = count_by_occupation.reduceByKey((a,b) -> a+b);
//숫자 기준으로 정렬위해(성능은???) - false:내림차순
count_by_occupation = count_by_occupation.mapToPair(x->x.swap()).sortByKey(false).mapToPair(x->x.swap());
List<Tuple2<String, Integer>> list_occupation = s(count_by_occupation.take(20)); //상위 20개
//Tuple2는 스칼라 클래스라서 그래프 유틸쪽에 넘길 수가 없다..
String[] xNameList = new String[list_occupation.size()];
Long[] yList = new Long[list_occupation.size()];
for(int i = 0 ; i<list_occupation.size() ; i++){
xNameList[i] = list_occupation.get(i)._1;
yList[i] = (long)list_occupation.get(i)._2;
}
//사용자 직업의 분포 xy 그래프
new MySimpleBarPlot().initArray2(xNameList, yList);
}else if(method == 2){
//방법2 - countByValue 사용하기
// countByValue를 사용하면, 더이상의 RDD action을 하지 못하고, java collection으로 빠져나온다-_-;
JavaRDD<String> count_by_occupation = user_fields.map(a -> a[3]);
Map<String, Long> map = count_by_occupation.countByValue(); //how to sort??
//사용자 직업의 분포 xy 그래프
new MySimpleBarPlot().initMap2(map);
}
}
public void proc2(){
/**
* 영화 데이터 집합 탐색
*/
// 1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
// 2|GoldenEye (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?GoldenEye%20(1995)|0|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
// 3|Four Rooms (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Four%20Rooms%20(1995)|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
// 4|Get Shorty (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Get%20Shorty%20(1995)|0|1|0|0|0|1|0|0|1|0|0|0|0|0|0|0|0|0|0
// 5|Copycat (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Copycat%20(1995)|0|0|0|0|0|0|1|0|1|0|0|0|0|0|0|0|1|0|0
JavaRDD<String> movie_data = sc.textFile("D:\\workspace\\spark\\sparkNmachineLearning\\src\\main\\java\\org\\test\\sparkNmachineLearning3\\data\\ml-100k\\u.item");
s(movie_data.first()); // 1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
s(movie_data.count()); // 1682
JavaRDD<String[]> movie_fields = movie_data.map(a -> a.split("\\|"));
JavaRDD<String> years = s(movie_fields.map(a -> a[2]));
years = years.map(str -> {
try{
return str.substring(str.length()-4);
}catch(Exception e){
// s(e.getMessage()); //serializable exception 발생.
System.out.println(e.getMessage());
return "1900";
}
});
JavaRDD<String> years_filtered = years.filter(a -> !a.equals("1900"));
// Map<Integer, Long> movie_ages = s(years_filtered.map(a -> 1998 - Integer.parseInt(a)).countByValue()); //sort를 할 수 없다.-_-;
JavaRDD<Integer> movie_ages = years_filtered.map(a -> 1998 - Integer.parseInt(a));
JavaPairRDD<Integer, Integer> years_filtered2 = movie_ages.mapToPair(a -> new Tuple2<Integer, Integer>(a, 1));
years_filtered2 = years_filtered2.reduceByKey((a,b) -> a+b);
years_filtered2 = years_filtered2.mapToPair(x->x.swap()).sortByKey(true).mapToPair(x->x.swap()); //key오름차순
List<Tuple2<Integer, Integer>> list = years_filtered2.collect();
Integer[] xNameList = new Integer[list.size()];
Long[] yList = new Long[list.size()];
for(int i = 0 ; i<list.size() ; i++){
xNameList[i] = list.get(i)._1;
yList[i] = (long)list.get(i)._2;
}
//영화 나이의 분포 xy 그래프
new MySimpleBarPlot().initArray1(xNameList, yList);
}
public void proc3(){
/**
* 평점 데이터 집합 탐색
*/
// 196 242 3 881250949
// 186 302 3 891717742
// 22 377 1 878887116
// 244 51 2 880606923
// 166 346 1 886397596
JavaRDD<String> rating_data_raw = sc.textFile("D:\\workspace\\spark\\sparkNmachineLearning\\src\\main\\java\\org\\test\\sparkNmachineLearning3\\data\\ml-100k\\u.data");
s(rating_data_raw.first()); // 196 242 3 881250949
int num_ratings = s((int)rating_data_raw.count()); // 100000
JavaRDD<String[]> rating_data = rating_data_raw.map(s -> s.split("\t"));
JavaRDD<Integer> ratings = rating_data.map(a -> Integer.parseInt(a[2]));
int max_rating = s(ratings.reduce((a,b) -> a > b ? a : b));
int min_rating = s(ratings.reduce((a,b) -> a < b ? a : b));
float mean_rating = s(ratings.reduce((a,b) -> a+b) / (float)num_ratings);
//TODO
//spark 통계 함수 Statistics... 그런데 Vector 데이터여야 한다.-_-;
// 평점의 분포
Map<Integer, Long> count_by_rating = s(ratings.countByValue());
new MySimpleBarPlot().initMap1(count_by_rating, true, true);
//사용자별 평점의 개수 - 이것만을 위한다면 굳이 책에서 나오는 것처럼 복잡하게 할 필요가 없다.(위와 같은 방법으로.. 가능)
JavaRDD<Integer> user_ratings = rating_data.map(a -> Integer.parseInt(a[0]));
Map<Integer, Long> count_by_user_ratings = s(user_ratings.countByValue());
new MySimpleBarPlot().initMap1(count_by_user_ratings, false, false);
}
}
그래프는 java의 그래프 라이브러리 중... GRAL 을 이용하였다. http://trac.erichseifert.de/gral/wiki/Comparison
<사용자 나이의 분포>
3장은 파이썬으로 작성되었기에..
파이썬을 JAVA8 소스로 작성하면서 파이썬의 라이브러리와 비슷한 것을 찾는 것이 쉽지는 않았다.
java의 GRAL 그래프 라이브러리가 꽤 이쁘다...
내부적으로 java의 swing을 이용한다. 속도도 나쁘지 않다.
라이브러리 API 도 꽤 직관적이라서 사용하는데 무리는 없다.
gral-examples 을 변형해서 갖가지 그래프를 만들어 낼 수 있다.
https://github.com/eseifert/gral
<사용자 직업의 분포>
<영화 나이의 분포>
<평점의 분포>
<사용자마다 준 평점 분포>
'Spark > Spark와 머신 러닝' 카테고리의 다른 글
| Spark 시작하기20 - [Spark와 머신 러닝] 5장 스파크를 이용한 분류 모델 구현 (2) | 2016.05.07 |
|---|---|
| Spark 시작하기19 - [Spark와 머신 러닝] 4장 스파크를 이용한 추천 엔진 구현 (0) | 2016.05.04 |
| Spark 시작하기18 - [Spark와 머신 러닝] 3장 스파크를 이용한 데이터 수집, 프로세싱, 준비2 (0) | 2016.05.04 |
| Spark 시작하기05 - Exception (0) | 2016.04.03 |
| Spark 시작하기03 - [Spark와 머신 러닝] 1장 스파크의 시작과 구동 (0) | 2016.03.27 |