package org.mystudy.example.ch5;
import java.io.StringReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.JdbcRDD;
import org.junit.Test;
import com.fasterxml.jackson.databind.ObjectMapper;
import au.com.bytecode.opencsv.CSVReader;
import scala.Tuple2;
import scala.runtime.AbstractFunction0;
import scala.runtime.AbstractFunction1;
public class Ch5_example {
public static void main(String... strings) {
Ch5_example ex = new Ch5_example();
// ex.proc1();
// ex.proc2(); //json
// ex.proc3(); //csv
ex.proc4(); //jdbc
}
public Ch5_example() {
PropertyConfigurator.configure(System.getProperty("user.dir") + "/src/resources/log4j.properties");
}
public static void s(Object o){ //static 으로 사용하면 class가 Serializable 하지 않아도 된다. 왜????
System.out.println(o);
}
@Test
public void proc1(){
s("예제5-4 파일별 평균값 구하기");
JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
//같은 패키지 내의 폴더
JavaPairRDD<String, String> files = sc.wholeTextFiles(getClass().getResource("files").getFile()); //폴더명
/**
* 방법1 - 람다식 1개에서 모두 처리하기 - Spark 답지 않다.
*/
JavaPairRDD<String, Double> result1 = files.mapValues(contents -> {
String[] arr = contents.split(" ");
if(arr.length < 1) return 0.0;
double sum = 0.0;
for(String s : arr){
try{
sum += Double.parseDouble(s);
}catch(Exception e){
System.out.println(e.getMessage());
}
}
return sum / arr.length;
});
s(result1.collect());
/**
* 방법2 - RDD 단계별로 처리하기 - 결과는 같지만 좀더 복잡하다.-_-;;
*/
//텍스트 분할
JavaPairRDD<String, String> result2 = files.flatMapValues(contents -> {
return Arrays.asList(contents.split(" "));
});
//숫자로 변형
JavaPairRDD<String, Double> result2_2 = result2.mapValues(str -> {
try{
return Double.parseDouble(str);
}catch(Exception e){
return 0.0;
}
});
//합계
JavaPairRDD<String, Tuple2<Double, Double>> result2_3 = result2_2.aggregateByKey(new Tuple2<Double, Double>(0.0, 0.0), (tuple, num)->new Tuple2<Double, Double>(tuple._1 + num, tuple._2 + 1), (tuple1,tuple2)->new Tuple2<Double, Double>(tuple1._1 + tuple2._1, tuple1._2 + tuple2._2));
//평균내기
JavaPairRDD<String, Double> result2_4 = result2_3.mapValues(tuple -> tuple._1 / tuple._2);
s(result2_4.collect());
}
public static class Person implements java.io.Serializable {
public String id;
public String pwd;
public Boolean likesPandas;
@Override
public String toString() {
return "Person [id=" + id + ", pwd=" + pwd + ", likesPandas=" + likesPandas + "]";
}
}
@Test
public void proc2(){
s("예제5-8 자바에서 json 불러오기 - 엄밀히 말하면 json 데이터가 아니다. line별로 map{}이 있을 뿐이다.");
JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
JavaRDD<String> input = sc.textFile(getClass().getResource("json/file_map.txt").getFile());
/**
* json/file_map.txt 파일 내용:
{"id":"electro85","pwd":"1111", "likesPandas":false}
{"id":"me","pwd":"1111", "likesPandas":true}
{"id":"you","pwd":"1111", "likesPandas":true}
*/
JavaRDD<Person> result = input.mapPartitions(iter -> {
List<Person> list = new ArrayList<>();
ObjectMapper mapper = new ObjectMapper();
while(iter.hasNext()){
String line = iter.next();
try{
list.add(mapper.readValue(line, Person.class));
}catch(Exception e){
System.out.println(e.getMessage()); //에러 발생시 레코드 무시
}
}
return list;
});
s(result.collect()); //[Person [id=electro85, pwd=1111, likesPandas=false], Person [id=me, pwd=1111, likesPandas=true], Person [id=you, pwd=1111, likesPandas=true]]
s("예제5-11 자바에서 json 저장하기");
JavaRDD<Person> result2 = result.filter(person -> person.likesPandas);
JavaRDD<String> result3 = result2.mapPartitions(iter -> {
List<String> list = new ArrayList<>();
ObjectMapper mapper = new ObjectMapper();
while(iter.hasNext()){
Person person = iter.next();
try{
list.add(mapper.writeValueAsString(person));
}catch(Exception e){
System.out.println(e.getMessage()); //에러 발생시 레코드 무시
}
}
return list;
});
result3.saveAsTextFile("output5_11"); //폴더명(프로젝트 ROOR 에 폴더 생성된다.)
// 두번 실행하면 디렉토리 중복으로 Exception 발생한다.
// InvocationTargetException: Output directory file:/D:/ysh/workspace_kepler/learning.spark/output5_11 already exists -> [Help 1]
}
@Test
public void proc3(){
s("예제5-14 자바에서 textFile()로 csv 불러오기");
JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
JavaRDD<String> csvFile1 = sc.textFile(getClass().getResource("csv/favourite_animals.csv").getFile());
JavaRDD<String> csvData = csvFile1.map(str -> {
au.com.bytecode.opencsv.CSVReader reader = new au.com.bytecode.opencsv.CSVReader(new StringReader(str));
return Arrays.toString(reader.readNext());
});
s(csvData.collect()); //[[holden, panda], [notholden, notpanda], [spark, bear]]
s("예제5-17 자바에서 전체적으로 CSV 불러오기");
JavaPairRDD<String, String> wholeCsvData = sc.wholeTextFiles(getClass().getResource("csv").getFile()); //폴더명
JavaRDD<String[]> keyedRDD = wholeCsvData.flatMap(tuple ->{
CSVReader reader = new CSVReader(new StringReader(tuple._2));
return reader.readAll();
});
keyedRDD.foreach(arr -> s(Arrays.toString(arr)));
// [holden, panda]
// [notholden, notpanda]
// [spark, bear]
// [holden, panda]
// [notholden, notpanda]
// [spark, bear]
s("예제5-19 자바에서 CSV 쓰기");
//csv 파일의 데이터를 string으로 만든다음. 그저... file에 쓰기 하는 보편적인 작업이다.
}
@Test
public void proc4(){
s("예제5-37 자바에서 JdbcRDD -- 테스트 실패;;;;");
/**
* 주의!!! - scala 소스이므로, java에서는
* JavaSparkContext 대신 SparkContext 를 사용해야 한다.
* org.apache.spark.api.java.function.Function0 가 아니라
* scala.Function0, scala.Function1 을 사용해야 한다.
*/
// try {
// Class.forName("com.mysql.jdbc.Driver");
// Connection c = DriverManager.getConnection("jdbc:mysql://", "", "");
// } catch (Exception e1) {
// e1.printStackTrace();
// }
JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
JdbcRDD<Tuple2<String, String>> data = new JdbcRDD<Tuple2<String, String>>(JavaSparkContext.toSparkContext(sc), new AbstractFunction0<Connection>(){
@Override
public Connection apply() {
try {
Class.forName("com.mysql.jdbc.Driver");
return DriverManager.getConnection("jdbc:mysql://", "", "");
}catch(Exception e){
e.printStackTrace();
}
return null;
}
}, "Select user_id, name from Users", 1,1,1, new AbstractFunction1<ResultSet, Tuple2<String, String>>(){
@Override
public Tuple2<String, String> apply(ResultSet rs) {
try {
return new Tuple2<String, String>(rs.getString(1), rs.getString(2));
} catch (SQLException e) {
e.printStackTrace();
}
return null;
}
}, null);
s(data.collect());
//에러나네...
}
}
일단...
proc4() 는 런타임 에러난다;;;; 좀더 살펴봐야...
'Spark > 러닝 스파크' 카테고리의 다른 글
| Spark 시작하기17 - [러닝 스파크] 7장 클러스터에서 운영하기 (0) | 2016.04.26 |
|---|---|
| Spark 시작하기16 - [러닝 스파크] 6장 고급 스파크 프로그래밍 (0) | 2016.04.26 |
| Spark 시작하기13 - [러닝 스파크] 4장 키/값 페어로 작업하기2 (0) | 2016.04.22 |
| Spark 시작하기12 - [러닝 스파크] 4장 키/값 페어로 작업하기 (0) | 2016.04.20 |
| Spark 시작하기11 - [러닝 스파크] 3장 RDD로 프로그래밍하기2 (0) | 2016.04.19 |