람다식 안에서 클로저 작동을 원활하게 하기 위해
어큐뮬레이터, 브로드캐스트 를 사용한다.
package org.mystudy;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Scanner;
import java.util.regex.Pattern;
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.util.StatCounter;
import org.eclipse.jetty.client.ContentExchange;
import org.eclipse.jetty.client.HttpClient;
import org.junit.Test;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import scala.Tuple2;
public class Ch6_example implements Serializable {
private static final long serialVersionUID = 43443432321L;
public Ch6_example() {
PropertyConfigurator.configure(System.getProperty("user.dir") + "/src/resources/log4j.properties");
}
public static void main(String... strings) {
Ch6_example ex = new Ch6_example();
// ex.proc1();
ex.proc2();
}
public static void s(Object o){ //static 으로 사용하면 class가 Serializable 하지 않아도 된다. 왜????
System.out.println(o);
}
@Test
public void proc1() {
s("예제 6-4 자바에서 어큐뮬레이터에 빈 라인 개수 세기");
JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
JavaRDD<String> rdd = sc.textFile(getClass().getResource("callsigns").getFile());
/* callsigns >
W8PAL
KK6JKQ
W6BB
VE3UOW
VE2CUA
VE2UN
OH2TI
GB1MIR
K2AMH
UA1LO
N7ICE
*/
final Accumulator<Integer> blankLines = sc.accumulator(0);
JavaRDD<String> callSigns = rdd.flatMap(str -> {
if(str.equals("")){
blankLines.add(1);
}
return Arrays.asList(str.split(" "));
});
// callSigns.saveAsTextFile("output.txt"); //같은 폴더가 존재하면, Exception 발생
s(callSigns.collect()); //[W8PAL, KK6JKQ, W6BB, VE3UOW, VE2CUA, VE2UN, OH2TI, GB1MIR, K2AMH, UA1LO, N7ICE]
s("blank lines:" + blankLines.value()); //blank lines:2
s("예제 6-5 어큐뮬레이터로 에러 세기");
final Accumulator<Integer> validSignCount = sc.accumulator(0);
final Accumulator<Integer> invalidSignCount = sc.accumulator(0);
JavaRDD<String> validSigns = callSigns.filter(str -> {
Pattern p = Pattern.compile("\\d?[a-zA-Z]{1,2}\\d{1,4}[a-zA-Z]{1,3}");
if(p.matcher(str).matches()){
validSignCount.add(1);
return true;
}else{
invalidSignCount.add(1);
return false;
}
});
s(validSigns.collect()); //[W8PAL, KK6JKQ, W6BB, VE3UOW, VE2CUA, VE2UN, OH2TI, GB1MIR, K2AMH, UA1LO, N7ICE]
JavaPairRDD<String, Integer> contactCount = validSigns.mapToPair(str -> new Tuple2<String, Integer>(str, 1)).reduceByKey((a,b)->a+b);
s(contactCount.collect()); //[(W8PAL,1), (KK6JKQ,1), (OH2TI,1), (N7ICE,1), (GB1MIR,1), (VE2UN,1), (UA1LO,1), (VE2CUA,1), (W6BB,1), (K2AMH,1), (VE3UOW,1)]
if(invalidSignCount.value() < (0.1 * validSignCount.value())){
s(invalidSignCount.value() + "< (0.1 * " + validSignCount.value());
}else{
s(String.format("Too many errors: %s in %s", invalidSignCount.value(), validSignCount.value())); //Too many errors: 4 in 22
}
s("예제 6-9 자바에서 브로드캐스트 변수를 쓴 국가 검색");
final Broadcast<String[]> callSignInfo = sc.broadcast(loadCallSignTable());
JavaPairRDD<String, Integer> countryContactCounts = contactCount.mapToPair(tuple->{
String country = lookupCountry(tuple._1, callSignInfo.value());
return new Tuple2<String, Integer>(country, tuple._2);
});
s(countryContactCounts.collect()); //[( United States of America,1), ( United States of America,1), ( Finland,1), ( United States of America,1), ( United Kingdom of Great Britain and Northern Ireland,1), ( Canada,1), ( Russian Federation,1), ( Canada,1), ( United States of America,1), ( United States of America,1), ( Canada,1)]
countryContactCounts = countryContactCounts.reduceByKey((a,b) -> a+b);
s(countryContactCounts.collect()); //[( Finland,1), ( Canada,3), ( Russian Federation,1), ( United States of America,5), ( United Kingdom of Great Britain and Northern Ireland,1)]
s("예제 6-12 자바에서 접속 풀과 JSON 파서 공유 - 아직 잘 모르겠다. heroku도 null 로 return되고-_-;;");
JavaPairRDD<String, CallLog[]> contactsContactLists = validSigns.mapPartitionsToPair(iter -> {
ArrayList<Tuple2<String, CallLog[]>> callsignLogs = new ArrayList<>();
ArrayList<Tuple2<String, ContentExchange>> requests = new ArrayList<>();
ObjectMapper mapper = createMapper();
HttpClient client = new HttpClient();
try{
client.start();
while(iter.hasNext()){
requests.add(createRequestForSign(iter.next(), client));
}
for(Tuple2<String, ContentExchange> signExchange : requests){
callsignLogs.add(fetchResultFromRequest(mapper, signExchange));
}
}catch(Exception e){
e.printStackTrace();
}
return callsignLogs;
});
s(contactsContactLists.collect());
sc.close();
}
private String[] loadCallSignTable() {
Scanner callSignTbl = null;
try {
callSignTbl = new Scanner(new File(getClass().getResource("callsign_tbl_sorted").getFile()));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
ArrayList<String> callSignList = new ArrayList<String>();
while (callSignTbl.hasNextLine()) {
callSignList.add(callSignTbl.nextLine());
}
return callSignList.toArray(new String[0]);
}
private String lookupCountry(String callSign, String[] table) {
Integer pos = java.util.Arrays.binarySearch(table, callSign);
if (pos < 0) {
pos = -pos - 1;
}
return table[pos].split(",")[1];
}
class CallLog implements Serializable {
private static final long serialVersionUID = 1L;
public String callsign;
public Double contactlat;
public Double contactlong;
public Double mylat;
public Double mylong;
}
private ObjectMapper createMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
return mapper;
}
private Tuple2<String, ContentExchange> createRequestForSign(String sign, HttpClient client) throws Exception {
ContentExchange exchange = new ContentExchange(true);
String url = "http://new73s.herokuapp.com/qsos/" + sign + ".json";
s(url);
exchange.setURL(url);
client.send(exchange);
return new Tuple2(sign, exchange);
}
private Tuple2<String, CallLog[]> fetchResultFromRequest(ObjectMapper mapper, Tuple2<String, ContentExchange> signExchange) {
String sign = signExchange._1();
ContentExchange exchange = signExchange._2();
return new Tuple2(sign, readExchangeCallLog(mapper, exchange));
}
private CallLog[] readExchangeCallLog(ObjectMapper mapper, ContentExchange exchange) {
try {
exchange.waitForDone();
String responseJson = exchange.getResponseContent();
return mapper.readValue(responseJson, CallLog[].class);
} catch (Exception e) {
return new CallLog[0];
}
}
public void proc2(){
s("예제6-13 mapPartitions() 없이 평균 계산");
JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9,10));
JavaPairRDD<Integer, Integer> result = rdd.mapToPair(i -> new Tuple2<Integer, Integer>(i, 1));
Tuple2<Integer, Integer> result3 = result.reduce((tuple1, tuple2) -> new Tuple2<Integer, Integer>(tuple1._1 + tuple2._1, tuple1._2 + tuple2._2));
s(result3._1 / (double)result3._2);
s("예제6-14 mapPartitions()으로 평균 계산 - 파티션당 Tuple을 하나만 만들수 있다고 한다. 그런가?");
rdd = rdd.repartition(5);
JavaPairRDD<Integer, Integer> result2 = rdd.mapPartitionsToPair(iter -> {
System.out.println("파티션 합치기 파티션 개수만큼 동작?");
int i=0;
int j=0;
while(iter.hasNext()){
// sumCount._1 += iter.next(); //?? The final field Tuple2<Integer,Integer>._1 cannot be assigned???
// sumCount._2 += 1;
i += iter.next();
j++;
}
List<Tuple2<Integer, Integer>> list = new ArrayList<>();
Tuple2<Integer, Integer> sumCount = new Tuple2<>(i, j);
list.add(sumCount);
return list;
});
s(result2.collect()); //[(15,2), (7,2), (9,2), (11,2), (13,2)]
Tuple2<Integer, Integer> result4 = result2.reduce((tuple1, tuple2) -> {
System.out.println("파티션 합치기 파티션 개수만큼 동작? -1");
return new Tuple2<Integer, Integer>(tuple1._1 + tuple2._1, tuple1._2 + tuple2._2);
});
s(result4._1 / (double)result4._2);
s("단순히 합계만 구할 것 같으면, mapPartitons만 사용해도 될듯");
JavaRDD<Integer> result5 = rdd.mapPartitions(iter->{
System.out.println("파티션 합치기 파티션 개수만큼 동작A");
int i=0;
while(iter.hasNext()){
i += iter.next();
}
List<Integer> list = new ArrayList<>();
list.add(i);
return list;
});
s(result5.collect()); //[15, 7, 9, 11, 13]
s("단순히 합계만 구할 것 같으면, mapPartitons만 사용해도 될듯 -> return은 Double로");
JavaDoubleRDD result6 = rdd.mapPartitionsToDouble(iter->{
System.out.println("파티션 합치기 파티션 개수만큼 동작B");
double d = 0.0;
while(iter.hasNext()){
d += iter.next();
}
List<Double> list = new ArrayList<>();
list.add(d);
return list;
});
s(result6.collect()); //[15.0, 7.0, 9.0, 11.0, 13.0]
s(result6.sum()); //55.0
s("pipe()를 이용한 R 접속");
//TODO
s("예제6-21 자바에서 먼 거리의 로그 걸러 내기");
JavaRDD<String> distances = sc.parallelize(Arrays.asList("-80.0","20.0","30.0","40.0","50.0","60.0","60.0","60.0","60.0","60.0","60.0","1000.0"));
JavaDoubleRDD distanceDoubles = distances.mapToDouble(str->Double.parseDouble(str));
distanceDoubles.cache();
final StatCounter stats = distanceDoubles.stats();
s(stats); //(count: 7, mean: 41.428571, stdev: 67.913510, max: 170.000000, min: -80.000000)
final Double stdev = stats.stdev(); //표준편차
final Double mean = stats.mean(); //데이터 값들의 평균
JavaDoubleRDD reasonableDistances = distanceDoubles.filter(d -> Math.abs(d - mean) < (3 * stdev));
distanceDoubles.unpersist();
s(reasonableDistances.collect()); //[-80.0, 20.0, 30.0, 40.0, 50.0, 60.0, 60.0, 60.0, 60.0, 60.0, 60.0]
sc.close();
}
}
'Spark > 러닝 스파크' 카테고리의 다른 글
| Spark 시작하기17 - [러닝 스파크] 7장 클러스터에서 운영하기 (0) | 2016.04.26 |
|---|---|
| Spark 시작하기15 - [러닝 스파크] 5장 데이터 불러오기/저장하기 (0) | 2016.04.26 |
| Spark 시작하기13 - [러닝 스파크] 4장 키/값 페어로 작업하기2 (0) | 2016.04.22 |
| Spark 시작하기12 - [러닝 스파크] 4장 키/값 페어로 작업하기 (0) | 2016.04.20 |
| Spark 시작하기11 - [러닝 스파크] 3장 RDD로 프로그래밍하기2 (0) | 2016.04.19 |