scala로 짜여진 소스코드를 java8 로 작성하였다.
package org.test.ch4; import java.io.Serializable; public class MyTuple2<K, V extends Comparable<V>> implements Comparable<MyTuple2<K,V>>, Serializable{ private static final long serialVersionUID = 1L; private K key; private V value; public MyTuple2(K key, V value){ this.key = key; this.value = value; } public K getKey() { return key; } public void setKey(K key) { this.key = key; } public V getValue() { return value; } public void setValue(V value) { this.value = value; } @Override public String toString() { return "MyTuple2[" + key + ", " + value + "]"; } @Override public int compareTo(MyTuple2<K,V> v) { return value.compareTo(v.getValue()); } }
package org.test.ch4; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; import org.apache.log4j.PropertyConfigurator; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.mllib.evaluation.RankingMetrics; import org.apache.spark.mllib.evaluation.RegressionMetrics; import org.apache.spark.mllib.recommendation.ALS; import org.apache.spark.mllib.recommendation.MatrixFactorizationModel; import org.apache.spark.mllib.recommendation.Rating; import org.jblas.DoubleMatrix; import scala.Tuple2; /** * 4장 스파크를 이용한 추천 엔진 구현 * scala를 java로 변환해서 코딩한다. */ public class Ch4_JavaApp implements Serializable { private static final long serialVersionUID = 154356342432L; private static void s(){s("");} private static <T> T s(T o){System.out.println(o);return o;} public static void main(String...strings){ PropertyConfigurator.configure(System.getProperty("user.dir") + "/src/resources/log4j.properties"); Long start = System.currentTimeMillis(); Ch4_JavaApp c = new Ch4_JavaApp(); JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App"); c.proc1(sc); s("\n" + (System.currentTimeMillis() - start) + " ms spend..."); sc.stop(); } private void proc1(JavaSparkContext sc){ /** * 132p 무비렌즈 100k 데이터 집합에서 특징 추출 */ String ratingsFileForUser = "data/ml-100k/u.data"; //탭으로 구분됨 String itemsFile = "data/ml-100k/u.item"; //|으로 구분됨 // 사람 영화 점수 시간 // 196 242 3 881250949 // 186 302 3 891717742 // 22 377 1 878887116 // 244 51 2 880606923 // 166 346 1 886397596 JavaRDD<String> rawData = sc.textFile(getClass().getResource(ratingsFileForUser).getFile()); JavaRDD<String[]> rawRatings = rawData.map(str -> str.split("\\t")); JavaRDD<Rating> ratings = rawRatings.map(arr -> new Rating(Integer.parseInt(arr[0]), Integer.parseInt(arr[1]), Double.parseDouble(arr[2]))); ratings.cache(); s(ratings.first()); //Rating(196,242,3.0) MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), 50, 10, 0.01); s(model.userFeatures()); //users MapPartitionsRDD[209] at mapValues at ALS.scala:255 s(model.userFeatures().count()); //943 s(model.productFeatures().count()); //1682 /** * 추천 모델 사용 */ //특정사용자와 제품을 결합한 예측점수 double predictedRating = model.predict(789, 123); //2.532021015757959 s("predictedRating: " + predictedRating); //특정 사용자에게 영화 추천하기 int userId = 789; int K = 10; Rating[] topKRecs = model.recommendProducts(userId, K); for(Rating r : topKRecs){s(r);} // 결과 // Rating(789,479,5.6086207457472375) // Rating(789,135,5.3675152031432924) // Rating(789,530,5.351079666468601) // Rating(789,603,5.344757197789459) // Rating(789,56,5.232041665817473) // Rating(789,32,5.157860310800221) // Rating(789,23,5.154813663424755) // Rating(789,616,5.142307969569071) // Rating(789,182,5.132978302760848) // Rating(789,7,5.122370250313715) //->과연 제대로된 추천일까? 책이랑 결과가 다르다. // 추천 목록 점검 // 순서|영화제목|연도||등등 // 1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0 // 2|GoldenEye (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?GoldenEye%20(1995)|0|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0 // 3|Four Rooms (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Four%20Rooms%20(1995)|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0 // 4|Get Shorty (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Get%20Shorty%20(1995)|0|1|0|0|0|1|0|0|1|0|0|0|0|0|0|0|0|0|0 JavaRDD<String> movies = sc.textFile(getClass().getResource(itemsFile).getFile()); Map<Integer, String> titles = movies.map(str -> str.split("\\|")).mapToPair(arr->new Tuple2<Integer, String>(Integer.parseInt(arr[0]), arr[1])).collectAsMap(); s(titles.get(123)); //Frighteners, The (1996) // //책 방법대로 안된다. // List<Rating> movieForUser = ratings.keyBy(rating -> rating.user()).lookup(789); // //RDD를 빠져나온다... 책에서는 RDD를 유지하는 것 같았는데.... // s(movieForUser.size()); //33 // movieForUser = new ArrayList<>(movieForUser); // movieForUser.sort((o1,o2)->(int)(o1.rating() - o2.rating())); // movieForUser.sort(new Comparator<Rating>() { // public int compare(Rating o1, Rating o2) { // return (int)(o1.rating() - o2.rating()); // } // }); //다시 - 이게 훨씬 간단할 듯. JavaRDD<Rating> moviesForUser = ratings.filter(rating->rating.user() == 789); List<Tuple2<String, Double>> result = moviesForUser.sortBy(rating -> rating.rating(), false, 3) .mapToPair(rating -> new Tuple2<String, Double>(titles.get(rating.product()), rating.rating())).take(10); //take는 한쪽의 파티션 데이터만 들어올 가능성이 높다. 권하지 않음... for(Tuple2<String, Double> t : result){ System.out.println(t); } // 결과 // (Godfather, The (1972),5.0) // (Trainspotting (1996),5.0) // (Dead Man Walking (1995),5.0) // (Star Wars (1977),5.0) // (Swingers (1996),5.0) // (Leaving Las Vegas (1995),5.0) // (Bound (1996),5.0) // (Fargo (1996),5.0) // (Last Supper, The (1995),5.0) // (Private Parts (1997),4.0) JavaRDD<Rating> topKRecsRDD = sc.parallelize(Arrays.asList(topKRecs)); List<Tuple2<String, Double>> result2 = topKRecsRDD.mapToPair(rating -> new Tuple2<String, Double>(titles.get(rating.product()), rating.rating())).collect(); for(Tuple2<String, Double> t : result2){ s(t); } // 결과 // (East of Eden (1955),5.667943142890133) // (2001: A Space Odyssey (1968),5.618153701144573) // (Harold and Maude (1971),5.5870422410271345) // (Pulp Fiction (1994),5.543497847332227) // (Miller's Crossing (1990),5.536807619084026) // (Taxi Driver (1976),5.534494309386643) // (Shawshank Redemption, The (1994),5.309557575911523) // (Nosferatu (Nosferatu, eine Symphonie des Grauens) (1922),5.270318804944542) // (Blob, The (1958),5.220344419638212) // (Reservoir Dogs (1992),5.11340290625421) //결과를 어떻게 검증 가능한가?? /** * 제품 추천 */ //무비렌즈 100k 데이터 집합에서 유사 영화 생성 //jblas 라이브러리 사용 DoubleMatrix aMatrix = new DoubleMatrix(new double[]{1.0, 2.0, 3.0}); //책 방법대로 해보기 int itemId = 567; JavaRDD<Tuple2<Object, double[]>> d1 = model.productFeatures().toJavaRDD(); JavaPairRDD<Object, double[]> d2 = d1.mapToPair(t->t); List<double[]> d3 = d2.lookup(itemId); DoubleMatrix itemVector = new DoubleMatrix(d3.get(0)); //head //왜 첫번째 배열만 사용하나? s(cosineSimilarity(itemVector, itemVector)); //1.0000000000000002 /** * 1.scala.Tuple2 사용 */ //코사인 유사성 행렬을 각 제품에 적용해 보기 JavaRDD<Tuple2<Integer, Double>> sims = model.productFeatures().toJavaRDD().map(tuple->{ DoubleMatrix factorVector = new DoubleMatrix(tuple._2()); double sim = cosineSimilarity(factorVector, itemVector); return new Tuple2<Integer, Double>((Integer) tuple._1(), sim); }); //최상위 10개 제품 sims = sims.sortBy(tuple-> tuple._2(), false, 4); /** * top 을 사용하면 Exception 발생 : scala.Tuple2 cannot be cast to java.lang.Comparable * top 할때, 비교 작업을 하는 것 같은데... 할 수가 없다. * scala.Tuple2 가 java.lang.Comparable을 구현하지 않아서, compareTo 함수를 사용할 수 없다. */ // List<Tuple2<Integer, Double>> sortedSims = sims.top(10); //scala.Tuple2 cannot be cast to java.lang.Comparable List<Tuple2<Integer, Double>> sortedSims = sims.mapToPair(t->t).take(K); //정확한 결과가 아닐 수 있다.(take는 1개의 파티션에서 데이터를 전부 가져오므로) s(); sortedSims.forEach(a->s(a)); // 결과 // (567,0.9999999999999998) // (1376,0.7538820816680819) // (413,0.7380317817065267) // (940,0.7223113561822763) // (257,0.7029913601619708) // (563,0.7022151386323402) // (201,0.7004318504984995) // (230,0.6998985408873872) // (222,0.6971150972513658) // (636,0.6949170154071356) /** * 2.튜플을 직접 만들어서 사용하자. */ JavaRDD<MyTuple2<Integer, Double>> sims2 = model.productFeatures().toJavaRDD().map(tuple->{ DoubleMatrix factorVector = new DoubleMatrix(tuple._2()); double sim = cosineSimilarity(factorVector, itemVector); return new MyTuple2<Integer, Double>((Integer) tuple._1(), sim); }); List<MyTuple2<Integer, Double>> sortedSims2 = sims2.sortBy(tuple->tuple.getValue(), false, 4).top(K); s(); for(int i = 0; i < 10 ; i++){ s(sortedSims2.get(i)); } // 결과(는 일단 위와 같다) // MyTuple2[567, 0.9999999999999998] // MyTuple2[1376, 0.7538820816680819] // MyTuple2[413, 0.7380317817065267] // MyTuple2[940, 0.7223113561822763] // MyTuple2[257, 0.7029913601619708] // MyTuple2[563, 0.7022151386323402] // MyTuple2[201, 0.7004318504984995] // MyTuple2[230, 0.6998985408873872] // MyTuple2[222, 0.6971150972513658] // MyTuple2[636, 0.6949170154071356] //유사 제품 검사 s(titles.get(itemId)); s(); for(int i = 0; i < 10 ; i++){ s(titles.get(sortedSims2.get(i).getKey()) + ", " + sortedSims2.get(i).getValue()); } // 결과 // Wes Craven's New Nightmare (1994), 0.9999999999999998 // Meet Wally Sparks (1997), 0.7538820816680819 // Tales from the Crypt Presents: Bordello of Blood (1996), 0.7380317817065267 // Airheads (1994), 0.7223113561822763 // Men in Black (1997), 0.7029913601619708 // Stephen King's The Langoliers (1995), 0.7022151386323402 // Evil Dead II (1987), 0.7004318504984995 // Star Trek IV: The Voyage Home (1986), 0.6998985408873872 // Star Trek: First Contact (1996), 0.6971150972513658 // Escape from New York (1981), 0.6949170154071356 //MSE 구하기 샘플 List<Rating> actualRatingList = s(moviesForUser.take(1)); //[Rating(789,1012,4.0)] Rating actualRating = actualRatingList.get(0); double predictedRating2 = s(model.predict(789, actualRating.product())); //3.9513950808655918 double squaredError = s(Math.pow(predictedRating2-actualRating.rating(), 2.0)); //0.002362438164062363 /** * 본격적으로 MSE 구하기 * 이클립스 단축키 Ctrl+2, L : return되는 값을 고려해서 변수 정의 */ JavaPairRDD<Integer, Integer> usersProducts = ratings.mapToPair(rating->new Tuple2<Integer, Integer>(rating.user(), rating.product())); JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = model.predict(usersProducts).mapToPair(rating->new Tuple2<Tuple2<Integer, Integer>, Double>(new Tuple2<Integer, Integer>(rating.user(), rating.product()), rating.rating())); JavaPairRDD<Tuple2<Integer, Integer>, Double> ratingsPair = ratings.mapToPair(rating->new Tuple2<Tuple2<Integer, Integer>, Double>(new Tuple2<Integer, Integer>(rating.user(), rating.product()), rating.rating())); JavaPairRDD<Tuple2<Integer, Integer>, Tuple2<Double, Double>> ratingsAndpredictions = ratingsPair.join(predictions); double reduce = ratingsAndpredictions.map(tuple -> Math.pow(tuple._2()._1() - tuple._2()._2(), 2.0)).reduce((a,b)->a+b); double MSE = reduce/ratingsAndpredictions.count(); s("Mean Squared Error = " + MSE); //0.08434151380324563 double RMSE = Math.sqrt(MSE); s("Root Mean Squared Error = " + RMSE); //0.2893908476845658 /** * K평균 정확도의 평균 */ JavaRDD<Integer> actualMovies = moviesForUser.map(rating->rating.product()); s(actualMovies.collect()); //[1012, 127, 475, 93, 1161, 286, 293, 9, 50, 294, 181, 1, 1008, 508, 284, 1017, 137, 111, 742, 248, 249, 1007, 591, 150,276, 151, 129, 100, 741, 288, 762, 628, 124] JavaRDD<Integer> predictedMovies = sc.parallelize(Arrays.asList(topKRecs)).map(rating->rating.product()); s(predictedMovies.collect()); //[185, 482, 182, 178, 447, 48, 693, 192, 514, 23] double apk10 = s(avgPrecisionK(actualMovies.collect(), predictedMovies.collect(), 10)); //0.0 List<double[]> itemFactorsTemp = model.productFeatures().toJavaRDD().map(tuple->tuple._2()).collect(); double[][] itemFactors = new double[itemFactorsTemp.size()][]; for(int i = 0 ; i<itemFactorsTemp.size() ; i++){ itemFactors[i] = itemFactorsTemp.get(i); } DoubleMatrix itemMatrix = new DoubleMatrix(itemFactors); s(itemMatrix.rows + ", " + itemMatrix.columns); //1682, 50 Broadcast<DoubleMatrix> imBroadcast = sc.broadcast(itemMatrix); JavaPairRDD<Integer, List<Integer>> allRecs = model.userFeatures().toJavaRDD().mapToPair(tuple->{ DoubleMatrix userVector = new DoubleMatrix(tuple._2()); DoubleMatrix scores = imBroadcast.value().mmul(userVector); double[] d = scores.data; HashMap<Double, Integer> zipWithIndex = new HashMap<>(); for(int i = 0 ; i<d.length; i++){ zipWithIndex.put(d[i], i); } // TreeMap<Double, Integer> sortedWithId = new TreeMap<Double, Integer>(zipWithIndex); //오름차순 TreeMap<Double, Integer> sortedWithId = new TreeMap<Double, Integer>(Collections.reverseOrder()); //내림차순 sortedWithId.putAll(zipWithIndex); //확인 Iterator<Double> treeMapIter = sortedWithId.keySet().iterator(); while( treeMapIter.hasNext()) { Double key = treeMapIter.next(); Integer value = sortedWithId.get(key); // System.out.println(key + " : " + value); sortedWithId.put(key, value + 1); } return new Tuple2<Integer, List<Integer>>((int)tuple._1(), Arrays.asList(sortedWithId.values().toArray(new Integer[sortedWithId.values().size()]))); }); s(allRecs.collect().size()); //943 JavaPairRDD<Integer, Integer> userMovies = ratings.mapToPair(rating->new Tuple2<Integer, Integer>(rating.user(), rating.product())); JavaPairRDD<Integer, Iterable<Integer>> userMovies2 = userMovies.groupByKey(); int K1 = 10; JavaPairRDD<Integer, Tuple2<List<Integer>, Iterable<Integer>>> MAPK = allRecs.join(userMovies2); //아...java로 못해먹겠네.... JavaRDD<Double> MAPK2 = MAPK.map(tuple->{ List<Integer> predicted = tuple._2()._1(); Iterable<Integer> actual = tuple._2()._2(); List<Integer> target = new ArrayList<>(); actual.forEach(target::add); return avgPrecisionK(target, predicted, K1); }); double APK = MAPK2.reduce((a,b)->a+b) / allRecs.count(); s("Mean Average Precision at K = " + APK); //0.034713006446834664 /** * MLlib 라이브러리의 내장 평가 메소드 사용(160p) */ ratingsAndpredictions.cache(); JavaRDD<Tuple2<Object, Object>> predictedAndTrue = ratingsAndpredictions.map(tuple->new Tuple2<Object, Object>(tuple._2()._1(), tuple._2()._2())); RegressionMetrics regressionMetrics = new RegressionMetrics(JavaRDD.toRDD(predictedAndTrue)); s("Mean Squared Error: " + regressionMetrics.meanSquaredError()); //0.08441911339839799 s("Root Mean Squared Error: " + regressionMetrics.rootMeanSquaredError()); //0.2905496745797489 MAPK.cache(); JavaRDD<Tuple2<Object, Object>> predictedAndTrueForRanking = MAPK.map(tuple->{ List<Integer> predicted = tuple._2()._1(); Iterable<Integer> actual = tuple._2()._2(); List<Integer> target = new ArrayList<>(); actual.forEach(target::add); return new Tuple2<Object, Object>(predicted.toArray(), target.toArray()); }); RankingMetrics<Object> rankingMetrics = new RankingMetrics<Object>(JavaRDD.toRDD(predictedAndTrueForRanking), null); s("Mean Average Precision = " + rankingMetrics.meanAveragePrecision()); //0.08316338166223147 //cache 해제 ratings.unpersist(); ratingsAndpredictions.unpersist(); MAPK.unpersist(); } private double cosineSimilarity(DoubleMatrix vec1, DoubleMatrix vec2){ return vec1.dot(vec2) / (vec1.norm2() * vec2.norm2()); } private double avgPrecisionK(List<Integer> actual, List<Integer> predicted, int k){ List<Integer> predK = predicted.subList(0, k); double score = 0.0; double numHits = 0.0; //굳이 책처럼 할 필요는 없다. for(int i = 0 ; i<predK.size() ; i++){ int p = predK.get(i); if(actual.contains(p)){ numHits += 1.0; score += numHits / (i + 1.0); } } if(actual.isEmpty()){ return 1.0; }else{ return score / Math.min(actual.size(), k); } } }
'Spark > Spark와 머신 러닝' 카테고리의 다른 글
Spark 시작하기20 - [Spark와 머신 러닝] 5장 스파크를 이용한 분류 모델 구현 (2) | 2016.05.07 |
---|---|
Spark 시작하기18 - [Spark와 머신 러닝] 3장 스파크를 이용한 데이터 수집, 프로세싱, 준비2 (0) | 2016.05.04 |
Spark 시작하기05 - Exception (0) | 2016.04.03 |
Spark 시작하기04 - [Spark와 머신 러닝] 3장 스파크를 이용한 데이터 수집, 프로세싱, 준비 (0) | 2016.04.03 |
Spark 시작하기03 - [Spark와 머신 러닝] 1장 스파크의 시작과 구동 (0) | 2016.03.27 |