scala로 짜여진 소스코드를 java8 로 작성하였다.
package org.test.ch5; import java.io.Serializable; import java.util.Arrays; import java.util.Map; import org.apache.log4j.PropertyConfigurator; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.mllib.classification.LogisticRegressionModel; import org.apache.spark.mllib.classification.LogisticRegressionWithSGD; import org.apache.spark.mllib.classification.NaiveBayes; import org.apache.spark.mllib.classification.NaiveBayesModel; import org.apache.spark.mllib.classification.SVMModel; import org.apache.spark.mllib.classification.SVMWithSGD; import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics; import org.apache.spark.mllib.feature.StandardScaler; import org.apache.spark.mllib.feature.StandardScalerModel; import org.apache.spark.mllib.linalg.Vector; import org.apache.spark.mllib.linalg.Vectors; import org.apache.spark.mllib.linalg.distributed.RowMatrix; import org.apache.spark.mllib.optimization.L1Updater; import org.apache.spark.mllib.optimization.SimpleUpdater; import org.apache.spark.mllib.optimization.SquaredL2Updater; import org.apache.spark.mllib.optimization.Updater; import org.apache.spark.mllib.regression.GeneralizedLinearModel; import org.apache.spark.mllib.regression.LabeledPoint; import org.apache.spark.mllib.stat.MultivariateStatisticalSummary; import org.apache.spark.mllib.tree.DecisionTree; import org.apache.spark.mllib.tree.configuration.Algo; import org.apache.spark.mllib.tree.impurity.Entropy; import org.apache.spark.mllib.tree.impurity.Impurity; import org.apache.spark.mllib.tree.model.DecisionTreeModel; import scala.Tuple2; /** * 5장 스파크를 이용한 분류 모델 구현 * https://www.kaggle.com/c/stumbleupon/data * 에서 관련파일 다운로드(train.tsv) */ /** 5장 책 구성 1.데이터 파싱(앞의 4개 열은 텍스트이므로 제외) 2.모델 트레이닝(로지스틱 회귀, SVM, 나이브베이즈, 의사결정트리) 3-1.모델 정확도 측정 3-2.모델 성능 PR,ROC 측정 3-3.결론:너무 낮다.... 4-1.데이터 파싱 -> 특징 표준화 : (값-평균)/sqrt(분산) = 특징 표준화된 값2 4-2.로지스틱회귀 : 모델 트레이닝 및 성능 측정 5-1.데이터 파싱 -> 3열의 텍스트 데이터(카테고리)의 종류를 파악해서, 차원 확대 + 나머지 열 데이터(+ 특징표준화) 5-2.로지스틱회귀 :모델 트레이닝 및 성능 측정 6-1.데이터 파싱 -> 3열의 텍스트 데이터(카테고리)의 종류를 파악해서, 차원 확대 -> 카테고리 열 하나만 사용 6-2.나이브베이즈 : 모델 트레이닝 및 성능 측정 7.1.데이터 파싱 -> 3열의 텍스트 데이터(카테고리)의 종류를 파악해서, 차원 확대 + 나머지 열 데이터(+ 특징표준화) 7-2.모델 트레이닝 매개변수 튜닝(numIterations, stepSize, regParam(SquaredL2Updater / L1Updater) 7-3.로지스틱회귀 : 모델 트레이닝 및 성능 측정 8-1.모델 트레이닝 매개변수 튜닝(maxDepth) 8-2.의사결정 트리 : 모델 트레이닝 및 성능 측정 9-1.모델 트레이닝 매개변수 튜닝(lambda) 9-2.나이브 베이즈 : 모델 트레이닝 및 성능 측정 */ public class Ch5_JavaApp implements Serializable { private static final long serialVersionUID = 154356342432L; private static void s(){s("");} private static <T> T s(T o){System.out.println(o);return o;} final int maxTreeDepth = 5; final int numIterations = 10; public static void main(String...strings){ PropertyConfigurator.configure(System.getProperty("user.dir") + "/src/resources/log4j.properties"); Long start = System.currentTimeMillis(); Ch5_JavaApp c = new Ch5_JavaApp(); JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App"); c.proc1(sc); c.proc1_1(sc); c.proc2(sc); c.proc3(sc); c.proc4(sc); c.proc5(sc); c.proc6(sc); c.proc7(sc); s("\n" + (System.currentTimeMillis() - start) + " ms spend..."); sc.stop(); } private JavaRDD<String[]> getRecords(JavaSparkContext sc){ // "url" "urlid" "boilerplate" "alchemy_category" "alchemy_category_score" "avglinksize" "commonlinkratio_1" "commonlinkratio_2" "commonlinkratio_3" "commonlinkratio_4" "compression_ratio" "embed_ratio" "framebased" "frameTagRatio" "hasDomainLink" "html_ratio" "image_ratio" "is_news" "lengthyLinkDomain" "linkwordscore" "news_front_page" "non_markup_alphanum_characters" "numberOfLinks" "numwords_in_url" "parametrizedLinkRatio" "spelling_errors_ratio" "label" JavaRDD<String> rawData = sc.textFile(getClass().getResource("data/train_noheader.tsv").getFile()); JavaRDD<String[]> records = rawData.map(str -> { return str.replaceAll("\"", "").split("\\t"); }); return records; } private JavaRDD<LabeledPoint> getData(JavaSparkContext sc){ JavaRDD<String[]> records = getRecords(sc); // s(Arrays.toString(records.first())); /** * 데이터 집합에서 적절한 특징 추출 (중요함!!!!). 처음에 여기서 데이터를 잘못 쪼개서, 의사결정트리 성능이 100%가 나와버렸다.-_-;; */ JavaRDD<LabeledPoint> data = records.map(arr->{ // 8열이 있다면, // 4,5,6열의 데이터를 features로 보고 // 7열의 데이터를 label로 // 0 1 2 3 4 ... 20 21 // X X X X O O O X // X X X X O O O X // X X X X O O O X int label = Integer.parseInt(arr[arr.length - 1]); double[] features = new double[arr.length - 5]; for(int i = 0 ; i < arr.length - 5 ; i++){ //유실된 데이터 보정 features[i] = "?".equals(arr[4+i]) ? 0.0 : Double.parseDouble(arr[4+i]); } return new LabeledPoint(label, Vectors.dense(features)); }); return data; } private JavaRDD<LabeledPoint> getNBData(JavaSparkContext sc){ JavaRDD<String[]> records = getRecords(sc); //나이브베이즈 모델을 위해(음수는 지원하지 않는다.) JavaRDD<LabeledPoint> nbData = records.map(arr->{ int label = Integer.parseInt(arr[arr.length - 1]); double[] features = new double[arr.length - 5]; for(int i = 0 ; i < arr.length - 5 ; i++){ if("?".equals(arr[4+i])){ //유실된 데이터 보정 features[i] = 0.0; }else{ double d = Double.parseDouble(arr[4+i]); features[i] = d < 0.0 ? 1.0 : d; } } return new LabeledPoint(label, Vectors.dense(features)); }); return nbData; } private void printAUC(String subject, JavaRDD<Tuple2<Object, Object>> scoreAndLabels){ BinaryClassificationMetrics lrMetrics = new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels)); s(subject + (lrMetrics.areaUnderPR()*100.0) + ", " + (lrMetrics.areaUnderROC()*100.0)); } private void proc1(JavaSparkContext sc){ JavaRDD<LabeledPoint> data = getData(sc); data.cache(); s("length:"+data.first().features().toArray().length); //22 // data.foreach(d->s(d)); //엑셀로 정리하기 위해 전체 데이터 출력해봄. JavaRDD<LabeledPoint> nbData = getNBData(sc); nbData.cache(); s(data.count()); //7395 /** * 분류 모델 트레이닝 */ s("로지스틱 회귀"); LogisticRegressionModel lrModel = s(LogisticRegressionWithSGD.train(JavaRDD.toRDD(data), numIterations)); // org.apache.spark.mllib.classification.LogisticRegressionModel: intercept = 0.0, numFeatures = 22, numClasses = 2, threshold = 0.5 s("서포트 벡터 머신 모델"); SVMModel svmModel = s(SVMWithSGD.train(JavaRDD.toRDD(data), numIterations)); // org.apache.spark.mllib.classification.SVMModel: intercept = 0.0, numFeatures = 22, numClasses = 2, threshold = 0.0 s("나이브 베이즈 모델"); NaiveBayesModel nbModel = s(NaiveBayes.train(JavaRDD.toRDD(nbData))); // org.apache.spark.mllib.classification.NaiveBayesModel@3c25cfe1 s("의사결정 트리 모델"); DecisionTreeModel dtModel = s(DecisionTree.train(JavaRDD.toRDD(data), Algo.Classification(), Entropy.instance(), maxTreeDepth)); //or Gini.instance() // DecisionTreeModel classifier of depth 5 with 61 nodes /** * 분류 모델 사용 */ //로지스틱 LabeledPoint dataPoint = data.first(); double prediction = s(lrModel.predict(dataPoint.features())); //예측 : 1.0 s("dataPoint.features()" + dataPoint.features()); s("dataPoint.label()" + dataPoint.label()); //0.0 <- 실제 데이터 0.0, 잘못된 예측(1.0) JavaRDD<Double> predictions = lrModel.predict(data.map(lp->lp.features())); s(predictions.take(10)); //[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0] s(svmModel.predict(dataPoint.features())); //예측 : 1.0 s(nbModel.predict(nbData.first().features())); //예측 : 1.0 s(dtModel.predict(dataPoint.features())); //예측 : 0.0 /** * 분류 모델의 성능 평가 */ //로지스틱 회귀 성능 평가 double lrTotalCorrect = data.mapToDouble(lp->{ return lrModel.predict(lp.features()) == lp.label() ? 1.0 : 0.0; }).sum(); //SVM 성능 평가 double svmTotalCorrect = data.mapToDouble(lp->{ return svmModel.predict(lp.features()) == lp.label() ? 1.0 : 0.0; }).sum(); //나이브베이즈 성능 평가 double nbTotalCorrect = nbData.mapToDouble(lp->{ return nbModel.predict(lp.features()) == lp.label() ? 1.0 : 0.0; }).sum(); //의사결정트리 성능 평가 double dtTotalCorrect = data.mapToDouble(lp->{ double score = dtModel.predict(lp.features()); double predicted = score > 0.5 ? 1.0 : 0.0; //score가 1.0 또는 0.0 이 나오므로 굳이 0.5로 구분할 필요는 없다. return predicted == lp.label() ? 1.0 : 0.0; }).sum(); double numData = data.count(); s("로지스틱 회귀 성능:" + lrTotalCorrect / numData); //0.5146720757268425 <- 분류 정확도 50% -_-;; s("svm 성능:" + svmTotalCorrect / numData); //0.5146720757268425 s("나이브베이즈 성능:" + nbTotalCorrect / numData); //0.5803921568627451 s("의사결정트리 성능:" + dtTotalCorrect / numData); //0.6482758620689655 /** * 다른 성능 평가 방법 * PR, ROC */ //로지스틱 JavaRDD<Tuple2<Object, Object>> lrScoreAndLabels = data.map(lp -> new Tuple2<Object, Object>(lrModel.predict(lp.features()), lp.label())); printAUC("로지스틱 회귀 PR,ROC: ", lrScoreAndLabels); //SVM JavaRDD<Tuple2<Object, Object>> svmScoreAndLabels = data.map(lp -> new Tuple2<Object, Object>(svmModel.predict(lp.features()), lp.label())); printAUC("SVM PR,ROC: ", svmScoreAndLabels); //나이브 베이즈 JavaRDD<Tuple2<Object, Object>> nbScoreAndLabels = nbData.map(lp -> { return new Tuple2<Object, Object>(nbModel.predict(lp.features()) > 0.5 ? 1.0 : 0.0, lp.label()); }); printAUC("나이브베이즈 PR,ROC: ", nbScoreAndLabels); //의사결정 트리 JavaRDD<Tuple2<Object, Object>> dtScoreAndLabels = data.map(lp -> { return new Tuple2<Object, Object>(dtModel.predict(lp.features()) > 0.5 ? 1.0 : 0.0, lp.label()); }); printAUC("의사결정트리 PR,ROC: ", dtScoreAndLabels); // 결과 // 로지스틱 회귀 PR,ROC: 75.67586293858841, 50.14181143280931 // SVM PR,ROC: 75.67586293858841, 50.14181143280931 // 나이브베이즈 PR,ROC: 68.12849819263486, 58.39392440412702 // 의사결정트리 PR,ROC: 74.30805993331198, 64.88371887050936 data.unpersist(); nbData.unpersist(); } private void proc1_1(JavaSparkContext sc){ /** * 모델의 성능 향상!!! * 특징 표준화 - 로지스틱 회귀 모델에만 효과 있음. */ JavaRDD<LabeledPoint> data = getData(sc); JavaRDD<Vector> vectors = data.map(lp->lp.features()); RowMatrix matrix = new RowMatrix(JavaRDD.toRDD(vectors)); MultivariateStatisticalSummary matrixSummary = matrix.computeColumnSummaryStatistics(); Vector mean = s(matrixSummary.mean()); //평균값 Vector min = s(matrixSummary.min()); //최소값 Vector max = s(matrixSummary.max()); //최대값 Vector variance = s(matrixSummary.variance()); //분산 Vector numNonzeros = s(matrixSummary.numNonzeros());//0이 아닌 값들의 개수? StandardScalerModel scaler = new StandardScaler(true, true).fit(JavaRDD.toRDD(vectors)); JavaRDD<LabeledPoint> scaledData = data.map(lp->new LabeledPoint(lp.label(), scaler.transform(lp.features()))); s(data.first().features()); s(scaledData.first().features()); // (x-u)/sqrt(variance) // (값-평균)/sqrt(분산) = 특징 표준화된 값2 s((data.first().features().toArray()[0] - mean.toArray()[0])/Math.sqrt(variance.toArray()[0])); //1.137647336497678 s((0.789131 - 0.4122)/Math.sqrt(0.1097)); //1.138042662130702 //같다고 봐야지;;;; // 로지스틱 회귀 - 모델 다시 만들기 LogisticRegressionModel lrModel2 = s(LogisticRegressionWithSGD.train(JavaRDD.toRDD(scaledData), numIterations)); //로지스틱 회귀 성능 평가 다시 하기 double lrTotalCorrect2 = scaledData.mapToDouble(lp->{ return lrModel2.predict(lp.features()) == lp.label() ? 1.0 : 0.0; }).sum(); s("로지스틱 회귀 성능2:" + lrTotalCorrect2 / data.count()); //0.6204192021636241 <- 10% 향상!!! JavaRDD<Tuple2<Object, Object>> lrScoreAndLabels2 = scaledData.map(lp -> new Tuple2<Object, Object>(lrModel2.predict(lp.features()), lp.label())); printAUC("로지스틱 회귀2 PR,ROC: ", lrScoreAndLabels2); // 로지스틱 회귀2 PR,ROC: 72.72540762713375, 61.96629669112512 <-ROC 10% 향상 } private Map<String, Long> getCategories(JavaRDD<String[]> records){ // 3열인, alchemy_category_score 컬럼의 데이터를 가져온다. Map<String, Long> categories = records.map(arr->arr[3]).distinct().zipWithIndex().collectAsMap(); return categories; } private JavaRDD<LabeledPoint> getScaledDataCats(JavaSparkContext sc){ JavaRDD<String[]> records = getRecords(sc); Map<String, Long> categories = getCategories(records); int numCategories = s(categories.size()); //14 // s(categories); //{religion=13, culture_politics=4, science_technology=8, arts_entertainment=10, weather=2, health=0, recreation=3, gaming=7, ?=11, unknown=5, sports=12, computer_internet=6, business=1, law_crime=9} JavaRDD<LabeledPoint> dataCetegories = records.map(arr->{ int label = Integer.parseInt(arr[arr.length - 1]); int categoryIdx = categories.get(arr[3]).intValue(); //Long의 숫자가 int범위를 넘지 않는 한에서... double[] categoryFeatures = new double[numCategories]; categoryFeatures[categoryIdx] = 1.0; double[] otherFeatures = new double[arr.length - 5]; for(int i = 0 ; i < arr.length - 5 ; i++){ //유실된 데이터 보정 otherFeatures[i] = "?".equals(arr[4+i]) ? 0.0 : Double.parseDouble(arr[4+i]); } double[] features = new double[categoryFeatures.length + otherFeatures.length]; System.arraycopy(categoryFeatures, 0, features, 0, categoryFeatures.length); System.arraycopy(otherFeatures, 0, features, categoryFeatures.length, otherFeatures.length); return new LabeledPoint(label, Vectors.dense(features)); }); s(dataCetegories.first()); //(0.0,[0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.789131,2.055555556,0.676470588,0.205882353,0.047058824,0.023529412,0.443783175,0.0,0.0,0.09077381,0.0,0.245831182,0.003883495,1.0,1.0,24.0,0.0,5424.0,170.0,8.0,0.152941176,0.079129575]) //특징 표준화 다시 JavaRDD<Vector> vectors = dataCetegories.map(lp->lp.features()); StandardScalerModel scalerCats = new StandardScaler(true, true).fit(JavaRDD.toRDD(vectors)); JavaRDD<LabeledPoint> scaledDataCats = dataCetegories.map(lp->new LabeledPoint(lp.label(), scalerCats.transform(lp.features()))); //비교 // s(dataCetegories.first().features()); //[0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.789131,2.055555556,0.676470588,0.205882353,0.047058824,0.023529412,0.443783175,0.0,0.0,0.09077381,0.0,0.245831182,0.003883495,1.0,1.0,24.0,0.0,5424.0,170.0,8.0,0.152941176,0.079129575] // s(scaledDataCats.first().features()); //[-0.2709990696925828,2.7207366564548514,-0.02326210589837061,-0.4464212047941535,-0.22052688457880879,-0.028494000387023734,-0.20418221057887365,-0.10189469097220732,-0.2016540523193296,-0.06487757239262681,-0.38181322324318134,-0.6807527904251456,-0.23272797709480803,-0.09914991930875496,1.137647336497678,-0.08193557169294771,1.0251398128933331,-0.05586356442541689,-0.4688932531289357,-0.3543053263079386,-0.3175352172363148,0.3384507982396541,0.0,0.828822173315322,-0.14726894334628504,0.22963982357813484,-0.14162596909880876,0.7902380499177364,0.7171947294529865,-0.29799681649642257,-0.2034625779299476,-0.03296720969690391,-0.04878112975579913,0.9400699751165439,-0.10869848852526258,-0.2788207823137022] return scaledDataCats; } private void proc2(JavaSparkContext sc){ /** * 추가적인 특징 */ JavaRDD<LabeledPoint> scaledDataCats = getScaledDataCats(sc); // 로지스틱 회귀 - 모델 다시 만들기 LogisticRegressionModel lrModel3 = s(LogisticRegressionWithSGD.train(JavaRDD.toRDD(scaledDataCats), numIterations)); //로지스틱 회귀 성능 평가 다시 하기 double lrTotalCorrect2 = scaledDataCats.mapToDouble(lp->{ return lrModel3.predict(lp.features()) == lp.label() ? 1.0 : 0.0; }).sum(); s("로지스틱 회귀 성능3:" + lrTotalCorrect2 / scaledDataCats.count()); //0.6657200811359026 //성능 향상 JavaRDD<Tuple2<Object, Object>> lrScoreAndLabels2 = scaledDataCats.map(lp -> new Tuple2<Object, Object>(lrModel3.predict(lp.features()), lp.label())); printAUC("로지스틱 회귀3 PR,ROC: " , lrScoreAndLabels2); // 로지스틱 회귀2 PR,ROC: 75.79640787676577, 66.54826844243996 <--------66까지 올라감. /** * AUC 0.88906 까지 올라간 경우 * www.kaggle.com/c/stumbleupon/leaderboard/private */ //TODO } private void proc3(JavaSparkContext sc){ /** * 정확한 형태의 데이터 사용 * - 나이브 베이즈 - 3열 카테고리 데이터만 사용한다(0.0과 1.0만) */ JavaRDD<String[]> records = getRecords(sc); Map<String, Long> categories = getCategories(records); int numCategories = s(categories.size()); //14 //-> 여기까지는 proc2와 같다. JavaRDD<LabeledPoint> dataNB = records.map(arr->{ int label = Integer.parseInt(arr[arr.length - 1]); int categoryIdx = categories.get(arr[3]).intValue(); //Long의 숫자가 int범위를 넘지 않는 한에서... double[] categoryFeatures = new double[numCategories]; categoryFeatures[categoryIdx] = 1.0; // categoryFeatures만 피처로 사용한다. return new LabeledPoint(label, Vectors.dense(categoryFeatures)); }); //모델 학습 s("나이브 베이즈 모델2"); NaiveBayesModel nbModelCats = s(NaiveBayes.train(JavaRDD.toRDD(dataNB))); //성능평가 double nbTotalCorrectCats = dataNB.mapToDouble(lp->{ return nbModelCats.predict(lp.features()) == lp.label() ? 1.0 : 0.0; }).sum(); s("나이브베이즈 성능2: " + nbTotalCorrectCats / dataNB.count()); //0.6096010818120352 //나이브 베이즈 JavaRDD<Tuple2<Object, Object>> nbScoreAndLabels2 = dataNB.map(lp -> { double score = nbModelCats.predict(lp.features()); score = score > 0.5 ? 1.0 : 0.0; return new Tuple2<Object, Object>(score, lp.label()); }); printAUC("나이브베이즈2 PR,ROC: ", nbScoreAndLabels2); //74.05222106704076, 60.513849415494455 <- 58%에 60%으로 올라갔다. } private void proc4(JavaSparkContext sc){ JavaRDD<LabeledPoint> scaledDataCats = getScaledDataCats(sc); //->여기까지는 proc2와 같다. scaledDataCats.cache(); /** * 반복 - numIterations */ // JavaRDD 내에서 RavaRDD.toRDD 를 실행할 수가 없다. // Exception msg : org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. // JavaRDD<Tuple2<String, Double>> iterResults = sc.parallelize(Arrays.asList(1,5,10,50)).map(numIterations->{ // LogisticRegressionModel model = trainWithParams(scaledDataCats, 0.0, numIterations, new SimpleUpdater(), 1.0); // return createMetrics("numIterations:"+numIterations, scaledDataCats, model); // }); // iterResults.foreach(tuple->s(tuple._1() + ", AUC: " + (tuple._2()*100))); // 다른 방법! (Arrays.asList(1,5,10,50)).parallelStream().forEach(numIterations->{ LogisticRegressionModel model = trainWithParams(scaledDataCats, 0.0, numIterations, new SimpleUpdater(), 1.0); Tuple2<String, Double> tuple = createMetrics("numIterations:"+numIterations, scaledDataCats, model); s(tuple._1() + ", AUC: " + (tuple._2()*100)); }); // 결과 // numIterations:1, AUC: 64.95198950299684 // numIterations:5, AUC: 66.6160962344358 // numIterations:10, AUC: 66.54826844243996 <--- 더 안좋아졌다. // numIterations:50, AUC: 66.81425454500737 /** * 반복 - stepSize */ final int numIterations = 10; (Arrays.asList(0.001, 0.01, 0.1, 1.0, 10.0)).parallelStream().forEach(stepSize->{ LogisticRegressionModel model = trainWithParams(scaledDataCats, 0.0, numIterations, new SimpleUpdater(), stepSize); Tuple2<String, Double> tuple = createMetrics("stepSize:"+stepSize, scaledDataCats, model); s(tuple._1() + ", AUC: " + (tuple._2()*100)); }); // 결과 // stepSize:0.001, AUC: 64.96588225098238 // stepSize:0.01, AUC: 64.96444027450548 // stepSize:0.1, AUC: 65.521065153621 // stepSize:1.0, AUC: 66.54826844243996 // stepSize:10.0, AUC: 61.922788527781535 <--- 더 안좋아졌다. /** * 규칙화 SquaredL2Updater - regParam */ (Arrays.asList(0.001, 0.01, 0.1, 1.0, 10.0)).parallelStream().forEach(regParam->{ LogisticRegressionModel model = trainWithParams(scaledDataCats, regParam, numIterations, new SquaredL2Updater(), 1.0); Tuple2<String, Double> tuple = createMetrics("L2 regParam:"+regParam, scaledDataCats, model); s(tuple._1() + ", AUC: " + (tuple._2()*100)); }); // 결과(Stream, parallelStream, forEach 이라서 순서가 섞인다.) // L2 regParam:1.0, AUC: 66.03745376525676 // L2 regParam:10.0, AUC: 35.32533843993077 <----- 더 안좋아졌다. // L2 regParam:0.1, AUC: 66.63378789506862 // L2 regParam:0.01, AUC: 66.5475474542015 // L2 regParam:0.001, AUC: 66.54826844243996 /** * 규칙화 L1Updater - regParam */ (Arrays.asList(0.001, 0.01, 0.1, 1.0, 10.0)).parallelStream().forEach(regParam->{ LogisticRegressionModel model = trainWithParams(scaledDataCats, regParam, numIterations, new L1Updater(), 1.0); Tuple2<String, Double> tuple = createMetrics("L1 regParam:"+regParam, scaledDataCats, model); s(tuple._1() + ", AUC: " + (tuple._2()*100)); }); // 결과(Stream, parallelStream, forEach 이라서 순서가 섞인다.) // L1 regParam:10.0, AUC: 50.0 <----- 더 안좋아졌다. // L1 regParam:0.1, AUC: 50.0 <----- 더 안좋아졌다. // L1 regParam:1.0, AUC: 50.0 <----- 더 안좋아졌다. // L1 regParam:0.001, AUC: 66.53221272973906 // L1 regParam:0.01, AUC: 66.46962582686739 // scaledDataCats.unpersist(); } private LogisticRegressionModel trainWithParams(JavaRDD<LabeledPoint> input, Double regParam, Integer numIterations, Updater updater, Double stepSize){ LogisticRegressionWithSGD lr = new LogisticRegressionWithSGD(); lr.optimizer().setNumIterations(numIterations).setUpdater(updater).setRegParam(regParam).setStepSize(stepSize); return lr.run(JavaRDD.toRDD(input)); //org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. } private Tuple2<String, Double> createMetrics(String label, JavaRDD<LabeledPoint> data, GeneralizedLinearModel model){ JavaRDD<Tuple2<Object, Object>> lrScoreAndLabels = data.map(lp->{ return new Tuple2<Object, Object>(model.predict(lp.features()), lp.label()); }); BinaryClassificationMetrics lrMetrics = new BinaryClassificationMetrics(JavaRDD.toRDD(lrScoreAndLabels)); return new Tuple2<String, Double>(label, lrMetrics.areaUnderROC()); } private void proc5(JavaSparkContext sc){ /** * 의사결정 트리 - maxDepth 변경하기 */ JavaRDD<LabeledPoint> data = getData(sc); //-> 여기까지는 proc1()과 같음. Arrays.asList(1,2,3,4,5,10,20).parallelStream().forEach(maxDepth->{ DecisionTreeModel model = trainDTWithParams(data, maxDepth, Entropy.instance()); //or Gini.instance() JavaRDD<Tuple2<Object, Object>> scoreAndLabels = data.map(lp->{ double score = model.predict(lp.features()); return new Tuple2<Object, Object>(score > 0.5 ? 1.0 : 0.0, lp.label()); }); BinaryClassificationMetrics metrics = new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels)); s("tree depth:" + maxDepth + ", AUC:" + metrics.areaUnderROC()); }); // 결과 // tree depth:1, AUC:0.5932683560677638 // tree depth:2, AUC:0.6168392183052838 // tree depth:3, AUC:0.6260699538655363 // tree depth:4, AUC:0.6363331299438932 // tree depth:5, AUC:0.6488371887050935 // tree depth:10, AUC:0.7625521856410764 // tree depth:20, AUC:0.9845371811804648 <---- 월등히 올라간다.(트리의 깊이가 깊어질수록 모델은 데이터 집합을 상당히 과도하게 변형시킬 수 있다.) // 불순물로 Gini.instance() 를 사용한 경우(별 차이는 없다.) // tree depth:1, AUC:0.5932683560677638 // tree depth:2, AUC:0.6168392183052838 // tree depth:3, AUC:0.6260699538655363 // tree depth:4, AUC:0.6363331299438932 // tree depth:5, AUC:0.6489164974113228 // tree depth:10, AUC:0.7837090914201374 // tree depth:20, AUC:0.9887069452906806 } private DecisionTreeModel trainDTWithParams(JavaRDD<LabeledPoint> input, Integer maxDepth, Impurity impurity){ return DecisionTree.train(JavaRDD.toRDD(input), Algo.Classification(), impurity, maxDepth); } private void proc6(JavaSparkContext sc){ /** * 나이브 베이즈 모델 - lambda */ JavaRDD<LabeledPoint> nbData = getNBData(sc); //-> 여기까지는 proc1() 과 같음. Arrays.asList(0.001, 0.01, 0.1, 1.0, 10.0).parallelStream().forEach(lambda->{ NaiveBayesModel model = trainNBWithParams(nbData, lambda); JavaRDD<Tuple2<Object, Object>> scoreAndLabels = nbData.map(lp->new Tuple2<Object, Object>(model.predict(lp.features()), lp.label())); BinaryClassificationMetrics metrics = new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels)); s("lambda " + lambda + ", AUC:" + metrics.areaUnderROC()); }); // 결과(별 차이 없음) // lambda 0.001, AUC:0.5839392440412702 // lambda 0.01, AUC:0.5839392440412702 // lambda 10.0, AUC:0.5843343968336832 // lambda 0.1, AUC:0.5839392440412702 } private NaiveBayesModel trainNBWithParams(JavaRDD<LabeledPoint> input, Double lambda){ NaiveBayes nb = new NaiveBayes(); nb.setLambda(lambda); return nb.run(JavaRDD.toRDD(input)); } private void proc7(JavaSparkContext sc){ /** * 교차검증 */ JavaRDD<LabeledPoint> scaledDataCats = getScaledDataCats(sc); //->여기까지는 proc2()와 같음. JavaRDD<LabeledPoint>[] trainTestSplit = scaledDataCats.randomSplit(new double[]{0.6, 0.4}, 123); JavaRDD<LabeledPoint> train = trainTestSplit[0]; JavaRDD<LabeledPoint> test = trainTestSplit[1]; train.cache(); test.cache(); final int numIterations = 10; Arrays.asList(0.0, 0.001, 0.0025, 0.005, 0.01).parallelStream().forEach(regParam->{ LogisticRegressionModel model = trainWithParams(train, regParam, numIterations, new SquaredL2Updater(), 1.0); Tuple2<String, Double> createMetrics = createMetrics("L2 regParam:"+regParam, test, model); s(createMetrics._1() + ", AUC:" + createMetrics._2()); }); // 결과 // L2 regParam:0.0, AUC:0.6612684244321665 // L2 regParam:0.001, AUC:0.6612684244321665 // L2 regParam:0.0025, AUC:0.6612684244321665 // L2 regParam:0.005, AUC:0.6612684244321665 // L2 regParam:0.01, AUC:0.6609319506771195 // train.unpersist(); test.unpersist(); } }
'Spark > Spark와 머신 러닝' 카테고리의 다른 글
Spark 시작하기19 - [Spark와 머신 러닝] 4장 스파크를 이용한 추천 엔진 구현 (0) | 2016.05.04 |
---|---|
Spark 시작하기18 - [Spark와 머신 러닝] 3장 스파크를 이용한 데이터 수집, 프로세싱, 준비2 (0) | 2016.05.04 |
Spark 시작하기05 - Exception (0) | 2016.04.03 |
Spark 시작하기04 - [Spark와 머신 러닝] 3장 스파크를 이용한 데이터 수집, 프로세싱, 준비 (0) | 2016.04.03 |
Spark 시작하기03 - [Spark와 머신 러닝] 1장 스파크의 시작과 구동 (0) | 2016.03.27 |