scala로 짜여진 소스코드를 java8 로 작성하였다.
package org.test.ch4;
import java.io.Serializable;
public class MyTuple2<K, V extends Comparable<V>> implements Comparable<MyTuple2<K,V>>, Serializable{
private static final long serialVersionUID = 1L;
private K key;
private V value;
public MyTuple2(K key, V value){
this.key = key;
this.value = value;
}
public K getKey() {
return key;
}
public void setKey(K key) {
this.key = key;
}
public V getValue() {
return value;
}
public void setValue(V value) {
this.value = value;
}
@Override
public String toString() {
return "MyTuple2[" + key + ", " + value + "]";
}
@Override
public int compareTo(MyTuple2<K,V> v) {
return value.compareTo(v.getValue());
}
}
package org.test.ch4;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.mllib.evaluation.RankingMetrics;
import org.apache.spark.mllib.evaluation.RegressionMetrics;
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;
import org.jblas.DoubleMatrix;
import scala.Tuple2;
/**
* 4장 스파크를 이용한 추천 엔진 구현
* scala를 java로 변환해서 코딩한다.
*/
public class Ch4_JavaApp implements Serializable {
private static final long serialVersionUID = 154356342432L;
private static void s(){s("");}
private static <T> T s(T o){System.out.println(o);return o;}
public static void main(String...strings){
PropertyConfigurator.configure(System.getProperty("user.dir") + "/src/resources/log4j.properties");
Long start = System.currentTimeMillis();
Ch4_JavaApp c = new Ch4_JavaApp();
JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
c.proc1(sc);
s("\n" + (System.currentTimeMillis() - start) + " ms spend...");
sc.stop();
}
private void proc1(JavaSparkContext sc){
/**
* 132p 무비렌즈 100k 데이터 집합에서 특징 추출
*/
String ratingsFileForUser = "data/ml-100k/u.data"; //탭으로 구분됨
String itemsFile = "data/ml-100k/u.item"; //|으로 구분됨
// 사람 영화 점수 시간
// 196 242 3 881250949
// 186 302 3 891717742
// 22 377 1 878887116
// 244 51 2 880606923
// 166 346 1 886397596
JavaRDD<String> rawData = sc.textFile(getClass().getResource(ratingsFileForUser).getFile());
JavaRDD<String[]> rawRatings = rawData.map(str -> str.split("\\t"));
JavaRDD<Rating> ratings = rawRatings.map(arr -> new Rating(Integer.parseInt(arr[0]), Integer.parseInt(arr[1]), Double.parseDouble(arr[2])));
ratings.cache();
s(ratings.first()); //Rating(196,242,3.0)
MatrixFactorizationModel model = ALS.train(JavaRDD.toRDD(ratings), 50, 10, 0.01);
s(model.userFeatures()); //users MapPartitionsRDD[209] at mapValues at ALS.scala:255
s(model.userFeatures().count()); //943
s(model.productFeatures().count()); //1682
/**
* 추천 모델 사용
*/
//특정사용자와 제품을 결합한 예측점수
double predictedRating = model.predict(789, 123); //2.532021015757959
s("predictedRating: " + predictedRating);
//특정 사용자에게 영화 추천하기
int userId = 789;
int K = 10;
Rating[] topKRecs = model.recommendProducts(userId, K);
for(Rating r : topKRecs){s(r);}
// 결과
// Rating(789,479,5.6086207457472375)
// Rating(789,135,5.3675152031432924)
// Rating(789,530,5.351079666468601)
// Rating(789,603,5.344757197789459)
// Rating(789,56,5.232041665817473)
// Rating(789,32,5.157860310800221)
// Rating(789,23,5.154813663424755)
// Rating(789,616,5.142307969569071)
// Rating(789,182,5.132978302760848)
// Rating(789,7,5.122370250313715)
//->과연 제대로된 추천일까? 책이랑 결과가 다르다.
// 추천 목록 점검
// 순서|영화제목|연도||등등
// 1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
// 2|GoldenEye (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?GoldenEye%20(1995)|0|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
// 3|Four Rooms (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Four%20Rooms%20(1995)|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0
// 4|Get Shorty (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Get%20Shorty%20(1995)|0|1|0|0|0|1|0|0|1|0|0|0|0|0|0|0|0|0|0
JavaRDD<String> movies = sc.textFile(getClass().getResource(itemsFile).getFile());
Map<Integer, String> titles = movies.map(str -> str.split("\\|")).mapToPair(arr->new Tuple2<Integer, String>(Integer.parseInt(arr[0]), arr[1])).collectAsMap();
s(titles.get(123)); //Frighteners, The (1996)
// //책 방법대로 안된다.
// List<Rating> movieForUser = ratings.keyBy(rating -> rating.user()).lookup(789);
// //RDD를 빠져나온다... 책에서는 RDD를 유지하는 것 같았는데....
// s(movieForUser.size()); //33
// movieForUser = new ArrayList<>(movieForUser);
// movieForUser.sort((o1,o2)->(int)(o1.rating() - o2.rating()));
// movieForUser.sort(new Comparator<Rating>() {
// public int compare(Rating o1, Rating o2) {
// return (int)(o1.rating() - o2.rating());
// }
// });
//다시 - 이게 훨씬 간단할 듯.
JavaRDD<Rating> moviesForUser = ratings.filter(rating->rating.user() == 789);
List<Tuple2<String, Double>> result = moviesForUser.sortBy(rating -> rating.rating(), false, 3)
.mapToPair(rating -> new Tuple2<String, Double>(titles.get(rating.product()), rating.rating())).take(10); //take는 한쪽의 파티션 데이터만 들어올 가능성이 높다. 권하지 않음...
for(Tuple2<String, Double> t : result){
System.out.println(t);
}
// 결과
// (Godfather, The (1972),5.0)
// (Trainspotting (1996),5.0)
// (Dead Man Walking (1995),5.0)
// (Star Wars (1977),5.0)
// (Swingers (1996),5.0)
// (Leaving Las Vegas (1995),5.0)
// (Bound (1996),5.0)
// (Fargo (1996),5.0)
// (Last Supper, The (1995),5.0)
// (Private Parts (1997),4.0)
JavaRDD<Rating> topKRecsRDD = sc.parallelize(Arrays.asList(topKRecs));
List<Tuple2<String, Double>> result2 = topKRecsRDD.mapToPair(rating -> new Tuple2<String, Double>(titles.get(rating.product()), rating.rating())).collect();
for(Tuple2<String, Double> t : result2){
s(t);
}
// 결과
// (East of Eden (1955),5.667943142890133)
// (2001: A Space Odyssey (1968),5.618153701144573)
// (Harold and Maude (1971),5.5870422410271345)
// (Pulp Fiction (1994),5.543497847332227)
// (Miller's Crossing (1990),5.536807619084026)
// (Taxi Driver (1976),5.534494309386643)
// (Shawshank Redemption, The (1994),5.309557575911523)
// (Nosferatu (Nosferatu, eine Symphonie des Grauens) (1922),5.270318804944542)
// (Blob, The (1958),5.220344419638212)
// (Reservoir Dogs (1992),5.11340290625421)
//결과를 어떻게 검증 가능한가??
/**
* 제품 추천
*/
//무비렌즈 100k 데이터 집합에서 유사 영화 생성
//jblas 라이브러리 사용
DoubleMatrix aMatrix = new DoubleMatrix(new double[]{1.0, 2.0, 3.0});
//책 방법대로 해보기
int itemId = 567;
JavaRDD<Tuple2<Object, double[]>> d1 = model.productFeatures().toJavaRDD();
JavaPairRDD<Object, double[]> d2 = d1.mapToPair(t->t);
List<double[]> d3 = d2.lookup(itemId);
DoubleMatrix itemVector = new DoubleMatrix(d3.get(0)); //head //왜 첫번째 배열만 사용하나?
s(cosineSimilarity(itemVector, itemVector)); //1.0000000000000002
/**
* 1.scala.Tuple2 사용
*/
//코사인 유사성 행렬을 각 제품에 적용해 보기
JavaRDD<Tuple2<Integer, Double>> sims = model.productFeatures().toJavaRDD().map(tuple->{
DoubleMatrix factorVector = new DoubleMatrix(tuple._2());
double sim = cosineSimilarity(factorVector, itemVector);
return new Tuple2<Integer, Double>((Integer) tuple._1(), sim);
});
//최상위 10개 제품
sims = sims.sortBy(tuple-> tuple._2(), false, 4);
/**
* top 을 사용하면 Exception 발생 : scala.Tuple2 cannot be cast to java.lang.Comparable
* top 할때, 비교 작업을 하는 것 같은데... 할 수가 없다.
* scala.Tuple2 가 java.lang.Comparable을 구현하지 않아서, compareTo 함수를 사용할 수 없다.
*/
// List<Tuple2<Integer, Double>> sortedSims = sims.top(10); //scala.Tuple2 cannot be cast to java.lang.Comparable
List<Tuple2<Integer, Double>> sortedSims = sims.mapToPair(t->t).take(K); //정확한 결과가 아닐 수 있다.(take는 1개의 파티션에서 데이터를 전부 가져오므로)
s();
sortedSims.forEach(a->s(a));
// 결과
// (567,0.9999999999999998)
// (1376,0.7538820816680819)
// (413,0.7380317817065267)
// (940,0.7223113561822763)
// (257,0.7029913601619708)
// (563,0.7022151386323402)
// (201,0.7004318504984995)
// (230,0.6998985408873872)
// (222,0.6971150972513658)
// (636,0.6949170154071356)
/**
* 2.튜플을 직접 만들어서 사용하자.
*/
JavaRDD<MyTuple2<Integer, Double>> sims2 = model.productFeatures().toJavaRDD().map(tuple->{
DoubleMatrix factorVector = new DoubleMatrix(tuple._2());
double sim = cosineSimilarity(factorVector, itemVector);
return new MyTuple2<Integer, Double>((Integer) tuple._1(), sim);
});
List<MyTuple2<Integer, Double>> sortedSims2 = sims2.sortBy(tuple->tuple.getValue(), false, 4).top(K);
s();
for(int i = 0; i < 10 ; i++){
s(sortedSims2.get(i));
}
// 결과(는 일단 위와 같다)
// MyTuple2[567, 0.9999999999999998]
// MyTuple2[1376, 0.7538820816680819]
// MyTuple2[413, 0.7380317817065267]
// MyTuple2[940, 0.7223113561822763]
// MyTuple2[257, 0.7029913601619708]
// MyTuple2[563, 0.7022151386323402]
// MyTuple2[201, 0.7004318504984995]
// MyTuple2[230, 0.6998985408873872]
// MyTuple2[222, 0.6971150972513658]
// MyTuple2[636, 0.6949170154071356]
//유사 제품 검사
s(titles.get(itemId));
s();
for(int i = 0; i < 10 ; i++){
s(titles.get(sortedSims2.get(i).getKey()) + ", " + sortedSims2.get(i).getValue());
}
// 결과
// Wes Craven's New Nightmare (1994), 0.9999999999999998
// Meet Wally Sparks (1997), 0.7538820816680819
// Tales from the Crypt Presents: Bordello of Blood (1996), 0.7380317817065267
// Airheads (1994), 0.7223113561822763
// Men in Black (1997), 0.7029913601619708
// Stephen King's The Langoliers (1995), 0.7022151386323402
// Evil Dead II (1987), 0.7004318504984995
// Star Trek IV: The Voyage Home (1986), 0.6998985408873872
// Star Trek: First Contact (1996), 0.6971150972513658
// Escape from New York (1981), 0.6949170154071356
//MSE 구하기 샘플
List<Rating> actualRatingList = s(moviesForUser.take(1)); //[Rating(789,1012,4.0)]
Rating actualRating = actualRatingList.get(0);
double predictedRating2 = s(model.predict(789, actualRating.product())); //3.9513950808655918
double squaredError = s(Math.pow(predictedRating2-actualRating.rating(), 2.0)); //0.002362438164062363
/**
* 본격적으로 MSE 구하기
* 이클립스 단축키 Ctrl+2, L : return되는 값을 고려해서 변수 정의
*/
JavaPairRDD<Integer, Integer> usersProducts = ratings.mapToPair(rating->new Tuple2<Integer, Integer>(rating.user(), rating.product()));
JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = model.predict(usersProducts).mapToPair(rating->new Tuple2<Tuple2<Integer, Integer>, Double>(new Tuple2<Integer, Integer>(rating.user(), rating.product()), rating.rating()));
JavaPairRDD<Tuple2<Integer, Integer>, Double> ratingsPair = ratings.mapToPair(rating->new Tuple2<Tuple2<Integer, Integer>, Double>(new Tuple2<Integer, Integer>(rating.user(), rating.product()), rating.rating()));
JavaPairRDD<Tuple2<Integer, Integer>, Tuple2<Double, Double>> ratingsAndpredictions = ratingsPair.join(predictions);
double reduce = ratingsAndpredictions.map(tuple -> Math.pow(tuple._2()._1() - tuple._2()._2(), 2.0)).reduce((a,b)->a+b);
double MSE = reduce/ratingsAndpredictions.count();
s("Mean Squared Error = " + MSE); //0.08434151380324563
double RMSE = Math.sqrt(MSE);
s("Root Mean Squared Error = " + RMSE); //0.2893908476845658
/**
* K평균 정확도의 평균
*/
JavaRDD<Integer> actualMovies = moviesForUser.map(rating->rating.product());
s(actualMovies.collect()); //[1012, 127, 475, 93, 1161, 286, 293, 9, 50, 294, 181, 1, 1008, 508, 284, 1017, 137, 111, 742, 248, 249, 1007, 591, 150,276, 151, 129, 100, 741, 288, 762, 628, 124]
JavaRDD<Integer> predictedMovies = sc.parallelize(Arrays.asList(topKRecs)).map(rating->rating.product());
s(predictedMovies.collect()); //[185, 482, 182, 178, 447, 48, 693, 192, 514, 23]
double apk10 = s(avgPrecisionK(actualMovies.collect(), predictedMovies.collect(), 10)); //0.0
List<double[]> itemFactorsTemp = model.productFeatures().toJavaRDD().map(tuple->tuple._2()).collect();
double[][] itemFactors = new double[itemFactorsTemp.size()][];
for(int i = 0 ; i<itemFactorsTemp.size() ; i++){
itemFactors[i] = itemFactorsTemp.get(i);
}
DoubleMatrix itemMatrix = new DoubleMatrix(itemFactors);
s(itemMatrix.rows + ", " + itemMatrix.columns); //1682, 50
Broadcast<DoubleMatrix> imBroadcast = sc.broadcast(itemMatrix);
JavaPairRDD<Integer, List<Integer>> allRecs = model.userFeatures().toJavaRDD().mapToPair(tuple->{
DoubleMatrix userVector = new DoubleMatrix(tuple._2());
DoubleMatrix scores = imBroadcast.value().mmul(userVector);
double[] d = scores.data;
HashMap<Double, Integer> zipWithIndex = new HashMap<>();
for(int i = 0 ; i<d.length; i++){
zipWithIndex.put(d[i], i);
}
// TreeMap<Double, Integer> sortedWithId = new TreeMap<Double, Integer>(zipWithIndex); //오름차순
TreeMap<Double, Integer> sortedWithId = new TreeMap<Double, Integer>(Collections.reverseOrder()); //내림차순
sortedWithId.putAll(zipWithIndex);
//확인
Iterator<Double> treeMapIter = sortedWithId.keySet().iterator();
while( treeMapIter.hasNext()) {
Double key = treeMapIter.next();
Integer value = sortedWithId.get(key);
// System.out.println(key + " : " + value);
sortedWithId.put(key, value + 1);
}
return new Tuple2<Integer, List<Integer>>((int)tuple._1(), Arrays.asList(sortedWithId.values().toArray(new Integer[sortedWithId.values().size()])));
});
s(allRecs.collect().size()); //943
JavaPairRDD<Integer, Integer> userMovies = ratings.mapToPair(rating->new Tuple2<Integer, Integer>(rating.user(), rating.product()));
JavaPairRDD<Integer, Iterable<Integer>> userMovies2 = userMovies.groupByKey();
int K1 = 10;
JavaPairRDD<Integer, Tuple2<List<Integer>, Iterable<Integer>>> MAPK = allRecs.join(userMovies2); //아...java로 못해먹겠네....
JavaRDD<Double> MAPK2 = MAPK.map(tuple->{
List<Integer> predicted = tuple._2()._1();
Iterable<Integer> actual = tuple._2()._2();
List<Integer> target = new ArrayList<>();
actual.forEach(target::add);
return avgPrecisionK(target, predicted, K1);
});
double APK = MAPK2.reduce((a,b)->a+b) / allRecs.count();
s("Mean Average Precision at K = " + APK); //0.034713006446834664
/**
* MLlib 라이브러리의 내장 평가 메소드 사용(160p)
*/
ratingsAndpredictions.cache();
JavaRDD<Tuple2<Object, Object>> predictedAndTrue = ratingsAndpredictions.map(tuple->new Tuple2<Object, Object>(tuple._2()._1(), tuple._2()._2()));
RegressionMetrics regressionMetrics = new RegressionMetrics(JavaRDD.toRDD(predictedAndTrue));
s("Mean Squared Error: " + regressionMetrics.meanSquaredError()); //0.08441911339839799
s("Root Mean Squared Error: " + regressionMetrics.rootMeanSquaredError()); //0.2905496745797489
MAPK.cache();
JavaRDD<Tuple2<Object, Object>> predictedAndTrueForRanking = MAPK.map(tuple->{
List<Integer> predicted = tuple._2()._1();
Iterable<Integer> actual = tuple._2()._2();
List<Integer> target = new ArrayList<>();
actual.forEach(target::add);
return new Tuple2<Object, Object>(predicted.toArray(), target.toArray());
});
RankingMetrics<Object> rankingMetrics = new RankingMetrics<Object>(JavaRDD.toRDD(predictedAndTrueForRanking), null);
s("Mean Average Precision = " + rankingMetrics.meanAveragePrecision()); //0.08316338166223147
//cache 해제
ratings.unpersist();
ratingsAndpredictions.unpersist();
MAPK.unpersist();
}
private double cosineSimilarity(DoubleMatrix vec1, DoubleMatrix vec2){
return vec1.dot(vec2) / (vec1.norm2() * vec2.norm2());
}
private double avgPrecisionK(List<Integer> actual, List<Integer> predicted, int k){
List<Integer> predK = predicted.subList(0, k);
double score = 0.0;
double numHits = 0.0;
//굳이 책처럼 할 필요는 없다.
for(int i = 0 ; i<predK.size() ; i++){
int p = predK.get(i);
if(actual.contains(p)){
numHits += 1.0;
score += numHits / (i + 1.0);
}
}
if(actual.isEmpty()){
return 1.0;
}else{
return score / Math.min(actual.size(), k);
}
}
}
'Spark > Spark와 머신 러닝' 카테고리의 다른 글
| Spark 시작하기20 - [Spark와 머신 러닝] 5장 스파크를 이용한 분류 모델 구현 (2) | 2016.05.07 |
|---|---|
| Spark 시작하기18 - [Spark와 머신 러닝] 3장 스파크를 이용한 데이터 수집, 프로세싱, 준비2 (0) | 2016.05.04 |
| Spark 시작하기05 - Exception (0) | 2016.04.03 |
| Spark 시작하기04 - [Spark와 머신 러닝] 3장 스파크를 이용한 데이터 수집, 프로세싱, 준비 (0) | 2016.04.03 |
| Spark 시작하기03 - [Spark와 머신 러닝] 1장 스파크의 시작과 구동 (0) | 2016.03.27 |