본문 바로가기
Spark/러닝 스파크

Spark 시작하기15 - [러닝 스파크] 5장 데이터 불러오기/저장하기

by java개발자 2016. 4. 26.
package org.mystudy.example.ch5;

import java.io.StringReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.JdbcRDD;
import org.junit.Test;

import com.fasterxml.jackson.databind.ObjectMapper;

import au.com.bytecode.opencsv.CSVReader;
import scala.Tuple2;
import scala.runtime.AbstractFunction0;
import scala.runtime.AbstractFunction1;

public class Ch5_example {
	public static void main(String... strings) {
		Ch5_example ex = new Ch5_example();
//		ex.proc1();	
//		ex.proc2();		//json
//		ex.proc3();		//csv
		ex.proc4();		//jdbc
	}
	public Ch5_example() {
		PropertyConfigurator.configure(System.getProperty("user.dir") + "/src/resources/log4j.properties");
	}
	public static void s(Object o){	//static 으로 사용하면 class가 Serializable 하지 않아도 된다. 왜????
		System.out.println(o);
	}
	
	@Test
	public void proc1(){
		s("예제5-4 파일별 평균값 구하기");
		JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
		//같은 패키지 내의 폴더
		JavaPairRDD<String, String> files = sc.wholeTextFiles(getClass().getResource("files").getFile());	//폴더명
		/**
		 * 방법1 - 람다식 1개에서 모두 처리하기 - Spark 답지 않다.
		 */
		JavaPairRDD<String, Double> result1 = files.mapValues(contents -> {
			String[] arr = contents.split(" ");
			if(arr.length < 1) return 0.0;
			
			double sum = 0.0;
			for(String s : arr){
				try{
					sum += Double.parseDouble(s);
				}catch(Exception e){
					System.out.println(e.getMessage());
				}
			}
			return sum / arr.length;
		});
		s(result1.collect());
		
		/**
		 * 방법2 - RDD 단계별로 처리하기 - 결과는 같지만 좀더 복잡하다.-_-;;
		 */
		//텍스트 분할
		JavaPairRDD<String, String> result2 = files.flatMapValues(contents -> {
			return Arrays.asList(contents.split(" "));
		});
		//숫자로 변형
		JavaPairRDD<String, Double> result2_2 = result2.mapValues(str -> {
			try{
				return Double.parseDouble(str);
			}catch(Exception e){
				return 0.0;
			}
		});
		//합계
		JavaPairRDD<String, Tuple2<Double, Double>> result2_3 = result2_2.aggregateByKey(new Tuple2<Double, Double>(0.0, 0.0), (tuple, num)->new Tuple2<Double, Double>(tuple._1 + num, tuple._2 + 1), (tuple1,tuple2)->new Tuple2<Double, Double>(tuple1._1 + tuple2._1, tuple1._2 + tuple2._2));	
		//평균내기
		JavaPairRDD<String, Double> result2_4 = result2_3.mapValues(tuple -> tuple._1 / tuple._2);	
		s(result2_4.collect());
	}

	public static class Person implements java.io.Serializable {
		public String id;
		public String pwd;
		public Boolean likesPandas;
		@Override
		public String toString() {
			return "Person [id=" + id + ", pwd=" + pwd + ", likesPandas=" + likesPandas + "]";
		}
	}

	@Test
	public void proc2(){
		s("예제5-8 자바에서 json 불러오기 - 엄밀히 말하면 json 데이터가 아니다. line별로 map{}이 있을 뿐이다.");
		JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
		JavaRDD<String> input = sc.textFile(getClass().getResource("json/file_map.txt").getFile());
		/**
		 * json/file_map.txt 파일 내용:
			{"id":"electro85","pwd":"1111", "likesPandas":false}
			{"id":"me","pwd":"1111", "likesPandas":true}
			{"id":"you","pwd":"1111", "likesPandas":true}
		 */
		JavaRDD<Person> result = input.mapPartitions(iter -> {
			List<Person> list = new ArrayList<>();
			ObjectMapper mapper = new ObjectMapper();
			while(iter.hasNext()){
				String line = iter.next();
				try{
					list.add(mapper.readValue(line, Person.class));
				}catch(Exception e){
					System.out.println(e.getMessage());	//에러 발생시 레코드 무시
				}
			}
			return list;
		});
		s(result.collect());	//[Person [id=electro85, pwd=1111, likesPandas=false], Person [id=me, pwd=1111, likesPandas=true], Person [id=you, pwd=1111, likesPandas=true]]
		
		s("예제5-11 자바에서 json 저장하기");
		JavaRDD<Person> result2 = result.filter(person -> person.likesPandas);
		JavaRDD<String> result3 = result2.mapPartitions(iter -> {
			List<String> list = new ArrayList<>();
			ObjectMapper mapper = new ObjectMapper();
			while(iter.hasNext()){
				Person person = iter.next();
				try{
					list.add(mapper.writeValueAsString(person));
				}catch(Exception e){
					System.out.println(e.getMessage());	//에러 발생시 레코드 무시
				}
			}
			return list;
		});
		result3.saveAsTextFile("output5_11");	//폴더명(프로젝트 ROOR 에 폴더 생성된다.)
//		두번 실행하면 디렉토리 중복으로 Exception 발생한다.
//		InvocationTargetException: Output directory file:/D:/ysh/workspace_kepler/learning.spark/output5_11 already exists -> [Help 1]
	}
	@Test
	public void proc3(){
		s("예제5-14 자바에서 textFile()로 csv 불러오기");
		JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
		JavaRDD<String> csvFile1 = sc.textFile(getClass().getResource("csv/favourite_animals.csv").getFile());
		JavaRDD<String> csvData = csvFile1.map(str -> {
			au.com.bytecode.opencsv.CSVReader reader = new au.com.bytecode.opencsv.CSVReader(new StringReader(str));
			return Arrays.toString(reader.readNext());
		});
		s(csvData.collect());	//[[holden, panda], [notholden, notpanda], [spark, bear]]
		
		s("예제5-17 자바에서 전체적으로 CSV 불러오기");
		JavaPairRDD<String, String> wholeCsvData = sc.wholeTextFiles(getClass().getResource("csv").getFile());	//폴더명
		JavaRDD<String[]> keyedRDD = wholeCsvData.flatMap(tuple ->{
			CSVReader reader = new CSVReader(new StringReader(tuple._2));
			return reader.readAll();
		});
		keyedRDD.foreach(arr -> s(Arrays.toString(arr)));
//		[holden, panda]
//		[notholden, notpanda]
//		[spark, bear]
//		[holden, panda]
//		[notholden, notpanda]
//		[spark, bear]
		
		s("예제5-19 자바에서 CSV 쓰기");
		//csv 파일의 데이터를 string으로 만든다음. 그저... file에 쓰기 하는 보편적인 작업이다.
	}
	@Test
	public void proc4(){
		s("예제5-37 자바에서 JdbcRDD -- 테스트 실패;;;;");
		/**
		 * 주의!!! - scala 소스이므로, java에서는
		 * JavaSparkContext 대신 SparkContext 를 사용해야 한다.
		 * org.apache.spark.api.java.function.Function0 가 아니라
		 * scala.Function0, scala.Function1 을 사용해야 한다.
		 */
//		try {
//			Class.forName("com.mysql.jdbc.Driver");
//			Connection c = DriverManager.getConnection("jdbc:mysql://", "", "");
//		} catch (Exception e1) {
//			e1.printStackTrace();
//		}
		
		JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App");
		JdbcRDD<Tuple2<String, String>> data = new JdbcRDD<Tuple2<String, String>>(JavaSparkContext.toSparkContext(sc), new AbstractFunction0<Connection>(){
			@Override
			public Connection apply() {
				try {
					Class.forName("com.mysql.jdbc.Driver");
					return DriverManager.getConnection("jdbc:mysql://", "", "");
				}catch(Exception e){
					e.printStackTrace();
				}
				return null;
			}
			
		}, "Select user_id, name from Users", 1,1,1, new AbstractFunction1<ResultSet, Tuple2<String, String>>(){
			@Override
			public Tuple2<String, String> apply(ResultSet rs) {
				try {
					return new Tuple2<String, String>(rs.getString(1), rs.getString(2));
				} catch (SQLException e) {
					e.printStackTrace();
				}
				return null;
			}
			
		}, null);
		s(data.collect());
		
		//에러나네...
	}
}

일단...

proc4() 는 런타임 에러난다;;;; 좀더 살펴봐야...