SparkRDD 的 Transformations/Actions 操作实战

本贴最后更新于 2708 天前,其中的信息可能已经时移世易

在前面 Spark 编程原理及 RDD 的特性与基本操作介绍了 SparkRDD 的操作分为两个部分 Transformation 和 Action。这两种操作分为多个算子(即操作函数)。Transformation 针对已有的 RDD 创建一个新的 RDD,主要是对数据进行映射,变换,统计,过滤。。。Action 主要是对数据进行最后的执行操作,遍历,聚合,保存等操作。下面来看下这些操作的具体实现。

Transformations

Map(func):对 RDD 中的每个元素通过函数 func 进行映射。

/**
 * map算子:给集合每个元素*2
 */
private static void mapTest() {
    SparkConf conf = new SparkConf().setAppName("mapTest").setMaster("local");
    JavaSparkContext jsc = new JavaSparkContext(conf);

    List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
    JavaRDD<Integer> numRDD = jsc.parallelize(list);
    JavaRDD<Integer> num2RDD = numRDD.map(num -> num * 2);

    num2RDD.foreach(num2 -> System.out.printf(num2.toString() + " "));
    jsc.close();
}
/**
 * map算子:给集合每个元素*2
 */
def mapTest(){
	val conf = new SparkConf().setAppName("mapTest").setMaster("local")
	val sc   = new SparkContext(conf)
	val list = Array(1,2,3,4,5,6,7,8)
	val numRDD = sc.parallelize(list)
	val num2RDD = numRDD.map(num=>num*2)
	num2RDD.foreach(num2=>print(num2+" "))
}
map结果: 2 4 6 8 10 12 14 16 

filter:过滤或者选择满足条件的数据

/**
 * fileter算子:满足过滤条件的保留
 */
private static void filterTest() {
    SparkConf conf = new SparkConf().setAppName("filterTest").setMaster("local");
    JavaSparkContext jsc = new JavaSparkContext(conf);

    List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    JavaRDD<Integer> numRDD = jsc.parallelize(list);
    //过滤偶数集合
    JavaRDD<Integer> num2RDD = numRDD.filter(num -> num % 2 == 0);
    num2RDD.foreach(num2 -> System.out.print(num2 + " "));

    jsc.close();
}
/**
 * fileter算子:满足过滤条件的保留
 */
def filterTest(): Unit ={
  val conf = new SparkConf().setAppName("filterTest").setMaster("local")
  val sc   = new SparkContext(conf)
  val list = Array(1,2,3,4,5,6,7,8,9,10)
  val numRDD = sc.parallelize(list)
  //filter过滤偶数
  val evenNumRDD = numRDD.filter(num=> num % 2 == 0)
  evenNumRDD.foreach(evennNum => print(evennNum + " "))
}
filter结果: 2 4 6 8 10 

flatMap:将一个输入映射成 0-多个输出

/**
 * flatmap算子:接收RDD中所有元素,并进行运算,然后返回多个元素
 * 这里是对元素分割成单词
 */
private static void flatMapTest() {
    SparkConf conf = new SparkConf().setAppName("flatMapTest").setMaster("local");
    JavaSparkContext jsc = new JavaSparkContext(conf);

    List<String> list = Arrays.asList("hello world", "hello you", "good morning");
    JavaRDD<String> wordsRDD = jsc.parallelize(list);
	//将每条数据通过空格分隔程多个数据
    JavaRDD<String> wordRDD = wordsRDD.flatMap(
            words -> Arrays.asList(words.split(" ")).listIterator()
    );

    wordRDD.foreach(
            word -> System.out.printf(word+" ")
    );
    jsc.close();
}
/**
 * flatmap算子:接收RDD中所有元素,并进行运算,然后返回多个元素
 * 这里是对元素分割成单词
 */
def flatMapTest(): Unit ={
  val conf = new SparkConf().setMaster("local").setAppName("flatMapTest")
  val sc = new SparkContext(conf)
  val list = Array("hello world", "hello you", "good morning")
  val wordsRDD = sc.parallelize(list)
  val wordRDD = wordsRDD.flatMap(words=>words.split(" "))
  wordRDD.foreach(word=>println(word).toString)
}
flatMap结果: 
  hello 
  world
  hello
  you
  good
  morning
			  

groupByKey:按 k 分组(k,v),(k,v2),(k2,v)(k3,v2) =>(k,(v,v2)),(k2,(v)),(k3,(v2)),sortByKey:按 key 排序

/**
 * groupByKey算子:按key分组,
 * sortByKey:按key排序,无参由小到大,参数false,由大到小
 */
private static void groupByKeyTest() {
    SparkConf conf = new SparkConf().setAppName("groupByKeyTest").setMaster("local");
    JavaSparkContext jsc = new JavaSparkContext(conf);

    List<Tuple2<String,Integer>> list = Arrays.asList(
            new Tuple2<>("A",88),
            new Tuple2<>("B",78),
            new Tuple2<>("C",55),
            new Tuple2<>("A",95),
            new Tuple2<>("C",34)
    );
    JavaPairRDD<String,Integer> scoreRDD = jsc.parallelizePairs(list);

    //不加sortBykey()结果乱序
    //JavaPairRDD<String,Iterable<Integer>> scoreGroupRDD = scoreRDD.groupByKey();
    JavaPairRDD<String,Iterable<Integer>> scoreGroupRDD = scoreRDD.groupByKey().sortByKey();

    scoreGroupRDD.foreach(
            scoreGroup-> {
                System.out.printf(scoreGroup._1 + ": ");
                scoreGroup._2.forEach(score-> System.out.printf(score+" "));
                System.out.println();
            }
    );
    jsc.close();
}
/**
 * groupByKey算子:按key分组,
 * sortByKey:按key排序,无参由小到大,参数false,由大到小
 */
def groupByKeyTest(): Unit ={
  val conf = new SparkConf().setAppName("groupByKeyTest").setMaster("local")
  val sc = new SparkContext(conf)
  val list = Array(
	Tuple2("A",89),
	Tuple2("c",59),
	Tuple2("B",74),
	Tuple2("c",50),
	Tuple2("A",98))

  val scoreRDD = sc.parallelize(list)
  //不加sortByKey结果乱序
  // val groupRDD = scoreRDD.groupByKey()
  val groupRDD = scoreRDD.groupByKey().sortByKey()
  groupRDD.foreach(
	scoreTP=> {
	  print(scoreTP._1 + ": ")
	  scoreTP._2.foreach(score=>print(score+" "))
	  println()
	})

groupByKey结果
A: 88 95 
B: 78 
C: 55 34 

reduceByKey(func):key 相同 V 用 func 函数聚合,这

/**
 * reduceByKey算子:按key分组并聚合(聚合函数可以自己定义)这里是相加,即key相同则V相加,(k1,2),(k1,3),(k2,5) => (K1,5),(k2,5)
 */
private static void reduceByKeyTest() {
    SparkConf conf = new SparkConf().setAppName("reduceByKeyTest").setMaster("local");
    JavaSparkContext jsc = new JavaSparkContext(conf);
    List<Tuple2<String,Integer>> list = Arrays.asList(
            new Tuple2<>("A",88),
            new Tuple2<>("B",78),
            new Tuple2<>("C",55),
            new Tuple2<>("A",95),
            new Tuple2<>("C",34)
    );
    JavaPairRDD<String,Integer> scoreRDD = jsc.parallelizePairs(list);
	//定义的聚合函数是:v相加
    JavaPairRDD<String,Integer> scoreGroupRDD = scoreRDD.reduceByKey((v1,v2)->v1+v2);
    scoreGroupRDD.foreach(
            score->System.out.println(score._1+": "+score._2)
    );
    jsc.close();
}
/**
 * reduceByKey算子:按key分组并聚合(聚合函数可以自己定义)这里是相加,即key相同则V相加,(k1,2),(k1,3),(k2,5) => (K1,5),(k2,5)
 */
def reduceByKeyTest(): Unit ={
  val conf = new SparkConf().setAppName("reduceByKeyTest").setMaster("local")
  val sc = new SparkContext(conf)
  val list = Array(
	Tuple2("A",89),
	Tuple2("c",59),
	Tuple2("B",74),
	Tuple2("c",50),
	Tuple2("A",98))

  val scoreRDD = sc.parallelize(list)
  val totalRDD = scoreRDD.reduceByKey(_+_).sortByKey()
  totalRDD.foreach(
	total=>println(total._1+": "+total._2)
  )
}
reduceByKey结果:
(B,78)
(A,183)
(C,89)

join 和 cogroup:对 RDD 进行连接

/**
 * join算子:关联兩個RDD,(K,V).join(K,W)=>(K,(V,W)),都存在的K,V保留。其他的丢弃,相当于做交集。
 * cogroup算子;相当于集合+,做并集。只要k存在就有一条记录。
 */
private static void joinAndCoGroupTest(){
    SparkConf conf = new SparkConf().setMaster("local").setAppName("joinAndCoGroupTest");
    JavaSparkContext jsc = new JavaSparkContext(conf);

    List<Tuple2<Integer,Integer>> scoreList = Arrays.asList(
            new Tuple2<>(1,88),
            new Tuple2<>(2,98),
            new Tuple2<>(3,75),
            new Tuple2<>(4,84),
            new Tuple2<>(6,77)
    );
    List<Tuple2<Integer,String>> stuList = Arrays.asList(
            new Tuple2<>(1,"A"),
            new Tuple2<>(2,"B"),
            new Tuple2<>(3,"C"),
            new Tuple2<>(4,"D"),
            new Tuple2<>(5,"E")
    );
    JavaPairRDD<Integer,Integer> scoreRDD = jsc.parallelizePairs(scoreList);
    JavaPairRDD<Integer,String> stuRDD = jsc.parallelizePairs(stuList);

    //join链接操作(通过Key做链接,保留都存在的key)
    JavaPairRDD  scoreStuRDD= scoreRDD.join(stuRDD).sortByKey();
    scoreStuRDD.foreach(
            score-> System.out.println(score.toString())
    );

    JavaPairRDD  stuScoreRDD= stuRDD.join(scoreRDD).sortByKey();
    scoreStuRDD.foreach(
            score-> System.out.println(score.toString())
    );

    //cogroup连接操作:保留所有的key
    JavaPairRDD  scoreStuCGRDD= scoreRDD.cogroup(stuRDD).sortByKey();
    scoreStuCGRDD.foreach(
            score-> System.out.println(score.toString())
    );
    JavaPairRDD  stuScoreCGRDD= stuRDD.cogroup(scoreRDD).sortByKey();
    scoreStuCGRDD.foreach(
            score-> System.out.println(score.toString())
    );
}
/**
 * join算子:关联兩個RDD,(K,V).join(K,W)=>(K,(V,W)),都存在的K,V保留。其他的丢弃,相当于做交集。
 * cogroup算子;相当于集合+,做并集。只要k存在就有一条记录。
 */
def joinAndCoGroupTest(): Unit ={
  val conf = new SparkConf().setAppName("joinAndCoGroupTest").setMaster("local")
  val sc = new SparkContext(conf)

  val scoreList = Array(
	Tuple2(1,89),
	Tuple2(2,59),
	Tuple2(3,74),
	Tuple2(4,50),
	Tuple2(5,98)
  )
  val stuList = Array(
	Tuple2(1,"A"),
	Tuple2(2,"B"),
	Tuple2(3,"C"),
	Tuple2(4,"D"),
	Tuple2(6,"E")
  )
  val scoreRDD = sc.parallelize(scoreList)
  val stuRDD = sc.parallelize(stuList)
  //join联合两个RDD
  val joinRDD = scoreRDD.join(stuRDD).sortByKey()
  joinRDD.foreach(
	join=>println(join.toString())
  )
  //cogroup联合两个RDD
  val cogroupRDD = scoreRDD.cogroup(stuRDD).sortByKey()
  cogroupRDD.foreach(
	co=>println(co.toString())
  )
}
join结果:
  (1,(88,A))
  (2,(98,B))
  (3,(75,C))
  (4,(84,D))
cogroup结果:
  (1,([88],[A]))
  (2,([98],[B]))
  (3,([75],[C]))
  (4,([84],[D]))
  (5,([],[E]))
  (6,([77],[]))

Actions 算子

reduce 算子:

/**
 * reduce算子:1,2聚合结果和3聚合结果和4聚合,递归
 */
private static void reduceTest(){
    SparkConf conf = new SparkConf().setAppName("reduceTest").setMaster("local");
    JavaSparkContext jsc = new JavaSparkContext(conf);

    List<Integer> numList = Arrays.asList(1,2,3,4,5,6,7,8,9);
    JavaRDD<Integer> numRDD = jsc.parallelize(numList);

    Integer sumRDD = numRDD.reduce((x1, x2)->x1+x2);
    System.out.printf(sumRDD.toString());

    jsc.close();
}
/**
 * reduce算子:1,2聚合结果和3聚合结果和4聚合,递归
 */
def reduceTest(): Unit ={
	val conf = new SparkConf().setAppName("reduceTest").setMaster("local")
	val sc = new SparkContext(conf)
	val numList = Array(1,2,3,4,5,6,7,8,9)
	val parRDD = sc.parallelize(numList)
	val sum = parRDD.reduce(_+_)
	println(sum)
}
reduce结果: 45 

count 和 collect:count 统计 RDD 元素个数,collect 将 RDD 数据拉取到本地

/**
 * count:统计RDD元素数量
 * collect算子:将RDD数据拉取到本地,大量数据时,性能比较差,高Io,或者oom内存溢出
 */
private static void collectTest(){
    SparkConf conf = new SparkConf().setAppName("collectTest").setMaster("local");
    JavaSparkContext jsc = new JavaSparkContext(conf);

    List<Integer> numList = Arrays.asList(1,2,3,4,5,6,7,8,9);
    JavaRDD<Integer> numRDD = jsc.parallelize(numList);

    //count:
    long count = numRDD.count();
    System.out.printf("元素个数:"+count);

    //map:元素翻倍
    JavaRDD<Integer> doubleNumRDD = numRDD.map(x->x*2);

    //collect:将RDD的数据拉取到本地,变成了java的List,
    List<Integer> listNum= doubleNumRDD.collect();
    listNum.forEach(
            num-> System.out.printf(num.toString()+" ")
    );

    jsc.close();
}
/**
 * count:统计RDD元素数量
 * collect算子:将RDD数据拉取到本地,大量数据时,性能比较差,高Io,或者oom内存溢出
 */
def collectTest(): Unit ={
  val conf = new SparkConf().setAppName("collectTest").setMaster("local")
  val sc = new SparkContext(conf)
  val numList = Array(1,2,3,4,5,6,7,8,9)
  val numRDD = sc.parallelize(numList)
  //count
  val count = numRDD.count()
  println("元素个数:"+count);
  val doubleRDD = numRDD.map(x=>x*2)
  //collect
  val listNum =  doubleRDD.collect()
  listNum.foreach(
	num=>println(num+" ")
  )
}

count结果: 元素个数:9 
collect结果: 2 4 6 8 10 12 14 16 18 

tabke(N)算子:取 RDD 的前 N 条记录

/**
 * take(N),类似collect,把RDD拉取到本地,取前N条数据
 */
private  static void takeTest(){
    SparkConf conf = new SparkConf().setAppName("takeTest").setMaster("local");
    JavaSparkContext jsc = new JavaSparkContext(conf);

    List<Integer> numList = Arrays.asList(1,2,3,4,5,6,7,8,9);
    JavaRDD<Integer> numRDD = jsc.parallelize(numList);

    //take(N):
    List<Integer> list = numRDD.take(8);
    list.forEach(
            num-> System.out.printf(num+" ")
    );
    jsc.close();
}
/**
 * take(N),类似collect,把RDD拉取到本地,取前N条数据
 */
def takeTest(): Unit = {
  val conf = new SparkConf().setAppName("takeTest").setMaster("local")
  val sc = new SparkContext(conf)
  val numList = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
  val numRDD = sc.parallelize(numList)
  val num5 = numRDD.take(5)
  for (n <- num5) {
	println(n)
  }
 // num5.foreach(n => print(n + " "))
}
take(8)结果: 1 2 3 4 5 6 7 8 

saveAsTextFile 算子:保存 RDD 到本地

/**
 * saveAsTextFile:保存到文件
 */
private static void saveAsTextFileTest(){
    SparkConf conf = new SparkConf().setAppName("saveAsTextFile").setMaster("local");
    JavaSparkContext jsc = new JavaSparkContext(conf);

    List<String > list = Arrays.asList("hello world","how are you","nice to");
    JavaRDD<String> strRDD =  jsc.parallelize(list);
    //saveAsTextFile
    strRDD.saveAsTextFile("input/str");

    jsc.close();
}
/**
 * saveAsTextFile:保存到文件
 */
def saveAsTextFileTest(): Unit = {
  val conf = new SparkConf().setAppName("takeTest").setMaster("local")
  val sc = new SparkContext(conf)
  val numList = Array("TaskSchedulerImpl","DAGScheduler")
  val numRDD = sc.parallelize(numList)
  numRDD.saveAsTextFile("input/scalasaveAsTextFile")
}

countByKey:统计 key 相同的记录(相当于对 K 分组)

/**
 * countBykey:统计每个key的数量
 */
private static void countByKeyTest(){
    SparkConf conf = new SparkConf().setAppName("countByKeyTest").setMaster("local");
    JavaSparkContext jsc = new JavaSparkContext(conf);

    List<Tuple2<Integer,Integer>> numList = Arrays.asList(
            new Tuple2<>(1,100),
            new Tuple2<>(2,50),
            new Tuple2<>(1,100),
            new Tuple2<>(4,80),
            new Tuple2<>(5,50),
            new Tuple2<>(6,80)
    );
    //如果数据是tuple类型的则要用parallelizePairs来并行化,JavaPairRDD来接收类型,而你不是JavaRDD,
    JavaPairRDD<Integer,Integer> numRDD  = jsc.parallelizePairs(numList);
    Map<Integer,Long> count = numRDD.countByKey();
    count.forEach((k,v)-> System.out.println(k+"--"+v));
    jsc.close();
}
/**
 * countBykey:统计每个key的数量
 */
def countByKeyTest(): Unit = {
  val conf = new SparkConf().setAppName("takeTest").setMaster("local")
  val sc = new SparkContext(conf)
  val numList = Array("TaskSchedulerImpl","DAGScheduler","TaskSchedulerImpl")
  val numRDD = sc.parallelize(numList)

  numRDD.countByValue().foreach(num=>println(num._1+"--"+num._2));
} 
countByKey结果:
5--1
1--2
6--1
2--1
4--1
  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 560 关注
  • rdd
    5 引用 • 2 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...