package org;
import java.io.Serializable;
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.api.java.JavaSparkContext;
public class MyConf implements Serializable{
private static final long serialVersionUID = 9809021L;
public static String PROJECT_PATH = System.getProperty("user.dir");
public static void setLog4j(){
PropertyConfigurator.configure(PROJECT_PATH + "\\src\\resources\\log4j.properties");
}
public static JavaSparkContext getJavaSparkContext(){
//local[1]과 local[2]의 중요한 차이 : 2이상에서, 데이터의 정렬이 달라질 수 있다.
// test하는 pc의 상황에 따라서 local[1]을 변경한다.
return new JavaSparkContext("local[2]", "First Spark App");
}
}
package org.mystudy;
import java.io.Serializable;
import java.util.Comparator;
public class MyComparator implements Comparator<Integer>, Serializable {
private static final long serialVersionUID = 1L;
@Override
public int compare(Integer o1, Integer o2) {
return o2 - o1;
}
}
package org.mystudy;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.MyConf;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;
public class Ch3_example2 implements Serializable {
private static final long serialVersionUID = 43443432321L;
public Ch3_example2() {
MyConf.setLog4j();
}
public static void main(String... strings) {
Ch3_example2 ex = new Ch3_example2();
System.out.println("표3-4 {1,2,3,3}을 갖고 있는 RDD에 대한 기본 액션");
ex.proc4(); //collect
// ex.proc5(); //count
// ex.proc6(); //countByValue
// ex.proc7(); //take
// ex.proc8(); //top
// ex.proc9(); //takeOrdered
// ex.proc10(); //takeSample
// ex.proc11(); //reduce
// ex.proc12(); //fold
// ex.proc13(); //aggregate
// ex.proc14(); //foreach
System.out.println("예제 3-38 자바에서 DoubleRDD 만들기");
ex.proc21();
}
public void proc4() {
JavaSparkContext sc = MyConf.getJavaSparkContext();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4));
List<Integer> result1 = rdd.collect();
System.out.println(result1); //[1, 2, 3, 4]
}
public void proc5() {
JavaSparkContext sc = MyConf.getJavaSparkContext();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4));
long count = rdd.count();
System.out.println(count); //4
}
public void proc6() {
JavaSparkContext sc = MyConf.getJavaSparkContext();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,3));
Map<Integer, Long> map = rdd.countByValue();
System.out.println(map); //{1=1, 3=2, 2=1}
JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("A","B","A","C"));
Map<String, Long> map2 = rdd2.countByValue();
System.out.println(map2); //{B=1, A=2, C=1}
}
public void proc7() {
JavaSparkContext sc = MyConf.getJavaSparkContext();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,3));
rdd = rdd.repartition(4); //파티션이 많아질때 발생할 현상 고려
List<Integer> result = rdd.take(2); //특정 파티션의 값들 우선,,, 정렬 무시될 수 있음.
System.out.println(result); //[2, 3]
}
public void proc8() { //기준정렬 : 내림차순
JavaSparkContext sc = MyConf.getJavaSparkContext();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8));
rdd = rdd.repartition(4); //파티션이 많아질때 발생할 현상 고려
List<Integer> result = rdd.top(2); //기본내림차순
System.out.println(result); //[8, 7]
List<Integer> result2 = rdd.top(2, new MyComparator()); //정렬 함수(반대로:오름차순)
System.out.println(result2); //[1, 2]
}
public void proc9() { //기준정렬 : 오름차순
JavaSparkContext sc = MyConf.getJavaSparkContext();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8));
List<Integer> result = rdd.takeOrdered(2); //[1, 2]
System.out.println(result);
List<Integer> result2 = rdd.takeOrdered(2, new MyComparator()); //반대로:내림차순
System.out.println(result2); //[8, 7]
}
public void proc10() {
JavaSparkContext sc = MyConf.getJavaSparkContext();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8));
List<Integer> result1 = rdd.takeSample(false, 3); //[2, 7, 6]
System.out.println(result1);
List<Integer> result2 = rdd.takeSample(false, 3); //[2, 7, 1]
System.out.println(result2);
//boolean withReplacement : 중복가능
List<Integer> result3 = rdd.takeSample(true, 3); //[8, 5, 5]
System.out.println(result3);
List<Integer> result4 = rdd.takeSample(true, 3); //[1, 2, 1]
System.out.println(result4);
}
public void proc11() { //병렬로 병합
JavaSparkContext sc = MyConf.getJavaSparkContext();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8));
Integer result = rdd.reduce((a,b)->a+b);
System.out.println(result); //36
}
public void proc12() { //reduce 연산 + zero value
//rdd 파티션의 개수에 따라 zero value 가 달라진다.
JavaSparkContext sc = MyConf.getJavaSparkContext();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8));
System.out.println(rdd.getNumPartitions()); //2
Integer result = rdd.fold(1, (a,b)->{
System.out.println(a+", "+b);
return a+b;
});
// 1, 5
// 6, 6
// 12, 7
// 19, 8
// 1, 1
// 2, 2
// 4, 3
// 7, 4
// 1, 27
// 28, 11
//default(1) + 5,6,7,8 = (파티션1)
//default(1) + 1,2,3,4 = (파티션2)
//default(1) + 파티션1, 파티션2 = 최종결과
System.out.println(result); //39
JavaRDD<Integer> rdd2 = rdd.repartition(5);
Integer result2 = rdd2.fold(1, (a,b)->a+b);
System.out.println(result2); //42
}
public void proc13() {
JavaSparkContext sc = MyConf.getJavaSparkContext();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8));
System.out.println(rdd.getNumPartitions()); //2
//합계구하기
Integer result = rdd.aggregate(1, (a,b)->a+b, (a,b)->a+b);
System.out.println(result); //39
JavaRDD<Integer> rdd2 = rdd.repartition(5);
Integer result2 = rdd2.aggregate(1, (a,b)->a+b, (a,b)->a+b);
System.out.println(result2); //42
//평균구하기
JavaRDD<Integer> rdd3 = rdd.repartition(2);
//Tuple2(합계, 개수)
Tuple2<Integer, Integer> result3 = rdd3.aggregate(
new Tuple2<Integer, Integer>(0, 0)
, (tuple0, data) -> new Tuple2<Integer, Integer>(tuple0._1 + data, tuple0._2 + 1)
, (tuple1, tuple2)-> new Tuple2<Integer, Integer>(tuple1._1 + tuple2._1, tuple1._2 + tuple2._2)
);
System.out.println(result3._1()); //36
System.out.println(result3._2()); //8
System.out.println(result3._1() / result3._2()); //4 //평균
}
public void proc14() {
JavaSparkContext sc = MyConf.getJavaSparkContext();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8));
rdd.foreach(a->System.out.println(a));
//1
//5
//6
//7
//8
//2
//3
//4
}
public void proc21(){
JavaSparkContext sc = MyConf.getJavaSparkContext();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6,7,8));
JavaDoubleRDD d = rdd.mapToDouble(a -> a);
System.out.println(d.count()); //8
System.out.println(d.mean()); //4.5
System.out.println(d.variance()); //5.25
System.out.println(d.sum()); //36.0
System.out.println(d.min()); //1.0
System.out.println(d.stdev()); //2.29128784747792
}
}