Spark/러닝 스파크
Spark 시작하기11 - [러닝 스파크] 3장 RDD로 프로그래밍하기2
java개발자
2016. 4. 19. 16:56
package org; import java.io.Serializable; import org.apache.log4j.PropertyConfigurator; import org.apache.spark.api.java.JavaSparkContext; public class MyConf implements Serializable{ private static final long serialVersionUID = 9809021L; public static String PROJECT_PATH = System.getProperty("user.dir"); public static void setLog4j(){ PropertyConfigurator.configure(PROJECT_PATH + "\\src\\resources\\log4j.properties"); } public static JavaSparkContext getJavaSparkContext(){ //local[1]과 local[2]의 중요한 차이 : 2이상에서, 데이터의 정렬이 달라질 수 있다. // test하는 pc의 상황에 따라서 local[1]을 변경한다. return new JavaSparkContext("local[2]", "First Spark App"); } }
package org.mystudy; import java.io.Serializable; import java.util.Comparator; public class MyComparator implements Comparator<Integer>, Serializable { private static final long serialVersionUID = 1L; @Override public int compare(Integer o1, Integer o2) { return o2 - o1; } }
package org.mystudy; import java.io.Serializable; import java.util.Arrays; import java.util.List; import java.util.Map; import org.MyConf; import org.apache.spark.api.java.JavaDoubleRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; public class Ch3_example2 implements Serializable { private static final long serialVersionUID = 43443432321L; public Ch3_example2() { MyConf.setLog4j(); } public static void main(String... strings) { Ch3_example2 ex = new Ch3_example2(); System.out.println("표3-4 {1,2,3,3}을 갖고 있는 RDD에 대한 기본 액션"); ex.proc4(); //collect // ex.proc5(); //count // ex.proc6(); //countByValue // ex.proc7(); //take // ex.proc8(); //top // ex.proc9(); //takeOrdered // ex.proc10(); //takeSample // ex.proc11(); //reduce // ex.proc12(); //fold // ex.proc13(); //aggregate // ex.proc14(); //foreach System.out.println("예제 3-38 자바에서 DoubleRDD 만들기"); ex.proc21(); } public void proc4() { JavaSparkContext sc = MyConf.getJavaSparkContext(); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4)); List<Integer> result1 = rdd.collect(); System.out.println(result1); //[1, 2, 3, 4] } public void proc5() { JavaSparkContext sc = MyConf.getJavaSparkContext(); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4)); long count = rdd.count(); System.out.println(count); //4 } public void proc6() { JavaSparkContext sc = MyConf.getJavaSparkContext(); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,3)); Map<Integer, Long> map = rdd.countByValue(); System.out.println(map); //{1=1, 3=2, 2=1} JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("A","B","A","C")); Map<String, Long> map2 = rdd2.countByValue(); System.out.println(map2); //{B=1, A=2, C=1} } public void proc7() { JavaSparkContext sc = MyConf.getJavaSparkContext(); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,3)); rdd = rdd.repartition(4); //파티션이 많아질때 발생할 현상 고려 List<Integer> result = rdd.take(2); //특정 파티션의 값들 우선,,, 정렬 무시될 수 있음. System.out.println(result); //[2, 3] } public void proc8() { //기준정렬 : 내림차순 JavaSparkContext sc = MyConf.getJavaSparkContext(); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8)); rdd = rdd.repartition(4); //파티션이 많아질때 발생할 현상 고려 List<Integer> result = rdd.top(2); //기본내림차순 System.out.println(result); //[8, 7] List<Integer> result2 = rdd.top(2, new MyComparator()); //정렬 함수(반대로:오름차순) System.out.println(result2); //[1, 2] } public void proc9() { //기준정렬 : 오름차순 JavaSparkContext sc = MyConf.getJavaSparkContext(); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8)); List<Integer> result = rdd.takeOrdered(2); //[1, 2] System.out.println(result); List<Integer> result2 = rdd.takeOrdered(2, new MyComparator()); //반대로:내림차순 System.out.println(result2); //[8, 7] } public void proc10() { JavaSparkContext sc = MyConf.getJavaSparkContext(); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8)); List<Integer> result1 = rdd.takeSample(false, 3); //[2, 7, 6] System.out.println(result1); List<Integer> result2 = rdd.takeSample(false, 3); //[2, 7, 1] System.out.println(result2); //boolean withReplacement : 중복가능 List<Integer> result3 = rdd.takeSample(true, 3); //[8, 5, 5] System.out.println(result3); List<Integer> result4 = rdd.takeSample(true, 3); //[1, 2, 1] System.out.println(result4); } public void proc11() { //병렬로 병합 JavaSparkContext sc = MyConf.getJavaSparkContext(); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8)); Integer result = rdd.reduce((a,b)->a+b); System.out.println(result); //36 } public void proc12() { //reduce 연산 + zero value //rdd 파티션의 개수에 따라 zero value 가 달라진다. JavaSparkContext sc = MyConf.getJavaSparkContext(); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8)); System.out.println(rdd.getNumPartitions()); //2 Integer result = rdd.fold(1, (a,b)->{ System.out.println(a+", "+b); return a+b; }); // 1, 5 // 6, 6 // 12, 7 // 19, 8 // 1, 1 // 2, 2 // 4, 3 // 7, 4 // 1, 27 // 28, 11 //default(1) + 5,6,7,8 = (파티션1) //default(1) + 1,2,3,4 = (파티션2) //default(1) + 파티션1, 파티션2 = 최종결과 System.out.println(result); //39 JavaRDD<Integer> rdd2 = rdd.repartition(5); Integer result2 = rdd2.fold(1, (a,b)->a+b); System.out.println(result2); //42 } public void proc13() { JavaSparkContext sc = MyConf.getJavaSparkContext(); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8)); System.out.println(rdd.getNumPartitions()); //2 //합계구하기 Integer result = rdd.aggregate(1, (a,b)->a+b, (a,b)->a+b); System.out.println(result); //39 JavaRDD<Integer> rdd2 = rdd.repartition(5); Integer result2 = rdd2.aggregate(1, (a,b)->a+b, (a,b)->a+b); System.out.println(result2); //42 //평균구하기 JavaRDD<Integer> rdd3 = rdd.repartition(2); //Tuple2(합계, 개수) Tuple2<Integer, Integer> result3 = rdd3.aggregate( new Tuple2<Integer, Integer>(0, 0) , (tuple0, data) -> new Tuple2<Integer, Integer>(tuple0._1 + data, tuple0._2 + 1) , (tuple1, tuple2)-> new Tuple2<Integer, Integer>(tuple1._1 + tuple2._1, tuple1._2 + tuple2._2) ); System.out.println(result3._1()); //36 System.out.println(result3._2()); //8 System.out.println(result3._1() / result3._2()); //4 //평균 } public void proc14() { JavaSparkContext sc = MyConf.getJavaSparkContext(); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8)); rdd.foreach(a->System.out.println(a)); //1 //5 //6 //7 //8 //2 //3 //4 } public void proc21(){ JavaSparkContext sc = MyConf.getJavaSparkContext(); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8)); JavaDoubleRDD d = rdd.mapToDouble(a -> a); System.out.println(d.count()); //8 System.out.println(d.mean()); //4.5 System.out.println(d.variance()); //5.25 System.out.println(d.sum()); //36.0 System.out.println(d.min()); //1.0 System.out.println(d.stdev()); //2.29128784747792 } }