Spark/러닝 스파크
Spark 시작하기13 - [러닝 스파크] 4장 키/값 페어로 작업하기2
java개발자
2016. 4. 22. 14:23
package org.mystudy; import static org.MyConf.s; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import com.google.common.base.Optional; import org.MyConf; import org.apache.commons.lang3.StringUtils; import org.apache.spark.api.java.JavaDoubleRDD; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.junit.Test; import scala.Tuple2; public class Ch4_example2 implements Serializable { private static final long serialVersionUID = 43443432321L; public Ch4_example2() { MyConf.setLog4j(); } public static void main(String... strings) { /** * 책에 없는 함수들 샘플 */ Ch4_example2 ex = new Ch4_example2(); s("JavaRDD---------------------------"); // ex.proc31(); //mapPartitions, mapPartitionsToDouble, mapPartitionsToPair // ex.proc32(); //keyBy // ex.proc33(); //groupBy // ex.proc34(); //flatMapToPair // s("JavaPairRDD---------------------------"); // ex.proc35(); //cartesian // ex.proc36(); //foldByKey // ex.proc37(); //fullOuterJoin // ex.proc38(); //groupBy // ex.proc39(); //mapToDouble // ex.proc40(); //flatMapToDouble // ex.proc41(); //mapToPair // ex.proc42(); //flatMapToPair // ex.proc43(); //intersection // ex.proc44(); //subtract // ex.proc45(); //map ex.proc46(); // } @Test public void proc31() { s("mapPartitions, mapPartitionsToDouble, mapPartitionsToPair 은 왜 하는지 모르겠다."); s("mapPartitions() - 결과는 그대로..."); JavaSparkContext sc = MyConf.getJavaSparkContext(); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,8,8)); JavaRDD<Integer> r0 = rdd.mapPartitions(a->{ List<Integer> list = new ArrayList<Integer>(); while(a.hasNext()){ list.add(a.next()); } return list; }); s(r0.collect()); //[1, 2, 3, 4, 5, 6, 7, 8, 8, 8] s("mapPartitionsToDouble()"); JavaDoubleRDD r1 = rdd.mapPartitionsToDouble(a->{ List<Double> list = new ArrayList<Double>(); while(a.hasNext()){ list.add((double)a.next()); } return list; }); s(r1.collect()); //[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 8.0, 8.0] s("mapPartitionsToPair()"); JavaPairRDD<String, Integer> r3 = rdd.mapPartitionsToPair(a->{ List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>(); while(a.hasNext()){ list.add(new Tuple2<String, Integer>("Test", a.next())); } return list; }); s(r3.collect()); //[(Test,1), (Test,2), (Test,3), (Test,4), (Test,5), (Test,6), (Test,7), (Test,8), (Test,8), (Test,8)] } public void proc32(){ s("keyBy() - 없던 key를 만들어줌. 유용하네..."); JavaSparkContext sc = MyConf.getJavaSparkContext(); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,8,8)); JavaPairRDD<String, Integer> r4 = rdd.keyBy(a -> { if(a > 5){ return "A"; }else{ return "B"; } }); s(r4.collect()); //[(B,1), (B,2), (B,3), (B,4), (B,5), (A,6), (A,7), (A,8), (A,8), (A,8)] } public void proc33(){ s("groupBy() - 없던 key를 만들어줌. 그리고 key로 그룹화"); JavaSparkContext sc = MyConf.getJavaSparkContext(); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,8,8)); JavaPairRDD<String, Iterable<Integer>> r5 = rdd.groupBy(a -> "A"); s(r5.collect()); //[(A,[1, 2, 3, 4, 5, 6, 7, 8, 8, 8])] JavaPairRDD<String, Iterable<Integer>> r6 = rdd.groupBy(a -> { if(a > 5){ return "A"; }else{ return "B"; } }); s(r6.collect()); //[(B,[1, 2, 3, 4, 5]), (A,[6, 7, 8, 8, 8])] } public void proc34() { s("flatMapToPair"); JavaSparkContext sc = MyConf.getJavaSparkContext(); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,8,8)); JavaPairRDD<String, Integer> r7 = rdd.flatMapToPair(a -> { List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>(); list.add(new Tuple2<String, Integer>("A", a)); return list; }); s(r7.collect()); //[(A,1), (A,2), (A,3), (A,4), (A,5), (A,6), (A,7), (A,8), (A,8), (A,8)] } public void proc35() { s("cartesian()"); JavaSparkContext sc = MyConf.getJavaSparkContext(); List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>(); data0.add(new Tuple2<String, String>("K1", "ABC")); data0.add(new Tuple2<String, String>("K2", "DE")); data0.add(new Tuple2<String, String>("K5", "GHI")); data0.add(new Tuple2<String, String>("K5", "JKL")); JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0); JavaRDD<Integer> rdd2 = sc.parallelize(Arrays.asList(1,2)); JavaPairRDD<Tuple2<String, String>, Integer> result1 = pairs.cartesian(rdd2); s(result1.collect()); //[((K1,ABC),1), ((K2,DE),1), ((K1,ABC),2), ((K2,DE),2), ((K5,GHI),1), ((K5,JKL),1), ((K5,GHI),2), ((K5,JKL),2)] } public void proc36() { s("foldByKey"); JavaSparkContext sc = MyConf.getJavaSparkContext(); List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>(); data1.add(new Tuple2<String, Integer>("K1", 123)); data1.add(new Tuple2<String, Integer>("K2", 456)); data1.add(new Tuple2<String, Integer>("K2", 789)); data1.add(new Tuple2<String, Integer>("K7", 0)); JavaPairRDD<String, Integer> other1 = sc.parallelizePairs(data1); JavaPairRDD<String, Integer> result4 = other1.foldByKey(0, (a,b) -> a+b); s(result4.collect()); //[(K7,0), (K1,123), (K2,1245)] } public void proc37() { s("fullOuterJoin - leftOuterJoin, rightOuterJoin 둘다 적용한 듯"); JavaSparkContext sc = MyConf.getJavaSparkContext(); List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>(); data0.add(new Tuple2<String, String>("K1", "ABC")); data0.add(new Tuple2<String, String>("K2", "DE")); data0.add(new Tuple2<String, String>("K5", "GHI")); data0.add(new Tuple2<String, String>("K5", "JKL")); JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0); List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>(); data1.add(new Tuple2<String, Integer>("K1", 123)); data1.add(new Tuple2<String, Integer>("K2", 456)); data1.add(new Tuple2<String, Integer>("K2", 789)); data1.add(new Tuple2<String, Integer>("K7", 0)); JavaPairRDD<String, Integer> other1 = sc.parallelizePairs(data1); JavaPairRDD<String, Tuple2<Optional<String>, Optional<Integer>>> result5 = pairs.fullOuterJoin(other1); s(result5.collect()); //[(K7,(Optional.absent(),Optional.of(0))), (K1,(Optional.of(ABC),Optional.of(123))), (K5,(Optional.of(GHI),Optional.absent())), //(K5,(Optional.of(JKL),Optional.absent())), (K2,(Optional.of(DE),Optional.of(456))), (K2,(Optional.of(DE),Optional.of(789)))] } public void proc38() { s("groupBy - key를 새로 만들어서 그룹을 지정할 수 있다."); JavaSparkContext sc = MyConf.getJavaSparkContext(); List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>(); data0.add(new Tuple2<String, String>("K1", "ABC")); data0.add(new Tuple2<String, String>("K2", "DE")); data0.add(new Tuple2<String, String>("K5", "GHI")); data0.add(new Tuple2<String, String>("K5", "JKL")); JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0); JavaPairRDD<String, Iterable<Tuple2<String,String>>> result6 = pairs.groupBy(tuple->{ if(tuple._2.length() > 2){ return "length3"; }else{ return "length2"; } }); s(result6.collect()); //[(length2,[(K2,DE)]), (length3,[(K1,ABC), (K5,GHI), (K5,JKL)])] } public void proc39() { s("mapToDouble"); JavaSparkContext sc = MyConf.getJavaSparkContext(); List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>(); data0.add(new Tuple2<String, String>("K1", "ABC")); data0.add(new Tuple2<String, String>("K2", "DE")); data0.add(new Tuple2<String, String>("K5", "GHI")); data0.add(new Tuple2<String, String>("K5", "JKL")); JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0); JavaDoubleRDD result7 = pairs.mapToDouble(tuple->{ return tuple._2.length(); }); s(result7.collect()); //[3.0, 2.0, 3.0, 3.0] } public void proc40() { s("flatMapToDouble()"); JavaSparkContext sc = MyConf.getJavaSparkContext(); List<Tuple2<String, String>> data2 = new ArrayList<Tuple2<String, String>>(); data2.add(new Tuple2<String, String>("K1", "1 2 3")); data2.add(new Tuple2<String, String>("K2", "4 5 6")); data2.add(new Tuple2<String, String>("K7", "0")); JavaPairRDD<String, String> other2 = sc.parallelizePairs(data2); JavaDoubleRDD result2 = other2.flatMapToDouble(tuple -> { String[] arr = tuple._2.split(" "); List<Double> list = new ArrayList<Double>(); for(String s : arr){ list.add(Double.parseDouble(s)); } return list; }); s(result2.collect()); //[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 0.0] } public void proc41() { s("mapToPair"); JavaSparkContext sc = MyConf.getJavaSparkContext(); List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>(); data1.add(new Tuple2<String, Integer>("K1", 123)); data1.add(new Tuple2<String, Integer>("K2", 456)); data1.add(new Tuple2<String, Integer>("K2", 789)); data1.add(new Tuple2<String, Integer>("K7", 0)); JavaPairRDD<String, Integer> other1 = sc.parallelizePairs(data1); JavaPairRDD<String, Integer> result8 = other1.mapToPair(tuple->new Tuple2<String, Integer>(tuple._1 + "X", tuple._2 + 1)); s(result8.collect()); //[(K1X,124), (K2X,457), (K2X,790), (K7X,1)] } public void proc42() { s("flatMapToPair - value를 쪼개서 key에 각각 맵핑, flatMapValues와 비슷하지만 key를 조작할 수 있다는 것이 차이."); JavaSparkContext sc = MyConf.getJavaSparkContext(); List<Tuple2<String, String>> data3 = new ArrayList<Tuple2<String, String>>(); data3.add(new Tuple2<String, String>("K1", "appleKorea")); data3.add(new Tuple2<String, String>("K2", "japanTokyo")); data3.add(new Tuple2<String, String>("K5", "seoulSeoul")); JavaPairRDD<String, String> pairs3 = sc.parallelizePairs(data3); JavaPairRDD<String, String> result3 = pairs3.flatMapToPair(tuple -> { List<Tuple2<String, String>> list = new ArrayList<Tuple2<String, String>>(); String[] arr = StringUtils.splitByCharacterTypeCamelCase(tuple._2); for(String s : arr){ list.add(new Tuple2<String, String>(tuple._1, tuple._1 + "-" + s)); } return list; }); s(result3.collect()); //[(K1,K1-apple), (K1,K1-Korea), (K2,K2-japan), (K2,K2-Tokyo), (K5,K5-seoul), (K5,K5-Seoul)] } public void proc43() { s("intersection - key,value 모두 비교해서 교집합인 경우만"); JavaSparkContext sc = MyConf.getJavaSparkContext(); List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>(); data0.add(new Tuple2<String, String>("K1", "ABC")); data0.add(new Tuple2<String, String>("K2", "DE")); data0.add(new Tuple2<String, String>("K5", "GHI")); data0.add(new Tuple2<String, String>("K5", "JKL")); JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0); List<Tuple2<String, String>> data1 = new ArrayList<Tuple2<String, String>>(); data1.add(new Tuple2<String, String>("K1", "123")); data1.add(new Tuple2<String, String>("K2", "DE")); data1.add(new Tuple2<String, String>("K2", "GHI")); data1.add(new Tuple2<String, String>("K7", "0")); JavaPairRDD<String, String> other1 = sc.parallelizePairs(data1); JavaPairRDD<String, String> result = pairs.intersection(other1); s(result.collect()); //[(K2,DE)] } public void proc44() { s("subtract - key,value모두 비교해서 잘라내기 - 차집합 A-B"); JavaSparkContext sc = MyConf.getJavaSparkContext(); List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>(); data0.add(new Tuple2<String, String>("K1", "ABC")); data0.add(new Tuple2<String, String>("K2", "DE")); data0.add(new Tuple2<String, String>("K5", "GHI")); data0.add(new Tuple2<String, String>("K5", "JKL")); JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0); List<Tuple2<String, String>> data1 = new ArrayList<Tuple2<String, String>>(); data1.add(new Tuple2<String, String>("K1", "123")); data1.add(new Tuple2<String, String>("K2", "DE")); data1.add(new Tuple2<String, String>("K2", "GHI")); data1.add(new Tuple2<String, String>("K7", "0")); JavaPairRDD<String, String> other1 = sc.parallelizePairs(data1); JavaPairRDD<String, String> result = pairs.subtract(other1); s(result.collect()); //[(K1,ABC), (K5,JKL), (K5,GHI)] //union 도 key, value모두 같아야 한다. } public void proc45(){ s("map()"); JavaSparkContext sc = MyConf.getJavaSparkContext(); List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>(); data0.add(new Tuple2<String, String>("K1", "A-B-C")); data0.add(new Tuple2<String, String>("K2", "D-E")); data0.add(new Tuple2<String, String>("K5", "G-H-I")); data0.add(new Tuple2<String, String>("K5", "JKL")); JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0); JavaRDD<Integer> result = pairs.map(tuple->tuple._1.length()); s(result.collect()); //[2, 2, 2, 2] JavaRDD<List<String>> result2 = pairs.map(tuple->Arrays.asList(tuple._2.split("-"))); s(result2.collect()); //[[A, B, C], [D, E], [G, H, I], [JKL]] } public void proc46(){ //기타 JavaSparkContext sc = MyConf.getJavaSparkContext(); List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>(); data0.add(new Tuple2<String, String>("K1", "A-B-C")); data0.add(new Tuple2<String, String>("K2", "D-E")); data0.add(new Tuple2<String, String>("K5", "G-H-I")); data0.add(new Tuple2<String, String>("K5", "JKL")); data0.add(new Tuple2<String, String>("K5", "JKL")); JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0); pairs.reduce((tuple1, tuple2) -> new Tuple2<String, String>("", "")); pairs.fold(new Tuple2<String, String>("",""), (tuple1, tuple2) -> new Tuple2<String, String>("", "")); pairs.aggregate("", (a,b)->"", (a,b)->""); pairs.keyBy(a -> ""); pairs.flatMap(a->null); pairs.mapPartitions(a->null); pairs.mapPartitionsToDouble(a->null); pairs.mapPartitionsToPair(a->null); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,8,8)); rdd.flatMapToDouble(a->null); pairs.flatMapToDouble(a->null); s(pairs.distinct().collect()); } }