본문 바로가기
Spark/러닝 스파크

Spark 시작하기13 - [러닝 스파크] 4장 키/값 페어로 작업하기2

by java개발자 2016. 4. 22.
package org.mystudy;

import static org.MyConf.s;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import com.google.common.base.Optional;

import org.MyConf;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.Test;

import scala.Tuple2;

public class Ch4_example2 implements Serializable {

	private static final long serialVersionUID = 43443432321L;

	public Ch4_example2() {
		MyConf.setLog4j();
	}

	public static void main(String... strings) {
		/**
		 * 책에 없는 함수들 샘플
		 */
		Ch4_example2 ex = new Ch4_example2();
		s("JavaRDD---------------------------");
//		ex.proc31();	//mapPartitions, mapPartitionsToDouble, mapPartitionsToPair
//		ex.proc32();	//keyBy
//		ex.proc33();	//groupBy
//		ex.proc34();	//flatMapToPair
//		s("JavaPairRDD---------------------------");
//		ex.proc35();	//cartesian
//		ex.proc36();	//foldByKey
//		ex.proc37();	//fullOuterJoin
//		ex.proc38();	//groupBy
//		ex.proc39();	//mapToDouble
//		ex.proc40();	//flatMapToDouble
//		ex.proc41();	//mapToPair
//		ex.proc42();	//flatMapToPair
//		ex.proc43();	//intersection
//		ex.proc44();	//subtract
//		ex.proc45();	//map
		ex.proc46();	//
	}

	@Test
	public void proc31() {
		s("mapPartitions, mapPartitionsToDouble, mapPartitionsToPair 은 왜 하는지 모르겠다.");
		s("mapPartitions() - 결과는 그대로...");
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,8,8));
		JavaRDD<Integer> r0 = rdd.mapPartitions(a->{
			List<Integer> list = new ArrayList<Integer>();
			while(a.hasNext()){
				list.add(a.next());
			}
			return list;
		});
		s(r0.collect());	//[1, 2, 3, 4, 5, 6, 7, 8, 8, 8]
		
		s("mapPartitionsToDouble()");
		JavaDoubleRDD r1 = rdd.mapPartitionsToDouble(a->{
			List<Double> list = new ArrayList<Double>();
			while(a.hasNext()){
				list.add((double)a.next());
			}
			return list;
		});
		s(r1.collect());	//[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 8.0, 8.0]
		
		s("mapPartitionsToPair()");
		JavaPairRDD<String, Integer> r3 = rdd.mapPartitionsToPair(a->{
			List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
			while(a.hasNext()){
				list.add(new Tuple2<String, Integer>("Test", a.next()));
			}
			return list;
		});
		s(r3.collect());	//[(Test,1), (Test,2), (Test,3), (Test,4), (Test,5), (Test,6), (Test,7), (Test,8), (Test,8), (Test,8)]
	}
	public void proc32(){
		s("keyBy() - 없던 key를 만들어줌. 유용하네...");
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,8,8));
		
		JavaPairRDD<String, Integer> r4 = rdd.keyBy(a -> {
			if(a > 5){
				return "A";
			}else{
				return "B";
			}
		});
		s(r4.collect());	//[(B,1), (B,2), (B,3), (B,4), (B,5), (A,6), (A,7), (A,8), (A,8), (A,8)]
	}	
	public void proc33(){
		s("groupBy() - 없던 key를 만들어줌. 그리고 key로 그룹화");
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,8,8));
		
		JavaPairRDD<String, Iterable<Integer>> r5 = rdd.groupBy(a -> "A");
		s(r5.collect());	//[(A,[1, 2, 3, 4, 5, 6, 7, 8, 8, 8])]
		JavaPairRDD<String, Iterable<Integer>> r6 = rdd.groupBy(a -> {
			if(a > 5){
				return "A";
			}else{
				return "B";
			}
		});
		s(r6.collect());	//[(B,[1, 2, 3, 4, 5]), (A,[6, 7, 8, 8, 8])]
	}
	public void proc34() {
		s("flatMapToPair");
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,8,8));
		
		JavaPairRDD<String, Integer> r7 = rdd.flatMapToPair(a -> {
			List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
			list.add(new Tuple2<String, Integer>("A", a));
			return list;
		});
		s(r7.collect());	//[(A,1), (A,2), (A,3), (A,4), (A,5), (A,6), (A,7), (A,8), (A,8), (A,8)]
	}
	public void proc35() {
		s("cartesian()");
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>();
		data0.add(new Tuple2<String, String>("K1", "ABC"));
		data0.add(new Tuple2<String, String>("K2", "DE"));
		data0.add(new Tuple2<String, String>("K5", "GHI"));
		data0.add(new Tuple2<String, String>("K5", "JKL"));
		JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0);
		
		JavaRDD<Integer> rdd2 = sc.parallelize(Arrays.asList(1,2));
		JavaPairRDD<Tuple2<String, String>, Integer> result1 = pairs.cartesian(rdd2);
		s(result1.collect());	//[((K1,ABC),1), ((K2,DE),1), ((K1,ABC),2), ((K2,DE),2), ((K5,GHI),1), ((K5,JKL),1), ((K5,GHI),2), ((K5,JKL),2)]
	}
	
	public void proc36() {
		s("foldByKey");
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
		data1.add(new Tuple2<String, Integer>("K1", 123));
		data1.add(new Tuple2<String, Integer>("K2", 456));
		data1.add(new Tuple2<String, Integer>("K2", 789));
		data1.add(new Tuple2<String, Integer>("K7", 0));
		JavaPairRDD<String, Integer> other1 = sc.parallelizePairs(data1);
		
		JavaPairRDD<String, Integer> result4 = other1.foldByKey(0, (a,b) -> a+b);
		s(result4.collect());	//[(K7,0), (K1,123), (K2,1245)]
	}
	public void proc37() {
		s("fullOuterJoin - leftOuterJoin, rightOuterJoin 둘다 적용한 듯");
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>();
		data0.add(new Tuple2<String, String>("K1", "ABC"));
		data0.add(new Tuple2<String, String>("K2", "DE"));
		data0.add(new Tuple2<String, String>("K5", "GHI"));
		data0.add(new Tuple2<String, String>("K5", "JKL"));
		JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0);
		
		List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
		data1.add(new Tuple2<String, Integer>("K1", 123));
		data1.add(new Tuple2<String, Integer>("K2", 456));
		data1.add(new Tuple2<String, Integer>("K2", 789));
		data1.add(new Tuple2<String, Integer>("K7", 0));
		JavaPairRDD<String, Integer> other1 = sc.parallelizePairs(data1);
		
		JavaPairRDD<String, Tuple2<Optional<String>, Optional<Integer>>> result5 = pairs.fullOuterJoin(other1);
		s(result5.collect());
		//[(K7,(Optional.absent(),Optional.of(0))), (K1,(Optional.of(ABC),Optional.of(123))), (K5,(Optional.of(GHI),Optional.absent())), 
		//(K5,(Optional.of(JKL),Optional.absent())), (K2,(Optional.of(DE),Optional.of(456))), (K2,(Optional.of(DE),Optional.of(789)))]
	}
	public void proc38() {
		s("groupBy - key를 새로 만들어서 그룹을 지정할 수 있다.");
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>();
		data0.add(new Tuple2<String, String>("K1", "ABC"));
		data0.add(new Tuple2<String, String>("K2", "DE"));
		data0.add(new Tuple2<String, String>("K5", "GHI"));
		data0.add(new Tuple2<String, String>("K5", "JKL"));
		JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0);
		
		JavaPairRDD<String, Iterable<Tuple2<String,String>>> result6 = pairs.groupBy(tuple->{
			if(tuple._2.length() > 2){
				return "length3";
			}else{
				return "length2";
			}
		});
		s(result6.collect());	//[(length2,[(K2,DE)]), (length3,[(K1,ABC), (K5,GHI), (K5,JKL)])]
	}
	public void proc39() {
		s("mapToDouble");
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>();
		data0.add(new Tuple2<String, String>("K1", "ABC"));
		data0.add(new Tuple2<String, String>("K2", "DE"));
		data0.add(new Tuple2<String, String>("K5", "GHI"));
		data0.add(new Tuple2<String, String>("K5", "JKL"));
		JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0);
		
		JavaDoubleRDD result7 = pairs.mapToDouble(tuple->{
			return tuple._2.length();
		});
		s(result7.collect());	//[3.0, 2.0, 3.0, 3.0]
	}
	public void proc40() {
		s("flatMapToDouble()");
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		List<Tuple2<String, String>> data2 = new ArrayList<Tuple2<String, String>>();
		data2.add(new Tuple2<String, String>("K1", "1 2 3"));
		data2.add(new Tuple2<String, String>("K2", "4 5 6"));
		data2.add(new Tuple2<String, String>("K7", "0"));
		JavaPairRDD<String, String> other2 = sc.parallelizePairs(data2);
		
		JavaDoubleRDD result2 = other2.flatMapToDouble(tuple -> {
			String[] arr = tuple._2.split(" ");
			List<Double> list = new ArrayList<Double>();
			for(String s : arr){
				list.add(Double.parseDouble(s));
			}
			return list;
		});
		s(result2.collect());		//[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 0.0]
	}
	public void proc41() {
		s("mapToPair");
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
		data1.add(new Tuple2<String, Integer>("K1", 123));
		data1.add(new Tuple2<String, Integer>("K2", 456));
		data1.add(new Tuple2<String, Integer>("K2", 789));
		data1.add(new Tuple2<String, Integer>("K7", 0));
		JavaPairRDD<String, Integer> other1 = sc.parallelizePairs(data1);
		
		JavaPairRDD<String, Integer> result8 = other1.mapToPair(tuple->new Tuple2<String, Integer>(tuple._1 + "X", tuple._2 + 1));
		s(result8.collect());		//[(K1X,124), (K2X,457), (K2X,790), (K7X,1)]
	}
	public void proc42() {
		s("flatMapToPair - value를 쪼개서 key에 각각 맵핑, flatMapValues와 비슷하지만 key를 조작할 수 있다는 것이 차이.");
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		List<Tuple2<String, String>> data3 = new ArrayList<Tuple2<String, String>>();
		data3.add(new Tuple2<String, String>("K1", "appleKorea"));
		data3.add(new Tuple2<String, String>("K2", "japanTokyo"));
		data3.add(new Tuple2<String, String>("K5", "seoulSeoul"));
		JavaPairRDD<String, String> pairs3 = sc.parallelizePairs(data3);
		
		JavaPairRDD<String, String> result3 = pairs3.flatMapToPair(tuple -> {
			List<Tuple2<String, String>> list = new ArrayList<Tuple2<String, String>>();
			String[] arr = StringUtils.splitByCharacterTypeCamelCase(tuple._2);
			for(String s : arr){
				list.add(new Tuple2<String, String>(tuple._1, tuple._1 + "-" + s));
			}
			return list;
		});
		s(result3.collect());		//[(K1,K1-apple), (K1,K1-Korea), (K2,K2-japan), (K2,K2-Tokyo), (K5,K5-seoul), (K5,K5-Seoul)]
	}
	public void proc43() {
		s("intersection - key,value 모두 비교해서 교집합인 경우만");
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>();
		data0.add(new Tuple2<String, String>("K1", "ABC"));
		data0.add(new Tuple2<String, String>("K2", "DE"));
		data0.add(new Tuple2<String, String>("K5", "GHI"));
		data0.add(new Tuple2<String, String>("K5", "JKL"));
		JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0);
		
		List<Tuple2<String, String>> data1 = new ArrayList<Tuple2<String, String>>();
		data1.add(new Tuple2<String, String>("K1", "123"));
		data1.add(new Tuple2<String, String>("K2", "DE"));
		data1.add(new Tuple2<String, String>("K2", "GHI"));
		data1.add(new Tuple2<String, String>("K7", "0"));
		JavaPairRDD<String, String> other1 = sc.parallelizePairs(data1);
		
		JavaPairRDD<String, String> result = pairs.intersection(other1);
		s(result.collect());	//[(K2,DE)]
	}
	public void proc44() {
		s("subtract - key,value모두 비교해서 잘라내기  - 차집합 A-B");
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>();
		data0.add(new Tuple2<String, String>("K1", "ABC"));
		data0.add(new Tuple2<String, String>("K2", "DE"));
		data0.add(new Tuple2<String, String>("K5", "GHI"));
		data0.add(new Tuple2<String, String>("K5", "JKL"));
		JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0);
		
		List<Tuple2<String, String>> data1 = new ArrayList<Tuple2<String, String>>();
		data1.add(new Tuple2<String, String>("K1", "123"));
		data1.add(new Tuple2<String, String>("K2", "DE"));
		data1.add(new Tuple2<String, String>("K2", "GHI"));
		data1.add(new Tuple2<String, String>("K7", "0"));
		JavaPairRDD<String, String> other1 = sc.parallelizePairs(data1);
		
		JavaPairRDD<String, String> result = pairs.subtract(other1);
		s(result.collect());		//[(K1,ABC), (K5,JKL), (K5,GHI)]
		
		//union 도 key, value모두 같아야 한다.
	}
	public void proc45(){
		s("map()");
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>();
		data0.add(new Tuple2<String, String>("K1", "A-B-C"));
		data0.add(new Tuple2<String, String>("K2", "D-E"));
		data0.add(new Tuple2<String, String>("K5", "G-H-I"));
		data0.add(new Tuple2<String, String>("K5", "JKL"));
		JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0);
		
		JavaRDD<Integer> result = pairs.map(tuple->tuple._1.length());
		s(result.collect());	//[2, 2, 2, 2]
		JavaRDD<List<String>> result2 = pairs.map(tuple->Arrays.asList(tuple._2.split("-")));
		s(result2.collect());	//[[A, B, C], [D, E], [G, H, I], [JKL]]
	}
	
	public void proc46(){
		//기타
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>();
		data0.add(new Tuple2<String, String>("K1", "A-B-C"));
		data0.add(new Tuple2<String, String>("K2", "D-E"));
		data0.add(new Tuple2<String, String>("K5", "G-H-I"));
		data0.add(new Tuple2<String, String>("K5", "JKL"));
		data0.add(new Tuple2<String, String>("K5", "JKL"));
		JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0);
		
		pairs.reduce((tuple1, tuple2) -> new Tuple2<String, String>("", ""));
		pairs.fold(new Tuple2<String, String>("",""), (tuple1, tuple2) -> new Tuple2<String, String>("", ""));
		pairs.aggregate("", (a,b)->"", (a,b)->"");
		pairs.keyBy(a -> "");
		pairs.flatMap(a->null);
		
		pairs.mapPartitions(a->null);
		pairs.mapPartitionsToDouble(a->null);
		pairs.mapPartitionsToPair(a->null);

		JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,8,8));		
		rdd.flatMapToDouble(a->null);
		pairs.flatMapToDouble(a->null);

		s(pairs.distinct().collect());

	}
}