scala로 짜여진 소스코드를 java8 로 작성하였다.
package org.test.ch5;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Map;
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD;
import org.apache.spark.mllib.classification.NaiveBayes;
import org.apache.spark.mllib.classification.NaiveBayesModel;
import org.apache.spark.mllib.classification.SVMModel;
import org.apache.spark.mllib.classification.SVMWithSGD;
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;
import org.apache.spark.mllib.feature.StandardScaler;
import org.apache.spark.mllib.feature.StandardScalerModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
import org.apache.spark.mllib.optimization.L1Updater;
import org.apache.spark.mllib.optimization.SimpleUpdater;
import org.apache.spark.mllib.optimization.SquaredL2Updater;
import org.apache.spark.mllib.optimization.Updater;
import org.apache.spark.mllib.regression.GeneralizedLinearModel;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.stat.MultivariateStatisticalSummary;
import org.apache.spark.mllib.tree.DecisionTree;
import org.apache.spark.mllib.tree.configuration.Algo;
import org.apache.spark.mllib.tree.impurity.Entropy;
import org.apache.spark.mllib.tree.impurity.Impurity;
import org.apache.spark.mllib.tree.model.DecisionTreeModel;
import scala.Tuple2;
/**
* 5장 스파크를 이용한 분류 모델 구현
* https://www.kaggle.com/c/stumbleupon/data
* 에서 관련파일 다운로드(train.tsv)
*/
/**
5장 책 구성
1.데이터 파싱(앞의 4개 열은 텍스트이므로 제외)
2.모델 트레이닝(로지스틱 회귀, SVM, 나이브베이즈, 의사결정트리)
3-1.모델 정확도 측정
3-2.모델 성능 PR,ROC 측정
3-3.결론:너무 낮다....
4-1.데이터 파싱 -> 특징 표준화 : (값-평균)/sqrt(분산) = 특징 표준화된 값2
4-2.로지스틱회귀 : 모델 트레이닝 및 성능 측정
5-1.데이터 파싱 -> 3열의 텍스트 데이터(카테고리)의 종류를 파악해서, 차원 확대 + 나머지 열 데이터(+ 특징표준화)
5-2.로지스틱회귀 :모델 트레이닝 및 성능 측정
6-1.데이터 파싱 -> 3열의 텍스트 데이터(카테고리)의 종류를 파악해서, 차원 확대 -> 카테고리 열 하나만 사용
6-2.나이브베이즈 : 모델 트레이닝 및 성능 측정
7.1.데이터 파싱 -> 3열의 텍스트 데이터(카테고리)의 종류를 파악해서, 차원 확대 + 나머지 열 데이터(+ 특징표준화)
7-2.모델 트레이닝 매개변수 튜닝(numIterations, stepSize, regParam(SquaredL2Updater / L1Updater)
7-3.로지스틱회귀 : 모델 트레이닝 및 성능 측정
8-1.모델 트레이닝 매개변수 튜닝(maxDepth)
8-2.의사결정 트리 : 모델 트레이닝 및 성능 측정
9-1.모델 트레이닝 매개변수 튜닝(lambda)
9-2.나이브 베이즈 : 모델 트레이닝 및 성능 측정
*/
public class Ch5_JavaApp implements Serializable {
private static final long serialVersionUID = 154356342432L;
private static void s(){s("");}
private static <T> T s(T o){System.out.println(o);return o;}
final int maxTreeDepth = 5;
final int numIterations = 10;
public static void main(String...strings){
PropertyConfigurator.configure(System.getProperty("user.dir") + "/src/resources/log4j.properties");
Long start = System.currentTimeMillis();
Ch5_JavaApp c = new Ch5_JavaApp();
JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
c.proc1(sc);
c.proc1_1(sc);
c.proc2(sc);
c.proc3(sc);
c.proc4(sc);
c.proc5(sc);
c.proc6(sc);
c.proc7(sc);
s("\n" + (System.currentTimeMillis() - start) + " ms spend...");
sc.stop();
}
private JavaRDD<String[]> getRecords(JavaSparkContext sc){
// "url" "urlid" "boilerplate" "alchemy_category" "alchemy_category_score" "avglinksize" "commonlinkratio_1" "commonlinkratio_2" "commonlinkratio_3" "commonlinkratio_4" "compression_ratio" "embed_ratio" "framebased" "frameTagRatio" "hasDomainLink" "html_ratio" "image_ratio" "is_news" "lengthyLinkDomain" "linkwordscore" "news_front_page" "non_markup_alphanum_characters" "numberOfLinks" "numwords_in_url" "parametrizedLinkRatio" "spelling_errors_ratio" "label"
JavaRDD<String> rawData = sc.textFile(getClass().getResource("data/train_noheader.tsv").getFile());
JavaRDD<String[]> records = rawData.map(str -> {
return str.replaceAll("\"", "").split("\\t");
});
return records;
}
private JavaRDD<LabeledPoint> getData(JavaSparkContext sc){
JavaRDD<String[]> records = getRecords(sc);
// s(Arrays.toString(records.first()));
/**
* 데이터 집합에서 적절한 특징 추출 (중요함!!!!). 처음에 여기서 데이터를 잘못 쪼개서, 의사결정트리 성능이 100%가 나와버렸다.-_-;;
*/
JavaRDD<LabeledPoint> data = records.map(arr->{
// 8열이 있다면,
// 4,5,6열의 데이터를 features로 보고
// 7열의 데이터를 label로
// 0 1 2 3 4 ... 20 21
// X X X X O O O X
// X X X X O O O X
// X X X X O O O X
int label = Integer.parseInt(arr[arr.length - 1]);
double[] features = new double[arr.length - 5];
for(int i = 0 ; i < arr.length - 5 ; i++){
//유실된 데이터 보정
features[i] = "?".equals(arr[4+i]) ? 0.0 : Double.parseDouble(arr[4+i]);
}
return new LabeledPoint(label, Vectors.dense(features));
});
return data;
}
private JavaRDD<LabeledPoint> getNBData(JavaSparkContext sc){
JavaRDD<String[]> records = getRecords(sc);
//나이브베이즈 모델을 위해(음수는 지원하지 않는다.)
JavaRDD<LabeledPoint> nbData = records.map(arr->{
int label = Integer.parseInt(arr[arr.length - 1]);
double[] features = new double[arr.length - 5];
for(int i = 0 ; i < arr.length - 5 ; i++){
if("?".equals(arr[4+i])){ //유실된 데이터 보정
features[i] = 0.0;
}else{
double d = Double.parseDouble(arr[4+i]);
features[i] = d < 0.0 ? 1.0 : d;
}
}
return new LabeledPoint(label, Vectors.dense(features));
});
return nbData;
}
private void printAUC(String subject, JavaRDD<Tuple2<Object, Object>> scoreAndLabels){
BinaryClassificationMetrics lrMetrics = new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels));
s(subject + (lrMetrics.areaUnderPR()*100.0) + ", " + (lrMetrics.areaUnderROC()*100.0));
}
private void proc1(JavaSparkContext sc){
JavaRDD<LabeledPoint> data = getData(sc);
data.cache();
s("length:"+data.first().features().toArray().length); //22
// data.foreach(d->s(d)); //엑셀로 정리하기 위해 전체 데이터 출력해봄.
JavaRDD<LabeledPoint> nbData = getNBData(sc);
nbData.cache();
s(data.count()); //7395
/**
* 분류 모델 트레이닝
*/
s("로지스틱 회귀");
LogisticRegressionModel lrModel = s(LogisticRegressionWithSGD.train(JavaRDD.toRDD(data), numIterations));
// org.apache.spark.mllib.classification.LogisticRegressionModel: intercept = 0.0, numFeatures = 22, numClasses = 2, threshold = 0.5
s("서포트 벡터 머신 모델");
SVMModel svmModel = s(SVMWithSGD.train(JavaRDD.toRDD(data), numIterations));
// org.apache.spark.mllib.classification.SVMModel: intercept = 0.0, numFeatures = 22, numClasses = 2, threshold = 0.0
s("나이브 베이즈 모델");
NaiveBayesModel nbModel = s(NaiveBayes.train(JavaRDD.toRDD(nbData)));
// org.apache.spark.mllib.classification.NaiveBayesModel@3c25cfe1
s("의사결정 트리 모델");
DecisionTreeModel dtModel = s(DecisionTree.train(JavaRDD.toRDD(data), Algo.Classification(), Entropy.instance(), maxTreeDepth)); //or Gini.instance()
// DecisionTreeModel classifier of depth 5 with 61 nodes
/**
* 분류 모델 사용
*/
//로지스틱
LabeledPoint dataPoint = data.first();
double prediction = s(lrModel.predict(dataPoint.features())); //예측 : 1.0
s("dataPoint.features()" + dataPoint.features());
s("dataPoint.label()" + dataPoint.label()); //0.0 <- 실제 데이터 0.0, 잘못된 예측(1.0)
JavaRDD<Double> predictions = lrModel.predict(data.map(lp->lp.features()));
s(predictions.take(10)); //[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
s(svmModel.predict(dataPoint.features())); //예측 : 1.0
s(nbModel.predict(nbData.first().features())); //예측 : 1.0
s(dtModel.predict(dataPoint.features())); //예측 : 0.0
/**
* 분류 모델의 성능 평가
*/
//로지스틱 회귀 성능 평가
double lrTotalCorrect = data.mapToDouble(lp->{
return lrModel.predict(lp.features()) == lp.label() ? 1.0 : 0.0;
}).sum();
//SVM 성능 평가
double svmTotalCorrect = data.mapToDouble(lp->{
return svmModel.predict(lp.features()) == lp.label() ? 1.0 : 0.0;
}).sum();
//나이브베이즈 성능 평가
double nbTotalCorrect = nbData.mapToDouble(lp->{
return nbModel.predict(lp.features()) == lp.label() ? 1.0 : 0.0;
}).sum();
//의사결정트리 성능 평가
double dtTotalCorrect = data.mapToDouble(lp->{
double score = dtModel.predict(lp.features());
double predicted = score > 0.5 ? 1.0 : 0.0; //score가 1.0 또는 0.0 이 나오므로 굳이 0.5로 구분할 필요는 없다.
return predicted == lp.label() ? 1.0 : 0.0;
}).sum();
double numData = data.count();
s("로지스틱 회귀 성능:" + lrTotalCorrect / numData); //0.5146720757268425 <- 분류 정확도 50% -_-;;
s("svm 성능:" + svmTotalCorrect / numData); //0.5146720757268425
s("나이브베이즈 성능:" + nbTotalCorrect / numData); //0.5803921568627451
s("의사결정트리 성능:" + dtTotalCorrect / numData); //0.6482758620689655
/**
* 다른 성능 평가 방법
* PR, ROC
*/
//로지스틱
JavaRDD<Tuple2<Object, Object>> lrScoreAndLabels = data.map(lp -> new Tuple2<Object, Object>(lrModel.predict(lp.features()), lp.label()));
printAUC("로지스틱 회귀 PR,ROC: ", lrScoreAndLabels);
//SVM
JavaRDD<Tuple2<Object, Object>> svmScoreAndLabels = data.map(lp -> new Tuple2<Object, Object>(svmModel.predict(lp.features()), lp.label()));
printAUC("SVM PR,ROC: ", svmScoreAndLabels);
//나이브 베이즈
JavaRDD<Tuple2<Object, Object>> nbScoreAndLabels = nbData.map(lp -> {
return new Tuple2<Object, Object>(nbModel.predict(lp.features()) > 0.5 ? 1.0 : 0.0, lp.label());
});
printAUC("나이브베이즈 PR,ROC: ", nbScoreAndLabels);
//의사결정 트리
JavaRDD<Tuple2<Object, Object>> dtScoreAndLabels = data.map(lp -> {
return new Tuple2<Object, Object>(dtModel.predict(lp.features()) > 0.5 ? 1.0 : 0.0, lp.label());
});
printAUC("의사결정트리 PR,ROC: ", dtScoreAndLabels);
// 결과
// 로지스틱 회귀 PR,ROC: 75.67586293858841, 50.14181143280931
// SVM PR,ROC: 75.67586293858841, 50.14181143280931
// 나이브베이즈 PR,ROC: 68.12849819263486, 58.39392440412702
// 의사결정트리 PR,ROC: 74.30805993331198, 64.88371887050936
data.unpersist();
nbData.unpersist();
}
private void proc1_1(JavaSparkContext sc){
/**
* 모델의 성능 향상!!!
* 특징 표준화 - 로지스틱 회귀 모델에만 효과 있음.
*/
JavaRDD<LabeledPoint> data = getData(sc);
JavaRDD<Vector> vectors = data.map(lp->lp.features());
RowMatrix matrix = new RowMatrix(JavaRDD.toRDD(vectors));
MultivariateStatisticalSummary matrixSummary = matrix.computeColumnSummaryStatistics();
Vector mean = s(matrixSummary.mean()); //평균값
Vector min = s(matrixSummary.min()); //최소값
Vector max = s(matrixSummary.max()); //최대값
Vector variance = s(matrixSummary.variance()); //분산
Vector numNonzeros = s(matrixSummary.numNonzeros());//0이 아닌 값들의 개수?
StandardScalerModel scaler = new StandardScaler(true, true).fit(JavaRDD.toRDD(vectors));
JavaRDD<LabeledPoint> scaledData = data.map(lp->new LabeledPoint(lp.label(), scaler.transform(lp.features())));
s(data.first().features());
s(scaledData.first().features());
// (x-u)/sqrt(variance)
// (값-평균)/sqrt(분산) = 특징 표준화된 값2
s((data.first().features().toArray()[0] - mean.toArray()[0])/Math.sqrt(variance.toArray()[0])); //1.137647336497678
s((0.789131 - 0.4122)/Math.sqrt(0.1097)); //1.138042662130702 //같다고 봐야지;;;;
// 로지스틱 회귀 - 모델 다시 만들기
LogisticRegressionModel lrModel2 = s(LogisticRegressionWithSGD.train(JavaRDD.toRDD(scaledData), numIterations));
//로지스틱 회귀 성능 평가 다시 하기
double lrTotalCorrect2 = scaledData.mapToDouble(lp->{
return lrModel2.predict(lp.features()) == lp.label() ? 1.0 : 0.0;
}).sum();
s("로지스틱 회귀 성능2:" + lrTotalCorrect2 / data.count()); //0.6204192021636241 <- 10% 향상!!!
JavaRDD<Tuple2<Object, Object>> lrScoreAndLabels2 = scaledData.map(lp -> new Tuple2<Object, Object>(lrModel2.predict(lp.features()), lp.label()));
printAUC("로지스틱 회귀2 PR,ROC: ", lrScoreAndLabels2);
// 로지스틱 회귀2 PR,ROC: 72.72540762713375, 61.96629669112512 <-ROC 10% 향상
}
private Map<String, Long> getCategories(JavaRDD<String[]> records){
// 3열인, alchemy_category_score 컬럼의 데이터를 가져온다.
Map<String, Long> categories = records.map(arr->arr[3]).distinct().zipWithIndex().collectAsMap();
return categories;
}
private JavaRDD<LabeledPoint> getScaledDataCats(JavaSparkContext sc){
JavaRDD<String[]> records = getRecords(sc);
Map<String, Long> categories = getCategories(records);
int numCategories = s(categories.size()); //14
// s(categories); //{religion=13, culture_politics=4, science_technology=8, arts_entertainment=10, weather=2, health=0, recreation=3, gaming=7, ?=11, unknown=5, sports=12, computer_internet=6, business=1, law_crime=9}
JavaRDD<LabeledPoint> dataCetegories = records.map(arr->{
int label = Integer.parseInt(arr[arr.length - 1]);
int categoryIdx = categories.get(arr[3]).intValue(); //Long의 숫자가 int범위를 넘지 않는 한에서...
double[] categoryFeatures = new double[numCategories];
categoryFeatures[categoryIdx] = 1.0;
double[] otherFeatures = new double[arr.length - 5];
for(int i = 0 ; i < arr.length - 5 ; i++){
//유실된 데이터 보정
otherFeatures[i] = "?".equals(arr[4+i]) ? 0.0 : Double.parseDouble(arr[4+i]);
}
double[] features = new double[categoryFeatures.length + otherFeatures.length];
System.arraycopy(categoryFeatures, 0, features, 0, categoryFeatures.length);
System.arraycopy(otherFeatures, 0, features, categoryFeatures.length, otherFeatures.length);
return new LabeledPoint(label, Vectors.dense(features));
});
s(dataCetegories.first()); //(0.0,[0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.789131,2.055555556,0.676470588,0.205882353,0.047058824,0.023529412,0.443783175,0.0,0.0,0.09077381,0.0,0.245831182,0.003883495,1.0,1.0,24.0,0.0,5424.0,170.0,8.0,0.152941176,0.079129575])
//특징 표준화 다시
JavaRDD<Vector> vectors = dataCetegories.map(lp->lp.features());
StandardScalerModel scalerCats = new StandardScaler(true, true).fit(JavaRDD.toRDD(vectors));
JavaRDD<LabeledPoint> scaledDataCats = dataCetegories.map(lp->new LabeledPoint(lp.label(), scalerCats.transform(lp.features())));
//비교
// s(dataCetegories.first().features()); //[0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.789131,2.055555556,0.676470588,0.205882353,0.047058824,0.023529412,0.443783175,0.0,0.0,0.09077381,0.0,0.245831182,0.003883495,1.0,1.0,24.0,0.0,5424.0,170.0,8.0,0.152941176,0.079129575]
// s(scaledDataCats.first().features()); //[-0.2709990696925828,2.7207366564548514,-0.02326210589837061,-0.4464212047941535,-0.22052688457880879,-0.028494000387023734,-0.20418221057887365,-0.10189469097220732,-0.2016540523193296,-0.06487757239262681,-0.38181322324318134,-0.6807527904251456,-0.23272797709480803,-0.09914991930875496,1.137647336497678,-0.08193557169294771,1.0251398128933331,-0.05586356442541689,-0.4688932531289357,-0.3543053263079386,-0.3175352172363148,0.3384507982396541,0.0,0.828822173315322,-0.14726894334628504,0.22963982357813484,-0.14162596909880876,0.7902380499177364,0.7171947294529865,-0.29799681649642257,-0.2034625779299476,-0.03296720969690391,-0.04878112975579913,0.9400699751165439,-0.10869848852526258,-0.2788207823137022]
return scaledDataCats;
}
private void proc2(JavaSparkContext sc){
/**
* 추가적인 특징
*/
JavaRDD<LabeledPoint> scaledDataCats = getScaledDataCats(sc);
// 로지스틱 회귀 - 모델 다시 만들기
LogisticRegressionModel lrModel3 = s(LogisticRegressionWithSGD.train(JavaRDD.toRDD(scaledDataCats), numIterations));
//로지스틱 회귀 성능 평가 다시 하기
double lrTotalCorrect2 = scaledDataCats.mapToDouble(lp->{
return lrModel3.predict(lp.features()) == lp.label() ? 1.0 : 0.0;
}).sum();
s("로지스틱 회귀 성능3:" + lrTotalCorrect2 / scaledDataCats.count()); //0.6657200811359026 //성능 향상
JavaRDD<Tuple2<Object, Object>> lrScoreAndLabels2 = scaledDataCats.map(lp -> new Tuple2<Object, Object>(lrModel3.predict(lp.features()), lp.label()));
printAUC("로지스틱 회귀3 PR,ROC: " , lrScoreAndLabels2);
// 로지스틱 회귀2 PR,ROC: 75.79640787676577, 66.54826844243996 <--------66까지 올라감.
/**
* AUC 0.88906 까지 올라간 경우
* www.kaggle.com/c/stumbleupon/leaderboard/private
*/
//TODO
}
private void proc3(JavaSparkContext sc){
/**
* 정확한 형태의 데이터 사용
* - 나이브 베이즈 - 3열 카테고리 데이터만 사용한다(0.0과 1.0만)
*/
JavaRDD<String[]> records = getRecords(sc);
Map<String, Long> categories = getCategories(records);
int numCategories = s(categories.size()); //14
//-> 여기까지는 proc2와 같다.
JavaRDD<LabeledPoint> dataNB = records.map(arr->{
int label = Integer.parseInt(arr[arr.length - 1]);
int categoryIdx = categories.get(arr[3]).intValue(); //Long의 숫자가 int범위를 넘지 않는 한에서...
double[] categoryFeatures = new double[numCategories];
categoryFeatures[categoryIdx] = 1.0;
// categoryFeatures만 피처로 사용한다.
return new LabeledPoint(label, Vectors.dense(categoryFeatures));
});
//모델 학습
s("나이브 베이즈 모델2");
NaiveBayesModel nbModelCats = s(NaiveBayes.train(JavaRDD.toRDD(dataNB)));
//성능평가
double nbTotalCorrectCats = dataNB.mapToDouble(lp->{
return nbModelCats.predict(lp.features()) == lp.label() ? 1.0 : 0.0;
}).sum();
s("나이브베이즈 성능2: " + nbTotalCorrectCats / dataNB.count()); //0.6096010818120352
//나이브 베이즈
JavaRDD<Tuple2<Object, Object>> nbScoreAndLabels2 = dataNB.map(lp -> {
double score = nbModelCats.predict(lp.features());
score = score > 0.5 ? 1.0 : 0.0;
return new Tuple2<Object, Object>(score, lp.label());
});
printAUC("나이브베이즈2 PR,ROC: ", nbScoreAndLabels2);
//74.05222106704076, 60.513849415494455 <- 58%에 60%으로 올라갔다.
}
private void proc4(JavaSparkContext sc){
JavaRDD<LabeledPoint> scaledDataCats = getScaledDataCats(sc);
//->여기까지는 proc2와 같다.
scaledDataCats.cache();
/**
* 반복 - numIterations
*/
// JavaRDD 내에서 RavaRDD.toRDD 를 실행할 수가 없다.
// Exception msg : org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
// JavaRDD<Tuple2<String, Double>> iterResults = sc.parallelize(Arrays.asList(1,5,10,50)).map(numIterations->{
// LogisticRegressionModel model = trainWithParams(scaledDataCats, 0.0, numIterations, new SimpleUpdater(), 1.0);
// return createMetrics("numIterations:"+numIterations, scaledDataCats, model);
// });
// iterResults.foreach(tuple->s(tuple._1() + ", AUC: " + (tuple._2()*100)));
// 다른 방법!
(Arrays.asList(1,5,10,50)).parallelStream().forEach(numIterations->{
LogisticRegressionModel model = trainWithParams(scaledDataCats, 0.0, numIterations, new SimpleUpdater(), 1.0);
Tuple2<String, Double> tuple = createMetrics("numIterations:"+numIterations, scaledDataCats, model);
s(tuple._1() + ", AUC: " + (tuple._2()*100));
});
// 결과
// numIterations:1, AUC: 64.95198950299684
// numIterations:5, AUC: 66.6160962344358
// numIterations:10, AUC: 66.54826844243996 <--- 더 안좋아졌다.
// numIterations:50, AUC: 66.81425454500737
/**
* 반복 - stepSize
*/
final int numIterations = 10;
(Arrays.asList(0.001, 0.01, 0.1, 1.0, 10.0)).parallelStream().forEach(stepSize->{
LogisticRegressionModel model = trainWithParams(scaledDataCats, 0.0, numIterations, new SimpleUpdater(), stepSize);
Tuple2<String, Double> tuple = createMetrics("stepSize:"+stepSize, scaledDataCats, model);
s(tuple._1() + ", AUC: " + (tuple._2()*100));
});
// 결과
// stepSize:0.001, AUC: 64.96588225098238
// stepSize:0.01, AUC: 64.96444027450548
// stepSize:0.1, AUC: 65.521065153621
// stepSize:1.0, AUC: 66.54826844243996
// stepSize:10.0, AUC: 61.922788527781535 <--- 더 안좋아졌다.
/**
* 규칙화 SquaredL2Updater - regParam
*/
(Arrays.asList(0.001, 0.01, 0.1, 1.0, 10.0)).parallelStream().forEach(regParam->{
LogisticRegressionModel model = trainWithParams(scaledDataCats, regParam, numIterations, new SquaredL2Updater(), 1.0);
Tuple2<String, Double> tuple = createMetrics("L2 regParam:"+regParam, scaledDataCats, model);
s(tuple._1() + ", AUC: " + (tuple._2()*100));
});
// 결과(Stream, parallelStream, forEach 이라서 순서가 섞인다.)
// L2 regParam:1.0, AUC: 66.03745376525676
// L2 regParam:10.0, AUC: 35.32533843993077 <----- 더 안좋아졌다.
// L2 regParam:0.1, AUC: 66.63378789506862
// L2 regParam:0.01, AUC: 66.5475474542015
// L2 regParam:0.001, AUC: 66.54826844243996
/**
* 규칙화 L1Updater - regParam
*/
(Arrays.asList(0.001, 0.01, 0.1, 1.0, 10.0)).parallelStream().forEach(regParam->{
LogisticRegressionModel model = trainWithParams(scaledDataCats, regParam, numIterations, new L1Updater(), 1.0);
Tuple2<String, Double> tuple = createMetrics("L1 regParam:"+regParam, scaledDataCats, model);
s(tuple._1() + ", AUC: " + (tuple._2()*100));
});
// 결과(Stream, parallelStream, forEach 이라서 순서가 섞인다.)
// L1 regParam:10.0, AUC: 50.0 <----- 더 안좋아졌다.
// L1 regParam:0.1, AUC: 50.0 <----- 더 안좋아졌다.
// L1 regParam:1.0, AUC: 50.0 <----- 더 안좋아졌다.
// L1 regParam:0.001, AUC: 66.53221272973906
// L1 regParam:0.01, AUC: 66.46962582686739
//
scaledDataCats.unpersist();
}
private LogisticRegressionModel trainWithParams(JavaRDD<LabeledPoint> input, Double regParam, Integer numIterations, Updater updater, Double stepSize){
LogisticRegressionWithSGD lr = new LogisticRegressionWithSGD();
lr.optimizer().setNumIterations(numIterations).setUpdater(updater).setRegParam(regParam).setStepSize(stepSize);
return lr.run(JavaRDD.toRDD(input)); //org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
}
private Tuple2<String, Double> createMetrics(String label, JavaRDD<LabeledPoint> data, GeneralizedLinearModel model){
JavaRDD<Tuple2<Object, Object>> lrScoreAndLabels = data.map(lp->{
return new Tuple2<Object, Object>(model.predict(lp.features()), lp.label());
});
BinaryClassificationMetrics lrMetrics = new BinaryClassificationMetrics(JavaRDD.toRDD(lrScoreAndLabels));
return new Tuple2<String, Double>(label, lrMetrics.areaUnderROC());
}
private void proc5(JavaSparkContext sc){
/**
* 의사결정 트리 - maxDepth 변경하기
*/
JavaRDD<LabeledPoint> data = getData(sc);
//-> 여기까지는 proc1()과 같음.
Arrays.asList(1,2,3,4,5,10,20).parallelStream().forEach(maxDepth->{
DecisionTreeModel model = trainDTWithParams(data, maxDepth, Entropy.instance()); //or Gini.instance()
JavaRDD<Tuple2<Object, Object>> scoreAndLabels = data.map(lp->{
double score = model.predict(lp.features());
return new Tuple2<Object, Object>(score > 0.5 ? 1.0 : 0.0, lp.label());
});
BinaryClassificationMetrics metrics = new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels));
s("tree depth:" + maxDepth + ", AUC:" + metrics.areaUnderROC());
});
// 결과
// tree depth:1, AUC:0.5932683560677638
// tree depth:2, AUC:0.6168392183052838
// tree depth:3, AUC:0.6260699538655363
// tree depth:4, AUC:0.6363331299438932
// tree depth:5, AUC:0.6488371887050935
// tree depth:10, AUC:0.7625521856410764
// tree depth:20, AUC:0.9845371811804648 <---- 월등히 올라간다.(트리의 깊이가 깊어질수록 모델은 데이터 집합을 상당히 과도하게 변형시킬 수 있다.)
// 불순물로 Gini.instance() 를 사용한 경우(별 차이는 없다.)
// tree depth:1, AUC:0.5932683560677638
// tree depth:2, AUC:0.6168392183052838
// tree depth:3, AUC:0.6260699538655363
// tree depth:4, AUC:0.6363331299438932
// tree depth:5, AUC:0.6489164974113228
// tree depth:10, AUC:0.7837090914201374
// tree depth:20, AUC:0.9887069452906806
}
private DecisionTreeModel trainDTWithParams(JavaRDD<LabeledPoint> input, Integer maxDepth, Impurity impurity){
return DecisionTree.train(JavaRDD.toRDD(input), Algo.Classification(), impurity, maxDepth);
}
private void proc6(JavaSparkContext sc){
/**
* 나이브 베이즈 모델 - lambda
*/
JavaRDD<LabeledPoint> nbData = getNBData(sc);
//-> 여기까지는 proc1() 과 같음.
Arrays.asList(0.001, 0.01, 0.1, 1.0, 10.0).parallelStream().forEach(lambda->{
NaiveBayesModel model = trainNBWithParams(nbData, lambda);
JavaRDD<Tuple2<Object, Object>> scoreAndLabels = nbData.map(lp->new Tuple2<Object, Object>(model.predict(lp.features()), lp.label()));
BinaryClassificationMetrics metrics = new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels));
s("lambda " + lambda + ", AUC:" + metrics.areaUnderROC());
});
// 결과(별 차이 없음)
// lambda 0.001, AUC:0.5839392440412702
// lambda 0.01, AUC:0.5839392440412702
// lambda 10.0, AUC:0.5843343968336832
// lambda 0.1, AUC:0.5839392440412702
}
private NaiveBayesModel trainNBWithParams(JavaRDD<LabeledPoint> input, Double lambda){
NaiveBayes nb = new NaiveBayes();
nb.setLambda(lambda);
return nb.run(JavaRDD.toRDD(input));
}
private void proc7(JavaSparkContext sc){
/**
* 교차검증
*/
JavaRDD<LabeledPoint> scaledDataCats = getScaledDataCats(sc);
//->여기까지는 proc2()와 같음.
JavaRDD<LabeledPoint>[] trainTestSplit = scaledDataCats.randomSplit(new double[]{0.6, 0.4}, 123);
JavaRDD<LabeledPoint> train = trainTestSplit[0];
JavaRDD<LabeledPoint> test = trainTestSplit[1];
train.cache();
test.cache();
final int numIterations = 10;
Arrays.asList(0.0, 0.001, 0.0025, 0.005, 0.01).parallelStream().forEach(regParam->{
LogisticRegressionModel model = trainWithParams(train, regParam, numIterations, new SquaredL2Updater(), 1.0);
Tuple2<String, Double> createMetrics = createMetrics("L2 regParam:"+regParam, test, model);
s(createMetrics._1() + ", AUC:" + createMetrics._2());
});
// 결과
// L2 regParam:0.0, AUC:0.6612684244321665
// L2 regParam:0.001, AUC:0.6612684244321665
// L2 regParam:0.0025, AUC:0.6612684244321665
// L2 regParam:0.005, AUC:0.6612684244321665
// L2 regParam:0.01, AUC:0.6609319506771195
//
train.unpersist();
test.unpersist();
}
}
'Spark > Spark와 머신 러닝' 카테고리의 다른 글
| Spark 시작하기19 - [Spark와 머신 러닝] 4장 스파크를 이용한 추천 엔진 구현 (0) | 2016.05.04 |
|---|---|
| Spark 시작하기18 - [Spark와 머신 러닝] 3장 스파크를 이용한 데이터 수집, 프로세싱, 준비2 (0) | 2016.05.04 |
| Spark 시작하기05 - Exception (0) | 2016.04.03 |
| Spark 시작하기04 - [Spark와 머신 러닝] 3장 스파크를 이용한 데이터 수집, 프로세싱, 준비 (0) | 2016.04.03 |
| Spark 시작하기03 - [Spark와 머신 러닝] 1장 스파크의 시작과 구동 (0) | 2016.03.27 |