람다식 안에서 클로저 작동을 원활하게 하기 위해
어큐뮬레이터, 브로드캐스트 를 사용한다.
package org.mystudy; import java.io.File; import java.io.FileNotFoundException; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Scanner; import java.util.regex.Pattern; import org.apache.log4j.PropertyConfigurator; import org.apache.spark.Accumulator; import org.apache.spark.api.java.JavaDoubleRDD; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.util.StatCounter; import org.eclipse.jetty.client.ContentExchange; import org.eclipse.jetty.client.HttpClient; import org.junit.Test; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import scala.Tuple2; public class Ch6_example implements Serializable { private static final long serialVersionUID = 43443432321L; public Ch6_example() { PropertyConfigurator.configure(System.getProperty("user.dir") + "/src/resources/log4j.properties"); } public static void main(String... strings) { Ch6_example ex = new Ch6_example(); // ex.proc1(); ex.proc2(); } public static void s(Object o){ //static 으로 사용하면 class가 Serializable 하지 않아도 된다. 왜???? System.out.println(o); } @Test public void proc1() { s("예제 6-4 자바에서 어큐뮬레이터에 빈 라인 개수 세기"); JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App"); JavaRDD<String> rdd = sc.textFile(getClass().getResource("callsigns").getFile()); /* callsigns > W8PAL KK6JKQ W6BB VE3UOW VE2CUA VE2UN OH2TI GB1MIR K2AMH UA1LO N7ICE */ final Accumulator<Integer> blankLines = sc.accumulator(0); JavaRDD<String> callSigns = rdd.flatMap(str -> { if(str.equals("")){ blankLines.add(1); } return Arrays.asList(str.split(" ")); }); // callSigns.saveAsTextFile("output.txt"); //같은 폴더가 존재하면, Exception 발생 s(callSigns.collect()); //[W8PAL, KK6JKQ, W6BB, VE3UOW, VE2CUA, VE2UN, OH2TI, GB1MIR, K2AMH, UA1LO, N7ICE] s("blank lines:" + blankLines.value()); //blank lines:2 s("예제 6-5 어큐뮬레이터로 에러 세기"); final Accumulator<Integer> validSignCount = sc.accumulator(0); final Accumulator<Integer> invalidSignCount = sc.accumulator(0); JavaRDD<String> validSigns = callSigns.filter(str -> { Pattern p = Pattern.compile("\\d?[a-zA-Z]{1,2}\\d{1,4}[a-zA-Z]{1,3}"); if(p.matcher(str).matches()){ validSignCount.add(1); return true; }else{ invalidSignCount.add(1); return false; } }); s(validSigns.collect()); //[W8PAL, KK6JKQ, W6BB, VE3UOW, VE2CUA, VE2UN, OH2TI, GB1MIR, K2AMH, UA1LO, N7ICE] JavaPairRDD<String, Integer> contactCount = validSigns.mapToPair(str -> new Tuple2<String, Integer>(str, 1)).reduceByKey((a,b)->a+b); s(contactCount.collect()); //[(W8PAL,1), (KK6JKQ,1), (OH2TI,1), (N7ICE,1), (GB1MIR,1), (VE2UN,1), (UA1LO,1), (VE2CUA,1), (W6BB,1), (K2AMH,1), (VE3UOW,1)] if(invalidSignCount.value() < (0.1 * validSignCount.value())){ s(invalidSignCount.value() + "< (0.1 * " + validSignCount.value()); }else{ s(String.format("Too many errors: %s in %s", invalidSignCount.value(), validSignCount.value())); //Too many errors: 4 in 22 } s("예제 6-9 자바에서 브로드캐스트 변수를 쓴 국가 검색"); final Broadcast<String[]> callSignInfo = sc.broadcast(loadCallSignTable()); JavaPairRDD<String, Integer> countryContactCounts = contactCount.mapToPair(tuple->{ String country = lookupCountry(tuple._1, callSignInfo.value()); return new Tuple2<String, Integer>(country, tuple._2); }); s(countryContactCounts.collect()); //[( United States of America,1), ( United States of America,1), ( Finland,1), ( United States of America,1), ( United Kingdom of Great Britain and Northern Ireland,1), ( Canada,1), ( Russian Federation,1), ( Canada,1), ( United States of America,1), ( United States of America,1), ( Canada,1)] countryContactCounts = countryContactCounts.reduceByKey((a,b) -> a+b); s(countryContactCounts.collect()); //[( Finland,1), ( Canada,3), ( Russian Federation,1), ( United States of America,5), ( United Kingdom of Great Britain and Northern Ireland,1)] s("예제 6-12 자바에서 접속 풀과 JSON 파서 공유 - 아직 잘 모르겠다. heroku도 null 로 return되고-_-;;"); JavaPairRDD<String, CallLog[]> contactsContactLists = validSigns.mapPartitionsToPair(iter -> { ArrayList<Tuple2<String, CallLog[]>> callsignLogs = new ArrayList<>(); ArrayList<Tuple2<String, ContentExchange>> requests = new ArrayList<>(); ObjectMapper mapper = createMapper(); HttpClient client = new HttpClient(); try{ client.start(); while(iter.hasNext()){ requests.add(createRequestForSign(iter.next(), client)); } for(Tuple2<String, ContentExchange> signExchange : requests){ callsignLogs.add(fetchResultFromRequest(mapper, signExchange)); } }catch(Exception e){ e.printStackTrace(); } return callsignLogs; }); s(contactsContactLists.collect()); sc.close(); } private String[] loadCallSignTable() { Scanner callSignTbl = null; try { callSignTbl = new Scanner(new File(getClass().getResource("callsign_tbl_sorted").getFile())); } catch (FileNotFoundException e) { e.printStackTrace(); } ArrayList<String> callSignList = new ArrayList<String>(); while (callSignTbl.hasNextLine()) { callSignList.add(callSignTbl.nextLine()); } return callSignList.toArray(new String[0]); } private String lookupCountry(String callSign, String[] table) { Integer pos = java.util.Arrays.binarySearch(table, callSign); if (pos < 0) { pos = -pos - 1; } return table[pos].split(",")[1]; } class CallLog implements Serializable { private static final long serialVersionUID = 1L; public String callsign; public Double contactlat; public Double contactlong; public Double mylat; public Double mylong; } private ObjectMapper createMapper() { ObjectMapper mapper = new ObjectMapper(); mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); return mapper; } private Tuple2<String, ContentExchange> createRequestForSign(String sign, HttpClient client) throws Exception { ContentExchange exchange = new ContentExchange(true); String url = "http://new73s.herokuapp.com/qsos/" + sign + ".json"; s(url); exchange.setURL(url); client.send(exchange); return new Tuple2(sign, exchange); } private Tuple2<String, CallLog[]> fetchResultFromRequest(ObjectMapper mapper, Tuple2<String, ContentExchange> signExchange) { String sign = signExchange._1(); ContentExchange exchange = signExchange._2(); return new Tuple2(sign, readExchangeCallLog(mapper, exchange)); } private CallLog[] readExchangeCallLog(ObjectMapper mapper, ContentExchange exchange) { try { exchange.waitForDone(); String responseJson = exchange.getResponseContent(); return mapper.readValue(responseJson, CallLog[].class); } catch (Exception e) { return new CallLog[0]; } } public void proc2(){ s("예제6-13 mapPartitions() 없이 평균 계산"); JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App"); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10)); JavaPairRDD<Integer, Integer> result = rdd.mapToPair(i -> new Tuple2<Integer, Integer>(i, 1)); Tuple2<Integer, Integer> result3 = result.reduce((tuple1, tuple2) -> new Tuple2<Integer, Integer>(tuple1._1 + tuple2._1, tuple1._2 + tuple2._2)); s(result3._1 / (double)result3._2); s("예제6-14 mapPartitions()으로 평균 계산 - 파티션당 Tuple을 하나만 만들수 있다고 한다. 그런가?"); rdd = rdd.repartition(5); JavaPairRDD<Integer, Integer> result2 = rdd.mapPartitionsToPair(iter -> { System.out.println("파티션 합치기 파티션 개수만큼 동작?"); int i=0; int j=0; while(iter.hasNext()){ // sumCount._1 += iter.next(); //?? The final field Tuple2<Integer,Integer>._1 cannot be assigned??? // sumCount._2 += 1; i += iter.next(); j++; } List<Tuple2<Integer, Integer>> list = new ArrayList<>(); Tuple2<Integer, Integer> sumCount = new Tuple2<>(i, j); list.add(sumCount); return list; }); s(result2.collect()); //[(15,2), (7,2), (9,2), (11,2), (13,2)] Tuple2<Integer, Integer> result4 = result2.reduce((tuple1, tuple2) -> { System.out.println("파티션 합치기 파티션 개수만큼 동작? -1"); return new Tuple2<Integer, Integer>(tuple1._1 + tuple2._1, tuple1._2 + tuple2._2); }); s(result4._1 / (double)result4._2); s("단순히 합계만 구할 것 같으면, mapPartitons만 사용해도 될듯"); JavaRDD<Integer> result5 = rdd.mapPartitions(iter->{ System.out.println("파티션 합치기 파티션 개수만큼 동작A"); int i=0; while(iter.hasNext()){ i += iter.next(); } List<Integer> list = new ArrayList<>(); list.add(i); return list; }); s(result5.collect()); //[15, 7, 9, 11, 13] s("단순히 합계만 구할 것 같으면, mapPartitons만 사용해도 될듯 -> return은 Double로"); JavaDoubleRDD result6 = rdd.mapPartitionsToDouble(iter->{ System.out.println("파티션 합치기 파티션 개수만큼 동작B"); double d = 0.0; while(iter.hasNext()){ d += iter.next(); } List<Double> list = new ArrayList<>(); list.add(d); return list; }); s(result6.collect()); //[15.0, 7.0, 9.0, 11.0, 13.0] s(result6.sum()); //55.0 s("pipe()를 이용한 R 접속"); //TODO s("예제6-21 자바에서 먼 거리의 로그 걸러 내기"); JavaRDD<String> distances = sc.parallelize(Arrays.asList("-80.0","20.0","30.0","40.0","50.0","60.0","60.0","60.0","60.0","60.0","60.0","1000.0")); JavaDoubleRDD distanceDoubles = distances.mapToDouble(str->Double.parseDouble(str)); distanceDoubles.cache(); final StatCounter stats = distanceDoubles.stats(); s(stats); //(count: 7, mean: 41.428571, stdev: 67.913510, max: 170.000000, min: -80.000000) final Double stdev = stats.stdev(); //표준편차 final Double mean = stats.mean(); //데이터 값들의 평균 JavaDoubleRDD reasonableDistances = distanceDoubles.filter(d -> Math.abs(d - mean) < (3 * stdev)); distanceDoubles.unpersist(); s(reasonableDistances.collect()); //[-80.0, 20.0, 30.0, 40.0, 50.0, 60.0, 60.0, 60.0, 60.0, 60.0, 60.0] sc.close(); } }
'Spark > 러닝 스파크' 카테고리의 다른 글
Spark 시작하기17 - [러닝 스파크] 7장 클러스터에서 운영하기 (0) | 2016.04.26 |
---|---|
Spark 시작하기15 - [러닝 스파크] 5장 데이터 불러오기/저장하기 (0) | 2016.04.26 |
Spark 시작하기13 - [러닝 스파크] 4장 키/값 페어로 작업하기2 (0) | 2016.04.22 |
Spark 시작하기12 - [러닝 스파크] 4장 키/값 페어로 작업하기 (0) | 2016.04.20 |
Spark 시작하기11 - [러닝 스파크] 3장 RDD로 프로그래밍하기2 (0) | 2016.04.19 |