본문 바로가기
Spark/러닝 스파크

Spark 시작하기11 - [러닝 스파크] 3장 RDD로 프로그래밍하기2

by java개발자 2016. 4. 19.
package org;

import java.io.Serializable;

import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.JavaSparkContext;

public class MyConf implements Serializable{
	
	private static final long serialVersionUID = 9809021L;
	
	public static String PROJECT_PATH = System.getProperty("user.dir");
	public static void setLog4j(){
		PropertyConfigurator.configure(PROJECT_PATH + "\\src\\resources\\log4j.properties");
	}
	public static JavaSparkContext getJavaSparkContext(){
		//local[1]과 local[2]의 중요한 차이 : 2이상에서, 데이터의 정렬이 달라질 수 있다.
		// test하는 pc의 상황에 따라서 local[1]을 변경한다.
		return new JavaSparkContext("local[2]", "First Spark App");	
	}
}
package org.mystudy;

import java.io.Serializable;
import java.util.Comparator;

public class MyComparator implements Comparator<Integer>, Serializable {

	private static final long serialVersionUID = 1L;

	@Override
	public int compare(Integer o1, Integer o2) {
		return o2 - o1;
	}
}

package org.mystudy;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.MyConf;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import scala.Tuple2;

public class Ch3_example2 implements Serializable {

	private static final long serialVersionUID = 43443432321L;

	public Ch3_example2() {
		MyConf.setLog4j();
	}

	public static void main(String... strings) {
		Ch3_example2 ex = new Ch3_example2();
		System.out.println("표3-4 {1,2,3,3}을 갖고 있는 RDD에 대한 기본 액션");
		ex.proc4();		//collect
//		ex.proc5();		//count
//		ex.proc6();		//countByValue
//		ex.proc7();		//take
//		ex.proc8();		//top
//		ex.proc9();		//takeOrdered
//		ex.proc10();	//takeSample
//		ex.proc11();	//reduce
//		ex.proc12();	//fold
//		ex.proc13();	//aggregate
//		ex.proc14();	//foreach
		
		System.out.println("예제 3-38 자바에서 DoubleRDD 만들기");
		ex.proc21();
	}

	public void proc4() {
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4));
		List<Integer> result1 = rdd.collect();
		System.out.println(result1);	//[1, 2, 3, 4]
	}
	public void proc5() {
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4));
		long count = rdd.count();
		System.out.println(count);	//4
	}
	public void proc6() {
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,3));
		Map<Integer, Long> map = rdd.countByValue();
		System.out.println(map);	//{1=1, 3=2, 2=1}
		
		JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("A","B","A","C"));
		Map<String, Long> map2 = rdd2.countByValue();
		System.out.println(map2);	//{B=1, A=2, C=1}
	}
	public void proc7() {
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,3));
		rdd = rdd.repartition(4);		//파티션이 많아질때 발생할 현상 고려
		List<Integer> result = rdd.take(2);	//특정 파티션의 값들 우선,,, 정렬 무시될 수 있음.
		System.out.println(result);		//[2, 3]
	}
	public void proc8() {	//기준정렬 : 내림차순
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8));
		rdd = rdd.repartition(4);			//파티션이 많아질때 발생할 현상 고려
		List<Integer> result = rdd.top(2);	//기본내림차순
		System.out.println(result);			//[8, 7]
		
		List<Integer> result2 = rdd.top(2, new MyComparator());	//정렬 함수(반대로:오름차순)
		System.out.println(result2);			//[1, 2]
	}
	public void proc9() {	//기준정렬 : 오름차순
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8));
		List<Integer> result = rdd.takeOrdered(2);	//[1, 2]
		System.out.println(result);
		List<Integer> result2 = rdd.takeOrdered(2, new MyComparator());	//반대로:내림차순
		System.out.println(result2);				//[8, 7]
	}
	public void proc10() {
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8));
		
		List<Integer> result1 = rdd.takeSample(false, 3);	//[2, 7, 6]
		System.out.println(result1);
		List<Integer> result2 = rdd.takeSample(false, 3);	//[2, 7, 1]
		System.out.println(result2);
		//boolean withReplacement : 중복가능
		List<Integer> result3 = rdd.takeSample(true, 3);	//[8, 5, 5]
		System.out.println(result3);
		List<Integer> result4 = rdd.takeSample(true, 3);	//[1, 2, 1]
		System.out.println(result4);
	}
	public void proc11() {	//병렬로 병합
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8));
		Integer result = rdd.reduce((a,b)->a+b);
		System.out.println(result);		//36
	}
	public void proc12() {	//reduce 연산 + zero value
		//rdd 파티션의 개수에 따라 zero value 가 달라진다. 
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8));
		System.out.println(rdd.getNumPartitions());	//2
		Integer result = rdd.fold(1, (a,b)->{
			System.out.println(a+", "+b);
			return a+b;
		});
//		1, 5
//		6, 6
//		12, 7
//		19, 8
//		1, 1
//		2, 2
//		4, 3
//		7, 4
//		1, 27
//		28, 11
		
		//default(1) + 5,6,7,8  = (파티션1)
		//default(1) + 1,2,3,4  = (파티션2)
		//default(1) + 파티션1, 파티션2 = 최종결과
		
		System.out.println(result);		//39
		
		JavaRDD<Integer> rdd2 = rdd.repartition(5);
		Integer result2 = rdd2.fold(1, (a,b)->a+b);
		System.out.println(result2);		//42
	}
	public void proc13() {
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8));
		System.out.println(rdd.getNumPartitions());	//2
		
		//합계구하기
		Integer result = rdd.aggregate(1, (a,b)->a+b, (a,b)->a+b);
		System.out.println(result);			//39
		JavaRDD<Integer> rdd2 = rdd.repartition(5);
		Integer result2 = rdd2.aggregate(1, (a,b)->a+b, (a,b)->a+b);
		System.out.println(result2);		//42
		
		//평균구하기
		JavaRDD<Integer> rdd3 = rdd.repartition(2);
		//Tuple2(합계, 개수)
		Tuple2<Integer, Integer> result3 = rdd3.aggregate(
					new Tuple2<Integer, Integer>(0, 0)
					, (tuple0, data) -> new Tuple2<Integer, Integer>(tuple0._1 + data, tuple0._2 + 1)
					, (tuple1, tuple2)-> new Tuple2<Integer, Integer>(tuple1._1 + tuple2._1, tuple1._2 + tuple2._2)
				);
		System.out.println(result3._1());	//36
		System.out.println(result3._2());	//8
		System.out.println(result3._1() / result3._2());	//4 	//평균
	}
	public void proc14() {
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8));
		rdd.foreach(a->System.out.println(a));
		//1
		//5
		//6
		//7
		//8
		//2
		//3
		//4
	}
	
	public void proc21(){
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8));
		
		JavaDoubleRDD d = rdd.mapToDouble(a -> a);
		System.out.println(d.count());		//8
		System.out.println(d.mean());		//4.5
		System.out.println(d.variance());	//5.25	
		System.out.println(d.sum());		//36.0
		System.out.println(d.min());		//1.0
		System.out.println(d.stdev());		//2.29128784747792
	}
}