下面记录一些常用的 Transformation 和 Action 的用法例子。
初始化
下面的例子都是依赖这个初始化的代码进行演示
static JavaSparkContext context;
static {
SparkSession session = SparkSession.builder().master("local[*]").appName("TestApp").getOrCreate();
context = JavaSparkContext.fromSparkContext(session.sparkContext());
}
Transformation
Map
map 是将源 JavaRDD 的一个一个元素的传入 call 方法,并经过算法后一个一个的返回从而生成一个新的 JavaRDD。
public static void map() {
String[] names = {"张无忌", "赵敏", "周芷若"};
List list = Arrays.asList(names);
JavaRDD listRDD = context.parallelize(list);
JavaRDD nameRDD = listRDD.map((Function, String>) v1 -> "Hello," + v1);
nameRDD.foreach(name -> System.out.println(name));
}
打印如下:
Hello,张无忌
Hello,周芷若
Hello,赵敏
可以看出,对于 map 算子,源 JavaRDD 的每个元素都会进行计算,由于是依次进行传参,所以他是有序的,新 RDD 的元素顺序与源 RDD 是相同的
flatMap
flatMap 与 map 一样,是将 RDD 中的元素依次的传入 call 方法,他比 map 多的功能是能在任何一个传入 call 方法的元素后面添加任意多元素,而能达到这一点,正是因为其进行传参是依次进行的。
public static void flatMap() {
List list = Arrays.asList("张无忌 赵敏", "宋青书 周芷若");
JavaRDD listRDD = context.parallelize(list);
JavaRDD nameRDD = listRDD.flatMap((FlatMapFunction, String>) s -> Arrays.asList(s.split(" ")).iterator());
nameRDD.foreach(name -> System.out.println(name));
}
打印如下:
张无忌
赵敏
宋青书
周芷若
flatMap 的特性决定了这个算子在对需要随时增加元素的时候十分好用,比如在对源 RDD 查漏补缺时。
mapPartitions
与 map 方法类似,map 是对 rdd 中的每一个元素进行操作,而 mapPartitions 则是对 rdd 中的每个分区的迭代器进行操作。如果在 map 过程中需要频繁创建额外的对象,map 需要为每个元素创建一个链接而 mapPartition 为每个 partition 创建一个链接),则 mapPartitions 效率比 map 高的多。mapPartitions 比较适合需要分批处理数据的情况。
public static void mapParations() {
List list = Arrays.asList(1, 2, 3, 4, 5, 6);
JavaRDD listRDD = context.parallelize(list, 2);
JavaRDD nRDD = listRDD.mapPartitions(iterator -> {
System.out.println("遍历");
ArrayList array = new ArrayList<>();
while (iterator.hasNext()) {
array.add("hello " + iterator.next());
}
return array.iterator();
});
nRDD.foreach((VoidFunction) s -> System.out.println(s));
}
打印如下:
遍历
遍历
hello 4
hello 1
hello 5
hello 6
hello 2
hello 3
mapPartitionsWithIndex
每次获取和处理的就是一个分区的数据,并且知道处理的分区的分区号是啥
public static void mapPartitionsWithIndex() {
List list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
JavaRDD listRDD = context.parallelize(list, 2);
JavaRDD stringJavaRDD = listRDD.mapPartitionsWithIndex((Function2, Iterator, Iterator>) (v1, v2) -> {
ArrayList list1 = new ArrayList<>();
while (v2.hasNext()) {
list1.add(v1 + "_" + v2.next());
}
return list1.iterator();
}, true);
stringJavaRDD.foreach(new VoidFunction() {
@Override
public void call(String s) throws Exception {
System.out.println(s);
}
});
}
打印如下:
0_1
0_2
0_3
0_4
1_5
1_6
1_7
1_8
union
当要将两个 RDD 合并时,便要用到 union 和 join,其中 union 只是简单的将两个 RDD 累加起来,可以看做 List 的 addAll 方法。就像 List 中一样,当使用 union 及 join 时,必须保证两个 RDD 的泛型是一致的。
public static void union() {
final List list1 = Arrays.asList(1, 2, 3, 4);
final List list2 = Arrays.asList(3, 4, 5, 6);
final JavaRDD rdd1 = context.parallelize(list1);
final JavaRDD rdd2 = context.parallelize(list2);
JavaRDD unionRDD = rdd1.union(rdd2);
unionRDD.foreach((VoidFunction) integer -> System.out.println(integer));
}
打印如下:
1
2
4
3
4
3
6
5
groupByKey
groupByKey 是将 RDD 中的元素进行分组,组名是 call 方法中的返回值,groupByKey 是将 PairRDD 中拥有相同 key 值的元素归为一组。
public static void groupByKey() {
List, String>> list = Arrays.asList(
new Tuple2("美国", "特朗普"),
new Tuple2("中国", "大大"),
new Tuple2("美国", "希拉里"),
new Tuple2("中国", "小小")
);
JavaPairRDD, String> listRDD = context.parallelizePairs(list);
JavaPairRDD, Iterable> groupByKeyRDD = listRDD.groupByKey();
groupByKeyRDD.foreach(tuple -> {
String country = tuple._1;
Iterator iterator = tuple._2.iterator();
String people = "";
while (iterator.hasNext()) {
people = people + iterator.next() + " ";
}
System.out.println(country + "人员:" + people);
});
}
打印如下:
美国人员:特朗普 希拉里
中国人员:大大 小小
join
join 是将两个 PairRDD 合并,并将有相同 key 的元素分为一组,可以理解为 groupByKey 和 Union 的结合。
public static void join() {
final List, String>> names = Arrays.asList(
new Tuple2, String>(1, "东方不败"),
new Tuple2, String>(2, "令狐冲"),
new Tuple2, String>(3, "林平之")
);
final List, Integer>> scores = Arrays.asList(
new Tuple2, Integer>(1, 99),
new Tuple2, Integer>(2, 98),
new Tuple2, Integer>(3, 97)
);
final JavaPairRDD, String> nemesrdd = context.parallelizePairs(names);
final JavaPairRDD, Integer> scoresrdd = context.parallelizePairs(scores);
final JavaPairRDD, Tuple2, Integer>> joinRDD = nemesrdd.join(scoresrdd);
joinRDD.foreach(tuple -> System.out.println("学号:" + tuple._1 + " 姓名:" + tuple._2._1 + " 成绩:" + tuple._2._2));
}
打印如下:
学号:1 姓名:东方不败 成绩:99
学号:3 姓名:林平之 成绩:97
学号:2 姓名:令狐冲 成绩:98
sample
对一个数据集进行随机抽样,withReplacement 表示是否有放回抽样,fraction 表示返回数据集大小的百分比的数量,比如数据集有 20 条数据,fraction=10,则返回 50 条,参数 seed 指定生成随机数的种子。
public static void sample() {
ArrayList list = new ArrayList<>();
for (int i = 1; i <= 20; i++) {
list.add(i);
}
JavaRDD listRDD = context.parallelize(list);
JavaRDD sampleRDD = listRDD.sample(false, 0.5, 0);
sampleRDD.foreach(num -> System.out.print(num + " "));
}
打印如下:
16 18 13 6 7 8 9 1 2 4
cartesian
用于求笛卡尔积
public static void cartesian() {
List list1 = Arrays.asList("A", "B");
List list2 = Arrays.asList(1, 2, 3);
JavaRDD list1RDD = context.parallelize(list1);
JavaRDD list2RDD = context.parallelize(list2);
list1RDD.cartesian(list2RDD).foreach(tuple -> System.out.println(tuple._1 + "->" + tuple._2));
}
打印如下:
A->2
A->1
A->3
B->1
B->2
B->3
filter
过滤操作,满足 filter 内 function 函数为 true 的 RDD 内所有元素组成一个新的数据集
public static void filter() {
List list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
JavaRDD listRDD = context.parallelize(list);
JavaRDD filterRDD = listRDD.filter(num -> num % 2 == 0);
filterRDD.foreach(num -> System.out.print(num + " "));
}
打印如下:
8 10 6 4 2
distinct
返回一个在源数据集去重之后的新数据集,即去重。
public static void distinct() {
List list = Arrays.asList(1, 1, 2, 2, 3, 3, 4, 5);
JavaRDD listRDD = (JavaRDD) context.parallelize(list);
listRDD.distinct().foreach(num -> System.out.println(num+","));
}
打印如下:
4,3,2,1,5
intersection
对于源数据集和其他数据集求交集,并去重。
public static void intersection() {
List list1 = Arrays.asList(1, 2, 3, 4);
List list2 = Arrays.asList(3, 4, 5, 6);
JavaRDD list1RDD = context.parallelize(list1);
JavaRDD list2RDD = context.parallelize(list2);
list1RDD.intersection(list2RDD).foreach(num -> System.out.print(num+","));
}
打印如下:
3,4
coalesce
重新分区,减少 RDD 中分区的数量到 numPartitions。
public static void coalesce() {
List list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
JavaRDD listRDD = context.parallelize(list, 3);
listRDD.coalesce(1).foreach(num -> System.out.println(num));
}
replication
进行重分区,解决的问题:本来分区数少-> 增加分区数
public static void replication() {
List list = Arrays.asList(1, 2, 3, 4);
JavaRDD listRDD = context.parallelize(list, 1);
listRDD.repartition(2).foreach(num -> System.out.println(num));
}
repartitionAndSortWithinPartitions
repartitionAndSortWithinPartitions 函数是 repartition 函数的变种,与 repartition 函数不同的是,repartitionAndSortWithinPartitions 在给定的 partitioner 内部进行排序,性能比 repartition 要高。
public static void repartitionAndSortWithinPartitions() {
List list = Arrays.asList(1, 4, 55, 66, 33, 48, 23);
JavaRDD listRDD = context.parallelize(list, 1);
JavaPairRDD, Integer> pairRDD = listRDD.mapToPair(num -> new Tuple2<>(num, num));
pairRDD.repartitionAndSortWithinPartitions(new HashPartitioner(2))
.mapPartitionsWithIndex((index, iterator) -> {
ArrayList list1 = new ArrayList<>();
while (iterator.hasNext()) {
list1.add(index + "_" + iterator.next());
}
return list1.iterator();
}, false)
.foreach(str -> System.out.println(str));
}
cogroup
cogroup 对两个 RDD 中的 KV 元素,每个 RDD 中相同 key 中的元素分别聚合成一个集合。与 reduceByKey 不同的是针对两个 RDD 中相同的 key 的元素进行合并。
public static void cogroup() {
List, String>> list1 = Arrays.asList(
new Tuple2, String>(1, "www"),
new Tuple2, String>(2, "bbs")
);
List, String>> list2 = Arrays.asList(
new Tuple2, String>(1, "cnblog"),
new Tuple2, String>(2, "cnblog"),
new Tuple2, String>(3, "very")
);
List, String>> list3 = Arrays.asList(
new Tuple2, String>(1, "com"),
new Tuple2, String>(2, "com"),
new Tuple2, String>(3, "good")
);
JavaPairRDD, String> list1RDD = context.parallelizePairs(list1);
JavaPairRDD, String> list2RDD = context.parallelizePairs(list2);
JavaPairRDD, String> list3RDD = context.parallelizePairs(list3);
list1RDD.cogroup(list2RDD, list3RDD).foreach(tuple ->
System.out.println(tuple._1 + " " + tuple._2._1() + " " + tuple._2._2() + " " + tuple._2._3()));
}
打印如下:
1 [www] [cnblog] [com]
2 [bbs] [cnblog] [com]
3 [] [very] [good]
sortByKey
sortByKey 函数作用于 Key-Value 形式的 RDD,并对 Key 进行排序。从函数的实现可以看出,它主要接受两个函数,含义和 sortBy 一样,这里就不进行解释了。该函数返回的 RDD 一定是 ShuffledRDD 类型的,因为对源 RDD 进行排序,必须进行 Shuffle 操作,而 Shuffle 操作的结果 RDD 就是 ShuffledRDD。其实这个函数的实现很优雅,里面用到了 RangePartitioner,它可以使得相应的范围 Key 数据分到同一个 partition 中,然后内部用到了 mapPartitions 对每个 partition 中的数据进行排序,而每个 partition 中数据的排序用到了标准的 sort 机制,避免了大量数据的 shuffle。
public static void sortByKey() {
List, String>> list = Arrays.asList(
new Tuple2<>(99, "张三丰"),
new Tuple2<>(96, "东方不败"),
new Tuple2<>(66, "林平之"),
new Tuple2<>(98, "聂风")
);
JavaPairRDD, String> listRDD = context.parallelizePairs(list);
listRDD.sortByKey(true).foreach(tuple -> System.out.println(tuple._2 + "->" + tuple._1));
}
打印如下:
张三丰->99
林平之->66
聂风->98
东方不败->96
aggregateByKey
aggregateByKey 函数对 PairRDD 中相同 Key 的值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和 aggregate 函数类似,aggregateByKey 返回值的类型不需要和 RDD 中 value 的类型一致。因为 aggregateByKey 是对相同 Key 中的值进行聚合操作,所以 aggregateByKey 函数最终返回的类型还是 Pair RDD,对应的结果是 Key 和聚合好的值;而 aggregate 函数直接是返回非 RDD 的结果,这点需要注意。在实现过程中,定义了三个 aggregateByKey 函数原型,但最终调用的 aggregateByKey 函数都一致。
public static void aggregateByKey() {
List list = Arrays.asList("you,jump", "i,jump");
JavaRDD listRDD = context.parallelize(list);
listRDD.flatMap(line -> Arrays.asList(line.split(",")).iterator())
.mapToPair(word -> new Tuple2<>(word, 1))
.aggregateByKey(0, (x, y) -> x + y, (m, n) -> m + n)
.foreach(tuple -> System.out.println(tuple._1 + "->" + tuple._2));
}
打印如下:
you->1
i->1
jump->2
Action
reduce
reduce 是将 RDD 中的所有元素进行合并,当运行 call 方法时,会传入两个参数,在 call 方法中将两个参数合并后返回,而这个返回值会返回一个新的 RDD 中的元素再次传入 call 方法中,继续合并,直到合并到只剩下一个元素时。
public static void reduce() {
List list = Arrays.asList(1, 2, 3, 4, 5, 6);
JavaRDD listRDD = context.parallelize(list);
Integer result = listRDD.reduce((x, y) -> x + y);
System.out.println(result);
}
打印如下:
21
reduceByKey
和 reduce 类似,但是 reduceByKey 仅将 RDD 中所有 K,V 对中 K 值相同的 V 进行合并。
public static void reduceByKey() {
List, Integer>> list = Arrays.asList(
new Tuple2, Integer>("中国", 99),
new Tuple2, Integer>("美国", 97),
new Tuple2, Integer>("泰国", 89),
new Tuple2, Integer>("中国", 77)
);
JavaPairRDD, Integer> listRDD = context.parallelizePairs(list);
JavaPairRDD, Integer> resultRDD = listRDD.reduceByKey((x, y) -> x + y);
resultRDD.foreach(tuple -> System.out.println("国家: " + tuple._1 + "->" + tuple._2));
}
打印如下:
门派: 美国->97
门派: 中国->176
门派: 泰国->89
collect
将一个 RDD 以一个 Array 数组形式返回其中的所有元素。
public static void collect(){
List list = Arrays.asList("you,jump", "i,jump");
JavaRDD listRDD = context.parallelize(list);
List collect = listRDD.collect();
}
count
返回数据集中元素个数,默认 Long 类型。
first
返回数据集的第一个元素
countByKey
用于统计 RDD[K,V]中每个 K 的数量,返回具有每个 key 的计数的(k,int)pairs 的 hashMap
foreach
对数据集中每一个元素运行函数 function
saveAsObjectFile
将数据集中元素以 ObjectFile 形式写入本地文件系统或者 HDFS 等
saveAsSequenceFile
将 dataSet 中元素以 Hadoop SequenceFile 的形式写入本地文件系统或者 HDFS 等。
saveAsTextFile
将 dataSet 中元素以文本文件的形式写入本地文件系统或者 HDFS 等。Spark 将对每个元素调用 toString 方法,将数据元素转换为文本文件中的一行记录。
takeOrdered
返回 RDD 中前 n 个元素,并按默认顺序排序(升序)或者按自定义比较器顺序排序。
take
返回一个包含数据集前 n 个元素的数组(从 0 下标到 n-1 下标的元素),不排序。
扫一扫有惊喜: [![imagepng](http://itechor.top/solo/upload/bb791a58c3a84193b7f643b6849482c5_image.png) ](http://ym0214.com)
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于