본문 바로가기
Spark/러닝 스파크

Spark 시작하기12 - [러닝 스파크] 4장 키/값 페어로 작업하기

by java개발자 2016. 4. 20.
package org.mystudy;

import static org.MyConf.s;

import java.io.Serializable;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;

import org.MyConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import com.google.common.base.Optional;

import scala.Tuple2;
import scala.Tuple3;
import scala.Tuple4;

public class Ch4_example implements Serializable {

	private static final long serialVersionUID = 43443432321L;

	public Ch4_example() {
		MyConf.setLog4j();
	}

	public static void main(String... strings) {
		Ch4_example ex = new Ch4_example();
		/**
		 * 표4-1 페어 RDD의 트랜스포메이션
		 * reduceByKey, groupByKey, combineBykey, mapValues, flatMapValues, keys, values, sortByKey,
		 * 표4-2 두 페어 RDD에 대한 트랜스포메이션 
		 * subtractByKey, join, rightOuterJoin, leftOuterJoin, cogroup,
		 * 표4-3 페어 RDD의 액션
		 * countByKey, collectAsMap, lookup(key) 
		 */
		ex.proc1();		//mapToPair
		ex.proc2();		//filter
		ex.proc3();		//mapValues, reduceByKey
		ex.proc4();		//flatMap, mapToPair, reduceByKey
		ex.proc4_1();	//countByValue
		ex.proc5();		//combineByKey
		ex.proc6();		//reduceByKey, 병렬화 수준 지정
		ex.proc7();		//join
		ex.proc8();		//sortBy, sortByKey
//		책에 없는 예제
		ex.proc11();	//groupByKey
		ex.proc12();	//mapValues, flatMap -> flatMapValues
		ex.proc13();	//keys
		ex.proc14();	//values
		ex.proc15();	//cogroup
		ex.proc16();	//subtractByKey
		ex.proc17();	//countByKey, collectAsMap, lookup(key)
		ex.proc21();	//파티션 추적
		
		/**
		 * 파티셔닝을 해서 도움이 되는 연산들
		 * 스파크1.0 기준 : cogroup, groupWith, join, leftOuterJoin, rightOuterJoin, groupByKey, reduceByKey, combineByKey, lookup
		 * 
		 * 결과 RDD에 파티셔너가 지정되는 모든 연산들
		 * cogroup, groupWith, join, leftOuterJoin, rightOuterJoin, groupByKey, reduceByKey, combineByKey, partitionBy, sort 
		 * , mapValue(부모RDD가 파티셔너를 가진 경우), flatMapValues(부모RDD가 파티셔너를 가진 경우), filter(부모RDD가 파티셔너를 가진 경우)
		 * 
		 */
	}

	public void proc1() {
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		s("예제 4-3 자바에서 첫 번째 단어를 키로 사용한 페어 RDD 생성");
		JavaRDD<String> lines = sc.textFile("src/main/java/org/mystudy/sample.txt");
		//sample.txt 내용
		//apple 123 456
		//kindle 567 abc
		//naver daum sk nate
		//empas yahoo google
		JavaPairRDD<String, String> pairs = lines.mapToPair(str -> new Tuple2<String, String>(str.split(" ")[0], str));
		s(pairs.collect());		//[(apple,apple 123 456), (kindle,kindle 567 abc), (naver,naver daum sk nate), (empas,empas yahoo google)]
		
		//java 객체를 PairRDD로 만들기
		List<Tuple2<String, String>> list2 = new ArrayList<Tuple2<String, String>>();
		list2.add(new Tuple2<String, String>("name", "ysh"));
		list2.add(new Tuple2<String, String>("age", "30"));
		list2.add(new Tuple2<String, String>("addr", "seoul"));
		list2.add(new Tuple2<String, String>("country", "korea"));
		JavaPairRDD<String, String> pairs2 = sc.parallelizePairs(list2);
		s(pairs2.getNumPartitions());	//2				<-- 중요!! new JavaSparkContext 생성할때 local[1], local[2]에 따라 기본 파티션 개수가 달라진다...헉!!!
		s(pairs2.collect());	//[(name,ysh), (age,30), (addr,seoul), (country,korea)]

		/**
		 * JavaSparkContext sc = new JavaSparkContext("local[1]", "First Spark App") -> 기본 파티션 개수 1개
		 * JavaSparkContext sc = new JavaSparkContext("local[3]", "First Spark App") -> 기본 파티션 개수 3개
		 */
		
		JavaPairRDD<String, String> pairs3 = sc.parallelizePairs(list2, 3);	//3은 파티션 개수이다.
		s(pairs3.getNumPartitions());	//3
		s(pairs3.collect());	//[(name,ysh), (age,30), (addr,seoul), (country,korea)]
	}
	
	public void proc2() {
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		s("예제 4-6 자바에서 두 번째 요소에 대한 단순 필터 적용");
		List<Tuple2<String, String>> list = new ArrayList<Tuple2<String, String>>();
		list.add(new Tuple2<String, String>("name", "ysh"));
		list.add(new Tuple2<String, String>("age", "30000000000000000"));
		list.add(new Tuple2<String, String>("addr", "seoul"));
		list.add(new Tuple2<String, String>("country", "korea"));
		JavaPairRDD<String, String> pairs = sc.parallelizePairs(list);
		JavaPairRDD<String, String> result = pairs.filter(tuple->tuple._2.length() < 10);
		s(result.collect());	//[(name,ysh), (addr,seoul), (country,korea)]
	}
	public void proc3() {
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		s("예제 4-7, 4-8 mapValues() 와 reduceByKey()로 키별 평균 구하기");
		List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
		list.add(new Tuple2<String, Integer>("panda", 0));
		list.add(new Tuple2<String, Integer>("pink", 3));
		list.add(new Tuple2<String, Integer>("pirate", 3));
		list.add(new Tuple2<String, Integer>("panda", 1));
		list.add(new Tuple2<String, Integer>("pink", 4));
		JavaPairRDD<String, Integer> pairs = sc.parallelizePairs(list);
		s(pairs.collect());		//[(panda,0), (pink,3), (pirate,3), (panda,1), (pink,4)]
		JavaPairRDD<String, Tuple2<Integer, Integer>> pairs2 = pairs.mapValues(value -> new Tuple2<Integer, Integer>(value, 1));
		s(pairs2.collect());	//[(panda,(0,1)), (pink,(3,1)), (pirate,(3,1)), (panda,(1,1)), (pink,(4,1))]
		JavaPairRDD<String, Tuple2<Integer, Integer>> pairs3 = pairs2.reduceByKey((tuple1, tuple2) -> new Tuple2<Integer, Integer>(tuple1._1 + tuple2._1, tuple1._2 + tuple2._2));
//	람다식안에서 매번 Tuple2를 생성해서 보내는 것이 비효율인 것 같아, Tuple2의 필드를 변경시키고 기존 Tuple2를 return할려고 했더니,, Tuple2의 필드가 final이다-_-;;; 필드 수정 불가능		
//		JavaPairRDD<String, Tuple2<Integer, Integer>> pairs3 = pairs2.reduceByKey((tuple1, tuple2) -> {
//			tuple1._1 = tuple1._1 + tuple2._1;
//			tuple1._2 = tuple1._2 + tuple2._2;
//			return tuple1;
//		});
		s(pairs3.collect());	//[(panda,(1,2)), (pirate,(3,1)), (pink,(7,2))]		//(키, (총합, 개수))
		JavaPairRDD<String, Double> pairs4 = pairs3.mapValues(tuple -> tuple._1 / (double)tuple._2);
		s(pairs4.collect());	//[(panda,0.5), (pirate,3.0), (pink,3.5)]			//(키, 평균)
	}
	public void proc4() {
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		s("예제 4-11 자바에서의 단어 세기");
		JavaRDD<String> input = sc.textFile("src/main/java/org/mystudy/sample2.txt");
		//apple 123 456 456
		//kindle 567 abc abc
		//naver daum sk nate nate
		//empas yahoo google google
		JavaRDD<String> words = input.flatMap(str -> Arrays.asList(str.split(" ")));
		JavaPairRDD<String, Integer> result = words.mapToPair(str -> new Tuple2<String, Integer>(str, 1));
		result = result.reduceByKey((a,b) -> a+b);
		s(result.collect());	//[(567,1), (nate,2), (yahoo,1), (sk,1), (empas,1), (naver,1), (apple,1), (123,1), (abc,2), (456,2), (kindle,1), (google,2), (daum,1)]
		
	}
	public void proc4_1() {
		// 더 빠른 방법!!
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		s("예제 4-11 자바에서의 단어 세기");
		JavaRDD<String> input = sc.textFile("src/main/java/org/mystudy/sample2.txt");
		JavaRDD<String> words = input.flatMap(str -> Arrays.asList(str.split(" ")));
		Map<String, Long> result = words.countByValue();
		s(result);		//{abc=2, 567=1, empas=1, 123=1, google=2, naver=1, kindle=1, 456=2, sk=1, nate=2, apple=1, yahoo=1, daum=1}
	}
	public void proc5() {
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		s("예제 4-14 자바에서 combineByKey() 를 사용한 키별 평균");
		List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
		list.add(new Tuple2<String, Integer>("panda", 0));
		list.add(new Tuple2<String, Integer>("pink", 3));
		list.add(new Tuple2<String, Integer>("pirate", 3));
		list.add(new Tuple2<String, Integer>("panda", 1));
		list.add(new Tuple2<String, Integer>("pink", 4));
		JavaPairRDD<String, Integer> pairs = sc.parallelizePairs(list);
		
		/**
		 * 책에서는 AvgCount 클래스를 사용했지만, 2개의 인자를 사용하므로 굳이... 클래스를 새로 생성할 필요는 없을 것 같다.
		 *  그냥. Tuple2를 사용해도 될 것 같다.
		 *  찾아보니 scala.Tuple1~Tuple22 까지 지원해주는 듯.
		 */
//		Tuple2 를 사용할때, 제너릭을 명시하지 않으면, Integer + Integer을 의도했지만, Object + Integer 라고 인식하므로 컴파일 에러가 난다. 이래서 명시적 표현이 불필요한 scala를 쓰는건가;;;  		
//		JavaPairRDD<String, Tuple2> result2 = pairs.combineByKey(
//				num -> new Tuple2(num, 1)
//				, (tuple0, num) -> new Tuple2(tuple0._1() + num, tuple0._2 + 1)  
//				, (tuple1, tuple2) -> new Tuple2(tuple1._1 + tuple2._1, tuple1._2 + tuple2._2) 
//				);
		JavaPairRDD<String, Tuple2<Integer, Integer>> result = pairs.combineByKey(
						num -> new Tuple2<Integer, Integer>(num, 1)
						, (tuple0, num) -> new Tuple2<Integer, Integer>(tuple0._1() + num, tuple0._2 + 1)  
						, (tuple1, tuple2) -> new Tuple2<Integer, Integer>(tuple1._1 + tuple2._1, tuple1._2 + tuple2._2) 
					);
		s(result.collect());	//[(panda,(1,2)), (pink,(7,2)), (pirate,(3,1))]
		JavaPairRDD<String, Double> result2 = result.mapValues(tuple -> tuple._1 / (double)tuple._2);
		s(result2.collect());	//[(panda,0.5), (pink,3.5), (pirate,3.0)]
	}
	public void proc6() {
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		s("예제 4-15,16 병렬화 직접 지정을 사용한 reduceByKey()");
		List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
		list.add(new Tuple2<String, Integer>("a", 3));
		list.add(new Tuple2<String, Integer>("b", 4));
		list.add(new Tuple2<String, Integer>("a", 1));
		JavaPairRDD<String, Integer> pairs = sc.parallelizePairs(list);
		JavaPairRDD<String, Integer> result1 = pairs.reduceByKey((a,b) -> a+b);		//기본 병렬화 수준 사용
		JavaPairRDD<String, Integer> result2 = pairs.reduceByKey((a,b) -> a+b, 10);	//병렬화 수준 지정
		s(result2.getNumPartitions());	//10
		result2 = result2.coalesce(7);
		s(result2.getNumPartitions());	//7
		result2 = result2.coalesce(4, false);	//boolean shuffle
		s(result2.getNumPartitions());	//4
		
	}
	public void proc7() {
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		s("예제 4-17 내부 조인");
		List<Tuple2<String, String>> data1 = new ArrayList<Tuple2<String, String>>();
		data1.add(new Tuple2<String, String>("K1", "A"));
		data1.add(new Tuple2<String, String>("K2", "B"));
		data1.add(new Tuple2<String, String>("K1", "C"));
		data1.add(new Tuple2<String, String>("K3", "D"));
		JavaPairRDD<String, String> pairs1 = sc.parallelizePairs(data1);
		
		List<Tuple2<String, Double>> data2 = new ArrayList<Tuple2<String, Double>>();
		data2.add(new Tuple2<String, Double>("K1", 4.9));
		data2.add(new Tuple2<String, Double>("K2", 4.8));
		data2.add(new Tuple2<String, Double>("K4", 4.8));
		JavaPairRDD<String, Double> pairs2 = sc.parallelizePairs(data2);
		
		JavaPairRDD<String, Tuple2<String, Double>> result1 = pairs1.join(pairs2);
		s(result1.collect());		//[(K1,(A,4.9)), (K1,(C,4.9)), (K2,(B,4.8))]
		
		s("예제 4-18 leftOuterJoin()과 rightOuterJoin()");
		JavaPairRDD<String, Tuple2<String, Optional<Double>>> result2 = pairs1.leftOuterJoin(pairs2);
		s(result2.collect());		//[(K1,(A,Optional.of(4.9))), (K1,(C,Optional.of(4.9))), (K3,(D,Optional.absent())), (K2,(B,Optional.of(4.8)))]
		JavaPairRDD<String, Tuple2<Optional<String>, Double>> result3 = pairs1.rightOuterJoin(pairs2);
		s(result3.collect());		//[(K1,(Optional.of(A),4.9)), (K1,(Optional.of(C),4.9)), (K4,(Optional.absent(),4.8)), (K2,(Optional.of(B),4.8))]
	}
	//Serializable 안해주면 다음의 Exception 발생한다: Task not serializable: org.mystudy.Ch4_example$MyComparator2
	class MyComparator2 implements Comparator<Integer>, Serializable {	

		private static final long serialVersionUID = 1L;

		@Override
		public int compare(Integer o1, Integer o2) {
			return String.valueOf(o1).compareTo(String.valueOf(o2));
		}
	}	
	public void proc8() {
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		s("예제 4-21 자바에서의 사용자 지정 정렬, 정수를 문자열인 것처럼 정렬하기");
		JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(3,4,1,2));
		rdd = rdd.sortBy(a -> String.valueOf(a), false, 3);
		s(rdd.collect());		//[4, 3, 2, 1]
		
		List<Tuple2<Integer, Integer>> data1 = new ArrayList<Tuple2<Integer, Integer>>();
		data1.add(new Tuple2<Integer, Integer>(5, 2));
		data1.add(new Tuple2<Integer, Integer>(4, 2));
		data1.add(new Tuple2<Integer, Integer>(8, 2));
		data1.add(new Tuple2<Integer, Integer>(11, 2));
		JavaPairRDD<Integer, Integer> pairs1 = sc.parallelizePairs(data1);
		s(pairs1.sortByKey().collect());					//[(4,2), (5,2), (8,2), (11,2)]
		s(pairs1.sortByKey(false).collect());				//[(11,2), (8,2), (5,2), (4,2)]
		s(pairs1.sortByKey(new MyComparator2()).collect());	//[(11,2), (4,2), (5,2), (8,2)]
	}
	
	class MyPartitioner extends org.apache.spark.Partitioner{
		private static final long serialVersionUID = 1L;
		private int numParts;
		
		public MyPartitioner(int numParts) {
			this.numParts = numParts;
		}
		@Override
		public int getPartition(Object arg0) {
			int code = 0;
			try {
				String domain = new java.net.URL(arg0.toString()).getHost();
				code = domain.hashCode() % this.numPartitions();
				if(code < 0){
					code += this.numPartitions();
				}
			} catch (MalformedURLException e) {
				e.printStackTrace();
			}
			return code;
		}

		@Override
		public int numPartitions() {
			return this.numParts;
		}
	}
	public void proc11() {
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		s("groupByKey() - 동일한 key가 존재하면, value를 같은 key로 묶는다.");
		List<Tuple2<String, String>> data1 = new ArrayList<Tuple2<String, String>>();
		data1.add(new Tuple2<String, String>("K1", "A"));
		data1.add(new Tuple2<String, String>("K2", "B"));
		data1.add(new Tuple2<String, String>("K1", "C"));
		data1.add(new Tuple2<String, String>("K3", "D"));
		JavaPairRDD<String, String> pairs1 = sc.parallelizePairs(data1);
		JavaPairRDD<String, Iterable<String>> result1 = pairs1.groupByKey();
		s(result1.collect());		//[(K1,[A, C]), (K3,[D]), (K2,[B])]
		JavaPairRDD<String, Iterable<String>> result2 = pairs1.groupByKey(3);	//numPartitions
		JavaPairRDD<String, Iterable<String>> result3 = pairs1.groupByKey(new org.apache.spark.HashPartitioner(4));		//Partitioner
		
		s("예제4-26 사용자 지정 파티셔너");
		JavaPairRDD<String, Iterable<String>> result4 = pairs1.groupByKey(new MyPartitioner(3));
	}
	public void proc12() {
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		s("flatMapValues를 하기 전에, mapValues, flatMap 을 알아야 한다.");
		s("1.mapValues() - key는 변함없고, value를 수정할 수 있다.");
		List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
		list.add(new Tuple2<String, Integer>("panda", 0));
		list.add(new Tuple2<String, Integer>("pink", 3));
		list.add(new Tuple2<String, Integer>("pirate", 3));
		list.add(new Tuple2<String, Integer>("panda", 1));
		list.add(new Tuple2<String, Integer>("pink", 4));
		JavaPairRDD<String, Integer> pairs = sc.parallelizePairs(list);
		s(pairs.collect());		//[(panda,0), (pink,3), (pirate,3), (panda,1), (pink,4)]
		JavaPairRDD<String, Tuple2<Integer, Integer>> pairs2 = pairs.mapValues(value -> new Tuple2<Integer, Integer>(value, 1));
		s(pairs2.collect());	//[(panda,(0,1)), (pink,(3,1)), (pirate,(3,1)), (panda,(1,1)), (pink,(4,1))]
		
		s("2.flatMap() - 개별 항목을 List로 수정할 수 있고(그냥 개별로 수정할 수도 있다.), List로 된 항목을 같은 레벨로 펼쳐놓는다는 느낌");
		JavaRDD<String> lines31 = sc.parallelize(Arrays.asList("first line : hello world", "second line : hi"));
		JavaRDD<String> words = lines31.flatMap(line -> Arrays.asList(line.split(" ")));
		s(words.collect());	//[first, line, :, hello, world, second, line, :, hi]
		
		s("3.flatMapValues() - 개별 value값을 List(Iterable) 형태로 수정해야 한다. List 형태로 수정된 value를 key에 따로따로 분리독립 시킴. 이것이 카테시안 곱인가?");
		List<Tuple2<String, String>> data1 = new ArrayList<Tuple2<String, String>>();
		data1.add(new Tuple2<String, String>("K1", "ABC"));
		data1.add(new Tuple2<String, String>("K2", "DE"));
		data1.add(new Tuple2<String, String>("K1", "F"));
		data1.add(new Tuple2<String, String>("K3", "GHI"));
		JavaPairRDD<String, String> pairs1 = sc.parallelizePairs(data1);
		JavaPairRDD<String, String> result1 = pairs1.flatMapValues(str -> {
				List<String> temp = new ArrayList<String>();
				for(int i = 0 ; i<str.length() ; i++){
					temp.add(String.valueOf(str.charAt(i)));
				}
				return temp;
		});
		s(result1.collect());	//[(K1,A), (K1,B), (K1,C), (K2,D), (K2,E), (K1,F), (K3,G), (K3,H), (K3,I)]
	}
	
	public void proc13() {
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		s("keys()");
		List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
		list.add(new Tuple2<String, Integer>("panda", 0));
		list.add(new Tuple2<String, Integer>("pink", 3));
		list.add(new Tuple2<String, Integer>("pirate", 3));
		list.add(new Tuple2<String, Integer>("panda", 1));
		list.add(new Tuple2<String, Integer>("pink", 4));
		JavaPairRDD<String, Integer> pairs = sc.parallelizePairs(list);	
		JavaRDD<String> result = pairs.keys();
		s(result.collect());		//[panda, pink, pirate, panda, pink]
	}
	
	public void proc14() {
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		s("values()");
		List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
		list.add(new Tuple2<String, Integer>("panda", 0));
		list.add(new Tuple2<String, Integer>("pink", 3));
		list.add(new Tuple2<String, Integer>("pirate", 3));
		list.add(new Tuple2<String, Integer>("panda", 1));
		list.add(new Tuple2<String, Integer>("pink", 4));
		JavaPairRDD<String, Integer> pairs = sc.parallelizePairs(list);
		JavaRDD<Integer> result = pairs.values();
		s(result.collect());	//[0, 3, 3, 1, 4]
	}
	
	public void proc15() {
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		s("cogroup() - 두개이상의 Pair를 이용, key는 데이터 타입이 같아야 하고, value는 달라도 된다. 최대 4개의 Pair를 그룹화 가능");
		List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>();
		data0.add(new Tuple2<String, String>("K1", "ABC"));
		data0.add(new Tuple2<String, String>("K2", "DE"));
		data0.add(new Tuple2<String, String>("K5", "GHI"));
		data0.add(new Tuple2<String, String>("K5", "JKL"));
		JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0);
		
		List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
		data1.add(new Tuple2<String, Integer>("K1", 123));
		data1.add(new Tuple2<String, Integer>("K2", 456));
		data1.add(new Tuple2<String, Integer>("K7", 0));
		JavaPairRDD<String, Integer> other1 = sc.parallelizePairs(data1);
		
		JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<Integer>>> result1 = pairs.cogroup(other1);
		s(result1.collect());	//[(K7,([],[0])), (K1,([ABC],[123])), (K5,([GHI, JKL],[])), (K2,([DE],[456]))]
		
		List<Tuple2<String, Double>> data2 = new ArrayList<Tuple2<String, Double>>();
		data2.add(new Tuple2<String, Double>("K1", 777.1));
		data2.add(new Tuple2<String, Double>("K2", 999.1));
		data2.add(new Tuple2<String, Double>("K8", 10.1));
		JavaPairRDD<String, Double> other2 = sc.parallelizePairs(data2);
		JavaPairRDD<String, Tuple3<Iterable<String>, Iterable<Integer>, Iterable<Double>>> result2 = pairs.cogroup(other1, other2);
		s(result2.collect());	//[(K7,([],[0],[])), (K1,([ABC],[123],[777.1])), (K5,([GHI, JKL],[],[])), (K2,([DE],[456],[999.1])), (K8,([],[],[10.1]))]
		
		List<Tuple2<String, Float>> data3 = new ArrayList<Tuple2<String, Float>>();
		data3.add(new Tuple2<String, Float>("K1", 1.1f));
		data3.add(new Tuple2<String, Float>("K2", 2.1f));
		data3.add(new Tuple2<String, Float>("K8", 3.1f));
		JavaPairRDD<String, Float> other3 = sc.parallelizePairs(data3);
		JavaPairRDD<String, Tuple4<Iterable<String>, Iterable<Integer>, Iterable<Double>, Iterable<Float>>> result3 = pairs.cogroup(other1, other2, other3);
		s(result3.collect());
		//[(K7,([],[0],[],[])), (K1,([ABC],[123],[777.1],[1.1])), (K5,([GHI, JKL],[],[],[])), (K2,([DE],[456],[999.1],[2.1])), (K8,([],[],[10.1],[3.1]))]
	}
	
	public void proc16() {
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		s("subtractByKey() - 상대편 Pair의 key로 잘라내기");
		List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>();
		data0.add(new Tuple2<String, String>("K1", "ABC"));
		data0.add(new Tuple2<String, String>("K2", "DE"));
		data0.add(new Tuple2<String, String>("K5", "GHI"));
		data0.add(new Tuple2<String, String>("K5", "JKL"));
		JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0);
		
		List<Tuple2<String, Integer>> data1 = new ArrayList<Tuple2<String, Integer>>();
		data1.add(new Tuple2<String, Integer>("K1", 123));
		data1.add(new Tuple2<String, Integer>("K2", 456));
		data1.add(new Tuple2<String, Integer>("K7", 0));
		JavaPairRDD<String, Integer> other1 = sc.parallelizePairs(data1);
		
		JavaPairRDD<String, String> result = pairs.subtractByKey(other1);
		s(result.collect());		//[(K5,GHI), (K5,JKL)]
		
		//반대로
		JavaPairRDD<String, Integer> result2 = other1.subtractByKey(pairs);
		s(result2.collect());		//[(K7,0)]
	}
	
	public void proc17(){
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		s("countByKey, collectAsMap, lookup(key)");
		List<Tuple2<String, String>> data0 = new ArrayList<Tuple2<String, String>>();
		data0.add(new Tuple2<String, String>("K1", "ABC"));
		data0.add(new Tuple2<String, String>("K2", "DE"));
		data0.add(new Tuple2<String, String>("K5", "GHI"));
		data0.add(new Tuple2<String, String>("K5", "JKL"));
		JavaPairRDD<String, String> pairs = sc.parallelizePairs(data0);

		Map<String, Object> result0 = pairs.countByKey();
		s(result0);		//{K1=1, K5=2, K2=1}
		Map<String, String> result1 = pairs.collectAsMap();
		s(result1);		//{K1=ABC, K2=DE, K5=JKL}
		List<String> result2 = pairs.lookup("K1");
		s(result2);		//[ABC]
		List<String> result3 = pairs.lookup("K5");
		s(result3);		//[GHI, JKL]
	}
	
	public void proc21(){
		JavaSparkContext sc = MyConf.getJavaSparkContext();
		s("데이터는 RDD에 내부적으로 어떻게 파티션되어서 나눠질까??? 확인해볼 수 있을까?");
		JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9));
		s(rdd.getNumPartitions());	//2
		rdd.foreachPartition(iter -> {
			while(iter.hasNext()){
				s(iter.hashCode() + ", " + iter.next());
			}
		});
//		파티션2개
//		271428025, 1		
//		271428025, 2
//		271428025, 3
//		271428025, 4
//		-788345623, 5
//		-788345623, 6
//		-788345623, 7
//		-788345623, 8
//		-788345623, 9
		
		rdd = rdd.repartition(4);
		rdd.foreachPartition(iter -> {
			while(iter.hasNext()){
				s(iter.hashCode() + ", " + iter.next());
			}
		});		
//		파티션4개
//		-1025106490, 3
//		287691785, 2
//		-1025106490, 7
//		287691785, 6
//		1847534542, 1
//		951314831, 4
//		1847534542, 5
//		951314831, 8
//		1847534542, 9
	}
	
}