본문 바로가기
Spark/러닝 스파크

Spark 시작하기16 - [러닝 스파크] 6장 고급 스파크 프로그래밍

by java개발자 2016. 4. 26.

람다식 안에서 클로저 작동을 원활하게 하기 위해

어큐뮬레이터, 브로드캐스트 를 사용한다.

package org.mystudy;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Scanner;
import java.util.regex.Pattern;

import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.util.StatCounter;
import org.eclipse.jetty.client.ContentExchange;
import org.eclipse.jetty.client.HttpClient;
import org.junit.Test;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;

import scala.Tuple2;

public class Ch6_example implements Serializable {

	private static final long serialVersionUID = 43443432321L;

	public Ch6_example() {
		PropertyConfigurator.configure(System.getProperty("user.dir") + "/src/resources/log4j.properties");
	}

	public static void main(String... strings) {
		Ch6_example ex = new Ch6_example();
//		ex.proc1();
		ex.proc2();
	}
	
	public static void s(Object o){	//static 으로 사용하면 class가 Serializable 하지 않아도 된다. 왜????
		System.out.println(o);
	}

	@Test
	public void proc1() {
		s("예제 6-4 자바에서 어큐뮬레이터에 빈 라인 개수 세기");
		JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
		JavaRDD<String> rdd = sc.textFile(getClass().getResource("callsigns").getFile());
		/*	callsigns >
			W8PAL
			KK6JKQ
			W6BB
			VE3UOW
			
			VE2CUA
			VE2UN
			OH2TI
			
			GB1MIR
			K2AMH
			UA1LO
			N7ICE
		*/

		final Accumulator<Integer> blankLines = sc.accumulator(0);
		JavaRDD<String> callSigns = rdd.flatMap(str -> {
			if(str.equals("")){
				blankLines.add(1);
			}
			return Arrays.asList(str.split(" "));
		});
//		callSigns.saveAsTextFile("output.txt");	//같은 폴더가 존재하면, Exception 발생
		s(callSigns.collect());						//[W8PAL, KK6JKQ, W6BB, VE3UOW, VE2CUA, VE2UN, OH2TI, GB1MIR, K2AMH, UA1LO, N7ICE]
		s("blank lines:" + blankLines.value());		//blank lines:2
		
		s("예제 6-5 어큐뮬레이터로 에러 세기");
		final Accumulator<Integer> validSignCount = sc.accumulator(0);
		final Accumulator<Integer> invalidSignCount = sc.accumulator(0);
		
		JavaRDD<String> validSigns = callSigns.filter(str -> {
			Pattern p = Pattern.compile("\\d?[a-zA-Z]{1,2}\\d{1,4}[a-zA-Z]{1,3}");
			if(p.matcher(str).matches()){
				validSignCount.add(1);
				return true;
			}else{
				invalidSignCount.add(1);
				return false;
			}
		});
		s(validSigns.collect());					//[W8PAL, KK6JKQ, W6BB, VE3UOW, VE2CUA, VE2UN, OH2TI, GB1MIR, K2AMH, UA1LO, N7ICE]
		JavaPairRDD<String, Integer> contactCount = validSigns.mapToPair(str -> new Tuple2<String, Integer>(str, 1)).reduceByKey((a,b)->a+b);

		s(contactCount.collect());					//[(W8PAL,1), (KK6JKQ,1), (OH2TI,1), (N7ICE,1), (GB1MIR,1), (VE2UN,1), (UA1LO,1), (VE2CUA,1), (W6BB,1), (K2AMH,1), (VE3UOW,1)]
		
		if(invalidSignCount.value() < (0.1 * validSignCount.value())){
			s(invalidSignCount.value() + "< (0.1 * " + validSignCount.value());
		}else{
			s(String.format("Too many errors: %s in %s", invalidSignCount.value(), validSignCount.value()));		//Too many errors: 4 in 22
		}
		
		s("예제 6-9 자바에서 브로드캐스트 변수를 쓴 국가 검색");
		final Broadcast<String[]> callSignInfo = sc.broadcast(loadCallSignTable());
		JavaPairRDD<String, Integer> countryContactCounts = contactCount.mapToPair(tuple->{
			String country = lookupCountry(tuple._1, callSignInfo.value());
			return new Tuple2<String, Integer>(country, tuple._2);
		});
		s(countryContactCounts.collect());	//[( United States of America,1), ( United States of America,1), ( Finland,1), ( United States of America,1), ( United Kingdom of Great Britain and Northern Ireland,1), ( Canada,1), ( Russian Federation,1), ( Canada,1), ( United States of America,1), ( United States of America,1), ( Canada,1)]
		countryContactCounts = countryContactCounts.reduceByKey((a,b) -> a+b);
		s(countryContactCounts.collect());	//[( Finland,1), ( Canada,3), ( Russian Federation,1), ( United States of America,5), ( United Kingdom of Great Britain and Northern Ireland,1)]
				
		s("예제 6-12 자바에서 접속 풀과 JSON 파서 공유 - 아직 잘 모르겠다. heroku도 null 로 return되고-_-;;");
		JavaPairRDD<String, CallLog[]> contactsContactLists = validSigns.mapPartitionsToPair(iter -> {
			
			ArrayList<Tuple2<String, CallLog[]>> callsignLogs = new ArrayList<>();
			ArrayList<Tuple2<String, ContentExchange>> requests = new ArrayList<>();
			ObjectMapper mapper = createMapper();
			HttpClient client = new HttpClient();
			try{
				client.start();
				while(iter.hasNext()){
					requests.add(createRequestForSign(iter.next(), client));
				}
				for(Tuple2<String, ContentExchange> signExchange : requests){
					callsignLogs.add(fetchResultFromRequest(mapper, signExchange));
				}
			}catch(Exception e){
				e.printStackTrace();
			}
			return callsignLogs;
		});

		s(contactsContactLists.collect());
		sc.close();
	}

	private String[] loadCallSignTable() {
		Scanner callSignTbl = null;
		try {
			callSignTbl = new Scanner(new File(getClass().getResource("callsign_tbl_sorted").getFile()));
		} catch (FileNotFoundException e) {
			e.printStackTrace();
		}
		ArrayList<String> callSignList = new ArrayList<String>();
		while (callSignTbl.hasNextLine()) {
			callSignList.add(callSignTbl.nextLine());
		}
		return callSignList.toArray(new String[0]);
	}

	private String lookupCountry(String callSign, String[] table) {
		Integer pos = java.util.Arrays.binarySearch(table, callSign);
		if (pos < 0) {
			pos = -pos - 1;
		}
		return table[pos].split(",")[1];
	}
	class CallLog implements Serializable {
		private static final long serialVersionUID = 1L;
		public String callsign;
		public Double contactlat;
		public Double contactlong;
		public Double mylat;
		public Double mylong;
	}
	private ObjectMapper createMapper() {
		ObjectMapper mapper = new ObjectMapper();
		mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
		return mapper;
	}
	private Tuple2<String, ContentExchange> createRequestForSign(String sign, HttpClient client) throws Exception {
		ContentExchange exchange = new ContentExchange(true);
		String url = "http://new73s.herokuapp.com/qsos/" + sign + ".json";
		s(url);
		exchange.setURL(url);
		client.send(exchange);
		return new Tuple2(sign, exchange);
	}
	private Tuple2<String, CallLog[]> fetchResultFromRequest(ObjectMapper mapper, Tuple2<String, ContentExchange> signExchange) {
		String sign = signExchange._1();
		ContentExchange exchange = signExchange._2();
		return new Tuple2(sign, readExchangeCallLog(mapper, exchange));
	}
	private CallLog[] readExchangeCallLog(ObjectMapper mapper, ContentExchange exchange) {
		try {
			exchange.waitForDone();
			String responseJson = exchange.getResponseContent();
			return mapper.readValue(responseJson, CallLog[].class);
		} catch (Exception e) {
			return new CallLog[0];
		}
	}	
	
	
	public void proc2(){
		s("예제6-13 mapPartitions() 없이 평균 계산");
		JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
		JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
		JavaPairRDD<Integer, Integer> result = rdd.mapToPair(i -> new Tuple2<Integer, Integer>(i, 1));
		Tuple2<Integer, Integer> result3 = result.reduce((tuple1, tuple2) -> new Tuple2<Integer, Integer>(tuple1._1 + tuple2._1, tuple1._2 + tuple2._2));
		s(result3._1 / (double)result3._2);
		
		s("예제6-14 mapPartitions()으로 평균 계산 - 파티션당 Tuple을 하나만 만들수 있다고 한다. 그런가?");
		rdd = rdd.repartition(5);
		
		JavaPairRDD<Integer, Integer> result2 = rdd.mapPartitionsToPair(iter -> {
			System.out.println("파티션 합치기 파티션 개수만큼 동작?");
			int i=0;
			int j=0;
			while(iter.hasNext()){
//				sumCount._1 += iter.next();		//??	The final field Tuple2<Integer,Integer>._1 cannot be assigned???
//				sumCount._2 += 1;
				i += iter.next();
				j++;
			}
			List<Tuple2<Integer, Integer>> list = new ArrayList<>();
			Tuple2<Integer, Integer> sumCount = new Tuple2<>(i, j);
			list.add(sumCount);
			return list;
		});
		s(result2.collect());		//[(15,2), (7,2), (9,2), (11,2), (13,2)]
		Tuple2<Integer, Integer> result4 = result2.reduce((tuple1, tuple2) -> {
							System.out.println("파티션 합치기 파티션 개수만큼 동작? -1");
							return new Tuple2<Integer, Integer>(tuple1._1 + tuple2._1, tuple1._2 + tuple2._2);
					});
		s(result4._1 / (double)result4._2);

		s("단순히 합계만 구할 것 같으면, mapPartitons만 사용해도 될듯");
		JavaRDD<Integer> result5 = rdd.mapPartitions(iter->{
			System.out.println("파티션 합치기 파티션 개수만큼 동작A");
			int i=0;
			while(iter.hasNext()){
				i += iter.next();
			}
			List<Integer> list = new ArrayList<>();
			list.add(i);
			return list;
		});
		s(result5.collect());	//[15, 7, 9, 11, 13]
		
		s("단순히 합계만 구할 것 같으면, mapPartitons만 사용해도 될듯 -> return은 Double로");
		JavaDoubleRDD result6 = rdd.mapPartitionsToDouble(iter->{
			System.out.println("파티션 합치기 파티션 개수만큼 동작B");
			double d = 0.0;
			while(iter.hasNext()){
				d += iter.next();
			}
			List<Double> list = new ArrayList<>();
			list.add(d);
			return list;
		});
		s(result6.collect());	//[15.0, 7.0, 9.0, 11.0, 13.0]
		s(result6.sum());		//55.0
		
		s("pipe()를 이용한 R  접속");
		//TODO
	
		s("예제6-21 자바에서 먼 거리의 로그 걸러 내기");
		JavaRDD<String> distances = sc.parallelize(Arrays.asList("-80.0","20.0","30.0","40.0","50.0","60.0","60.0","60.0","60.0","60.0","60.0","1000.0"));
		JavaDoubleRDD distanceDoubles = distances.mapToDouble(str->Double.parseDouble(str));
		distanceDoubles.cache();
		final StatCounter stats = distanceDoubles.stats();
		s(stats);		//(count: 7, mean: 41.428571, stdev: 67.913510, max: 170.000000, min: -80.000000)
		final Double stdev = stats.stdev();		//표준편차
		final Double mean = stats.mean();		//데이터 값들의 평균
		JavaDoubleRDD reasonableDistances = distanceDoubles.filter(d -> Math.abs(d - mean) < (3 * stdev));
		distanceDoubles.unpersist();
		s(reasonableDistances.collect());		//[-80.0, 20.0, 30.0, 40.0, 50.0, 60.0, 60.0, 60.0, 60.0, 60.0, 60.0]
		
		sc.close();
		
		
	}
}