package org.mystudy.example.ch5; import java.io.StringReader; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.log4j.PropertyConfigurator; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.rdd.JdbcRDD; import org.junit.Test; import com.fasterxml.jackson.databind.ObjectMapper; import au.com.bytecode.opencsv.CSVReader; import scala.Tuple2; import scala.runtime.AbstractFunction0; import scala.runtime.AbstractFunction1; public class Ch5_example { public static void main(String... strings) { Ch5_example ex = new Ch5_example(); // ex.proc1(); // ex.proc2(); //json // ex.proc3(); //csv ex.proc4(); //jdbc } public Ch5_example() { PropertyConfigurator.configure(System.getProperty("user.dir") + "/src/resources/log4j.properties"); } public static void s(Object o){ //static 으로 사용하면 class가 Serializable 하지 않아도 된다. 왜???? System.out.println(o); } @Test public void proc1(){ s("예제5-4 파일별 평균값 구하기"); JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App"); //같은 패키지 내의 폴더 JavaPairRDD<String, String> files = sc.wholeTextFiles(getClass().getResource("files").getFile()); //폴더명 /** * 방법1 - 람다식 1개에서 모두 처리하기 - Spark 답지 않다. */ JavaPairRDD<String, Double> result1 = files.mapValues(contents -> { String[] arr = contents.split(" "); if(arr.length < 1) return 0.0; double sum = 0.0; for(String s : arr){ try{ sum += Double.parseDouble(s); }catch(Exception e){ System.out.println(e.getMessage()); } } return sum / arr.length; }); s(result1.collect()); /** * 방법2 - RDD 단계별로 처리하기 - 결과는 같지만 좀더 복잡하다.-_-;; */ //텍스트 분할 JavaPairRDD<String, String> result2 = files.flatMapValues(contents -> { return Arrays.asList(contents.split(" ")); }); //숫자로 변형 JavaPairRDD<String, Double> result2_2 = result2.mapValues(str -> { try{ return Double.parseDouble(str); }catch(Exception e){ return 0.0; } }); //합계 JavaPairRDD<String, Tuple2<Double, Double>> result2_3 = result2_2.aggregateByKey(new Tuple2<Double, Double>(0.0, 0.0), (tuple, num)->new Tuple2<Double, Double>(tuple._1 + num, tuple._2 + 1), (tuple1,tuple2)->new Tuple2<Double, Double>(tuple1._1 + tuple2._1, tuple1._2 + tuple2._2)); //평균내기 JavaPairRDD<String, Double> result2_4 = result2_3.mapValues(tuple -> tuple._1 / tuple._2); s(result2_4.collect()); } public static class Person implements java.io.Serializable { public String id; public String pwd; public Boolean likesPandas; @Override public String toString() { return "Person [id=" + id + ", pwd=" + pwd + ", likesPandas=" + likesPandas + "]"; } } @Test public void proc2(){ s("예제5-8 자바에서 json 불러오기 - 엄밀히 말하면 json 데이터가 아니다. line별로 map{}이 있을 뿐이다."); JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App"); JavaRDD<String> input = sc.textFile(getClass().getResource("json/file_map.txt").getFile()); /** * json/file_map.txt 파일 내용: {"id":"electro85","pwd":"1111", "likesPandas":false} {"id":"me","pwd":"1111", "likesPandas":true} {"id":"you","pwd":"1111", "likesPandas":true} */ JavaRDD<Person> result = input.mapPartitions(iter -> { List<Person> list = new ArrayList<>(); ObjectMapper mapper = new ObjectMapper(); while(iter.hasNext()){ String line = iter.next(); try{ list.add(mapper.readValue(line, Person.class)); }catch(Exception e){ System.out.println(e.getMessage()); //에러 발생시 레코드 무시 } } return list; }); s(result.collect()); //[Person [id=electro85, pwd=1111, likesPandas=false], Person [id=me, pwd=1111, likesPandas=true], Person [id=you, pwd=1111, likesPandas=true]] s("예제5-11 자바에서 json 저장하기"); JavaRDD<Person> result2 = result.filter(person -> person.likesPandas); JavaRDD<String> result3 = result2.mapPartitions(iter -> { List<String> list = new ArrayList<>(); ObjectMapper mapper = new ObjectMapper(); while(iter.hasNext()){ Person person = iter.next(); try{ list.add(mapper.writeValueAsString(person)); }catch(Exception e){ System.out.println(e.getMessage()); //에러 발생시 레코드 무시 } } return list; }); result3.saveAsTextFile("output5_11"); //폴더명(프로젝트 ROOR 에 폴더 생성된다.) // 두번 실행하면 디렉토리 중복으로 Exception 발생한다. // InvocationTargetException: Output directory file:/D:/ysh/workspace_kepler/learning.spark/output5_11 already exists -> [Help 1] } @Test public void proc3(){ s("예제5-14 자바에서 textFile()로 csv 불러오기"); JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App"); JavaRDD<String> csvFile1 = sc.textFile(getClass().getResource("csv/favourite_animals.csv").getFile()); JavaRDD<String> csvData = csvFile1.map(str -> { au.com.bytecode.opencsv.CSVReader reader = new au.com.bytecode.opencsv.CSVReader(new StringReader(str)); return Arrays.toString(reader.readNext()); }); s(csvData.collect()); //[[holden, panda], [notholden, notpanda], [spark, bear]] s("예제5-17 자바에서 전체적으로 CSV 불러오기"); JavaPairRDD<String, String> wholeCsvData = sc.wholeTextFiles(getClass().getResource("csv").getFile()); //폴더명 JavaRDD<String[]> keyedRDD = wholeCsvData.flatMap(tuple ->{ CSVReader reader = new CSVReader(new StringReader(tuple._2)); return reader.readAll(); }); keyedRDD.foreach(arr -> s(Arrays.toString(arr))); // [holden, panda] // [notholden, notpanda] // [spark, bear] // [holden, panda] // [notholden, notpanda] // [spark, bear] s("예제5-19 자바에서 CSV 쓰기"); //csv 파일의 데이터를 string으로 만든다음. 그저... file에 쓰기 하는 보편적인 작업이다. } @Test public void proc4(){ s("예제5-37 자바에서 JdbcRDD -- 테스트 실패;;;;"); /** * 주의!!! - scala 소스이므로, java에서는 * JavaSparkContext 대신 SparkContext 를 사용해야 한다. * org.apache.spark.api.java.function.Function0 가 아니라 * scala.Function0, scala.Function1 을 사용해야 한다. */ // try { // Class.forName("com.mysql.jdbc.Driver"); // Connection c = DriverManager.getConnection("jdbc:mysql://", "", ""); // } catch (Exception e1) { // e1.printStackTrace(); // } JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App"); JdbcRDD<Tuple2<String, String>> data = new JdbcRDD<Tuple2<String, String>>(JavaSparkContext.toSparkContext(sc), new AbstractFunction0<Connection>(){ @Override public Connection apply() { try { Class.forName("com.mysql.jdbc.Driver"); return DriverManager.getConnection("jdbc:mysql://", "", ""); }catch(Exception e){ e.printStackTrace(); } return null; } }, "Select user_id, name from Users", 1,1,1, new AbstractFunction1<ResultSet, Tuple2<String, String>>(){ @Override public Tuple2<String, String> apply(ResultSet rs) { try { return new Tuple2<String, String>(rs.getString(1), rs.getString(2)); } catch (SQLException e) { e.printStackTrace(); } return null; } }, null); s(data.collect()); //에러나네... } }
일단...
proc4() 는 런타임 에러난다;;;; 좀더 살펴봐야...
'Spark > 러닝 스파크' 카테고리의 다른 글
Spark 시작하기17 - [러닝 스파크] 7장 클러스터에서 운영하기 (0) | 2016.04.26 |
---|---|
Spark 시작하기16 - [러닝 스파크] 6장 고급 스파크 프로그래밍 (0) | 2016.04.26 |
Spark 시작하기13 - [러닝 스파크] 4장 키/값 페어로 작업하기2 (0) | 2016.04.22 |
Spark 시작하기12 - [러닝 스파크] 4장 키/값 페어로 작업하기 (0) | 2016.04.20 |
Spark 시작하기11 - [러닝 스파크] 3장 RDD로 프로그래밍하기2 (0) | 2016.04.19 |