본문 바로가기
Spark/Spark와 머신 러닝

Spark 시작하기03 - [Spark와 머신 러닝] 1장 스파크의 시작과 구동

by java개발자 2016. 3. 27.

무비스트림

47p 자바로 개발하는 스파크 프로그램의 첫 단계... 인데... 시작부터 에러 발생이다.-_-;;

에러를 고치기 위해 구글 자료를 찾아가면서, Spark를 배워나간다.;;;;;한번에 속시원하게 풀리지 않고, 찾아봐야 하는... 이런 방법... 결국 도움은 되지만,,, 정말 힘들다...

또한 JAVA8 버전으로 변형!


파이썬이든, 스칼라든, 자바들,,,

결국은 성능을 위해 스칼라로 작성해야 한다.

파이썬이 다양한 라이브러리가 많아 편할지 모르지만,,, 결국은 스칼라 이므로,,,

일단은 JAVA8 함수형으로 구현한다.


JAVA8의 스트림기능처럼 .().().()........ 엮어서 표현하는 것이 간단하지만,,, 

공부할때는 모두 풀어서 작성하자, 어떤 형태로 리턴되는지 알 수 있어서 명확히 개념을 알 수 있다.


일단 테스트는

이클립스, JAVA8, 메이븐, class 하나 만들어서 main으로 실행


pom.xml

        <dependency>

            <groupid>org.apache.spark</groupid>

            <artifactid>spark-core_2.10</artifactid>

            <version>1.6.0</version>

        </dependency>


package org.test.sparkNmachineLearning;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

/**
 * A simple Spark app in Java
 */
public class Ch1_JavaApp2 implements Serializable{
	
	private static final long serialVersionUID = 1L;
	List<String> currentFile = null;
	
	private boolean isDebug = true;
	public Ch1_JavaApp2(){
		//log4j 설정(로그레벨 WARN)... 일단 경로는 하드코딩-_-;;
		PropertyConfigurator.configure("D:\\workspace\\spark\\sparkNmachineLearning\\src\\resources\\log4j.properties");
		
		if(currentFile == null){
			currentFile = new ArrayList<String>();
			BufferedReader br = null;
			try {
				br = new BufferedReader(new FileReader("D:\\workspace\\spark\\sparkNmachineLearning\\src\\main\\java\\org\\test\\sparkNmachineLearning\\Ch1_JavaApp2.java"));
				String temp = null;
				currentFile.add("");	//0번째 자리 빈값으로...
				while((temp = br.readLine()) != null){
					int index = temp.indexOf("=");
					if(index != -1){
						currentFile.add(temp.substring(0, index+1).trim() + " ");
					}else{
						currentFile.add("");
					}
				}
			} catch (IOException e) {
				e.printStackTrace();
			} finally{
				try {
					br.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}
	/**
	 * 변수에 값을 할당할때, 사용하면, 일단 화면에 한번 출력해주고, 할당한다.
	 * StackTraceElement를 사용하면, 어느 라인에서 할당했는지 알 수 있음.
	 * 디버그용으로 좋음!!
	 */
	public <T> T s(T o){	
		if(isDebug){
			Thread th = Thread.currentThread();
			StackTraceElement[] lists = th.getStackTrace();
			int lineNumber = lists[2].getLineNumber();
			String variable = currentFile.get(lineNumber);
			System.out.print("\n" +  + lineNumber + ": " + variable + o);
		}
		return o;
	}
	
	public static void main(String[] args) {
		new Ch1_JavaApp2().proc();
    }
    public void proc(){
//    	John,iPhone Cover,9.99
//    	John,Headphones,5.49
//    	Jack,iPhone Cover,9.99
//    	Jill,Samsung Galaxy Cover,8.95
//    	Bob,iPad Cover,5.49

    	/**
    	 * 1.파일 읽기
    	 */
        JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
        JavaRDD<String> fileData = sc.textFile("D:\\workspace\\spark\\sparkNmachineLearning\\src\\main\\java\\org\\test\\sparkNmachineLearning\\data\\UserPurchaseHistory.csv");
        JavaRDD<String[]> data = fileData.map(b -> b.split(",")); 
        s(data.count());

        /**
         * 파일 컬럼별 데이터 출력
         */
        JavaRDD<String> re1 = data.map(a -> a[0]);
        re1.foreach(a -> s(a));
        JavaRDD<String> re2 = re1.distinct();
        re2.foreach(a -> s(a));
        long re2_count = s(re2.count());	//4

        /**
         * 세번째 컬럼 sum
         * 헉!! sum 함수가 없다.
         * sum함수 대신 차선책..
         */
//        JavaRDD<Double> re3 = data.map(a -> Double.parseDouble(a[2]));
//        s(re3.count());		//5
//        re3.foreach(a -> s(a));
//        //http://codrspace.com/NareshKosgi/a-total-sum-spark-job/
//        Accumulator<Double> accum = sc.accumulator(0.0);	//누산기
//        re3.foreach(c -> accum.add(c));
//        s(accum.value());			//39.91	
//        Double re3_1 = re3.fold(0.0, (x,y)->x+y);	//fold???
//        s(re3_1);					//39.91
        
        /**
         * 이렇게 하면 된다. 
         */
        JavaDoubleRDD re3_2 = data.mapToDouble(a -> Double.parseDouble(a[2]));
        s("합:" + re3_2.sum());
        
        JavaPairRDD<String, Integer> pairs = data.mapToPair(a -> new Tuple2(a[1],1));	//Pair가 Tuple2 인가??
        pairs = pairs.reduceByKey((a,b)->a+b);	//첫번째 컬럼이 key가 되어서 중복을 제거하는 듯..
        pairs.sortByKey();
        
        List<Tuple2<String, Integer>> list4 = s(pairs.collect());
        
        String mostPopular = s(list4.get(0)._1());
        int purchases = s(list4.get(0)._2());
        
        sc.stop();
    }
}




콘솔 결과값

92: 5
98: John
98: John
98: Jack
98: Jill
98: Bob
100: Jill
100: John
100: Bob
100: Jack
101: long re2_count = 4
122: 합:39.91
128: List<Tuple2<String, Integer>> list4 = [(iPhone Cover,2), (iPad Cover,1), (Samsung Galaxy Cover,1), (Headphones,1)]
130: String mostPopular = iPhone Cover
131: int purchases = 2




처음엔 map을 사용하고, JavaRDD를 사용하니,,, sum 함수를 사용할 수가 없어서, sum을 구현하는 방법을 찾다가,,, 
map이 아니라 mapToDouble, mapToPair 였다. 
JavaRDD가 아니라 JavaDoubleRDD, JavaPairRDD 였다...ㅜ

또한
log4j 출력에 대한 파일 설정을 추가했고,
s 함수를 만들어서, System.out.println 을 단축시켰다.
s 함수를 이용하면, System.out.println 출력도 하면서, 그대로 그 값을 리턴하므로, 변수 할당시 사용하면 유용하다.
아예... AspectJ 처럼,,, class 단에서 AOP 하고 싶지만,, 너무 일이 커질 것 같아. 이정도로 마무리...