Spark 的见解 & 优化 (一)

本贴最后更新于 2111 天前,其中的信息可能已经事过境迁

 spark 是什么

 spark 是一个分布式的内存型的流式计算框架,支持 java,python,scala,数据源可以是流式的流,可以是文本,数据库,有 schema 的 json 或者 parquet 等

 概念&见解(附 java 示例代码)

 rdd

Spark revolves around the concept of a _resilient distributed dataset_ (RDD), which is a fault-tolerant collection of elements that can be
operated on in parallel. There are two ways to create RDDs: _parallelizing_ an existing collection in your driver program, or referencing a
dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

 rdd 是分布式的的可容错的数据集合,2 种创建方式上面已由上面给出,在此就不做赘述。

 Transformations 算子(带示例代码)

Transformation 属于延迟计算,当一个 RDD 转换成另一个 RDD 时并没有立即进行转换,仅仅是记住了数据集的逻辑操作

 1)map(通过函数把 rdd 变换成为一个新的数据集)

 SparkConf conf = new SparkConf();
 // local[1]表示使用1个线程
 conf.setMaster("local[1]");
 conf.setAppName("test");
 List list = new ArrayList<>();
 for(int i=1;i<=10;i++){
    list.add(i);
 }
 JavaSparkContext jc = new JavaSparkContext(conf);
 JavaRDD<Integer> parallelize = jc.parallelize(list);
 // 每个值前面加上字符串str:
 JavaRDD<String> map = parallelize.map(x -> "str:"+x);
 map.foreach(x-> System.out.println(x));
 
 结果:
 str:1
 str:2
 str:3
 str:4
 str:5
 str:6
 str:7
 str:8
 str:9
 str:10

 2)filter(返回函数结果为 true 的数据集)

 SparkConf conf = new SparkConf();
 conf.setMaster("local[1]");
 conf.setAppName("test");
 List list = new ArrayList<>();
 for(int i=1;i<=10;i++){
    list.add(i);
 }
 JavaSparkContext jc = new JavaSparkContext(conf);
 JavaRDD parallelize = jc.parallelize(list);
 // 过滤偶数
 JavaRDD map = parallelize.filter(x->x%2==0?false:true);
 map.foreach(x-> System.out.println(x));
 
 结果:
 1
 3
 5
 7
 9

 3) flatMap(把一个结果集的每个元素变成为多个元素)

 SparkConf conf = new SparkConf();
 conf.setMaster("local[1]");
 conf.setAppName("test");
 JavaSparkContext jc = new JavaSparkContext(conf);
 // 内容为:1,2,3,4,5,6,7,8,9
 // 加载该文件并按逗号分隔
 JavaRDD stringJavaRDD = jc.textFile("/Users/yangjunwei/data/spark.txt")
        .flatMap(x -> Arrays.asList(x.split(",")).iterator());
 stringJavaRDD.foreach(x-> System.out.println(x));
 
 结果:
 1
 2
 3
 4
 5
 6
 7
 8
 9

 4) mapPartitions(以分区为单位,对每个 partition 的 rdd 做 map 操作)

 SparkConf conf = new SparkConf();
 conf.setMaster("local[1]");
 conf.setAppName("test");
 JavaSparkContext jc = new JavaSparkContext(conf);
 // 内容为:1,2,3,4,5,6,7,8,9
 // 加载该文件并按逗号分隔
 // 每行开头加str:
 JavaRDD stringJavaRDD = jc.textFile("/Users/yangjunwei/data/spark.txt")
        .flatMap(x -> Arrays.asList(x.split(",")).iterator())
        .mapPartitions(x->{
           List list = new ArrayList<>();
           x.forEachRemaining(x1->{
             list.add("str:"+x1);
           });
           return list.iterator();
  });
  stringJavaRDD.foreach(x-> System.out.println(x));
  结果:
  str:1
  str:2
  str:3
  str:4
  str:5
  str:6
  str:7
  str:8
  str:9

 5) union(返回 2 个数据集的并集)

 SparkConf conf = new SparkConf();
 conf.setMaster("local[1]");
 conf.setAppName("test");
 JavaSparkContext jc = new JavaSparkContext(conf);
 List list1 = new ArrayList<>();
 for(int i=1;i<=5;i++){
    list1.add(i);
 }
 List list2 = new ArrayList<>();
 for(int i=5;i<=10;i++){
    list2.add(i);
 }
 JavaRDD rdd1 = jc.parallelize(list1);
 JavaRDD rdd2 = jc.parallelize(list2);
 JavaRDD union = rdd1.union(rdd2);
 System.out.println("rdd1:");
 rdd1.foreach(x-> System.out.println(x));
 System.out.println("rdd2:");
 rdd2.foreach(x-> System.out.println(x));
 System.out.println("union:");
 union.foreach(x-> System.out.println(x));
 
 结果:
 rdd1:
 1
 2
 3
 4
 5
 rdd2:
 5
 6
 7
 8
 9
 10
 union:
 1
 2
 3
 4
 5
 5
 6
 7
 8
 9
 10

 6) distinct(数据集去重)
 7) sortBy(对数据集处理后的值做二次排序)

 接5)的代码
 // 按原值分1个partition进行升序
 union.distinct().sortBy(x->x,true,1).foreach(x-> System.out.println(x));
 
 结果:
 1
 2
 3
 4
 5
 6
 7
 8
 9
 10

 9) mapToPair(将数据集转化为 <K,V> 数据集)
 10) sortByKey(对 <K,V> 数据集(pairs)基于 key 进行排序)

 SparkConf conf = new SparkConf();
 conf.setMaster("local[1]");
 conf.setAppName("test");
 JavaSparkContext jc = new JavaSparkContext(conf);
 List list1 = new ArrayList<>();
 for(int i=5;i>=1;i--){
    list1.add(i);
 }
 JavaPairRDD, String> pairRDD = jc.parallelize(list1).mapToPair(x -> new Tuple2<>(x, ""));
 System.out.println("排序前:");
 pairRDD.foreach(x-> System.out.println(x));
 JavaPairRDD, String> sortPair = pairRDD.sortByKey();
 System.out.println("排序后:");
 sortPair.foreach(x-> System.out.println(x));
 
 结果:
 排序前:
 (5,)
 (4,)
 (3,)
 (2,)
 (1,)
 排序后:
 (1,)
 (2,)
 (3,)
 (4,)
 (5,)

 11) groupByKey(对 <K,V> 数据集(pairs)基于 key 进行分组)

 SparkConf conf = new SparkConf();
 conf.setMaster("local[1]");
 conf.setAppName("test");
 JavaSparkContext jc = new JavaSparkContext(conf);
 List<Map<Integer,Integer>> list = new ArrayList<>();
 Map<Integer,Integer> item = new HashMap<>();
 item.put(1,1);
 list.add(item);
 item = new HashMap<>();
 item.put(1,2);
 list.add(item);
 item = new HashMap<>();
 item.put(1,3);
 list.add(item);
 item = new HashMap<>();
 item.put(2,1);
 list.add(item);
 item = new HashMap<>();
 item.put(2,2);
 list.add(item);
 JavaPairRDD<Integer,Integer> javaPairRDD = jc.parallelize(list).flatMap(x -> x.entrySet().iterator()).mapToPair(x -> new Tuple2<>(x.getKey(),x.getValue()));
 System.out.println("分组前:");
 javaPairRDD.foreach(x-> System.out.println(x));
 System.out.println("分组后:");
 javaPairRDD.groupByKey().foreach(x-> System.out.println(x));
 
 结果:
 分组前:
 (1,1)
 (1,2)
 (1,3)
 (2,1)
 (2,2)
 分组后:
 (1,[1, 2, 3])
 (2,[1, 2])

 12) reduceByKey(对 <K,V> 数据集(pairs)基于 key 进行 reduce 操作)

 SparkConf conf = new SparkConf();
 conf.setMaster("local[1]");
 conf.setAppName("test");
 JavaSparkContext jc = new JavaSparkContext(conf);
 List list1 = new ArrayList<>();
 for(int i=1;i<=5;i++){
    list1.add(i);
 }
 List list2 = new ArrayList<>();
 for(int i=3;i<=5;i++){
    list2.add(i);
 }
 JavaPairRDD, String> pairRdd1 = jc.parallelize(list1).mapToPair(x -> new Tuple2<>(x, ""));
 JavaPairRDD, String> pairRdd2 = jc.parallelize(list2).mapToPair(x -> new Tuple2<>(x, ""));
 JavaPairRDD, String> union = pairRdd1.union(pairRdd2);
 System.out.println("key去重前:");
 union.sortByKey().map(x->x._1).foreach(x-> System.out.println(x));
 System.out.println("key去重后:");
 union.reduceByKey((var1, var2) -> var1).sortByKey().map(x->x._1).foreach(x-> System.out.println(x));

 action 算子(带示例代码)

下一篇

  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 561 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...