본문 바로가기
Spark/Spark와 머신 러닝

Spark 시작하기18 - [Spark와 머신 러닝] 3장 스파크를 이용한 데이터 수집, 프로세싱, 준비2

by java개발자 2016. 5. 4.

python으로 짜여진 소스코드를 java8 로 작성하였다.

package org.test.ch3;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.feature.Normalizer;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;

public class Ch3_JavaApp2 {

	private static final long serialVersionUID = 154356342432L;

	public static void main(String...strings){
		PropertyConfigurator.configure(System.getProperty("user.dir") + "/src/resources/log4j.properties");
		
		Long start = System.currentTimeMillis();
		Ch3_JavaApp2 c = new Ch3_JavaApp2();
		JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
//		c.proc4(sc);	//mean 과 median
//		c.proc5(sc);
//		c.proc6(sc);
//		c.proc7(sc);
		c.proc8(sc);
		
		c.s("\n" + (System.currentTimeMillis() - start) + " ms spend...");
		
		sc.stop();
	}
	public static <T> T s(T o){
		System.out.println(o);
		return o;
	}

	public void proc4(JavaSparkContext sc){
		/**
		 * 103p 부적절하거나 소실된 데이터를 대체하기
		 */
		
		/**
		 * mean과 median
		 * mean : 모든 값 / 표본 수
		 * median : 최소값 최대값에서 하나하나 세어서 중간에 있는 값을 의미
		 */
		
//		JavaRDD<String> rdd = sc.parallelize(Arrays.asList("31","42","13","24","35"));
//		rdd = rdd.sortBy(a -> a, false, 3);
//		s(rdd.collect());
		
//		순서|영화제목|연도||등등
//		1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
//		2|GoldenEye (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?GoldenEye%20(1995)|0|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
//		3|Four Rooms (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Four%20Rooms%20(1995)|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
//		4|Get Shorty (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Get%20Shorty%20(1995)|0|1|0|0|0|1|0|0|1|0|0|0|0|0|0|0|0|0|0
//		5|Copycat (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Copycat%20(1995)|0|0|0|0|0|0|1|0|1|0|0|0|0|0|0|0|1|0|0		
        JavaRDD<String> movie_data = sc.textFile(getClass().getResource("data/ml-100k/u.item").getFile());
        JavaRDD<String[]> movie_fields = movie_data.map(a -> a.split("\\|"));
        JavaDoubleRDD years = movie_fields.mapToDouble(a -> {
        	try{
        		return Double.parseDouble(a[2].substring(a[2].length() - 4));
        	}catch(Exception e){
        		System.out.println("에러발생::" + Arrays.toString(a));	//267번째 라인		<-------어떤값으로 채울것인가? mean과 median 값을 구해서 참고!
        		return 1900.0;
        	}        	
        });
        JavaDoubleRDD years_filtered = years.filter(str -> str != 1900.0);
        List<Double> list1 = years_filtered.collect();
//        list1.add(1.0);			//add 할 수 없음.
//        Collections.sort(list1);	//add나 remove를 할 수 없어서 sort도 안된다.
        /*
			JavaDoubleRDD 내에서는 sort가 안되서 List로 넘어 온 후 정렬을 하려고 했지만,,, UnsupportedOperationException 에러발생
			원인:
	        Arrays.asList: Returns a fixed-size list backed by the specified array.
	        You can't add to it; you can't remove from it. You can't structurally modify the List.
	        
	            해결:
	        Create a LinkedList, which supports faster remove.
	        List<String> list = new LinkedList<String>(Arrays.asList(split));
	        
	            출처:http://stackoverflow.com/questions/2965747/why-i-get-unsupportedoperationexception-when-trying-to-remove-from-the-list
		*/
        
        //다시 JavaDoubleRDD 에서 sort를 할 수 있게 하자..어떻게든.. -> ArrayList로 변환!
        List<Double> list2 = new ArrayList<Double>(list1);
        Collections.sort(list2);
        
        //median(중간값)
        double median = s(getMedianD(list2));				//1995.0
        //mean(평균값)
        double mean = s(years_filtered.stats().mean());		//1989.3860797144548
        
	}
	
	
	/**
	 * Python의 Numby 대체 
	 */
	
	// 중앙값 구하기 메소드
	// 크기 순으로 이미 정렬된 배열을 입력해야만 함
	// 코드의 범용성을 위해서 이 함수 자체에는 정렬 기능이 없음
	private double getMedianD(List<Double> list1) {
		if (list1.size() == 0)
			return Double.NaN;
		int center = list1.size() / 2;

		if (list1.size() % 2 == 1) {
			return list1.get(center);
		} else {
			return (list1.get(center-1) + list1.get(center)) / 2.0;
		}
	}
	private double getMedianS(List<String> list1) {
		if (list1.size() == 0)
			return Double.NaN;
		int center = list1.size() / 2;

		if (list1.size() % 2 == 1) {
			return Double.parseDouble(list1.get(center));
		} else {
			return (Double.parseDouble(list1.get(center-1)) + Double.parseDouble(list1.get(center))) / 2.0;
		}
	}
	private double getMedianA(double[] array) {
		if (array.length == 0)
			return Double.NaN; // 빈 배열은 에러 반환(NaN은 숫자가 아니라는 뜻)
		int center = array.length / 2; // 요소 개수의 절반값 구하기

		if (array.length % 2 == 1) { // 요소 개수가 홀수면
			return array[center]; // 홀수 개수인 배열에서는 중간 요소를 그대로 반환
		} else {
			return (array[center - 1] + array[center]) / 2.0; // 짝수 개 요소는, 중간 두 수의 평균 반환
		}
	}	
	
	
	public void proc5(JavaSparkContext sc){
		/**
		 * 106p 카테고리 특징 (사용자의 직업)
		 */
//		순서|나이|성별|직업|등등
//		1|24|M|technician|85711
//		2|53|F|other|94043
//		3|23|M|writer|32067
//		4|24|M|technician|43537
//		5|33|F|other|15213		
		//사용자
        JavaRDD<String> user_data = sc.textFile(getClass().getResource("data/ml-100k/u.user").getFile());
        JavaRDD<String[]> user_fields = user_data.map(a -> a.split("\\|"));	//단순히 |를 쓰면, character 1개씩 쪼개진다.
        List<String> all_occupatioins = user_fields.map(arr -> arr[3]).distinct().collect();
        ArrayList<String> list2 = new ArrayList<String>(all_occupatioins);
        Collections.sort(list2);

        //딕셔너리 만들기
        int i = 0;
        LinkedHashMap<String, Integer> map = new LinkedHashMap<>();
        for(String s : list2){
        	map.put(s, i++);
        }
        s("Encoding of 'doctor' : " + map.get("doctor"));			//2
        s("Encoding of 'programmer' : " + map.get("programmer"));	//14

        //zero 데이터 만들기1
        Integer[] integers = new Integer[list2.size()];
        Arrays.fill(integers, 0);
        List<Integer> zeroList = Arrays.asList(integers);
        s(zeroList);	//[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
        
        //zero 데이터 만들기2
        List<Integer> zeroList2 = Collections.nCopies(list2.size(), 0);
        s(zeroList2);	//[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
        
//        zeroList2.set(map.get("programmer"), 1);	//할수 없다. Collections.nCopie 도 add, remove 할 수 없는 List를 리턴한다.
//        s("Binary feature vector : " + zeroList2);
        
        List<Integer> zeroList3 = new ArrayList<Integer>(zeroList2);
        zeroList3.set(map.get("programmer"), 1);
        s("Binary feature vector : " + zeroList3);			//[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0]
        s("Length of binary vector : " + zeroList3.size());	//21
	}
	
	public void proc6(JavaSparkContext sc){
		/**
		 * 108p 파생특징 - 타임스탬프를 카테고리 특징으로 변환
		 */
//		사람 영화 점수 시간
//		196	242	3	881250949
//		186	302	3	891717742
//		22	377	1	878887116
//		244	51	2	880606923
//		166	346	1	886397596		
		JavaRDD<String> rating_data_raw = sc.textFile(getClass().getResource("data/ml-100k/u.data").getFile());
		JavaRDD<String[]> rating_data = rating_data_raw.map(s -> s.split("\t"));
		
		JavaRDD<String> timestamps = rating_data.map(arr->arr[3]);
		s(timestamps.take(5));	//[881250949, 891717742, 878887116, 880606923, 886397596]
		
		JavaRDD<Integer> hour_of_day = timestamps.map(str -> {
//			SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS");
			SimpleDateFormat df = new SimpleDateFormat("HH");
//			return df.format(str);		//에러 : java.lang.IllegalArgumentException: Cannot format given Object as a Date
				
			//아래와 같이 한다.
			Date date = df.parse(str);
			return Integer.parseInt(df.format(date));					
		});
		s(hour_of_day.take(5));		//[13, 22, 12, 03, 04]
		
		JavaRDD<String> time_of_day = hour_of_day.map(num -> {
			if(7 <= num && num < 12){
				return "morning";	
			}else if(12 <= num && num < 14){
				return "lunch";
			}else if(14 <= num && num < 18){
				return "afternoon";
			}else if(8 <= num && num < 23){
				return "evening";
			}else if(23 <= num || num < 7){
				return "night";
			}else{
				return "ELSE num:" + num;
			}
		});
		s(time_of_day.take(5));	//[lunch, evening, lunch, ELSE num:3, ELSE num:4]
	}
	public void proc7(JavaSparkContext sc){
		/**
		 * 112p 단순 텍스트 특징 추출
		 */
//		순서|영화제목|연도||등등
//		1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
//		2|GoldenEye (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?GoldenEye%20(1995)|0|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
//		3|Four Rooms (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Four%20Rooms%20(1995)|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
//		4|Get Shorty (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Get%20Shorty%20(1995)|0|1|0|0|0|1|0|0|1|0|0|0|0|0|0|0|0|0|0
//		5|Copycat (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Copycat%20(1995)|0|0|0|0|0|0|1|0|1|0|0|0|0|0|0|0|1|0|0		
        JavaRDD<String> movie_data = sc.textFile(getClass().getResource("data/ml-100k/u.item").getFile());
        JavaRDD<String[]> movie_fields = movie_data.map(a -> a.split("\\|"));
        JavaRDD<String> raw_titles = movie_fields.map(arr -> arr[1]);
        s(raw_titles.take(5));		//[Toy Story (1995), GoldenEye (1995), Four Rooms (1995), Get Shorty (1995), Copycat (1995)]
        
        JavaRDD<String> movie_titles = raw_titles.flatMap(str -> {
        	Pattern p = Pattern.compile("\\((\\w+)\\)");
        	Matcher m = p.matcher(str);
        	if(m.find()){
        		return Arrays.asList(str.substring(0, m.start()).trim().split(" "));	//flat을 이용해서 각 단어로 쪼갠다.
        	}
        	return Arrays.asList(str.split(" "));
        });
        s(movie_titles.take(5));	//[Toy, Story, GoldenEye, Four, Rooms]
        
        List<String> all_terms = movie_titles.distinct().collect();		//이미 위에서 flat을 했다.
        
        //딕셔너리 만들기1
        int i = 0;
        LinkedHashMap<String, Integer> all_terms_dict = new LinkedHashMap<>();
        for(String s : all_terms){
        	all_terms_dict.put(s, i++);
        }
        
        s("Total number of terms : " + all_terms_dict.size());			//2645
        s("Index of term 'Dead' : " + all_terms_dict.get("Dead"));		//600
        s("Index of term 'Rooms' : " + all_terms_dict.get("Rooms"));	//1080
        
        //딕셔너리 만들기2(스파크 이용)
        Map<String, Long> all_terms_dict2 = movie_titles.zipWithIndex().collectAsMap();
        s("Total number of terms : " + all_terms_dict2.size());			//2645
        s("Index of term 'Dead' : " + all_terms_dict2.get("Dead"));		//4597
        s("Index of term 'Rooms' : " + all_terms_dict2.get("Rooms"));	//3584
        
	}
	public void proc8(JavaSparkContext sc){
		/**
		 * 116p 희박벡터??
		 */
		
		/**
		 * 117p 특징 정규화 - Numpy
		 */
		
		/**
		 * 118p 특징 정규화를 위한 MLlib 라이브러리 이용
		 */
		Normalizer normalizer = new Normalizer();
		JavaRDD<Vector> vector = sc.parallelize(Arrays.asList(new Vector[]{Vectors.dense(-2.0, 5.0, 1.0)}));
		Vector normalized_x_mllib = normalizer.transform(vector).first();
		s(Arrays.toString(normalized_x_mllib.toArray()));		//[-0.3651483716701107, 0.9128709291752769, 0.18257418583505536]
	}
}