package org.mystudy;
import static org.MyConf.s;
import java.io.Serializable;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import org.MyConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import com.google.common.base.Optional;
import scala.Tuple2;
import scala.Tuple3;
import scala.Tuple4;
public class Ch4_example implements Serializable {
private static final long serialVersionUID = 43443432321L;
public Ch4_example() {
MyConf.setLog4j();
}
public static void main(String... strings) {
Ch4_example ex = new Ch4_example();
/**
* 표4-1 페어 RDD의 트랜스포메이션
* reduceByKey, groupByKey, combineBykey, mapValues, flatMapValues, keys, values, sortByKey,
* 표4-2 두 페어 RDD에 대한 트랜스포메이션
* subtractByKey, join, rightOuterJoin, leftOuterJoin, cogroup,
* 표4-3 페어 RDD의 액션
* countByKey, collectAsMap, lookup(key)
*/
ex.proc1(); //mapToPair
ex.proc2(); //filter
ex.proc3(); //mapValues, reduceByKey
ex.proc4(); //flatMap, mapToPair, reduceByKey
ex.proc4_1(); //countByValue
ex.proc5(); //combineByKey
ex.proc6(); //reduceByKey, 병렬화 수준 지정
ex.proc7(); //join
ex.proc8(); //sortBy, sortByKey
// 책에 없는 예제
ex.proc11(); //groupByKey
ex.proc12(); //mapValues, flatMap -> flatMapValues
ex.proc13(); //keys
ex.proc14(); //values
ex.proc15(); //cogroup
ex.proc16(); //subtractByKey
ex.proc17(); //countByKey, collectAsMap, lookup(key)
ex.proc21(); //파티션 추적
/**
* 파티셔닝을 해서 도움이 되는 연산들
* 스파크1.0 기준 : cogroup, groupWith, join, leftOuterJoin, rightOuterJoin, groupByKey, reduceByKey, combineByKey, lookup
*
* 결과 RDD에 파티셔너가 지정되는 모든 연산들
* cogroup, groupWith, join, leftOuterJoin, rightOuterJoin, groupByKey, reduceByKey, combineByKey, partitionBy, sort
* , mapValue(부모RDD가 파티셔너를 가진 경우), flatMapValues(부모RDD가 파티셔너를 가진 경우), filter(부모RDD가 파티셔너를 가진 경우)
*
*/
}
public void proc1() {
JavaSparkContext sc = MyConf.getJavaSparkContext();
s("예제 4-3 자바에서 첫 번째 단어를 키로 사용한 페어 RDD 생성");
JavaRDD<String> lines = sc.textFile("src/main/java/org/mystudy/sample.txt");
//sample.txt 내용
//apple 123 456
//kindle 567 abc
//naver daum sk nate
//empas yahoo google
JavaPairRDD<String, String> pairs = lines.mapToPair(str -> new Tuple2<String, String>(str.split(" ")[0], str));
s(pairs.collect()); //[(apple,apple 123 456), (kindle,kindle 567 abc), (naver,naver daum sk nate), (empas,empas yahoo google)]
//java 객체를 PairRDD로 만들기
List<Tuple2<String, String>> list2 = new ArrayList<Tuple2<String, String>>();
list2.add(new Tuple2<String, String>("name", "ysh"));
list2.add(new Tuple2<String, String>("age", "30"));
list2.add(new Tuple2<String, String>("addr", "seoul"));
list2.add(new Tuple2<String, String>("country", "korea"));
JavaPairRDD<String, String> pairs2 = sc.parallelizePairs(list2);
s(pairs2.getNumPartitions()); //2 <-- 중요!! new JavaSparkContext 생성할때 local[1], local[2]에 따라 기본 파티션 개수가 달라진다...헉!!!
s(pairs2.collect()); //[(name,ysh), (age,30), (addr,seoul), (country,korea)]
/**
* JavaSparkContext sc = new JavaSparkContext("local[1]", "First Spark App") -> 기본 파티션 개수 1개
* JavaSparkContext sc = new JavaSparkContext("local[3]", "First Spark App") -> 기본 파티션 개수 3개
*/
JavaPairRDD<String, String> pairs3 = sc.parallelizePairs(list2, 3); //3은 파티션 개수이다.
s(pairs3.getNumPartitions()); //3
s(pairs3.collect()); //[(name,ysh), (age,30), (addr,seoul), (country,korea)]
}
public void proc2() {
JavaSparkContext sc = MyConf.getJavaSparkContext();
s("예제 4-6 자바에서 두 번째 요소에 대한 단순 필터 적용");
List<Tuple2<String, String>> list = new ArrayList<Tuple2<String, String>>();
list.add(new Tuple2<String, String>("name", "ysh"));
list.add(new Tuple2<String, String>("age", "30000000000000000"));
list.add(new Tuple2<String, String>("addr", "seoul"));
list.add(new Tuple2<String, String>("country", "korea"));
JavaPairRDD<String, String> pairs = sc.parallelizePairs(list);
JavaPairRDD<String, String> result = pairs.filter(tuple->tuple._2.length() < 10);
s(result.collect()); //[(name,ysh), (addr,seoul), (country,korea)]
}
public void proc3() {
JavaSparkContext sc = MyConf.getJavaSparkContext();
s("예제 4-7, 4-8 mapValues() 와 reduceByKey()로 키별 평균 구하기");
List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
list.add(new Tuple2<String, Integer>("panda", 0));
list.add(new Tuple2<String, Integer>("pink", 3));
list.add(new Tuple2<String, Integer>("pirate", 3));
list.add(new Tuple2<String, Integer>("panda", 1));
list.add(new Tuple2<String, Integer>("pink", 4));
JavaPairRDD<String, Integer> pairs = sc.parallelizePairs(list);
s(pairs.collect()); //[(panda,0), (pink,3), (pirate,3), (panda,1), (pink,4)]
JavaPairRDD<String, Tuple2<Integer, Integer>> pairs2 = pairs.mapValues(value -> new Tuple2<Integer, Integer>(value, 1));
s(pairs2.collect()); //[(panda,(0,1)), (pink,(3,1)), (pirate,(3,1)), (panda,(1,1)), (pink,(4,1))]
JavaPairRDD<String, Tuple2<Integer, Integer>> pairs3 = pairs2.reduceByKey((tuple1, tuple2) -> new Tuple2<Integer, Integer>(tuple1._1 + tuple2._1, tuple1._2 + tuple2._2));
// 람다식안에서 매번 Tuple2를 생성해서 보내는 것이 비효율인 것 같아, Tuple2의 필드를 변경시키고 기존 Tuple2를 return할려고 했더니,, Tuple2의 필드가 final이다-_-;;; 필드 수정 불가능
// JavaPairRDD<String, Tuple2<Integer, Integer>> pairs3 = pairs2.reduceByKey((tuple1, tuple2) -> {
// tuple1._1 = tuple1._1 + tuple2._1;
// tuple1._2 = tuple1._2 + tuple2._2;
// return tuple1;
// });
s(pairs3.collect()); //[(panda,(1,2)), (pirate,(3,1)), (pink,(7,2))] //(키, (총합, 개수))
JavaPairRDD<String, Double> pairs4 = pairs3.mapValues(tuple -> tuple._1 / (double)tuple._2);
s(pairs4.collect()); //[(panda,0.5), (pirate,3.0), (pink,3.5)] //(키, 평균)
}
public void proc4() {
JavaSparkContext sc = MyConf.getJavaSparkContext();
s("예제 4-11 자바에서의 단어 세기");
JavaRDD<String> input = sc.textFile("src/main/java/org/mystudy/sample2.txt");
//apple 123 456 456
//kindle 567 abc abc
//naver daum sk nate nate
//empas yahoo google google
JavaRDD<String> words = input.flatMap(str -> Arrays.asList(str.split(" ")));
JavaPairRDD<String, Integer> result = words.mapToPair(str -> new Tuple2<String, Integer>(str, 1));
result = result.reduceByKey((a,b) -> a+b);
s(result.collect()); //[(567,1), (nate,2), (yahoo,1), (sk,1), (empas,1), (naver,1), (apple,1), (123,1), (abc,2), (456,2), (kindle,1), (google,2), (daum,1)]
}
public void proc4_1() {
// 더 빠른 방법!!
JavaSparkContext sc = MyConf.getJavaSparkContext();
s("예제 4-11 자바에서의 단어 세기");
JavaRDD<String> input = sc.textFile("src/main/java/org/mystudy/sample2.txt");
JavaRDD<String> words = input.flatMap(str -> Arrays.asList(str.split(" ")));
Map<String, Long> result = words.countByValue();
s(result); //{abc=2, 567=1, empas=1, 123=1, google=2, naver=1, kindle=1, 456=2, sk=1, nate=2, apple=1, yahoo=1, daum=1}
}
public void proc5() {
JavaSparkContext sc = MyConf.getJavaSparkContext();
s("예제 4-14 자바에서 combineByKey() 를 사용한 키별 평균");
List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
list.add(new Tuple2<String, Integer>("panda", 0));
list.add(new Tuple2<String, Integer>("pink", 3));
list.add(new Tuple2<String, Integer>("pirate", 3));
list.add(new Tuple2<String, Integer>("panda", 1));
list.add(new Tuple2<String, Integer>("pink", 4));
JavaPairRDD<String, Integer> pairs = sc.parallelizePairs(list);
/**
* 책에서는 AvgCount 클래스를 사용했지만, 2개의 인자를 사용하므로 굳이... 클래스를 새로 생성할 필요는 없을 것 같다.
* 그냥. Tuple2를 사용해도 될 것 같다.
* 찾아보니 scala.Tuple1~Tuple22 까지 지원해주는 듯.
*/
// Tuple2 를 사용할때, 제너릭을 명시하지 않으면, Integer + Integer을 의도했지만, Object + Integer 라고 인식하므로 컴파일 에러가 난다. 이래서 명시적 표현이 불필요한 scala를 쓰는건가;;;
// JavaPairRDD<String, Tuple2> result2 = pairs.combineByKey(
// num -> new Tuple2(num, 1)
// , (tuple0, num) -> new Tuple2(tuple0._1() + num, tuple0._2 + 1)
// , (tuple1, tuple2) -> new Tuple2(tuple1._1 + tuple2._1, tuple1._2 + tuple2._2)
// );
JavaPairRDD<String, Tuple2<Integer, Integer>> result = pairs.combineByKey(
num -> new Tuple2<Integer, Integer>(num, 1)
, (tuple0, num) -> new Tuple2<Integer, Integer>(tuple0._1() + num, tuple0._2 + 1)
, (tuple1, tuple2) -> new Tuple2<Integer, Integer>(tuple1._1 + tuple2._1, tuple1._2 + tuple2._2)
);
s(result.collect()); //[(panda,(1,2)), (pink,(7,2)), (pirate,(3,1))]
JavaPairRDD<String, Double> result2 = result.mapValues(tuple -> tuple._1 / (double)tuple._2);
s(result2.collect()); //[(panda,0.5), (pink,3.5), (pirate,3.0)]
}
public void proc6() {
JavaSparkContext sc = MyConf.getJavaSparkContext();
s("예제 4-15,16 병렬화 직접 지정을 사용한 reduceByKey()");
List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
list.add(new Tuple2<String, Integer>("a", 3));
list.add(new Tuple2<String, Integer>("b", 4));
list.add(new Tuple2<String, Integer>("a", 1));
JavaPairRDD<String, Integer> pairs = sc.parallelizePairs(list);
JavaPairRDD<String, Integer> result1 = pairs.reduceByKey((a,b) -> a+b); //기본 병렬화 수준 사용
JavaPairRDD<String, Integer> result2 = pairs.reduceByKey((a,b) -> a+b, 10); //병렬화 수준 지정
s(result2.getNumPartitions()); //10
result2 = result2.coalesce(7);
s(result2.getNumPartitions()); //7
result2 = result2.coalesce(4, false); //boolean shuffle
s(result2.getNumPartitions()); //4
}
public void proc7() {
JavaSparkContext sc = MyConf.getJavaSparkContext();
s("예제 4-17 내부 조인");
List<Tuple2<String, String>> data1 = new ArrayList<Tuple2<String, String>>();
data1.add(new Tuple2<String, String>("K1", "A"));
data1.add(new Tuple2<String, String>("K2", "B"));
data1.add(new Tuple2<String, String>("K1", "C"));
data1.add(new Tuple2<String, String>("K3", "D"));
JavaPairRDD<String, String> pairs1 = sc.parallelizePairs(data1);
List<Tuple2<String, Double>> data2 = new ArrayList<Tuple2<String, Double>>();
data2.add(new Tuple2<String, Double>("K1", 4.9));
data2.add(new Tuple2<String, Double>("K2", 4.8));
data2.add(new Tuple2<String, Double>("K4", 4.8));
JavaPairRDD<String, Double> pairs2 = sc.parallelizePairs(data2);
JavaPairRDD<String, Tuple2<String, Double>> result1 = pairs1.join(pairs2);
s(result1.collect()); //[(K1,(A,4.9)), (K1,(C,4.9)), (K2,(B,4.8))]
s("예제 4-18 leftOuterJoin()과 rightOuterJoin()");
JavaPairRDD<String, Tuple2<String, Optional<Double>>> result2 = pairs1.leftOuterJoin(pairs2);
s(result2.collect()); //[(K1,(A,Optional.of(4.9))), (K1,(C,Optional.of(4.9))), (K3,(D,Optional.absent())), (K2,(B,Optional.of(4.8)))]
JavaPairRDD<String, Tuple2<Optional<String>, Double>> result3 = pairs1.rightOuterJoin(pairs2);
s(result3.collect()); //[(K1,(Optional.of(A),4.9)), (K1,(Optional.of(C),4.9)), (K4,(Optional.absent(),4.8)), (K2,(Optional.of(B),4.8))]
}
//Serializable 안해주면 다음의 Exception 발생한다: Task not serializable: org.mystudy.Ch4_example$MyComparator2
class MyComparator2 implements Comparator<Integer>, Serializable {
private static final long serialVersionUID = 1L;
@Override
public int compare(Integer o1, Integer o2) {
return String.valueOf(o1).compareTo(String.valueOf(o2));
}
}
public void proc8() {
JavaSparkContext sc = MyConf.getJavaSparkContext();
s("예제 4-21 자바에서의 사용자 지정 정렬, 정수를 문자열인 것처럼 정렬하기");
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(3,4,1,2));
rdd = rdd.sortBy(a -> String.valueOf(a), false, 3);
s(rdd.collect()); //[4, 3, 2, 1]
List<Tuple2<Integer, Integer>> data1 = new ArrayList<Tuple2<Integer, Integer>>();
data1.add(new Tuple2<Integer, Integer>(5, 2));
data1.add(new Tuple2<Integer, Integer>(4, 2));
data1.add(new Tuple2<Integer, Integer>(8, 2));
data1.add(new Tuple2<Integer, Integer>(11, 2));
JavaPairRDD<Integer, Integer> pairs1 = sc.parallelizePairs(data1);
s(pairs1.sortByKey().collect()); //[(4,2), (5,2), (8,2), (11,2)]
s(pairs1.sortByKey(false).collect()); //[(11,2), (8,2), (5,2), (4,2)]
s(pairs1.sortByKey(new MyComparator2()).collect()); //[(11,2), (4,2), (5,2), (8,2)]
}
class MyPartitioner extends org.apache.spark.Partitioner{
private static final long serialVersionUID = 1L;
private int numParts;
public MyPartitioner(int numParts) {
this.numParts = numParts;
}
@Override
public int getPartition(Object arg0) {
int code = 0;
try {
String domain = new java.net.URL(arg0.toString()).getHost();
code = domain.hashCode() % this.numPartitions();
if(code < 0){
code += this.numPartitions();
}
} catch (MalformedURLException e) {
e.printStackTrace();
}
return code;
}
@Override
public int numPartitions() {
return this.numParts;
}
}
public void proc11() {
JavaSparkContext sc = MyConf.getJavaSparkContext();
s("groupByKey() - 동일한 key가 존재하면, value를 같은 key로 묶는다.");
List<Tuple2<String, String>> data1 = new ArrayList<Tuple2<String, String>>();
data1.add(new Tuple2<String, String>("K1", "A"));
data1.add(new Tuple2<String, String>("K2", "B"));
data1.add(new Tuple2<String, String>("K1", "C"));
data1.add(new Tuple2<String, String>("K3", "D"));
JavaPairRDD<String, String> pairs1 = sc.parallelizePairs(data1);
JavaPairRDD<String, Iterable<String>> result1 = pairs1.groupByKey();
s(result1.collect()); //[(K1,[A, C]), (K3,[D]), (K2,[B])]
JavaPairRDD<String, Iterable<String>> result2 = pairs1.groupByKey(3); //numPartitions
JavaPairRDD<String, Iterable<String>> result3 = pairs1.groupByKey(new org.apache.spark.HashPartitioner(4)); //Partitioner
s("예제4-26 사용자 지정 파티셔너");
JavaPairRDD<String, Iterable<String>> result4 = pairs1.groupByKey(new MyPartitioner(3));
}
public void proc12() {
JavaSparkContext sc = MyConf.getJavaSparkContext();
s("flatMapValues를 하기 전에, mapValues, flatMap 을 알아야 한다.");
s("1.mapValues() - key는 변함없고, value를 수정할 수 있다.");
List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
list.add(new Tuple2<String, Integer>("panda", 0));
list.add(new Tuple2<String, Integer>("pink", 3));
list.add(new Tuple2<String, Integer>("pirate", 3));
list.add(new Tuple2<String, Integer>("panda", 1));
list.add(new Tuple2<String, Integer>("pink", 4));
JavaPairRDD<String, Integer> pairs = sc.parallelizePairs(list);
s(pairs.collect()); //[(panda,0), (pink,3), (pirate,3), (panda,1), (pink,4)]
JavaPairRDD<String, Tuple2<Integer, Integer>> pairs2 = pairs.mapValues(value -> new Tuple2<Integer, Integer>(value, 1));
s(pairs2.collect()); //[(panda,(0,1)), (pink,(3,1)), (pirate,(3,1)), (panda,(1,1)), (pink,(4,1))]
s("2.flatMap() - 개별 항목을 List로 수정할 수 있고(그냥 개별로 수정할 수도 있다.), List로 된 항목을 같은 레벨로 펼쳐놓는다는 느낌");
JavaRDD<String> lines31 = sc.parallelize(Arrays.asList("first line : hello world", "second line : hi"));
JavaRDD<String> words = lines31.flatMap(line -> Arrays.asList(line.split(" ")));
s(words.collect()); //[first, line, :, hello, world, second, line, :, hi]
s("3.flatMapValues() - 개별 value값을 List(Iterable) 형태로 수정해야 한다. List 형태로 수정된 value를 key에 따로따로 분리독립 시킴. 이것이 카테시안 곱인가?");
List<Tuple2<String, String>> data1 = new ArrayList<Tuple2<String, String>>();
data1.add(new Tuple2<String, String>("K1", "ABC"));
data1.add(new Tuple2<String, String>("K2", "DE"));
data1.add(new Tuple2<String, String>("K1", "F"));
data1.add(new Tuple2<String, String>("K3", "GHI"));
JavaPairRDD<String, String> pairs1 = sc.parallelizePairs(data1);
JavaPairRDD<String, String> result1 = pairs1.flatMapValues(str -> {
List<String> temp = new ArrayList<String>();
for(int i = 0 ; i<str.length() ; i++){
temp.add(String.valueOf(str.charAt(i)));
}
return temp;
});
s(result1.collect()); //[(K1,A), (K1,B), (K1,C), (K2,D), (K2,E), (K1,F), (K3,G), (K3,H), (K3,I)]
}
public void proc13() {
JavaSparkContext sc = MyConf.getJavaSparkContext();
s("keys()");
List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
list.add(new Tuple2<String, Integer>("panda", 0));
list.add(new Tuple2<String, Integer>("pink", 3));
list.add(new Tuple2<String, Integer>("pirate", 3));
list.add(new Tuple2<String, Integer>("panda", 1));
list.add(new Tuple2<String, Integer>("pink", 4));
JavaPairRDD<String, Integer> pairs = sc.parallelizePairs(list);
JavaRDD<String> result = pairs.keys();
s(result.collect()); //[panda, pink, pirate, panda, pink]
}
public void proc14() {
JavaSparkContext sc = MyConf.getJavaSparkContext();
s("values()");
List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
list.add(new Tuple2<String, Integer>("panda", 0));
list.add(new Tuple2<String, Integer>("pink", 3));
list.add(new Tuple2<String, Integer>("pirate", 3));
list.add(new Tuple2<String, Integer>("panda", 1));
list.add(new Tuple2<String, Integer>("pink", 4));
JavaPairRDD<String, Integer> pairs = sc.parallelizePairs(list);
JavaRDD<Integer> result = pairs.values();
s(result.collect()); //[0, 3, 3, 1, 4]
}
public void proc15() {
JavaSparkContext sc = MyConf.getJavaSparkContext();
s("cogroup() - 두개이상의 Pair를 이용, key는 데이터 타입이 같아야 하고, value는 달라도 된다. 최대 4개의 Pair를 그룹화 가능");
List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>();
data0.add(new Tuple2<String, String>("K1", "ABC"));
data0.add(new Tuple2<String, String>("K2", "DE"));
data0.add(new Tuple2<String, String>("K5", "GHI"));
data0.add(new Tuple2<String, String>("K5", "JKL"));
JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0);
List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
data1.add(new Tuple2<String, Integer>("K1", 123));
data1.add(new Tuple2<String, Integer>("K2", 456));
data1.add(new Tuple2<String, Integer>("K7", 0));
JavaPairRDD<String, Integer> other1 = sc.parallelizePairs(data1);
JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<Integer>>> result1 = pairs.cogroup(other1);
s(result1.collect()); //[(K7,([],[0])), (K1,([ABC],[123])), (K5,([GHI, JKL],[])), (K2,([DE],[456]))]
List<Tuple2<String, Double>> data2 = new ArrayList<Tuple2<String, Double>>();
data2.add(new Tuple2<String, Double>("K1", 777.1));
data2.add(new Tuple2<String, Double>("K2", 999.1));
data2.add(new Tuple2<String, Double>("K8", 10.1));
JavaPairRDD<String, Double> other2 = sc.parallelizePairs(data2);
JavaPairRDD<String, Tuple3<Iterable<String>, Iterable<Integer>, Iterable<Double>>> result2 = pairs.cogroup(other1, other2);
s(result2.collect()); //[(K7,([],[0],[])), (K1,([ABC],[123],[777.1])), (K5,([GHI, JKL],[],[])), (K2,([DE],[456],[999.1])), (K8,([],[],[10.1]))]
List<Tuple2<String, Float>> data3 = new ArrayList<Tuple2<String, Float>>();
data3.add(new Tuple2<String, Float>("K1", 1.1f));
data3.add(new Tuple2<String, Float>("K2", 2.1f));
data3.add(new Tuple2<String, Float>("K8", 3.1f));
JavaPairRDD<String, Float> other3 = sc.parallelizePairs(data3);
JavaPairRDD<String, Tuple4<Iterable<String>, Iterable<Integer>, Iterable<Double>, Iterable<Float>>> result3 = pairs.cogroup(other1, other2, other3);
s(result3.collect());
//[(K7,([],[0],[],[])), (K1,([ABC],[123],[777.1],[1.1])), (K5,([GHI, JKL],[],[],[])), (K2,([DE],[456],[999.1],[2.1])), (K8,([],[],[10.1],[3.1]))]
}
public void proc16() {
JavaSparkContext sc = MyConf.getJavaSparkContext();
s("subtractByKey() - 상대편 Pair의 key로 잘라내기");
List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>();
data0.add(new Tuple2<String, String>("K1", "ABC"));
data0.add(new Tuple2<String, String>("K2", "DE"));
data0.add(new Tuple2<String, String>("K5", "GHI"));
data0.add(new Tuple2<String, String>("K5", "JKL"));
JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0);
List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
data1.add(new Tuple2<String, Integer>("K1", 123));
data1.add(new Tuple2<String, Integer>("K2", 456));
data1.add(new Tuple2<String, Integer>("K7", 0));
JavaPairRDD<String, Integer> other1 = sc.parallelizePairs(data1);
JavaPairRDD<String, String> result = pairs.subtractByKey(other1);
s(result.collect()); //[(K5,GHI), (K5,JKL)]
//반대로
JavaPairRDD<String, Integer> result2 = other1.subtractByKey(pairs);
s(result2.collect()); //[(K7,0)]
}
public void proc17(){
JavaSparkContext sc = MyConf.getJavaSparkContext();
s("countByKey, collectAsMap, lookup(key)");
List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>();
data0.add(new Tuple2<String, String>("K1", "ABC"));
data0.add(new Tuple2<String, String>("K2", "DE"));
data0.add(new Tuple2<String, String>("K5", "GHI"));
data0.add(new Tuple2<String, String>("K5", "JKL"));
JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0);
Map<String, Object> result0 = pairs.countByKey();
s(result0); //{K1=1, K5=2, K2=1}
Map<String, String> result1 = pairs.collectAsMap();
s(result1); //{K1=ABC, K2=DE, K5=JKL}
List<String> result2 = pairs.lookup("K1");
s(result2); //[ABC]
List<String> result3 = pairs.lookup("K5");
s(result3); //[GHI, JKL]
}
public void proc21(){
JavaSparkContext sc = MyConf.getJavaSparkContext();
s("데이터는 RDD에 내부적으로 어떻게 파티션되어서 나눠질까??? 확인해볼 수 있을까?");
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9));
s(rdd.getNumPartitions()); //2
rdd.foreachPartition(iter -> {
while(iter.hasNext()){
s(iter.hashCode() + ", " + iter.next());
}
});
// 파티션2개
// 271428025, 1
// 271428025, 2
// 271428025, 3
// 271428025, 4
// -788345623, 5
// -788345623, 6
// -788345623, 7
// -788345623, 8
// -788345623, 9
rdd = rdd.repartition(4);
rdd.foreachPartition(iter -> {
while(iter.hasNext()){
s(iter.hashCode() + ", " + iter.next());
}
});
// 파티션4개
// -1025106490, 3
// 287691785, 2
// -1025106490, 7
// 287691785, 6
// 1847534542, 1
// 951314831, 4
// 1847534542, 5
// 951314831, 8
// 1847534542, 9
}
}