본문 바로가기
Spark/Spark와 머신 러닝

Spark 시작하기20 - [Spark와 머신 러닝] 5장 스파크를 이용한 분류 모델 구현

by java개발자 2016. 5. 7.

scala로 짜여진 소스코드를 java8 로 작성하였다.

package org.test.ch5;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Map;

import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD;
import org.apache.spark.mllib.classification.NaiveBayes;
import org.apache.spark.mllib.classification.NaiveBayesModel;
import org.apache.spark.mllib.classification.SVMModel;
import org.apache.spark.mllib.classification.SVMWithSGD;
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;
import org.apache.spark.mllib.feature.StandardScaler;
import org.apache.spark.mllib.feature.StandardScalerModel;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
import org.apache.spark.mllib.optimization.L1Updater;
import org.apache.spark.mllib.optimization.SimpleUpdater;
import org.apache.spark.mllib.optimization.SquaredL2Updater;
import org.apache.spark.mllib.optimization.Updater;
import org.apache.spark.mllib.regression.GeneralizedLinearModel;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.stat.MultivariateStatisticalSummary;
import org.apache.spark.mllib.tree.DecisionTree;
import org.apache.spark.mllib.tree.configuration.Algo;
import org.apache.spark.mllib.tree.impurity.Entropy;
import org.apache.spark.mllib.tree.impurity.Impurity;
import org.apache.spark.mllib.tree.model.DecisionTreeModel;

import scala.Tuple2;
/**
 * 5장 스파크를 이용한 분류 모델 구현
 * https://www.kaggle.com/c/stumbleupon/data
 * 에서 관련파일 다운로드(train.tsv)
 */
/**
	5장 책 구성
	1.데이터 파싱(앞의 4개 열은 텍스트이므로 제외)
	2.모델 트레이닝(로지스틱 회귀, SVM, 나이브베이즈, 의사결정트리)
	3-1.모델 정확도 측정
	3-2.모델 성능 PR,ROC 측정
	3-3.결론:너무 낮다....
	
	4-1.데이터 파싱 -> 특징 표준화 : (값-평균)/sqrt(분산) = 특징 표준화된 값2
	4-2.로지스틱회귀 : 모델 트레이닝 및 성능 측정
	5-1.데이터 파싱 -> 3열의 텍스트 데이터(카테고리)의 종류를 파악해서, 차원 확대 + 나머지 열 데이터(+ 특징표준화)
	5-2.로지스틱회귀 :모델 트레이닝 및 성능 측정
	6-1.데이터 파싱 -> 3열의 텍스트 데이터(카테고리)의 종류를 파악해서, 차원 확대 -> 카테고리 열 하나만 사용
	6-2.나이브베이즈 : 모델 트레이닝 및 성능 측정
	7.1.데이터 파싱 -> 3열의 텍스트 데이터(카테고리)의 종류를 파악해서, 차원 확대 + 나머지 열 데이터(+ 특징표준화)
	7-2.모델 트레이닝 매개변수 튜닝(numIterations, stepSize, regParam(SquaredL2Updater / L1Updater)
	7-3.로지스틱회귀 : 모델 트레이닝 및 성능 측정
	8-1.모델 트레이닝 매개변수 튜닝(maxDepth)
	8-2.의사결정 트리 : 모델 트레이닝 및 성능 측정
	9-1.모델 트레이닝 매개변수 튜닝(lambda)
	9-2.나이브 베이즈 : 모델 트레이닝 및 성능 측정
	
 */

public class Ch5_JavaApp implements Serializable {

	private static final long serialVersionUID = 154356342432L;
	private static void s(){s("");}
	private static <T> T s(T o){System.out.println(o);return o;}

	final int maxTreeDepth = 5;
	final int numIterations = 10;
	
	public static void main(String...strings){
		PropertyConfigurator.configure(System.getProperty("user.dir") + "/src/resources/log4j.properties");
		
		Long start = System.currentTimeMillis();
		Ch5_JavaApp c = new Ch5_JavaApp();
		JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
		c.proc1(sc);
		c.proc1_1(sc);
		c.proc2(sc);
		c.proc3(sc);
		c.proc4(sc);
		c.proc5(sc);
		c.proc6(sc);
		c.proc7(sc);
		
		s("\n" + (System.currentTimeMillis() - start) + " ms spend...");
		sc.stop();
	}
	private JavaRDD<String[]> getRecords(JavaSparkContext sc){
//		"url"	"urlid"	"boilerplate"	"alchemy_category"	"alchemy_category_score"	"avglinksize"	"commonlinkratio_1"	"commonlinkratio_2"	"commonlinkratio_3"	"commonlinkratio_4"	"compression_ratio"	"embed_ratio"	"framebased"	"frameTagRatio"	"hasDomainLink"	"html_ratio"	"image_ratio"	"is_news"	"lengthyLinkDomain"	"linkwordscore"	"news_front_page"	"non_markup_alphanum_characters"	"numberOfLinks"	"numwords_in_url"	"parametrizedLinkRatio"	"spelling_errors_ratio"	"label"
		JavaRDD<String> rawData = sc.textFile(getClass().getResource("data/train_noheader.tsv").getFile());
		JavaRDD<String[]> records = rawData.map(str -> {
			return str.replaceAll("\"", "").split("\\t");
		});
		return records;
	}
	private JavaRDD<LabeledPoint> getData(JavaSparkContext sc){
		JavaRDD<String[]> records = getRecords(sc);
//		s(Arrays.toString(records.first()));
		
		/**
		 * 데이터 집합에서 적절한 특징 추출 (중요함!!!!). 처음에 여기서 데이터를 잘못 쪼개서, 의사결정트리 성능이 100%가 나와버렸다.-_-;;
		 */
		JavaRDD<LabeledPoint> data = records.map(arr->{
//			8열이 있다면,
//			4,5,6열의 데이터를 features로 보고
//			7열의 데이터를 label로
//			0	1	2	3	4	...	20	21
//			X	X	X	X	O	O	O	X
//			X	X	X	X	O	O	O	X
//			X	X	X	X	O	O	O	X
			int label = Integer.parseInt(arr[arr.length - 1]);

			double[] features = new double[arr.length - 5];
			for(int i = 0 ; i < arr.length - 5 ; i++){
				//유실된 데이터 보정
				features[i] = "?".equals(arr[4+i]) ? 0.0 : Double.parseDouble(arr[4+i]); 
			}
			return new LabeledPoint(label, Vectors.dense(features));
		});
		return data;
	}
	private JavaRDD<LabeledPoint> getNBData(JavaSparkContext sc){
		JavaRDD<String[]> records = getRecords(sc);
		
		//나이브베이즈 모델을 위해(음수는 지원하지 않는다.)
		JavaRDD<LabeledPoint> nbData = records.map(arr->{
			int label = Integer.parseInt(arr[arr.length - 1]);
			
			double[] features = new double[arr.length - 5];
			for(int i = 0 ; i < arr.length - 5 ; i++){
				if("?".equals(arr[4+i])){	//유실된 데이터 보정
					features[i] = 0.0;
				}else{
					double d = Double.parseDouble(arr[4+i]);
					features[i] = d < 0.0 ? 1.0 : d;
				}
			}
			return new LabeledPoint(label, Vectors.dense(features));
		});
		return nbData;
	}
	private void printAUC(String subject, JavaRDD<Tuple2<Object, Object>> scoreAndLabels){
		BinaryClassificationMetrics lrMetrics = new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels));
		s(subject + (lrMetrics.areaUnderPR()*100.0) + ", " + (lrMetrics.areaUnderROC()*100.0));
	}
	private void proc1(JavaSparkContext sc){
		JavaRDD<LabeledPoint> data = getData(sc);
		data.cache();
		s("length:"+data.first().features().toArray().length);	//22
//		data.foreach(d->s(d));		//엑셀로 정리하기 위해 전체 데이터 출력해봄.
		
		JavaRDD<LabeledPoint> nbData = getNBData(sc);
		nbData.cache();
		s(data.count());	//7395
		
		/**
		 * 분류 모델 트레이닝
		 */
		s("로지스틱 회귀");
		LogisticRegressionModel lrModel = s(LogisticRegressionWithSGD.train(JavaRDD.toRDD(data), numIterations));
//		org.apache.spark.mllib.classification.LogisticRegressionModel: intercept = 0.0, numFeatures = 22, numClasses = 2, threshold = 0.5
		s("서포트 벡터 머신 모델");
		SVMModel svmModel = s(SVMWithSGD.train(JavaRDD.toRDD(data), numIterations));
//		org.apache.spark.mllib.classification.SVMModel: intercept = 0.0, numFeatures = 22, numClasses = 2, threshold = 0.0
		s("나이브 베이즈 모델");
		NaiveBayesModel nbModel = s(NaiveBayes.train(JavaRDD.toRDD(nbData)));
//		org.apache.spark.mllib.classification.NaiveBayesModel@3c25cfe1
		s("의사결정 트리 모델");
		DecisionTreeModel dtModel = s(DecisionTree.train(JavaRDD.toRDD(data), Algo.Classification(), Entropy.instance(), maxTreeDepth));	//or Gini.instance()
//		DecisionTreeModel classifier of depth 5 with 61 nodes
		
		/**
		 * 분류 모델 사용
		 */
		//로지스틱
		LabeledPoint dataPoint = data.first();
		double prediction = s(lrModel.predict(dataPoint.features()));		//예측 : 1.0
		s("dataPoint.features()" + dataPoint.features());
		s("dataPoint.label()" + dataPoint.label());							//0.0		<- 실제 데이터 0.0, 잘못된 예측(1.0)
		
		JavaRDD<Double> predictions = lrModel.predict(data.map(lp->lp.features()));
		s(predictions.take(10));		//[1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]

		s(svmModel.predict(dataPoint.features()));		//예측 : 1.0
		s(nbModel.predict(nbData.first().features()));	//예측 : 1.0
		s(dtModel.predict(dataPoint.features()));		//예측 : 0.0
		
		/**
		 * 분류 모델의 성능 평가
		 */
		//로지스틱 회귀 성능 평가
		double lrTotalCorrect = data.mapToDouble(lp->{
			return lrModel.predict(lp.features()) == lp.label() ? 1.0 : 0.0;
		}).sum();
		//SVM 성능 평가
		double svmTotalCorrect = data.mapToDouble(lp->{
			return svmModel.predict(lp.features()) == lp.label() ? 1.0 : 0.0;
		}).sum();
		//나이브베이즈 성능 평가
		double nbTotalCorrect = nbData.mapToDouble(lp->{
			return nbModel.predict(lp.features()) == lp.label() ? 1.0 : 0.0;
		}).sum();
		//의사결정트리 성능 평가
		double dtTotalCorrect = data.mapToDouble(lp->{
			double score = dtModel.predict(lp.features());
			double predicted = score > 0.5 ? 1.0 : 0.0;	//score가 1.0 또는 0.0 이 나오므로 굳이 0.5로 구분할 필요는 없다.
			return predicted == lp.label() ? 1.0 : 0.0; 
		}).sum();
		double numData = data.count();
		s("로지스틱 회귀 성능:" + lrTotalCorrect / numData); 	//0.5146720757268425		<- 분류 정확도 50% -_-;;
		s("svm 성능:" + svmTotalCorrect / numData);		//0.5146720757268425
		s("나이브베이즈 성능:" + nbTotalCorrect / numData);	//0.5803921568627451
		s("의사결정트리 성능:" + dtTotalCorrect / numData);	//0.6482758620689655

		/**
		 * 다른 성능 평가 방법
		 * PR, ROC
		 */
		//로지스틱
		JavaRDD<Tuple2<Object, Object>> lrScoreAndLabels = data.map(lp -> new Tuple2<Object, Object>(lrModel.predict(lp.features()), lp.label()));
		printAUC("로지스틱 회귀 PR,ROC: ", lrScoreAndLabels);
		//SVM
		JavaRDD<Tuple2<Object, Object>> svmScoreAndLabels = data.map(lp -> new Tuple2<Object, Object>(svmModel.predict(lp.features()), lp.label()));
		printAUC("SVM PR,ROC: ", svmScoreAndLabels);
		//나이브 베이즈
		JavaRDD<Tuple2<Object, Object>> nbScoreAndLabels = nbData.map(lp -> {
			return new Tuple2<Object, Object>(nbModel.predict(lp.features()) > 0.5 ? 1.0 : 0.0, lp.label());
		});
		printAUC("나이브베이즈 PR,ROC: ", nbScoreAndLabels);
		//의사결정 트리
		JavaRDD<Tuple2<Object, Object>> dtScoreAndLabels = data.map(lp -> {
			return new Tuple2<Object, Object>(dtModel.predict(lp.features()) > 0.5 ? 1.0 : 0.0, lp.label());
		});
		printAUC("의사결정트리 PR,ROC: ", dtScoreAndLabels);
//		결과
//		로지스틱 회귀 PR,ROC: 75.67586293858841, 50.14181143280931
//		SVM PR,ROC: 75.67586293858841, 50.14181143280931
//		나이브베이즈 PR,ROC: 68.12849819263486, 58.39392440412702
//		의사결정트리 PR,ROC: 74.30805993331198, 64.88371887050936
		
		data.unpersist();
		nbData.unpersist();
	}
	private void proc1_1(JavaSparkContext sc){
		/**
		 * 모델의 성능 향상!!!
		 * 특징 표준화 - 로지스틱 회귀 모델에만 효과 있음.
		 */
		JavaRDD<LabeledPoint> data = getData(sc);
		
		JavaRDD<Vector> vectors = data.map(lp->lp.features());
		RowMatrix matrix = new RowMatrix(JavaRDD.toRDD(vectors));
		MultivariateStatisticalSummary matrixSummary = matrix.computeColumnSummaryStatistics();
		
		Vector mean = s(matrixSummary.mean());				//평균값
		Vector min = s(matrixSummary.min());				//최소값
		Vector max = s(matrixSummary.max());				//최대값
		Vector variance = s(matrixSummary.variance());		//분산
		Vector numNonzeros = s(matrixSummary.numNonzeros());//0이 아닌 값들의 개수?
		
		StandardScalerModel scaler = new StandardScaler(true, true).fit(JavaRDD.toRDD(vectors));
		JavaRDD<LabeledPoint> scaledData = data.map(lp->new LabeledPoint(lp.label(), scaler.transform(lp.features())));
		
		s(data.first().features());
		s(scaledData.first().features());
		
//		(x-u)/sqrt(variance)
//		(값-평균)/sqrt(분산) = 특징 표준화된 값2
		s((data.first().features().toArray()[0] - mean.toArray()[0])/Math.sqrt(variance.toArray()[0]));		//1.137647336497678
		s((0.789131 - 0.4122)/Math.sqrt(0.1097));															//1.138042662130702	//같다고 봐야지;;;;
		
		// 로지스틱 회귀 - 모델 다시 만들기
		LogisticRegressionModel lrModel2 = s(LogisticRegressionWithSGD.train(JavaRDD.toRDD(scaledData), numIterations));
		//로지스틱 회귀 성능 평가 다시 하기
		double lrTotalCorrect2 = scaledData.mapToDouble(lp->{
			return lrModel2.predict(lp.features()) == lp.label() ? 1.0 : 0.0;
		}).sum();
		s("로지스틱 회귀 성능2:" + lrTotalCorrect2 / data.count()); 	//0.6204192021636241		<- 10% 향상!!!
		
		JavaRDD<Tuple2<Object, Object>> lrScoreAndLabels2 = scaledData.map(lp -> new Tuple2<Object, Object>(lrModel2.predict(lp.features()), lp.label()));
		printAUC("로지스틱 회귀2 PR,ROC: ", lrScoreAndLabels2);
//		로지스틱 회귀2 PR,ROC: 72.72540762713375, 61.96629669112512							<-ROC 10% 향상
	}
	private Map<String, Long> getCategories(JavaRDD<String[]> records){
		// 3열인, alchemy_category_score 컬럼의 데이터를 가져온다.
		Map<String, Long> categories = records.map(arr->arr[3]).distinct().zipWithIndex().collectAsMap();
		return categories;
	}
	private JavaRDD<LabeledPoint> getScaledDataCats(JavaSparkContext sc){
		JavaRDD<String[]> records = getRecords(sc);
		Map<String, Long> categories = getCategories(records);
		int numCategories = s(categories.size());	//14
//		s(categories);			//{religion=13, culture_politics=4, science_technology=8, arts_entertainment=10, weather=2, health=0, recreation=3, gaming=7, ?=11, unknown=5, sports=12, computer_internet=6, business=1, law_crime=9}

		JavaRDD<LabeledPoint> dataCetegories = records.map(arr->{
			int label = Integer.parseInt(arr[arr.length - 1]);
			int categoryIdx = categories.get(arr[3]).intValue();	//Long의 숫자가 int범위를 넘지 않는 한에서...
			double[] categoryFeatures = new double[numCategories];
			categoryFeatures[categoryIdx] = 1.0;
			double[] otherFeatures = new double[arr.length - 5];
			for(int i = 0 ; i < arr.length - 5 ; i++){
				//유실된 데이터 보정
				otherFeatures[i] = "?".equals(arr[4+i]) ? 0.0 : Double.parseDouble(arr[4+i]); 
			}
			double[] features = new double[categoryFeatures.length + otherFeatures.length];
			System.arraycopy(categoryFeatures, 0, features, 0, categoryFeatures.length);
			System.arraycopy(otherFeatures, 0, features, categoryFeatures.length, otherFeatures.length);
			
			return new LabeledPoint(label, Vectors.dense(features));
		});
		s(dataCetegories.first());	//(0.0,[0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.789131,2.055555556,0.676470588,0.205882353,0.047058824,0.023529412,0.443783175,0.0,0.0,0.09077381,0.0,0.245831182,0.003883495,1.0,1.0,24.0,0.0,5424.0,170.0,8.0,0.152941176,0.079129575])
		//특징 표준화 다시
		JavaRDD<Vector> vectors = dataCetegories.map(lp->lp.features());
		StandardScalerModel scalerCats = new StandardScaler(true, true).fit(JavaRDD.toRDD(vectors));
		JavaRDD<LabeledPoint> scaledDataCats = dataCetegories.map(lp->new LabeledPoint(lp.label(), scalerCats.transform(lp.features())));
		//비교
//		s(dataCetegories.first().features());	//[0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.789131,2.055555556,0.676470588,0.205882353,0.047058824,0.023529412,0.443783175,0.0,0.0,0.09077381,0.0,0.245831182,0.003883495,1.0,1.0,24.0,0.0,5424.0,170.0,8.0,0.152941176,0.079129575]
//		s(scaledDataCats.first().features());	//[-0.2709990696925828,2.7207366564548514,-0.02326210589837061,-0.4464212047941535,-0.22052688457880879,-0.028494000387023734,-0.20418221057887365,-0.10189469097220732,-0.2016540523193296,-0.06487757239262681,-0.38181322324318134,-0.6807527904251456,-0.23272797709480803,-0.09914991930875496,1.137647336497678,-0.08193557169294771,1.0251398128933331,-0.05586356442541689,-0.4688932531289357,-0.3543053263079386,-0.3175352172363148,0.3384507982396541,0.0,0.828822173315322,-0.14726894334628504,0.22963982357813484,-0.14162596909880876,0.7902380499177364,0.7171947294529865,-0.29799681649642257,-0.2034625779299476,-0.03296720969690391,-0.04878112975579913,0.9400699751165439,-0.10869848852526258,-0.2788207823137022]
		
		return scaledDataCats;
	}
	private void proc2(JavaSparkContext sc){
		/**
		 * 추가적인 특징
		 */
		JavaRDD<LabeledPoint> scaledDataCats = getScaledDataCats(sc); 

		// 로지스틱 회귀 - 모델 다시 만들기
		LogisticRegressionModel lrModel3 = s(LogisticRegressionWithSGD.train(JavaRDD.toRDD(scaledDataCats), numIterations));
		//로지스틱 회귀 성능 평가 다시 하기
		double lrTotalCorrect2 = scaledDataCats.mapToDouble(lp->{
			return lrModel3.predict(lp.features()) == lp.label() ? 1.0 : 0.0;
		}).sum();
		s("로지스틱 회귀 성능3:" + lrTotalCorrect2 / scaledDataCats.count());		//0.6657200811359026		//성능 향상
		
		JavaRDD<Tuple2<Object, Object>> lrScoreAndLabels2 = scaledDataCats.map(lp -> new Tuple2<Object, Object>(lrModel3.predict(lp.features()), lp.label()));
		printAUC("로지스틱 회귀3 PR,ROC: " , lrScoreAndLabels2);
//		로지스틱 회귀2 PR,ROC: 75.79640787676577, 66.54826844243996			<--------66까지 올라감.
		
		/**
		 * AUC 0.88906 까지 올라간 경우
		 * www.kaggle.com/c/stumbleupon/leaderboard/private
		 */
		//TODO
	}
	
	private void proc3(JavaSparkContext sc){
		/**
		 * 정확한 형태의 데이터 사용
		 * - 나이브 베이즈 - 3열 카테고리 데이터만 사용한다(0.0과 1.0만)
		 */
		JavaRDD<String[]> records = getRecords(sc);
		Map<String, Long> categories = getCategories(records);
		int numCategories = s(categories.size());	//14
		
		//-> 여기까지는 proc2와 같다.
		
		JavaRDD<LabeledPoint> dataNB = records.map(arr->{
			int label = Integer.parseInt(arr[arr.length - 1]);
			int categoryIdx = categories.get(arr[3]).intValue();	//Long의 숫자가 int범위를 넘지 않는 한에서...
			double[] categoryFeatures = new double[numCategories];
			categoryFeatures[categoryIdx] = 1.0;
			// categoryFeatures만 피처로 사용한다.
			return new LabeledPoint(label, Vectors.dense(categoryFeatures));
		});
		
		//모델 학습
		s("나이브 베이즈 모델2");
		NaiveBayesModel nbModelCats = s(NaiveBayes.train(JavaRDD.toRDD(dataNB)));
		
		//성능평가
		double nbTotalCorrectCats = dataNB.mapToDouble(lp->{
			return nbModelCats.predict(lp.features()) == lp.label() ? 1.0 : 0.0;
		}).sum();
		s("나이브베이즈 성능2: " + nbTotalCorrectCats / dataNB.count());	//0.6096010818120352
		//나이브 베이즈
		JavaRDD<Tuple2<Object, Object>> nbScoreAndLabels2 = dataNB.map(lp -> {
			double score = nbModelCats.predict(lp.features());
			score = score > 0.5 ? 1.0 : 0.0;
			return new Tuple2<Object, Object>(score, lp.label());
		});
		printAUC("나이브베이즈2 PR,ROC: ", nbScoreAndLabels2);			
		//74.05222106704076, 60.513849415494455					<- 58%에 60%으로 올라갔다.
	}
	private void proc4(JavaSparkContext sc){
		JavaRDD<LabeledPoint> scaledDataCats = getScaledDataCats(sc);
		
		//->여기까지는 proc2와 같다.
		scaledDataCats.cache();
		
		/**
		 * 반복 - numIterations
		 */
//		JavaRDD 내에서 RavaRDD.toRDD 를 실행할 수가 없다.
//		Exception msg : org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
//		JavaRDD<Tuple2<String, Double>> iterResults = sc.parallelize(Arrays.asList(1,5,10,50)).map(numIterations->{
//			LogisticRegressionModel model = trainWithParams(scaledDataCats, 0.0, numIterations, new SimpleUpdater(), 1.0);
//			return createMetrics("numIterations:"+numIterations, scaledDataCats, model);
//		});
//		iterResults.foreach(tuple->s(tuple._1() + ", AUC: " + (tuple._2()*100)));
		// 다른 방법!
		(Arrays.asList(1,5,10,50)).parallelStream().forEach(numIterations->{
			LogisticRegressionModel model = trainWithParams(scaledDataCats, 0.0, numIterations, new SimpleUpdater(), 1.0);
			Tuple2<String, Double> tuple = createMetrics("numIterations:"+numIterations, scaledDataCats, model);
			s(tuple._1() + ", AUC: " + (tuple._2()*100));
		});
//		결과
//		numIterations:1, AUC: 64.95198950299684
//		numIterations:5, AUC: 66.6160962344358
//		numIterations:10, AUC: 66.54826844243996		<--- 더 안좋아졌다.
//		numIterations:50, AUC: 66.81425454500737
		
		/**
		 * 반복 - stepSize
		 */
		final int numIterations = 10;
		(Arrays.asList(0.001, 0.01, 0.1, 1.0, 10.0)).parallelStream().forEach(stepSize->{
			LogisticRegressionModel model = trainWithParams(scaledDataCats, 0.0, numIterations, new SimpleUpdater(), stepSize);
			Tuple2<String, Double> tuple = createMetrics("stepSize:"+stepSize, scaledDataCats, model);
			s(tuple._1() + ", AUC: " + (tuple._2()*100));
		});
//		결과
//		stepSize:0.001, AUC: 64.96588225098238
//		stepSize:0.01, AUC: 64.96444027450548
//		stepSize:0.1, AUC: 65.521065153621
//		stepSize:1.0, AUC: 66.54826844243996
//		stepSize:10.0, AUC: 61.922788527781535			<--- 더 안좋아졌다.
		
		/**
		 * 규칙화 SquaredL2Updater - regParam
		 */
		(Arrays.asList(0.001, 0.01, 0.1, 1.0, 10.0)).parallelStream().forEach(regParam->{
			LogisticRegressionModel model = trainWithParams(scaledDataCats, regParam, numIterations, new SquaredL2Updater(), 1.0);
			Tuple2<String, Double> tuple = createMetrics("L2 regParam:"+regParam, scaledDataCats, model);
			s(tuple._1() + ", AUC: " + (tuple._2()*100));
		});
//		결과(Stream, parallelStream, forEach 이라서 순서가 섞인다.)
//		L2 regParam:1.0, AUC: 66.03745376525676
//		L2 regParam:10.0, AUC: 35.32533843993077		<----- 더 안좋아졌다.
//		L2 regParam:0.1, AUC: 66.63378789506862
//		L2 regParam:0.01, AUC: 66.5475474542015
//		L2 regParam:0.001, AUC: 66.54826844243996
		
		/**
		 * 규칙화 L1Updater - regParam
		 */
		(Arrays.asList(0.001, 0.01, 0.1, 1.0, 10.0)).parallelStream().forEach(regParam->{
			LogisticRegressionModel model = trainWithParams(scaledDataCats, regParam, numIterations, new L1Updater(), 1.0);
			Tuple2<String, Double> tuple =  createMetrics("L1 regParam:"+regParam, scaledDataCats, model);
			s(tuple._1() + ", AUC: " + (tuple._2()*100));
		});
//		결과(Stream, parallelStream, forEach 이라서 순서가 섞인다.)
//		L1 regParam:10.0, AUC: 50.0							<----- 더 안좋아졌다.
//		L1 regParam:0.1, AUC: 50.0							<----- 더 안좋아졌다.
//		L1 regParam:1.0, AUC: 50.0							<----- 더 안좋아졌다.
//		L1 regParam:0.001, AUC: 66.53221272973906
//		L1 regParam:0.01, AUC: 66.46962582686739

		//
		scaledDataCats.unpersist();
	}
	private LogisticRegressionModel trainWithParams(JavaRDD<LabeledPoint> input, Double regParam, Integer numIterations, Updater updater, Double stepSize){
		LogisticRegressionWithSGD lr = new LogisticRegressionWithSGD();
		lr.optimizer().setNumIterations(numIterations).setUpdater(updater).setRegParam(regParam).setStepSize(stepSize);
		return lr.run(JavaRDD.toRDD(input));	//org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
	}
	private Tuple2<String, Double> createMetrics(String label, JavaRDD<LabeledPoint> data, GeneralizedLinearModel model){
		JavaRDD<Tuple2<Object, Object>> lrScoreAndLabels = data.map(lp->{
			return new Tuple2<Object, Object>(model.predict(lp.features()), lp.label());
		});
		BinaryClassificationMetrics lrMetrics = new BinaryClassificationMetrics(JavaRDD.toRDD(lrScoreAndLabels));
		return new Tuple2<String, Double>(label, lrMetrics.areaUnderROC());
	}
	
	private void proc5(JavaSparkContext sc){
		/**
		 * 의사결정 트리 - maxDepth 변경하기
		 */
		JavaRDD<LabeledPoint> data = getData(sc);
		
		//-> 여기까지는 proc1()과 같음.
		
		Arrays.asList(1,2,3,4,5,10,20).parallelStream().forEach(maxDepth->{
			DecisionTreeModel model = trainDTWithParams(data, maxDepth, Entropy.instance());		//or Gini.instance()
			JavaRDD<Tuple2<Object, Object>> scoreAndLabels = data.map(lp->{
				double score = model.predict(lp.features());
				return new Tuple2<Object, Object>(score > 0.5 ? 1.0 : 0.0, lp.label());
			});
			
			BinaryClassificationMetrics metrics = new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels));
			s("tree depth:" + maxDepth + ", AUC:" + metrics.areaUnderROC());
		});
//		결과
//		tree depth:1, AUC:0.5932683560677638
//		tree depth:2, AUC:0.6168392183052838
//		tree depth:3, AUC:0.6260699538655363
//		tree depth:4, AUC:0.6363331299438932
//		tree depth:5, AUC:0.6488371887050935
//		tree depth:10, AUC:0.7625521856410764
//		tree depth:20, AUC:0.9845371811804648			<---- 월등히 올라간다.(트리의 깊이가 깊어질수록 모델은 데이터 집합을 상당히 과도하게 변형시킬 수 있다.)
		
//		불순물로 Gini.instance() 를 사용한 경우(별 차이는 없다.)
//		tree depth:1, AUC:0.5932683560677638
//		tree depth:2, AUC:0.6168392183052838
//		tree depth:3, AUC:0.6260699538655363
//		tree depth:4, AUC:0.6363331299438932
//		tree depth:5, AUC:0.6489164974113228
//		tree depth:10, AUC:0.7837090914201374
//		tree depth:20, AUC:0.9887069452906806
	}
	private DecisionTreeModel trainDTWithParams(JavaRDD<LabeledPoint> input, Integer maxDepth, Impurity impurity){
		return DecisionTree.train(JavaRDD.toRDD(input), Algo.Classification(), impurity, maxDepth);
	}
	
	private void proc6(JavaSparkContext sc){
		/**
		 * 나이브 베이즈 모델 - lambda
		 */
		JavaRDD<LabeledPoint> nbData = getNBData(sc);
		//-> 여기까지는 proc1() 과 같음.
		
		Arrays.asList(0.001, 0.01, 0.1, 1.0, 10.0).parallelStream().forEach(lambda->{
			NaiveBayesModel model = trainNBWithParams(nbData, lambda);
			JavaRDD<Tuple2<Object, Object>> scoreAndLabels = nbData.map(lp->new Tuple2<Object, Object>(model.predict(lp.features()), lp.label()));
			BinaryClassificationMetrics metrics = new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels));
			s("lambda " + lambda + ", AUC:" + metrics.areaUnderROC());
		});
//		결과(별 차이 없음)
//		lambda 0.001, AUC:0.5839392440412702
//		lambda 0.01, AUC:0.5839392440412702
//		lambda 10.0, AUC:0.5843343968336832
//		lambda 0.1, AUC:0.5839392440412702
		
	}
	private NaiveBayesModel trainNBWithParams(JavaRDD<LabeledPoint> input, Double lambda){
		NaiveBayes nb = new NaiveBayes();
		nb.setLambda(lambda);
		return nb.run(JavaRDD.toRDD(input));
	}
	
	private void proc7(JavaSparkContext sc){
		/**
		 * 교차검증
		 */
		
		JavaRDD<LabeledPoint> scaledDataCats = getScaledDataCats(sc);
		//->여기까지는 proc2()와 같음.
		
		JavaRDD<LabeledPoint>[] trainTestSplit = scaledDataCats.randomSplit(new double[]{0.6, 0.4}, 123);
		JavaRDD<LabeledPoint> train = trainTestSplit[0];
		JavaRDD<LabeledPoint> test = trainTestSplit[1];
		train.cache();
		test.cache();
		
		final int numIterations = 10;
		Arrays.asList(0.0, 0.001, 0.0025, 0.005, 0.01).parallelStream().forEach(regParam->{
			LogisticRegressionModel model = trainWithParams(train, regParam, numIterations, new SquaredL2Updater(), 1.0);
			Tuple2<String, Double> createMetrics = createMetrics("L2 regParam:"+regParam, test, model);
			s(createMetrics._1() + ", AUC:" + createMetrics._2());
		});
//		결과
//		L2 regParam:0.0, AUC:0.6612684244321665
//		L2 regParam:0.001, AUC:0.6612684244321665
//		L2 regParam:0.0025, AUC:0.6612684244321665
//		L2 regParam:0.005, AUC:0.6612684244321665
//		L2 regParam:0.01, AUC:0.6609319506771195
		
		//
		train.unpersist();
		test.unpersist();
	}	
}