在前面 Spark 编程原理及 RDD 的特性与基本操作介绍了 SparkRDD 的操作分为两个部分 Transformation 和 Action。这两种操作分为多个算子(即操作函数)。Transformation 针对已有的 RDD 创建一个新的 RDD,主要是对数据进行映射,变换,统计,过滤。。。Action 主要是对数据进行最后的执行操作,遍历,聚合,保存等操作。下面来看下这些操作的具体实现。
Transformations
Map(func):对 RDD 中的每个元素通过函数 func 进行映射。
/** * map算子:给集合每个元素*2 */ private static void mapTest() { SparkConf conf = new SparkConf().setAppName("mapTest").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(conf); List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8); JavaRDD<Integer> numRDD = jsc.parallelize(list); JavaRDD<Integer> num2RDD = numRDD.map(num -> num * 2); num2RDD.foreach(num2 -> System.out.printf(num2.toString() + " ")); jsc.close(); } /** * map算子:给集合每个元素*2 */ def mapTest(){ val conf = new SparkConf().setAppName("mapTest").setMaster("local") val sc = new SparkContext(conf) val list = Array(1,2,3,4,5,6,7,8) val numRDD = sc.parallelize(list) val num2RDD = numRDD.map(num=>num*2) num2RDD.foreach(num2=>print(num2+" ")) } map结果: 2 4 6 8 10 12 14 16
filter:过滤或者选择满足条件的数据
/** * fileter算子:满足过滤条件的保留 */ private static void filterTest() { SparkConf conf = new SparkConf().setAppName("filterTest").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(conf); List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); JavaRDD<Integer> numRDD = jsc.parallelize(list); //过滤偶数集合 JavaRDD<Integer> num2RDD = numRDD.filter(num -> num % 2 == 0); num2RDD.foreach(num2 -> System.out.print(num2 + " ")); jsc.close(); } /** * fileter算子:满足过滤条件的保留 */ def filterTest(): Unit ={ val conf = new SparkConf().setAppName("filterTest").setMaster("local") val sc = new SparkContext(conf) val list = Array(1,2,3,4,5,6,7,8,9,10) val numRDD = sc.parallelize(list) //filter过滤偶数 val evenNumRDD = numRDD.filter(num=> num % 2 == 0) evenNumRDD.foreach(evennNum => print(evennNum + " ")) } filter结果: 2 4 6 8 10
flatMap:将一个输入映射成 0-多个输出
/** * flatmap算子:接收RDD中所有元素,并进行运算,然后返回多个元素 * 这里是对元素分割成单词 */ private static void flatMapTest() { SparkConf conf = new SparkConf().setAppName("flatMapTest").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(conf); List<String> list = Arrays.asList("hello world", "hello you", "good morning"); JavaRDD<String> wordsRDD = jsc.parallelize(list); //将每条数据通过空格分隔程多个数据 JavaRDD<String> wordRDD = wordsRDD.flatMap( words -> Arrays.asList(words.split(" ")).listIterator() ); wordRDD.foreach( word -> System.out.printf(word+" ") ); jsc.close(); } /** * flatmap算子:接收RDD中所有元素,并进行运算,然后返回多个元素 * 这里是对元素分割成单词 */ def flatMapTest(): Unit ={ val conf = new SparkConf().setMaster("local").setAppName("flatMapTest") val sc = new SparkContext(conf) val list = Array("hello world", "hello you", "good morning") val wordsRDD = sc.parallelize(list) val wordRDD = wordsRDD.flatMap(words=>words.split(" ")) wordRDD.foreach(word=>println(word).toString) } flatMap结果: hello world hello you good morning
groupByKey:按 k 分组(k,v),(k,v2),(k2,v)(k3,v2) =>(k,(v,v2)),(k2,(v)),(k3,(v2)),sortByKey:按 key 排序
/** * groupByKey算子:按key分组, * sortByKey:按key排序,无参由小到大,参数false,由大到小 */ private static void groupByKeyTest() { SparkConf conf = new SparkConf().setAppName("groupByKeyTest").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(conf); List<Tuple2<String,Integer>> list = Arrays.asList( new Tuple2<>("A",88), new Tuple2<>("B",78), new Tuple2<>("C",55), new Tuple2<>("A",95), new Tuple2<>("C",34) ); JavaPairRDD<String,Integer> scoreRDD = jsc.parallelizePairs(list); //不加sortBykey()结果乱序 //JavaPairRDD<String,Iterable<Integer>> scoreGroupRDD = scoreRDD.groupByKey(); JavaPairRDD<String,Iterable<Integer>> scoreGroupRDD = scoreRDD.groupByKey().sortByKey(); scoreGroupRDD.foreach( scoreGroup-> { System.out.printf(scoreGroup._1 + ": "); scoreGroup._2.forEach(score-> System.out.printf(score+" ")); System.out.println(); } ); jsc.close(); } /** * groupByKey算子:按key分组, * sortByKey:按key排序,无参由小到大,参数false,由大到小 */ def groupByKeyTest(): Unit ={ val conf = new SparkConf().setAppName("groupByKeyTest").setMaster("local") val sc = new SparkContext(conf) val list = Array( Tuple2("A",89), Tuple2("c",59), Tuple2("B",74), Tuple2("c",50), Tuple2("A",98)) val scoreRDD = sc.parallelize(list) //不加sortByKey结果乱序 // val groupRDD = scoreRDD.groupByKey() val groupRDD = scoreRDD.groupByKey().sortByKey() groupRDD.foreach( scoreTP=> { print(scoreTP._1 + ": ") scoreTP._2.foreach(score=>print(score+" ")) println() }) groupByKey结果 A: 88 95 B: 78 C: 55 34
reduceByKey(func):key 相同 V 用 func 函数聚合,这
/** * reduceByKey算子:按key分组并聚合(聚合函数可以自己定义)这里是相加,即key相同则V相加,(k1,2),(k1,3),(k2,5) => (K1,5),(k2,5) */ private static void reduceByKeyTest() { SparkConf conf = new SparkConf().setAppName("reduceByKeyTest").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(conf); List<Tuple2<String,Integer>> list = Arrays.asList( new Tuple2<>("A",88), new Tuple2<>("B",78), new Tuple2<>("C",55), new Tuple2<>("A",95), new Tuple2<>("C",34) ); JavaPairRDD<String,Integer> scoreRDD = jsc.parallelizePairs(list); //定义的聚合函数是:v相加 JavaPairRDD<String,Integer> scoreGroupRDD = scoreRDD.reduceByKey((v1,v2)->v1+v2); scoreGroupRDD.foreach( score->System.out.println(score._1+": "+score._2) ); jsc.close(); } /** * reduceByKey算子:按key分组并聚合(聚合函数可以自己定义)这里是相加,即key相同则V相加,(k1,2),(k1,3),(k2,5) => (K1,5),(k2,5) */ def reduceByKeyTest(): Unit ={ val conf = new SparkConf().setAppName("reduceByKeyTest").setMaster("local") val sc = new SparkContext(conf) val list = Array( Tuple2("A",89), Tuple2("c",59), Tuple2("B",74), Tuple2("c",50), Tuple2("A",98)) val scoreRDD = sc.parallelize(list) val totalRDD = scoreRDD.reduceByKey(_+_).sortByKey() totalRDD.foreach( total=>println(total._1+": "+total._2) ) } reduceByKey结果: (B,78) (A,183) (C,89)
join 和 cogroup:对 RDD 进行连接
/** * join算子:关联兩個RDD,(K,V).join(K,W)=>(K,(V,W)),都存在的K,V保留。其他的丢弃,相当于做交集。 * cogroup算子;相当于集合+,做并集。只要k存在就有一条记录。 */ private static void joinAndCoGroupTest(){ SparkConf conf = new SparkConf().setMaster("local").setAppName("joinAndCoGroupTest"); JavaSparkContext jsc = new JavaSparkContext(conf); List<Tuple2<Integer,Integer>> scoreList = Arrays.asList( new Tuple2<>(1,88), new Tuple2<>(2,98), new Tuple2<>(3,75), new Tuple2<>(4,84), new Tuple2<>(6,77) ); List<Tuple2<Integer,String>> stuList = Arrays.asList( new Tuple2<>(1,"A"), new Tuple2<>(2,"B"), new Tuple2<>(3,"C"), new Tuple2<>(4,"D"), new Tuple2<>(5,"E") ); JavaPairRDD<Integer,Integer> scoreRDD = jsc.parallelizePairs(scoreList); JavaPairRDD<Integer,String> stuRDD = jsc.parallelizePairs(stuList); //join链接操作(通过Key做链接,保留都存在的key) JavaPairRDD scoreStuRDD= scoreRDD.join(stuRDD).sortByKey(); scoreStuRDD.foreach( score-> System.out.println(score.toString()) ); JavaPairRDD stuScoreRDD= stuRDD.join(scoreRDD).sortByKey(); scoreStuRDD.foreach( score-> System.out.println(score.toString()) ); //cogroup连接操作:保留所有的key JavaPairRDD scoreStuCGRDD= scoreRDD.cogroup(stuRDD).sortByKey(); scoreStuCGRDD.foreach( score-> System.out.println(score.toString()) ); JavaPairRDD stuScoreCGRDD= stuRDD.cogroup(scoreRDD).sortByKey(); scoreStuCGRDD.foreach( score-> System.out.println(score.toString()) ); } /** * join算子:关联兩個RDD,(K,V).join(K,W)=>(K,(V,W)),都存在的K,V保留。其他的丢弃,相当于做交集。 * cogroup算子;相当于集合+,做并集。只要k存在就有一条记录。 */ def joinAndCoGroupTest(): Unit ={ val conf = new SparkConf().setAppName("joinAndCoGroupTest").setMaster("local") val sc = new SparkContext(conf) val scoreList = Array( Tuple2(1,89), Tuple2(2,59), Tuple2(3,74), Tuple2(4,50), Tuple2(5,98) ) val stuList = Array( Tuple2(1,"A"), Tuple2(2,"B"), Tuple2(3,"C"), Tuple2(4,"D"), Tuple2(6,"E") ) val scoreRDD = sc.parallelize(scoreList) val stuRDD = sc.parallelize(stuList) //join联合两个RDD val joinRDD = scoreRDD.join(stuRDD).sortByKey() joinRDD.foreach( join=>println(join.toString()) ) //cogroup联合两个RDD val cogroupRDD = scoreRDD.cogroup(stuRDD).sortByKey() cogroupRDD.foreach( co=>println(co.toString()) ) } join结果: (1,(88,A)) (2,(98,B)) (3,(75,C)) (4,(84,D)) cogroup结果: (1,([88],[A])) (2,([98],[B])) (3,([75],[C])) (4,([84],[D])) (5,([],[E])) (6,([77],[]))
Actions 算子
reduce 算子:
/** * reduce算子:1,2聚合结果和3聚合结果和4聚合,递归 */ private static void reduceTest(){ SparkConf conf = new SparkConf().setAppName("reduceTest").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(conf); List<Integer> numList = Arrays.asList(1,2,3,4,5,6,7,8,9); JavaRDD<Integer> numRDD = jsc.parallelize(numList); Integer sumRDD = numRDD.reduce((x1, x2)->x1+x2); System.out.printf(sumRDD.toString()); jsc.close(); } /** * reduce算子:1,2聚合结果和3聚合结果和4聚合,递归 */ def reduceTest(): Unit ={ val conf = new SparkConf().setAppName("reduceTest").setMaster("local") val sc = new SparkContext(conf) val numList = Array(1,2,3,4,5,6,7,8,9) val parRDD = sc.parallelize(numList) val sum = parRDD.reduce(_+_) println(sum) } reduce结果: 45
count 和 collect:count 统计 RDD 元素个数,collect 将 RDD 数据拉取到本地
/** * count:统计RDD元素数量 * collect算子:将RDD数据拉取到本地,大量数据时,性能比较差,高Io,或者oom内存溢出 */ private static void collectTest(){ SparkConf conf = new SparkConf().setAppName("collectTest").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(conf); List<Integer> numList = Arrays.asList(1,2,3,4,5,6,7,8,9); JavaRDD<Integer> numRDD = jsc.parallelize(numList); //count: long count = numRDD.count(); System.out.printf("元素个数:"+count); //map:元素翻倍 JavaRDD<Integer> doubleNumRDD = numRDD.map(x->x*2); //collect:将RDD的数据拉取到本地,变成了java的List, List<Integer> listNum= doubleNumRDD.collect(); listNum.forEach( num-> System.out.printf(num.toString()+" ") ); jsc.close(); } /** * count:统计RDD元素数量 * collect算子:将RDD数据拉取到本地,大量数据时,性能比较差,高Io,或者oom内存溢出 */ def collectTest(): Unit ={ val conf = new SparkConf().setAppName("collectTest").setMaster("local") val sc = new SparkContext(conf) val numList = Array(1,2,3,4,5,6,7,8,9) val numRDD = sc.parallelize(numList) //count val count = numRDD.count() println("元素个数:"+count); val doubleRDD = numRDD.map(x=>x*2) //collect val listNum = doubleRDD.collect() listNum.foreach( num=>println(num+" ") ) } count结果: 元素个数:9 collect结果: 2 4 6 8 10 12 14 16 18
tabke(N)算子:取 RDD 的前 N 条记录
/** * take(N),类似collect,把RDD拉取到本地,取前N条数据 */ private static void takeTest(){ SparkConf conf = new SparkConf().setAppName("takeTest").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(conf); List<Integer> numList = Arrays.asList(1,2,3,4,5,6,7,8,9); JavaRDD<Integer> numRDD = jsc.parallelize(numList); //take(N): List<Integer> list = numRDD.take(8); list.forEach( num-> System.out.printf(num+" ") ); jsc.close(); } /** * take(N),类似collect,把RDD拉取到本地,取前N条数据 */ def takeTest(): Unit = { val conf = new SparkConf().setAppName("takeTest").setMaster("local") val sc = new SparkContext(conf) val numList = Array(1, 2, 3, 4, 5, 6, 7, 8, 9) val numRDD = sc.parallelize(numList) val num5 = numRDD.take(5) for (n <- num5) { println(n) } // num5.foreach(n => print(n + " ")) } take(8)结果: 1 2 3 4 5 6 7 8
saveAsTextFile 算子:保存 RDD 到本地
/** * saveAsTextFile:保存到文件 */ private static void saveAsTextFileTest(){ SparkConf conf = new SparkConf().setAppName("saveAsTextFile").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(conf); List<String > list = Arrays.asList("hello world","how are you","nice to"); JavaRDD<String> strRDD = jsc.parallelize(list); //saveAsTextFile strRDD.saveAsTextFile("input/str"); jsc.close(); } /** * saveAsTextFile:保存到文件 */ def saveAsTextFileTest(): Unit = { val conf = new SparkConf().setAppName("takeTest").setMaster("local") val sc = new SparkContext(conf) val numList = Array("TaskSchedulerImpl","DAGScheduler") val numRDD = sc.parallelize(numList) numRDD.saveAsTextFile("input/scalasaveAsTextFile") }
countByKey:统计 key 相同的记录(相当于对 K 分组)
/** * countBykey:统计每个key的数量 */ private static void countByKeyTest(){ SparkConf conf = new SparkConf().setAppName("countByKeyTest").setMaster("local"); JavaSparkContext jsc = new JavaSparkContext(conf); List<Tuple2<Integer,Integer>> numList = Arrays.asList( new Tuple2<>(1,100), new Tuple2<>(2,50), new Tuple2<>(1,100), new Tuple2<>(4,80), new Tuple2<>(5,50), new Tuple2<>(6,80) ); //如果数据是tuple类型的则要用parallelizePairs来并行化,JavaPairRDD来接收类型,而你不是JavaRDD, JavaPairRDD<Integer,Integer> numRDD = jsc.parallelizePairs(numList); Map<Integer,Long> count = numRDD.countByKey(); count.forEach((k,v)-> System.out.println(k+"--"+v)); jsc.close(); } /** * countBykey:统计每个key的数量 */ def countByKeyTest(): Unit = { val conf = new SparkConf().setAppName("takeTest").setMaster("local") val sc = new SparkContext(conf) val numList = Array("TaskSchedulerImpl","DAGScheduler","TaskSchedulerImpl") val numRDD = sc.parallelize(numList) numRDD.countByValue().foreach(num=>println(num._1+"--"+num._2)); } countByKey结果: 5--1 1--2 6--1 2--1 4--1
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于