package org.mystudy;
import static org.MyConf.s;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import com.google.common.base.Optional;
import org.MyConf;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.Test;
import scala.Tuple2;
public class Ch4_example2 implements Serializable {
private static final long serialVersionUID = 43443432321L;
public Ch4_example2() {
MyConf.setLog4j();
}
public static void main(String... strings) {
/**
* 책에 없는 함수들 샘플
*/
Ch4_example2 ex = new Ch4_example2();
s("JavaRDD---------------------------");
// ex.proc31(); //mapPartitions, mapPartitionsToDouble, mapPartitionsToPair
// ex.proc32(); //keyBy
// ex.proc33(); //groupBy
// ex.proc34(); //flatMapToPair
// s("JavaPairRDD---------------------------");
// ex.proc35(); //cartesian
// ex.proc36(); //foldByKey
// ex.proc37(); //fullOuterJoin
// ex.proc38(); //groupBy
// ex.proc39(); //mapToDouble
// ex.proc40(); //flatMapToDouble
// ex.proc41(); //mapToPair
// ex.proc42(); //flatMapToPair
// ex.proc43(); //intersection
// ex.proc44(); //subtract
// ex.proc45(); //map
ex.proc46(); //
}
@Test
public void proc31() {
s("mapPartitions, mapPartitionsToDouble, mapPartitionsToPair 은 왜 하는지 모르겠다.");
s("mapPartitions() - 결과는 그대로...");
JavaSparkContext sc = MyConf.getJavaSparkContext();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,8,8));
JavaRDD<Integer> r0 = rdd.mapPartitions(a->{
List<Integer> list = new ArrayList<Integer>();
while(a.hasNext()){
list.add(a.next());
}
return list;
});
s(r0.collect()); //[1, 2, 3, 4, 5, 6, 7, 8, 8, 8]
s("mapPartitionsToDouble()");
JavaDoubleRDD r1 = rdd.mapPartitionsToDouble(a->{
List<Double> list = new ArrayList<Double>();
while(a.hasNext()){
list.add((double)a.next());
}
return list;
});
s(r1.collect()); //[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 8.0, 8.0]
s("mapPartitionsToPair()");
JavaPairRDD<String, Integer> r3 = rdd.mapPartitionsToPair(a->{
List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
while(a.hasNext()){
list.add(new Tuple2<String, Integer>("Test", a.next()));
}
return list;
});
s(r3.collect()); //[(Test,1), (Test,2), (Test,3), (Test,4), (Test,5), (Test,6), (Test,7), (Test,8), (Test,8), (Test,8)]
}
public void proc32(){
s("keyBy() - 없던 key를 만들어줌. 유용하네...");
JavaSparkContext sc = MyConf.getJavaSparkContext();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,8,8));
JavaPairRDD<String, Integer> r4 = rdd.keyBy(a -> {
if(a > 5){
return "A";
}else{
return "B";
}
});
s(r4.collect()); //[(B,1), (B,2), (B,3), (B,4), (B,5), (A,6), (A,7), (A,8), (A,8), (A,8)]
}
public void proc33(){
s("groupBy() - 없던 key를 만들어줌. 그리고 key로 그룹화");
JavaSparkContext sc = MyConf.getJavaSparkContext();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,8,8));
JavaPairRDD<String, Iterable<Integer>> r5 = rdd.groupBy(a -> "A");
s(r5.collect()); //[(A,[1, 2, 3, 4, 5, 6, 7, 8, 8, 8])]
JavaPairRDD<String, Iterable<Integer>> r6 = rdd.groupBy(a -> {
if(a > 5){
return "A";
}else{
return "B";
}
});
s(r6.collect()); //[(B,[1, 2, 3, 4, 5]), (A,[6, 7, 8, 8, 8])]
}
public void proc34() {
s("flatMapToPair");
JavaSparkContext sc = MyConf.getJavaSparkContext();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,8,8));
JavaPairRDD<String, Integer> r7 = rdd.flatMapToPair(a -> {
List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
list.add(new Tuple2<String, Integer>("A", a));
return list;
});
s(r7.collect()); //[(A,1), (A,2), (A,3), (A,4), (A,5), (A,6), (A,7), (A,8), (A,8), (A,8)]
}
public void proc35() {
s("cartesian()");
JavaSparkContext sc = MyConf.getJavaSparkContext();
List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>();
data0.add(new Tuple2<String, String>("K1", "ABC"));
data0.add(new Tuple2<String, String>("K2", "DE"));
data0.add(new Tuple2<String, String>("K5", "GHI"));
data0.add(new Tuple2<String, String>("K5", "JKL"));
JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0);
JavaRDD<Integer> rdd2 = sc.parallelize(Arrays.asList(1,2));
JavaPairRDD<Tuple2<String, String>, Integer> result1 = pairs.cartesian(rdd2);
s(result1.collect()); //[((K1,ABC),1), ((K2,DE),1), ((K1,ABC),2), ((K2,DE),2), ((K5,GHI),1), ((K5,JKL),1), ((K5,GHI),2), ((K5,JKL),2)]
}
public void proc36() {
s("foldByKey");
JavaSparkContext sc = MyConf.getJavaSparkContext();
List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
data1.add(new Tuple2<String, Integer>("K1", 123));
data1.add(new Tuple2<String, Integer>("K2", 456));
data1.add(new Tuple2<String, Integer>("K2", 789));
data1.add(new Tuple2<String, Integer>("K7", 0));
JavaPairRDD<String, Integer> other1 = sc.parallelizePairs(data1);
JavaPairRDD<String, Integer> result4 = other1.foldByKey(0, (a,b) -> a+b);
s(result4.collect()); //[(K7,0), (K1,123), (K2,1245)]
}
public void proc37() {
s("fullOuterJoin - leftOuterJoin, rightOuterJoin 둘다 적용한 듯");
JavaSparkContext sc = MyConf.getJavaSparkContext();
List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>();
data0.add(new Tuple2<String, String>("K1", "ABC"));
data0.add(new Tuple2<String, String>("K2", "DE"));
data0.add(new Tuple2<String, String>("K5", "GHI"));
data0.add(new Tuple2<String, String>("K5", "JKL"));
JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0);
List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
data1.add(new Tuple2<String, Integer>("K1", 123));
data1.add(new Tuple2<String, Integer>("K2", 456));
data1.add(new Tuple2<String, Integer>("K2", 789));
data1.add(new Tuple2<String, Integer>("K7", 0));
JavaPairRDD<String, Integer> other1 = sc.parallelizePairs(data1);
JavaPairRDD<String, Tuple2<Optional<String>, Optional<Integer>>> result5 = pairs.fullOuterJoin(other1);
s(result5.collect());
//[(K7,(Optional.absent(),Optional.of(0))), (K1,(Optional.of(ABC),Optional.of(123))), (K5,(Optional.of(GHI),Optional.absent())),
//(K5,(Optional.of(JKL),Optional.absent())), (K2,(Optional.of(DE),Optional.of(456))), (K2,(Optional.of(DE),Optional.of(789)))]
}
public void proc38() {
s("groupBy - key를 새로 만들어서 그룹을 지정할 수 있다.");
JavaSparkContext sc = MyConf.getJavaSparkContext();
List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>();
data0.add(new Tuple2<String, String>("K1", "ABC"));
data0.add(new Tuple2<String, String>("K2", "DE"));
data0.add(new Tuple2<String, String>("K5", "GHI"));
data0.add(new Tuple2<String, String>("K5", "JKL"));
JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0);
JavaPairRDD<String, Iterable<Tuple2<String,String>>> result6 = pairs.groupBy(tuple->{
if(tuple._2.length() > 2){
return "length3";
}else{
return "length2";
}
});
s(result6.collect()); //[(length2,[(K2,DE)]), (length3,[(K1,ABC), (K5,GHI), (K5,JKL)])]
}
public void proc39() {
s("mapToDouble");
JavaSparkContext sc = MyConf.getJavaSparkContext();
List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>();
data0.add(new Tuple2<String, String>("K1", "ABC"));
data0.add(new Tuple2<String, String>("K2", "DE"));
data0.add(new Tuple2<String, String>("K5", "GHI"));
data0.add(new Tuple2<String, String>("K5", "JKL"));
JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0);
JavaDoubleRDD result7 = pairs.mapToDouble(tuple->{
return tuple._2.length();
});
s(result7.collect()); //[3.0, 2.0, 3.0, 3.0]
}
public void proc40() {
s("flatMapToDouble()");
JavaSparkContext sc = MyConf.getJavaSparkContext();
List<Tuple2<String, String>> data2 = new ArrayList<Tuple2<String, String>>();
data2.add(new Tuple2<String, String>("K1", "1 2 3"));
data2.add(new Tuple2<String, String>("K2", "4 5 6"));
data2.add(new Tuple2<String, String>("K7", "0"));
JavaPairRDD<String, String> other2 = sc.parallelizePairs(data2);
JavaDoubleRDD result2 = other2.flatMapToDouble(tuple -> {
String[] arr = tuple._2.split(" ");
List<Double> list = new ArrayList<Double>();
for(String s : arr){
list.add(Double.parseDouble(s));
}
return list;
});
s(result2.collect()); //[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 0.0]
}
public void proc41() {
s("mapToPair");
JavaSparkContext sc = MyConf.getJavaSparkContext();
List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
data1.add(new Tuple2<String, Integer>("K1", 123));
data1.add(new Tuple2<String, Integer>("K2", 456));
data1.add(new Tuple2<String, Integer>("K2", 789));
data1.add(new Tuple2<String, Integer>("K7", 0));
JavaPairRDD<String, Integer> other1 = sc.parallelizePairs(data1);
JavaPairRDD<String, Integer> result8 = other1.mapToPair(tuple->new Tuple2<String, Integer>(tuple._1 + "X", tuple._2 + 1));
s(result8.collect()); //[(K1X,124), (K2X,457), (K2X,790), (K7X,1)]
}
public void proc42() {
s("flatMapToPair - value를 쪼개서 key에 각각 맵핑, flatMapValues와 비슷하지만 key를 조작할 수 있다는 것이 차이.");
JavaSparkContext sc = MyConf.getJavaSparkContext();
List<Tuple2<String, String>> data3 = new ArrayList<Tuple2<String, String>>();
data3.add(new Tuple2<String, String>("K1", "appleKorea"));
data3.add(new Tuple2<String, String>("K2", "japanTokyo"));
data3.add(new Tuple2<String, String>("K5", "seoulSeoul"));
JavaPairRDD<String, String> pairs3 = sc.parallelizePairs(data3);
JavaPairRDD<String, String> result3 = pairs3.flatMapToPair(tuple -> {
List<Tuple2<String, String>> list = new ArrayList<Tuple2<String, String>>();
String[] arr = StringUtils.splitByCharacterTypeCamelCase(tuple._2);
for(String s : arr){
list.add(new Tuple2<String, String>(tuple._1, tuple._1 + "-" + s));
}
return list;
});
s(result3.collect()); //[(K1,K1-apple), (K1,K1-Korea), (K2,K2-japan), (K2,K2-Tokyo), (K5,K5-seoul), (K5,K5-Seoul)]
}
public void proc43() {
s("intersection - key,value 모두 비교해서 교집합인 경우만");
JavaSparkContext sc = MyConf.getJavaSparkContext();
List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>();
data0.add(new Tuple2<String, String>("K1", "ABC"));
data0.add(new Tuple2<String, String>("K2", "DE"));
data0.add(new Tuple2<String, String>("K5", "GHI"));
data0.add(new Tuple2<String, String>("K5", "JKL"));
JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0);
List<Tuple2<String, String>> data1 = new ArrayList<Tuple2<String, String>>();
data1.add(new Tuple2<String, String>("K1", "123"));
data1.add(new Tuple2<String, String>("K2", "DE"));
data1.add(new Tuple2<String, String>("K2", "GHI"));
data1.add(new Tuple2<String, String>("K7", "0"));
JavaPairRDD<String, String> other1 = sc.parallelizePairs(data1);
JavaPairRDD<String, String> result = pairs.intersection(other1);
s(result.collect()); //[(K2,DE)]
}
public void proc44() {
s("subtract - key,value모두 비교해서 잘라내기 - 차집합 A-B");
JavaSparkContext sc = MyConf.getJavaSparkContext();
List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>();
data0.add(new Tuple2<String, String>("K1", "ABC"));
data0.add(new Tuple2<String, String>("K2", "DE"));
data0.add(new Tuple2<String, String>("K5", "GHI"));
data0.add(new Tuple2<String, String>("K5", "JKL"));
JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0);
List<Tuple2<String, String>> data1 = new ArrayList<Tuple2<String, String>>();
data1.add(new Tuple2<String, String>("K1", "123"));
data1.add(new Tuple2<String, String>("K2", "DE"));
data1.add(new Tuple2<String, String>("K2", "GHI"));
data1.add(new Tuple2<String, String>("K7", "0"));
JavaPairRDD<String, String> other1 = sc.parallelizePairs(data1);
JavaPairRDD<String, String> result = pairs.subtract(other1);
s(result.collect()); //[(K1,ABC), (K5,JKL), (K5,GHI)]
//union 도 key, value모두 같아야 한다.
}
public void proc45(){
s("map()");
JavaSparkContext sc = MyConf.getJavaSparkContext();
List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>();
data0.add(new Tuple2<String, String>("K1", "A-B-C"));
data0.add(new Tuple2<String, String>("K2", "D-E"));
data0.add(new Tuple2<String, String>("K5", "G-H-I"));
data0.add(new Tuple2<String, String>("K5", "JKL"));
JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0);
JavaRDD<Integer> result = pairs.map(tuple->tuple._1.length());
s(result.collect()); //[2, 2, 2, 2]
JavaRDD<List<String>> result2 = pairs.map(tuple->Arrays.asList(tuple._2.split("-")));
s(result2.collect()); //[[A, B, C], [D, E], [G, H, I], [JKL]]
}
public void proc46(){
//기타
JavaSparkContext sc = MyConf.getJavaSparkContext();
List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>();
data0.add(new Tuple2<String, String>("K1", "A-B-C"));
data0.add(new Tuple2<String, String>("K2", "D-E"));
data0.add(new Tuple2<String, String>("K5", "G-H-I"));
data0.add(new Tuple2<String, String>("K5", "JKL"));
data0.add(new Tuple2<String, String>("K5", "JKL"));
JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0);
pairs.reduce((tuple1, tuple2) -> new Tuple2<String, String>("", ""));
pairs.fold(new Tuple2<String, String>("",""), (tuple1, tuple2) -> new Tuple2<String, String>("", ""));
pairs.aggregate("", (a,b)->"", (a,b)->"");
pairs.keyBy(a -> "");
pairs.flatMap(a->null);
pairs.mapPartitions(a->null);
pairs.mapPartitionsToDouble(a->null);
pairs.mapPartitionsToPair(a->null);
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,8,8));
rdd.flatMapToDouble(a->null);
pairs.flatMapToDouble(a->null);
s(pairs.distinct().collect());
}
}