본문 바로가기
Spark/Spark와 머신 러닝

Spark 시작하기19 - [Spark와 머신 러닝] 4장 스파크를 이용한 추천 엔진 구현

by java개발자 2016. 5. 4.

scala로 짜여진 소스코드를 java8 로 작성하였다.

package org.test.ch4;

import java.io.Serializable;

public class MyTuple2<K, V extends Comparable<V>> implements Comparable<MyTuple2<K,V>>, Serializable{
	private static final long serialVersionUID = 1L;
	private K key;
	private V value;
	
	public MyTuple2(K key, V value){
		this.key = key;
		this.value = value;
	}
	public K getKey() {
		return key;
	}
	public void setKey(K key) {
		this.key = key;
	}
	public V getValue() {
		return value;
	}
	public void setValue(V value) {
		this.value = value;
	}
	@Override
	public String toString() {
		return "MyTuple2[" + key + ", " + value + "]";
	}
	@Override
	public int compareTo(MyTuple2<K,V> v) {
		return value.compareTo(v.getValue());
	}
}
package org.test.ch4;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.mllib.evaluation.RankingMetrics;
import org.apache.spark.mllib.evaluation.RegressionMetrics;
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;
import org.jblas.DoubleMatrix;

import scala.Tuple2;
/**
 * 4장 스파크를 이용한 추천 엔진 구현
 *	scala를 java로 변환해서 코딩한다. 
 */
public class Ch4_JavaApp implements Serializable {

	private static final long serialVersionUID = 154356342432L;
	private static void s(){s("");}
	private static <T> T s(T o){System.out.println(o);return o;}
	
	public static void main(String...strings){
		PropertyConfigurator.configure(System.getProperty("user.dir") + "/src/resources/log4j.properties");
		
		Long start = System.currentTimeMillis();
		Ch4_JavaApp c = new Ch4_JavaApp();
		JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
		c.proc1(sc);
		
		s("\n" + (System.currentTimeMillis() - start) + " ms spend...");
		sc.stop();
	}

	private void proc1(JavaSparkContext sc){
		/**
		 * 132p 무비렌즈 100k 데이터 집합에서 특징 추출
		 */
		String ratingsFileForUser = "data/ml-100k/u.data";		//탭으로 구분됨
		String itemsFile = "data/ml-100k/u.item";				//|으로 구분됨
		
//		사람 영화 점수 시간
//		196	242	3	881250949
//		186	302	3	891717742 
//		22	377	1	878887116
//		244	51	2	880606923
//		166	346	1	886397596
		JavaRDD<String> rawData = sc.textFile(getClass().getResource(ratingsFileForUser).getFile());
		JavaRDD<String[]> rawRatings = rawData.map(str -> str.split("\\t"));
		JavaRDD<Rating> ratings =  rawRatings.map(arr -> new Rating(Integer.parseInt(arr[0]), Integer.parseInt(arr[1]), Double.parseDouble(arr[2])));
		ratings.cache();
		s(ratings.first());			//Rating(196,242,3.0)
		
		MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), 50, 10, 0.01);
		s(model.userFeatures());				//users MapPartitionsRDD[209] at mapValues at ALS.scala:255
		s(model.userFeatures().count());		//943
		s(model.productFeatures().count());		//1682

		/**
		 * 추천 모델 사용
		 */
		
		//특정사용자와 제품을 결합한 예측점수
		double predictedRating = model.predict(789, 123);	//2.532021015757959
		s("predictedRating: " + predictedRating);

		//특정 사용자에게 영화 추천하기
		int userId = 789;
		int K = 10;
		Rating[] topKRecs = model.recommendProducts(userId, K);
		for(Rating r : topKRecs){s(r);}
//		결과
//		Rating(789,479,5.6086207457472375)
//		Rating(789,135,5.3675152031432924)
//		Rating(789,530,5.351079666468601)
//		Rating(789,603,5.344757197789459)
//		Rating(789,56,5.232041665817473)
//		Rating(789,32,5.157860310800221)
//		Rating(789,23,5.154813663424755)
//		Rating(789,616,5.142307969569071)
//		Rating(789,182,5.132978302760848)
//		Rating(789,7,5.122370250313715)
		//->과연 제대로된 추천일까? 책이랑 결과가 다르다.

//		추천 목록 점검
//		순서|영화제목|연도||등등
//		1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
//		2|GoldenEye (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?GoldenEye%20(1995)|0|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
//		3|Four Rooms (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Four%20Rooms%20(1995)|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
//		4|Get Shorty (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Get%20Shorty%20(1995)|0|1|0|0|0|1|0|0|1|0|0|0|0|0|0|0|0|0|0
		JavaRDD<String> movies = sc.textFile(getClass().getResource(itemsFile).getFile());
		Map<Integer, String> titles = movies.map(str -> str.split("\\|")).mapToPair(arr->new Tuple2<Integer, String>(Integer.parseInt(arr[0]), arr[1])).collectAsMap();
		s(titles.get(123));		//Frighteners, The (1996)

//		//책 방법대로 안된다.
//		List<Rating> movieForUser = ratings.keyBy(rating -> rating.user()).lookup(789);
//		//RDD를 빠져나온다... 책에서는 RDD를 유지하는 것 같았는데....
//		s(movieForUser.size());		//33
//		movieForUser = new ArrayList<>(movieForUser);
//		movieForUser.sort((o1,o2)->(int)(o1.rating() - o2.rating()));
//		movieForUser.sort(new Comparator<Rating>() {
//			public int compare(Rating o1, Rating o2) {
//				return (int)(o1.rating() - o2.rating());
//			}
//		});
		
		//다시 - 이게 훨씬 간단할 듯.
		JavaRDD<Rating> moviesForUser = ratings.filter(rating->rating.user() == 789);
		
		List<Tuple2<String, Double>> result = moviesForUser.sortBy(rating -> rating.rating(), false, 3)
			.mapToPair(rating -> new Tuple2<String, Double>(titles.get(rating.product()), rating.rating())).take(10);		//take는 한쪽의 파티션 데이터만 들어올 가능성이 높다.	권하지 않음...
		for(Tuple2<String, Double> t : result){
			System.out.println(t);
		}
//		결과
//		(Godfather, The (1972),5.0)
//		(Trainspotting (1996),5.0)
//		(Dead Man Walking (1995),5.0)
//		(Star Wars (1977),5.0)
//		(Swingers (1996),5.0)
//		(Leaving Las Vegas (1995),5.0)
//		(Bound (1996),5.0)
//		(Fargo (1996),5.0)
//		(Last Supper, The (1995),5.0)
//		(Private Parts (1997),4.0)
		
		JavaRDD<Rating> topKRecsRDD = sc.parallelize(Arrays.asList(topKRecs));
		List<Tuple2<String, Double>> result2 = topKRecsRDD.mapToPair(rating -> new Tuple2<String, Double>(titles.get(rating.product()), rating.rating())).collect();

		for(Tuple2<String, Double> t : result2){
			s(t);
		}
//		결과		
//		(East of Eden (1955),5.667943142890133)
//		(2001: A Space Odyssey (1968),5.618153701144573)
//		(Harold and Maude (1971),5.5870422410271345)
//		(Pulp Fiction (1994),5.543497847332227)
//		(Miller's Crossing (1990),5.536807619084026)
//		(Taxi Driver (1976),5.534494309386643)
//		(Shawshank Redemption, The (1994),5.309557575911523)
//		(Nosferatu (Nosferatu, eine Symphonie des Grauens) (1922),5.270318804944542)
//		(Blob, The (1958),5.220344419638212)
//		(Reservoir Dogs (1992),5.11340290625421)
		
		//결과를 어떻게 검증 가능한가??
		
		/**
		 * 제품 추천
		 */
		//무비렌즈 100k 데이터 집합에서 유사 영화 생성
		//jblas 라이브러리 사용
		DoubleMatrix aMatrix = new DoubleMatrix(new double[]{1.0, 2.0, 3.0});
		
		//책 방법대로 해보기
		int itemId = 567;
		JavaRDD<Tuple2<Object, double[]>> d1 = model.productFeatures().toJavaRDD();
		JavaPairRDD<Object, double[]> d2 = d1.mapToPair(t->t);
		List<double[]> d3 = d2.lookup(itemId);
		DoubleMatrix itemVector = new DoubleMatrix(d3.get(0));	//head	//왜 첫번째 배열만 사용하나?
		s(cosineSimilarity(itemVector, itemVector));			//1.0000000000000002
		
		/**
		 * 1.scala.Tuple2 사용
		 */
		//코사인 유사성 행렬을 각 제품에 적용해 보기
		JavaRDD<Tuple2<Integer, Double>> sims = model.productFeatures().toJavaRDD().map(tuple->{
			DoubleMatrix factorVector = new DoubleMatrix(tuple._2());
			double sim = cosineSimilarity(factorVector, itemVector);
			return new Tuple2<Integer, Double>((Integer) tuple._1(), sim);
		});
		
		//최상위 10개 제품
		sims = sims.sortBy(tuple-> tuple._2(), false, 4);
		/**
		 * top 을 사용하면 Exception 발생 : scala.Tuple2 cannot be cast to java.lang.Comparable
		 * top 할때, 비교 작업을 하는 것 같은데... 할 수가 없다.
		 * scala.Tuple2 가 java.lang.Comparable을 구현하지 않아서, compareTo 함수를 사용할 수 없다. 
		 */
//		List<Tuple2<Integer, Double>> sortedSims = sims.top(10);	//scala.Tuple2 cannot be cast to java.lang.Comparable
		List<Tuple2<Integer, Double>> sortedSims = sims.mapToPair(t->t).take(K);	//정확한 결과가 아닐 수 있다.(take는 1개의 파티션에서 데이터를 전부 가져오므로)
		s();
		sortedSims.forEach(a->s(a));
//		결과
//		(567,0.9999999999999998)
//		(1376,0.7538820816680819)
//		(413,0.7380317817065267)
//		(940,0.7223113561822763)
//		(257,0.7029913601619708)
//		(563,0.7022151386323402)
//		(201,0.7004318504984995)
//		(230,0.6998985408873872)
//		(222,0.6971150972513658)
//		(636,0.6949170154071356)
		
		/**
		 * 2.튜플을 직접 만들어서 사용하자.
		 */
		JavaRDD<MyTuple2<Integer, Double>> sims2 = model.productFeatures().toJavaRDD().map(tuple->{
			DoubleMatrix factorVector = new DoubleMatrix(tuple._2());
			double sim = cosineSimilarity(factorVector, itemVector);
			return new MyTuple2<Integer, Double>((Integer) tuple._1(), sim);
		});
		
		List<MyTuple2<Integer, Double>> sortedSims2 = sims2.sortBy(tuple->tuple.getValue(), false, 4).top(K);
		s();
		for(int i = 0; i < 10 ; i++){
			s(sortedSims2.get(i));
		}
//		결과(는 일단 위와 같다)
//		MyTuple2[567, 0.9999999999999998]
//		MyTuple2[1376, 0.7538820816680819]
//		MyTuple2[413, 0.7380317817065267]
//		MyTuple2[940, 0.7223113561822763]
//		MyTuple2[257, 0.7029913601619708]
//		MyTuple2[563, 0.7022151386323402]
//		MyTuple2[201, 0.7004318504984995]
//		MyTuple2[230, 0.6998985408873872]
//		MyTuple2[222, 0.6971150972513658]
//		MyTuple2[636, 0.6949170154071356]
		
		
		//유사 제품 검사
		s(titles.get(itemId));
		s();
		for(int i = 0; i < 10 ; i++){
			s(titles.get(sortedSims2.get(i).getKey()) + ", " + sortedSims2.get(i).getValue());
		}		
//		결과
//		Wes Craven's New Nightmare (1994), 0.9999999999999998
//		Meet Wally Sparks (1997), 0.7538820816680819
//		Tales from the Crypt Presents: Bordello of Blood (1996), 0.7380317817065267
//		Airheads (1994), 0.7223113561822763
//		Men in Black (1997), 0.7029913601619708
//		Stephen King's The Langoliers (1995), 0.7022151386323402
//		Evil Dead II (1987), 0.7004318504984995
//		Star Trek IV: The Voyage Home (1986), 0.6998985408873872
//		Star Trek: First Contact (1996), 0.6971150972513658
//		Escape from New York (1981), 0.6949170154071356
		
		//MSE 구하기 샘플
		List<Rating> actualRatingList = s(moviesForUser.take(1));		//[Rating(789,1012,4.0)]
		Rating actualRating = actualRatingList.get(0);
		double predictedRating2 = s(model.predict(789, actualRating.product()));			//3.9513950808655918
		double squaredError = s(Math.pow(predictedRating2-actualRating.rating(), 2.0));		//0.002362438164062363
		
		/**
		 * 본격적으로 MSE 구하기
		 * 이클립스 단축키 Ctrl+2, L : return되는 값을 고려해서 변수 정의
		 */
		JavaPairRDD<Integer, Integer> usersProducts = ratings.mapToPair(rating->new Tuple2<Integer, Integer>(rating.user(), rating.product()));
		JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = model.predict(usersProducts).mapToPair(rating->new Tuple2<Tuple2<Integer, Integer>, Double>(new Tuple2<Integer, Integer>(rating.user(), rating.product()), rating.rating()));
		JavaPairRDD<Tuple2<Integer, Integer>, Double> ratingsPair = ratings.mapToPair(rating->new Tuple2<Tuple2<Integer, Integer>, Double>(new Tuple2<Integer, Integer>(rating.user(), rating.product()), rating.rating()));
		JavaPairRDD<Tuple2<Integer, Integer>, Tuple2<Double, Double>> ratingsAndpredictions = ratingsPair.join(predictions);
		
		double reduce = ratingsAndpredictions.map(tuple -> Math.pow(tuple._2()._1() - tuple._2()._2(), 2.0)).reduce((a,b)->a+b);
		double MSE = reduce/ratingsAndpredictions.count();
		s("Mean Squared Error = " + MSE);						//0.08434151380324563
		double RMSE = Math.sqrt(MSE);
		s("Root Mean Squared Error = " + RMSE);		//0.2893908476845658
		
		
		/**
		 * K평균 정확도의 평균
		 */
		JavaRDD<Integer> actualMovies = moviesForUser.map(rating->rating.product());
		s(actualMovies.collect());		//[1012, 127, 475, 93, 1161, 286, 293, 9, 50, 294, 181, 1, 1008, 508, 284, 1017, 137, 111, 742, 248, 249, 1007, 591, 150,276, 151, 129, 100, 741, 288, 762, 628, 124]
		JavaRDD<Integer> predictedMovies = sc.parallelize(Arrays.asList(topKRecs)).map(rating->rating.product());
		s(predictedMovies.collect());	//[185, 482, 182, 178, 447, 48, 693, 192, 514, 23]
		
		double apk10 = s(avgPrecisionK(actualMovies.collect(), predictedMovies.collect(), 10));		//0.0
		
		List<double[]> itemFactorsTemp = model.productFeatures().toJavaRDD().map(tuple->tuple._2()).collect();
		double[][] itemFactors = new double[itemFactorsTemp.size()][];
		for(int i = 0 ; i<itemFactorsTemp.size() ; i++){
			itemFactors[i] = itemFactorsTemp.get(i);
		}
		
		DoubleMatrix itemMatrix = new DoubleMatrix(itemFactors);
		s(itemMatrix.rows + ", " + itemMatrix.columns);	//1682, 50
		
		Broadcast<DoubleMatrix> imBroadcast = sc.broadcast(itemMatrix);
		JavaPairRDD<Integer, List<Integer>> allRecs = model.userFeatures().toJavaRDD().mapToPair(tuple->{
			DoubleMatrix userVector = new DoubleMatrix(tuple._2());
			DoubleMatrix scores = imBroadcast.value().mmul(userVector);
			double[] d = scores.data;
			HashMap<Double, Integer> zipWithIndex = new HashMap<>();
			for(int i = 0 ; i<d.length; i++){
				zipWithIndex.put(d[i], i);
			}
//			TreeMap<Double, Integer> sortedWithId = new TreeMap<Double, Integer>(zipWithIndex);						//오름차순
			TreeMap<Double, Integer> sortedWithId = new TreeMap<Double, Integer>(Collections.reverseOrder());		//내림차순
			sortedWithId.putAll(zipWithIndex);
			
			//확인
	        Iterator<Double> treeMapIter = sortedWithId.keySet().iterator();
	        while( treeMapIter.hasNext()) {
	        	Double key = treeMapIter.next();
	        	Integer value = sortedWithId.get(key);
//	            System.out.println(key + " : " + value);
	        	sortedWithId.put(key, value + 1);
	        }
	        
			return new Tuple2<Integer, List<Integer>>((int)tuple._1(), Arrays.asList(sortedWithId.values().toArray(new Integer[sortedWithId.values().size()])));
		});
		s(allRecs.collect().size());	//943
		
		JavaPairRDD<Integer, Integer> userMovies = ratings.mapToPair(rating->new Tuple2<Integer, Integer>(rating.user(), rating.product()));
		JavaPairRDD<Integer, Iterable<Integer>> userMovies2 = userMovies.groupByKey();
		
		int K1 = 10;
		JavaPairRDD<Integer, Tuple2<List<Integer>, Iterable<Integer>>> MAPK = allRecs.join(userMovies2);	//아...java로 못해먹겠네....
		JavaRDD<Double> MAPK2 = MAPK.map(tuple->{
			List<Integer> predicted = tuple._2()._1();
			Iterable<Integer> actual = tuple._2()._2();
			List<Integer> target = new ArrayList<>();
			actual.forEach(target::add);
			return avgPrecisionK(target, predicted, K1);
		});
		
		double APK = MAPK2.reduce((a,b)->a+b) / allRecs.count();
		s("Mean Average Precision at K = " + APK);		//0.034713006446834664
		
		/**
		 * MLlib 라이브러리의 내장 평가 메소드 사용(160p)
		 */
		
		ratingsAndpredictions.cache();
		JavaRDD<Tuple2<Object, Object>> predictedAndTrue = ratingsAndpredictions.map(tuple->new Tuple2<Object, Object>(tuple._2()._1(), tuple._2()._2()));
		RegressionMetrics regressionMetrics = new RegressionMetrics(JavaRDD.toRDD(predictedAndTrue));
		
		s("Mean Squared Error: " + regressionMetrics.meanSquaredError());				//0.08441911339839799
		s("Root Mean Squared Error: " + regressionMetrics.rootMeanSquaredError());		//0.2905496745797489
		
		MAPK.cache();
		JavaRDD<Tuple2<Object, Object>> predictedAndTrueForRanking = MAPK.map(tuple->{
			List<Integer> predicted = tuple._2()._1();
			Iterable<Integer> actual = tuple._2()._2();
			List<Integer> target = new ArrayList<>();
			actual.forEach(target::add);
			
			return new Tuple2<Object, Object>(predicted.toArray(), target.toArray());
		});
		
		RankingMetrics<Object> rankingMetrics = new RankingMetrics<Object>(JavaRDD.toRDD(predictedAndTrueForRanking), null);
		s("Mean Average Precision = " + rankingMetrics.meanAveragePrecision());		//0.08316338166223147
		
		//cache 해제
		ratings.unpersist();
		ratingsAndpredictions.unpersist();
		MAPK.unpersist();
	}
	private double cosineSimilarity(DoubleMatrix vec1, DoubleMatrix vec2){
		return vec1.dot(vec2) / (vec1.norm2() * vec2.norm2());
	}	
	private double avgPrecisionK(List<Integer> actual, List<Integer> predicted, int k){
		List<Integer> predK = predicted.subList(0, k);
		double score = 0.0;
		double numHits = 0.0;
		
		//굳이 책처럼 할 필요는 없다.
		for(int i = 0 ; i<predK.size() ; i++){
			int p = predK.get(i);
			if(actual.contains(p)){
				numHits += 1.0;
				score += numHits / (i + 1.0);
			}
		}
		if(actual.isEmpty()){
			return 1.0;
		}else{
			return score / Math.min(actual.size(), k);
		}
	}
}