Spark 的见解 & 优化 (一)

本贴最后更新于 2300 天前,其中的信息可能已经事过境迁

 spark 是什么

 spark 是一个分布式的内存型的流式计算框架,支持 java,python,scala,数据源可以是流式的流,可以是文本,数据库,有 schema 的 json 或者 parquet 等

 概念&见解(附 java 示例代码)

 rdd

Spark revolves around the concept of a _resilient distributed dataset_ (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: _parallelizing_ an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

 rdd 是分布式的的可容错的数据集合,2 种创建方式上面已由上面给出,在此就不做赘述。

 Transformations 算子(带示例代码)

Transformation 属于延迟计算,当一个 RDD 转换成另一个 RDD 时并没有立即进行转换,仅仅是记住了数据集的逻辑操作

 1)map(通过函数把 rdd 变换成为一个新的数据集)

SparkConf conf = new SparkConf(); // local[1]表示使用1个线程 conf.setMaster("local[1]"); conf.setAppName("test"); List list = new ArrayList<>(); for(int i=1;i<=10;i++){ list.add(i); } JavaSparkContext jc = new JavaSparkContext(conf); JavaRDD<Integer> parallelize = jc.parallelize(list); // 每个值前面加上字符串str: JavaRDD<String> map = parallelize.map(x -> "str:"+x); map.foreach(x-> System.out.println(x)); 结果: str:1 str:2 str:3 str:4 str:5 str:6 str:7 str:8 str:9 str:10

 2)filter(返回函数结果为 true 的数据集)

SparkConf conf = new SparkConf(); conf.setMaster("local[1]"); conf.setAppName("test"); List list = new ArrayList<>(); for(int i=1;i<=10;i++){ list.add(i); } JavaSparkContext jc = new JavaSparkContext(conf); JavaRDD parallelize = jc.parallelize(list); // 过滤偶数 JavaRDD map = parallelize.filter(x->x%2==0?false:true); map.foreach(x-> System.out.println(x)); 结果: 1 3 5 7 9

 3) flatMap(把一个结果集的每个元素变成为多个元素)

SparkConf conf = new SparkConf(); conf.setMaster("local[1]"); conf.setAppName("test"); JavaSparkContext jc = new JavaSparkContext(conf); // 内容为:1,2,3,4,5,6,7,8,9 // 加载该文件并按逗号分隔 JavaRDD stringJavaRDD = jc.textFile("/Users/yangjunwei/data/spark.txt") .flatMap(x -> Arrays.asList(x.split(",")).iterator()); stringJavaRDD.foreach(x-> System.out.println(x)); 结果: 1 2 3 4 5 6 7 8 9

 4) mapPartitions(以分区为单位,对每个 partition 的 rdd 做 map 操作)

SparkConf conf = new SparkConf(); conf.setMaster("local[1]"); conf.setAppName("test"); JavaSparkContext jc = new JavaSparkContext(conf); // 内容为:1,2,3,4,5,6,7,8,9 // 加载该文件并按逗号分隔 // 每行开头加str: JavaRDD stringJavaRDD = jc.textFile("/Users/yangjunwei/data/spark.txt") .flatMap(x -> Arrays.asList(x.split(",")).iterator()) .mapPartitions(x->{ List list = new ArrayList<>(); x.forEachRemaining(x1->{ list.add("str:"+x1); }); return list.iterator(); }); stringJavaRDD.foreach(x-> System.out.println(x)); 结果: str:1 str:2 str:3 str:4 str:5 str:6 str:7 str:8 str:9

 5) union(返回 2 个数据集的并集)

SparkConf conf = new SparkConf(); conf.setMaster("local[1]"); conf.setAppName("test"); JavaSparkContext jc = new JavaSparkContext(conf); List list1 = new ArrayList<>(); for(int i=1;i<=5;i++){ list1.add(i); } List list2 = new ArrayList<>(); for(int i=5;i<=10;i++){ list2.add(i); } JavaRDD rdd1 = jc.parallelize(list1); JavaRDD rdd2 = jc.parallelize(list2); JavaRDD union = rdd1.union(rdd2); System.out.println("rdd1:"); rdd1.foreach(x-> System.out.println(x)); System.out.println("rdd2:"); rdd2.foreach(x-> System.out.println(x)); System.out.println("union:"); union.foreach(x-> System.out.println(x)); 结果: rdd1: 1 2 3 4 5 rdd2: 5 6 7 8 9 10 union: 1 2 3 4 5 5 6 7 8 9 10

 6) distinct(数据集去重)
 7) sortBy(对数据集处理后的值做二次排序)

接5)的代码 // 按原值分1个partition进行升序 union.distinct().sortBy(x->x,true,1).foreach(x-> System.out.println(x)); 结果: 1 2 3 4 5 6 7 8 9 10

 9) mapToPair(将数据集转化为 <K,V> 数据集)
 10) sortByKey(对 <K,V> 数据集(pairs)基于 key 进行排序)

SparkConf conf = new SparkConf(); conf.setMaster("local[1]"); conf.setAppName("test"); JavaSparkContext jc = new JavaSparkContext(conf); List list1 = new ArrayList<>(); for(int i=5;i>=1;i--){ list1.add(i); } JavaPairRDD, String> pairRDD = jc.parallelize(list1).mapToPair(x -> new Tuple2<>(x, "")); System.out.println("排序前:"); pairRDD.foreach(x-> System.out.println(x)); JavaPairRDD, String> sortPair = pairRDD.sortByKey(); System.out.println("排序后:"); sortPair.foreach(x-> System.out.println(x)); 结果: 排序前: (5,) (4,) (3,) (2,) (1,) 排序后: (1,) (2,) (3,) (4,) (5,)

 11) groupByKey(对 <K,V> 数据集(pairs)基于 key 进行分组)

SparkConf conf = new SparkConf(); conf.setMaster("local[1]"); conf.setAppName("test"); JavaSparkContext jc = new JavaSparkContext(conf); List<Map<Integer,Integer>> list = new ArrayList<>(); Map<Integer,Integer> item = new HashMap<>(); item.put(1,1); list.add(item); item = new HashMap<>(); item.put(1,2); list.add(item); item = new HashMap<>(); item.put(1,3); list.add(item); item = new HashMap<>(); item.put(2,1); list.add(item); item = new HashMap<>(); item.put(2,2); list.add(item); JavaPairRDD<Integer,Integer> javaPairRDD = jc.parallelize(list).flatMap(x -> x.entrySet().iterator()).mapToPair(x -> new Tuple2<>(x.getKey(),x.getValue())); System.out.println("分组前:"); javaPairRDD.foreach(x-> System.out.println(x)); System.out.println("分组后:"); javaPairRDD.groupByKey().foreach(x-> System.out.println(x)); 结果: 分组前: (1,1) (1,2) (1,3) (2,1) (2,2) 分组后: (1,[1, 2, 3]) (2,[1, 2])

 12) reduceByKey(对 <K,V> 数据集(pairs)基于 key 进行 reduce 操作)

SparkConf conf = new SparkConf(); conf.setMaster("local[1]"); conf.setAppName("test"); JavaSparkContext jc = new JavaSparkContext(conf); List list1 = new ArrayList<>(); for(int i=1;i<=5;i++){ list1.add(i); } List list2 = new ArrayList<>(); for(int i=3;i<=5;i++){ list2.add(i); } JavaPairRDD, String> pairRdd1 = jc.parallelize(list1).mapToPair(x -> new Tuple2<>(x, "")); JavaPairRDD, String> pairRdd2 = jc.parallelize(list2).mapToPair(x -> new Tuple2<>(x, "")); JavaPairRDD, String> union = pairRdd1.union(pairRdd2); System.out.println("key去重前:"); union.sortByKey().map(x->x._1).foreach(x-> System.out.println(x)); System.out.println("key去重后:"); union.reduceByKey((var1, var2) -> var1).sortByKey().map(x->x._1).foreach(x-> System.out.println(x));

 action 算子(带示例代码)

下一篇

  • Spark

    Spark 是 UC Berkeley AMP lab 所开源的类 Hadoop MapReduce 的通用并行框架。Spark 拥有 Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job 中间输出结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark 能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。

    74 引用 • 46 回帖 • 563 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...