Spark/러닝 스파크
Spark 시작하기12 - [러닝 스파크] 4장 키/값 페어로 작업하기
java개발자
2016. 4. 20. 17:29
package org.mystudy; import static org.MyConf.s; import java.io.Serializable; import java.net.MalformedURLException; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Map; import org.MyConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import com.google.common.base.Optional; import scala.Tuple2; import scala.Tuple3; import scala.Tuple4; public class Ch4_example implements Serializable { private static final long serialVersionUID = 43443432321L; public Ch4_example() { MyConf.setLog4j(); } public static void main(String... strings) { Ch4_example ex = new Ch4_example(); /** * 표4-1 페어 RDD의 트랜스포메이션 * reduceByKey, groupByKey, combineBykey, mapValues, flatMapValues, keys, values, sortByKey, * 표4-2 두 페어 RDD에 대한 트랜스포메이션 * subtractByKey, join, rightOuterJoin, leftOuterJoin, cogroup, * 표4-3 페어 RDD의 액션 * countByKey, collectAsMap, lookup(key) */ ex.proc1(); //mapToPair ex.proc2(); //filter ex.proc3(); //mapValues, reduceByKey ex.proc4(); //flatMap, mapToPair, reduceByKey ex.proc4_1(); //countByValue ex.proc5(); //combineByKey ex.proc6(); //reduceByKey, 병렬화 수준 지정 ex.proc7(); //join ex.proc8(); //sortBy, sortByKey // 책에 없는 예제 ex.proc11(); //groupByKey ex.proc12(); //mapValues, flatMap -> flatMapValues ex.proc13(); //keys ex.proc14(); //values ex.proc15(); //cogroup ex.proc16(); //subtractByKey ex.proc17(); //countByKey, collectAsMap, lookup(key) ex.proc21(); //파티션 추적 /** * 파티셔닝을 해서 도움이 되는 연산들 * 스파크1.0 기준 : cogroup, groupWith, join, leftOuterJoin, rightOuterJoin, groupByKey, reduceByKey, combineByKey, lookup * * 결과 RDD에 파티셔너가 지정되는 모든 연산들 * cogroup, groupWith, join, leftOuterJoin, rightOuterJoin, groupByKey, reduceByKey, combineByKey, partitionBy, sort * , mapValue(부모RDD가 파티셔너를 가진 경우), flatMapValues(부모RDD가 파티셔너를 가진 경우), filter(부모RDD가 파티셔너를 가진 경우) * */ } public void proc1() { JavaSparkContext sc = MyConf.getJavaSparkContext(); s("예제 4-3 자바에서 첫 번째 단어를 키로 사용한 페어 RDD 생성"); JavaRDD<String> lines = sc.textFile("src/main/java/org/mystudy/sample.txt"); //sample.txt 내용 //apple 123 456 //kindle 567 abc //naver daum sk nate //empas yahoo google JavaPairRDD<String, String> pairs = lines.mapToPair(str -> new Tuple2<String, String>(str.split(" ")[0], str)); s(pairs.collect()); //[(apple,apple 123 456), (kindle,kindle 567 abc), (naver,naver daum sk nate), (empas,empas yahoo google)] //java 객체를 PairRDD로 만들기 List<Tuple2<String, String>> list2 = new ArrayList<Tuple2<String, String>>(); list2.add(new Tuple2<String, String>("name", "ysh")); list2.add(new Tuple2<String, String>("age", "30")); list2.add(new Tuple2<String, String>("addr", "seoul")); list2.add(new Tuple2<String, String>("country", "korea")); JavaPairRDD<String, String> pairs2 = sc.parallelizePairs(list2); s(pairs2.getNumPartitions()); //2 <-- 중요!! new JavaSparkContext 생성할때 local[1], local[2]에 따라 기본 파티션 개수가 달라진다...헉!!! s(pairs2.collect()); //[(name,ysh), (age,30), (addr,seoul), (country,korea)] /** * JavaSparkContext sc = new JavaSparkContext("local[1]", "First Spark App") -> 기본 파티션 개수 1개 * JavaSparkContext sc = new JavaSparkContext("local[3]", "First Spark App") -> 기본 파티션 개수 3개 */ JavaPairRDD<String, String> pairs3 = sc.parallelizePairs(list2, 3); //3은 파티션 개수이다. s(pairs3.getNumPartitions()); //3 s(pairs3.collect()); //[(name,ysh), (age,30), (addr,seoul), (country,korea)] } public void proc2() { JavaSparkContext sc = MyConf.getJavaSparkContext(); s("예제 4-6 자바에서 두 번째 요소에 대한 단순 필터 적용"); List<Tuple2<String, String>> list = new ArrayList<Tuple2<String, String>>(); list.add(new Tuple2<String, String>("name", "ysh")); list.add(new Tuple2<String, String>("age", "30000000000000000")); list.add(new Tuple2<String, String>("addr", "seoul")); list.add(new Tuple2<String, String>("country", "korea")); JavaPairRDD<String, String> pairs = sc.parallelizePairs(list); JavaPairRDD<String, String> result = pairs.filter(tuple->tuple._2.length() < 10); s(result.collect()); //[(name,ysh), (addr,seoul), (country,korea)] } public void proc3() { JavaSparkContext sc = MyConf.getJavaSparkContext(); s("예제 4-7, 4-8 mapValues() 와 reduceByKey()로 키별 평균 구하기"); List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>(); list.add(new Tuple2<String, Integer>("panda", 0)); list.add(new Tuple2<String, Integer>("pink", 3)); list.add(new Tuple2<String, Integer>("pirate", 3)); list.add(new Tuple2<String, Integer>("panda", 1)); list.add(new Tuple2<String, Integer>("pink", 4)); JavaPairRDD<String, Integer> pairs = sc.parallelizePairs(list); s(pairs.collect()); //[(panda,0), (pink,3), (pirate,3), (panda,1), (pink,4)] JavaPairRDD<String, Tuple2<Integer, Integer>> pairs2 = pairs.mapValues(value -> new Tuple2<Integer, Integer>(value, 1)); s(pairs2.collect()); //[(panda,(0,1)), (pink,(3,1)), (pirate,(3,1)), (panda,(1,1)), (pink,(4,1))] JavaPairRDD<String, Tuple2<Integer, Integer>> pairs3 = pairs2.reduceByKey((tuple1, tuple2) -> new Tuple2<Integer, Integer>(tuple1._1 + tuple2._1, tuple1._2 + tuple2._2)); // 람다식안에서 매번 Tuple2를 생성해서 보내는 것이 비효율인 것 같아, Tuple2의 필드를 변경시키고 기존 Tuple2를 return할려고 했더니,, Tuple2의 필드가 final이다-_-;;; 필드 수정 불가능 // JavaPairRDD<String, Tuple2<Integer, Integer>> pairs3 = pairs2.reduceByKey((tuple1, tuple2) -> { // tuple1._1 = tuple1._1 + tuple2._1; // tuple1._2 = tuple1._2 + tuple2._2; // return tuple1; // }); s(pairs3.collect()); //[(panda,(1,2)), (pirate,(3,1)), (pink,(7,2))] //(키, (총합, 개수)) JavaPairRDD<String, Double> pairs4 = pairs3.mapValues(tuple -> tuple._1 / (double)tuple._2); s(pairs4.collect()); //[(panda,0.5), (pirate,3.0), (pink,3.5)] //(키, 평균) } public void proc4() { JavaSparkContext sc = MyConf.getJavaSparkContext(); s("예제 4-11 자바에서의 단어 세기"); JavaRDD<String> input = sc.textFile("src/main/java/org/mystudy/sample2.txt"); //apple 123 456 456 //kindle 567 abc abc //naver daum sk nate nate //empas yahoo google google JavaRDD<String> words = input.flatMap(str -> Arrays.asList(str.split(" "))); JavaPairRDD<String, Integer> result = words.mapToPair(str -> new Tuple2<String, Integer>(str, 1)); result = result.reduceByKey((a,b) -> a+b); s(result.collect()); //[(567,1), (nate,2), (yahoo,1), (sk,1), (empas,1), (naver,1), (apple,1), (123,1), (abc,2), (456,2), (kindle,1), (google,2), (daum,1)] } public void proc4_1() { // 더 빠른 방법!! JavaSparkContext sc = MyConf.getJavaSparkContext(); s("예제 4-11 자바에서의 단어 세기"); JavaRDD<String> input = sc.textFile("src/main/java/org/mystudy/sample2.txt"); JavaRDD<String> words = input.flatMap(str -> Arrays.asList(str.split(" "))); Map<String, Long> result = words.countByValue(); s(result); //{abc=2, 567=1, empas=1, 123=1, google=2, naver=1, kindle=1, 456=2, sk=1, nate=2, apple=1, yahoo=1, daum=1} } public void proc5() { JavaSparkContext sc = MyConf.getJavaSparkContext(); s("예제 4-14 자바에서 combineByKey() 를 사용한 키별 평균"); List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>(); list.add(new Tuple2<String, Integer>("panda", 0)); list.add(new Tuple2<String, Integer>("pink", 3)); list.add(new Tuple2<String, Integer>("pirate", 3)); list.add(new Tuple2<String, Integer>("panda", 1)); list.add(new Tuple2<String, Integer>("pink", 4)); JavaPairRDD<String, Integer> pairs = sc.parallelizePairs(list); /** * 책에서는 AvgCount 클래스를 사용했지만, 2개의 인자를 사용하므로 굳이... 클래스를 새로 생성할 필요는 없을 것 같다. * 그냥. Tuple2를 사용해도 될 것 같다. * 찾아보니 scala.Tuple1~Tuple22 까지 지원해주는 듯. */ // Tuple2 를 사용할때, 제너릭을 명시하지 않으면, Integer + Integer을 의도했지만, Object + Integer 라고 인식하므로 컴파일 에러가 난다. 이래서 명시적 표현이 불필요한 scala를 쓰는건가;;; // JavaPairRDD<String, Tuple2> result2 = pairs.combineByKey( // num -> new Tuple2(num, 1) // , (tuple0, num) -> new Tuple2(tuple0._1() + num, tuple0._2 + 1) // , (tuple1, tuple2) -> new Tuple2(tuple1._1 + tuple2._1, tuple1._2 + tuple2._2) // ); JavaPairRDD<String, Tuple2<Integer, Integer>> result = pairs.combineByKey( num -> new Tuple2<Integer, Integer>(num, 1) , (tuple0, num) -> new Tuple2<Integer, Integer>(tuple0._1() + num, tuple0._2 + 1) , (tuple1, tuple2) -> new Tuple2<Integer, Integer>(tuple1._1 + tuple2._1, tuple1._2 + tuple2._2) ); s(result.collect()); //[(panda,(1,2)), (pink,(7,2)), (pirate,(3,1))] JavaPairRDD<String, Double> result2 = result.mapValues(tuple -> tuple._1 / (double)tuple._2); s(result2.collect()); //[(panda,0.5), (pink,3.5), (pirate,3.0)] } public void proc6() { JavaSparkContext sc = MyConf.getJavaSparkContext(); s("예제 4-15,16 병렬화 직접 지정을 사용한 reduceByKey()"); List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>(); list.add(new Tuple2<String, Integer>("a", 3)); list.add(new Tuple2<String, Integer>("b", 4)); list.add(new Tuple2<String, Integer>("a", 1)); JavaPairRDD<String, Integer> pairs = sc.parallelizePairs(list); JavaPairRDD<String, Integer> result1 = pairs.reduceByKey((a,b) -> a+b); //기본 병렬화 수준 사용 JavaPairRDD<String, Integer> result2 = pairs.reduceByKey((a,b) -> a+b, 10); //병렬화 수준 지정 s(result2.getNumPartitions()); //10 result2 = result2.coalesce(7); s(result2.getNumPartitions()); //7 result2 = result2.coalesce(4, false); //boolean shuffle s(result2.getNumPartitions()); //4 } public void proc7() { JavaSparkContext sc = MyConf.getJavaSparkContext(); s("예제 4-17 내부 조인"); List<Tuple2<String, String>> data1 = new ArrayList<Tuple2<String, String>>(); data1.add(new Tuple2<String, String>("K1", "A")); data1.add(new Tuple2<String, String>("K2", "B")); data1.add(new Tuple2<String, String>("K1", "C")); data1.add(new Tuple2<String, String>("K3", "D")); JavaPairRDD<String, String> pairs1 = sc.parallelizePairs(data1); List<Tuple2<String, Double>> data2 = new ArrayList<Tuple2<String, Double>>(); data2.add(new Tuple2<String, Double>("K1", 4.9)); data2.add(new Tuple2<String, Double>("K2", 4.8)); data2.add(new Tuple2<String, Double>("K4", 4.8)); JavaPairRDD<String, Double> pairs2 = sc.parallelizePairs(data2); JavaPairRDD<String, Tuple2<String, Double>> result1 = pairs1.join(pairs2); s(result1.collect()); //[(K1,(A,4.9)), (K1,(C,4.9)), (K2,(B,4.8))] s("예제 4-18 leftOuterJoin()과 rightOuterJoin()"); JavaPairRDD<String, Tuple2<String, Optional<Double>>> result2 = pairs1.leftOuterJoin(pairs2); s(result2.collect()); //[(K1,(A,Optional.of(4.9))), (K1,(C,Optional.of(4.9))), (K3,(D,Optional.absent())), (K2,(B,Optional.of(4.8)))] JavaPairRDD<String, Tuple2<Optional<String>, Double>> result3 = pairs1.rightOuterJoin(pairs2); s(result3.collect()); //[(K1,(Optional.of(A),4.9)), (K1,(Optional.of(C),4.9)), (K4,(Optional.absent(),4.8)), (K2,(Optional.of(B),4.8))] } //Serializable 안해주면 다음의 Exception 발생한다: Task not serializable: org.mystudy.Ch4_example$MyComparator2 class MyComparator2 implements Comparator<Integer>, Serializable { private static final long serialVersionUID = 1L; @Override public int compare(Integer o1, Integer o2) { return String.valueOf(o1).compareTo(String.valueOf(o2)); } } public void proc8() { JavaSparkContext sc = MyConf.getJavaSparkContext(); s("예제 4-21 자바에서의 사용자 지정 정렬, 정수를 문자열인 것처럼 정렬하기"); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(3,4,1,2)); rdd = rdd.sortBy(a -> String.valueOf(a), false, 3); s(rdd.collect()); //[4, 3, 2, 1] List<Tuple2<Integer, Integer>> data1 = new ArrayList<Tuple2<Integer, Integer>>(); data1.add(new Tuple2<Integer, Integer>(5, 2)); data1.add(new Tuple2<Integer, Integer>(4, 2)); data1.add(new Tuple2<Integer, Integer>(8, 2)); data1.add(new Tuple2<Integer, Integer>(11, 2)); JavaPairRDD<Integer, Integer> pairs1 = sc.parallelizePairs(data1); s(pairs1.sortByKey().collect()); //[(4,2), (5,2), (8,2), (11,2)] s(pairs1.sortByKey(false).collect()); //[(11,2), (8,2), (5,2), (4,2)] s(pairs1.sortByKey(new MyComparator2()).collect()); //[(11,2), (4,2), (5,2), (8,2)] } class MyPartitioner extends org.apache.spark.Partitioner{ private static final long serialVersionUID = 1L; private int numParts; public MyPartitioner(int numParts) { this.numParts = numParts; } @Override public int getPartition(Object arg0) { int code = 0; try { String domain = new java.net.URL(arg0.toString()).getHost(); code = domain.hashCode() % this.numPartitions(); if(code < 0){ code += this.numPartitions(); } } catch (MalformedURLException e) { e.printStackTrace(); } return code; } @Override public int numPartitions() { return this.numParts; } } public void proc11() { JavaSparkContext sc = MyConf.getJavaSparkContext(); s("groupByKey() - 동일한 key가 존재하면, value를 같은 key로 묶는다."); List<Tuple2<String, String>> data1 = new ArrayList<Tuple2<String, String>>(); data1.add(new Tuple2<String, String>("K1", "A")); data1.add(new Tuple2<String, String>("K2", "B")); data1.add(new Tuple2<String, String>("K1", "C")); data1.add(new Tuple2<String, String>("K3", "D")); JavaPairRDD<String, String> pairs1 = sc.parallelizePairs(data1); JavaPairRDD<String, Iterable<String>> result1 = pairs1.groupByKey(); s(result1.collect()); //[(K1,[A, C]), (K3,[D]), (K2,[B])] JavaPairRDD<String, Iterable<String>> result2 = pairs1.groupByKey(3); //numPartitions JavaPairRDD<String, Iterable<String>> result3 = pairs1.groupByKey(new org.apache.spark.HashPartitioner(4)); //Partitioner s("예제4-26 사용자 지정 파티셔너"); JavaPairRDD<String, Iterable<String>> result4 = pairs1.groupByKey(new MyPartitioner(3)); } public void proc12() { JavaSparkContext sc = MyConf.getJavaSparkContext(); s("flatMapValues를 하기 전에, mapValues, flatMap 을 알아야 한다."); s("1.mapValues() - key는 변함없고, value를 수정할 수 있다."); List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>(); list.add(new Tuple2<String, Integer>("panda", 0)); list.add(new Tuple2<String, Integer>("pink", 3)); list.add(new Tuple2<String, Integer>("pirate", 3)); list.add(new Tuple2<String, Integer>("panda", 1)); list.add(new Tuple2<String, Integer>("pink", 4)); JavaPairRDD<String, Integer> pairs = sc.parallelizePairs(list); s(pairs.collect()); //[(panda,0), (pink,3), (pirate,3), (panda,1), (pink,4)] JavaPairRDD<String, Tuple2<Integer, Integer>> pairs2 = pairs.mapValues(value -> new Tuple2<Integer, Integer>(value, 1)); s(pairs2.collect()); //[(panda,(0,1)), (pink,(3,1)), (pirate,(3,1)), (panda,(1,1)), (pink,(4,1))] s("2.flatMap() - 개별 항목을 List로 수정할 수 있고(그냥 개별로 수정할 수도 있다.), List로 된 항목을 같은 레벨로 펼쳐놓는다는 느낌"); JavaRDD<String> lines31 = sc.parallelize(Arrays.asList("first line : hello world", "second line : hi")); JavaRDD<String> words = lines31.flatMap(line -> Arrays.asList(line.split(" "))); s(words.collect()); //[first, line, :, hello, world, second, line, :, hi] s("3.flatMapValues() - 개별 value값을 List(Iterable) 형태로 수정해야 한다. List 형태로 수정된 value를 key에 따로따로 분리독립 시킴. 이것이 카테시안 곱인가?"); List<Tuple2<String, String>> data1 = new ArrayList<Tuple2<String, String>>(); data1.add(new Tuple2<String, String>("K1", "ABC")); data1.add(new Tuple2<String, String>("K2", "DE")); data1.add(new Tuple2<String, String>("K1", "F")); data1.add(new Tuple2<String, String>("K3", "GHI")); JavaPairRDD<String, String> pairs1 = sc.parallelizePairs(data1); JavaPairRDD<String, String> result1 = pairs1.flatMapValues(str -> { List<String> temp = new ArrayList<String>(); for(int i = 0 ; i<str.length() ; i++){ temp.add(String.valueOf(str.charAt(i))); } return temp; }); s(result1.collect()); //[(K1,A), (K1,B), (K1,C), (K2,D), (K2,E), (K1,F), (K3,G), (K3,H), (K3,I)] } public void proc13() { JavaSparkContext sc = MyConf.getJavaSparkContext(); s("keys()"); List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>(); list.add(new Tuple2<String, Integer>("panda", 0)); list.add(new Tuple2<String, Integer>("pink", 3)); list.add(new Tuple2<String, Integer>("pirate", 3)); list.add(new Tuple2<String, Integer>("panda", 1)); list.add(new Tuple2<String, Integer>("pink", 4)); JavaPairRDD<String, Integer> pairs = sc.parallelizePairs(list); JavaRDD<String> result = pairs.keys(); s(result.collect()); //[panda, pink, pirate, panda, pink] } public void proc14() { JavaSparkContext sc = MyConf.getJavaSparkContext(); s("values()"); List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>(); list.add(new Tuple2<String, Integer>("panda", 0)); list.add(new Tuple2<String, Integer>("pink", 3)); list.add(new Tuple2<String, Integer>("pirate", 3)); list.add(new Tuple2<String, Integer>("panda", 1)); list.add(new Tuple2<String, Integer>("pink", 4)); JavaPairRDD<String, Integer> pairs = sc.parallelizePairs(list); JavaRDD<Integer> result = pairs.values(); s(result.collect()); //[0, 3, 3, 1, 4] } public void proc15() { JavaSparkContext sc = MyConf.getJavaSparkContext(); s("cogroup() - 두개이상의 Pair를 이용, key는 데이터 타입이 같아야 하고, value는 달라도 된다. 최대 4개의 Pair를 그룹화 가능"); List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>(); data0.add(new Tuple2<String, String>("K1", "ABC")); data0.add(new Tuple2<String, String>("K2", "DE")); data0.add(new Tuple2<String, String>("K5", "GHI")); data0.add(new Tuple2<String, String>("K5", "JKL")); JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0); List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>(); data1.add(new Tuple2<String, Integer>("K1", 123)); data1.add(new Tuple2<String, Integer>("K2", 456)); data1.add(new Tuple2<String, Integer>("K7", 0)); JavaPairRDD<String, Integer> other1 = sc.parallelizePairs(data1); JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<Integer>>> result1 = pairs.cogroup(other1); s(result1.collect()); //[(K7,([],[0])), (K1,([ABC],[123])), (K5,([GHI, JKL],[])), (K2,([DE],[456]))] List<Tuple2<String, Double>> data2 = new ArrayList<Tuple2<String, Double>>(); data2.add(new Tuple2<String, Double>("K1", 777.1)); data2.add(new Tuple2<String, Double>("K2", 999.1)); data2.add(new Tuple2<String, Double>("K8", 10.1)); JavaPairRDD<String, Double> other2 = sc.parallelizePairs(data2); JavaPairRDD<String, Tuple3<Iterable<String>, Iterable<Integer>, Iterable<Double>>> result2 = pairs.cogroup(other1, other2); s(result2.collect()); //[(K7,([],[0],[])), (K1,([ABC],[123],[777.1])), (K5,([GHI, JKL],[],[])), (K2,([DE],[456],[999.1])), (K8,([],[],[10.1]))] List<Tuple2<String, Float>> data3 = new ArrayList<Tuple2<String, Float>>(); data3.add(new Tuple2<String, Float>("K1", 1.1f)); data3.add(new Tuple2<String, Float>("K2", 2.1f)); data3.add(new Tuple2<String, Float>("K8", 3.1f)); JavaPairRDD<String, Float> other3 = sc.parallelizePairs(data3); JavaPairRDD<String, Tuple4<Iterable<String>, Iterable<Integer>, Iterable<Double>, Iterable<Float>>> result3 = pairs.cogroup(other1, other2, other3); s(result3.collect()); //[(K7,([],[0],[],[])), (K1,([ABC],[123],[777.1],[1.1])), (K5,([GHI, JKL],[],[],[])), (K2,([DE],[456],[999.1],[2.1])), (K8,([],[],[10.1],[3.1]))] } public void proc16() { JavaSparkContext sc = MyConf.getJavaSparkContext(); s("subtractByKey() - 상대편 Pair의 key로 잘라내기"); List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>(); data0.add(new Tuple2<String, String>("K1", "ABC")); data0.add(new Tuple2<String, String>("K2", "DE")); data0.add(new Tuple2<String, String>("K5", "GHI")); data0.add(new Tuple2<String, String>("K5", "JKL")); JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0); List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>(); data1.add(new Tuple2<String, Integer>("K1", 123)); data1.add(new Tuple2<String, Integer>("K2", 456)); data1.add(new Tuple2<String, Integer>("K7", 0)); JavaPairRDD<String, Integer> other1 = sc.parallelizePairs(data1); JavaPairRDD<String, String> result = pairs.subtractByKey(other1); s(result.collect()); //[(K5,GHI), (K5,JKL)] //반대로 JavaPairRDD<String, Integer> result2 = other1.subtractByKey(pairs); s(result2.collect()); //[(K7,0)] } public void proc17(){ JavaSparkContext sc = MyConf.getJavaSparkContext(); s("countByKey, collectAsMap, lookup(key)"); List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>(); data0.add(new Tuple2<String, String>("K1", "ABC")); data0.add(new Tuple2<String, String>("K2", "DE")); data0.add(new Tuple2<String, String>("K5", "GHI")); data0.add(new Tuple2<String, String>("K5", "JKL")); JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0); Map<String, Object> result0 = pairs.countByKey(); s(result0); //{K1=1, K5=2, K2=1} Map<String, String> result1 = pairs.collectAsMap(); s(result1); //{K1=ABC, K2=DE, K5=JKL} List<String> result2 = pairs.lookup("K1"); s(result2); //[ABC] List<String> result3 = pairs.lookup("K5"); s(result3); //[GHI, JKL] } public void proc21(){ JavaSparkContext sc = MyConf.getJavaSparkContext(); s("데이터는 RDD에 내부적으로 어떻게 파티션되어서 나눠질까??? 확인해볼 수 있을까?"); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9)); s(rdd.getNumPartitions()); //2 rdd.foreachPartition(iter -> { while(iter.hasNext()){ s(iter.hashCode() + ", " + iter.next()); } }); // 파티션2개 // 271428025, 1 // 271428025, 2 // 271428025, 3 // 271428025, 4 // -788345623, 5 // -788345623, 6 // -788345623, 7 // -788345623, 8 // -788345623, 9 rdd = rdd.repartition(4); rdd.foreachPartition(iter -> { while(iter.hasNext()){ s(iter.hashCode() + ", " + iter.next()); } }); // 파티션4개 // -1025106490, 3 // 287691785, 2 // -1025106490, 7 // 287691785, 6 // 1847534542, 1 // 951314831, 4 // 1847534542, 5 // 951314831, 8 // 1847534542, 9 } }