来源: Think And COde
Stream 是 Java 8 提供的高效操作集合类(Collection)数据的 API。
从 Iterator 到 Stream
有一个字符串的 list,要统计其中长度大于 7 的字符串的数量,用迭代来实现:
List<String> wordList = Arrays.asList("regular", "expression", "specified", "as", "a",
"string", "must");
int countByIterator = 0;
for (String word: wordList) {
if (word.length() > 7) {
countByIterator++;
}
}
用 Stream 实现:
long countByStream = wordList.stream().filter(w -> w.length() > 7).count();
显然,用 stream 实现更简洁,不仅如此,stream 很容易实现并发操作,比如:
long countByParallelStream = wordList.parallelStream().filter(w -> w.length() > 7).count();
stream 遵循的原则是:告诉我做什么,不用管我怎么做。比如上例:告诉 stream 通过多线程统计字符串长度,至于以什么顺序、在哪个线程中执行,由 stream 来负责;而在迭代实现中,由于计算的方式已确定,很难优化了。
Stream 和 Collection 的区别主要有:
* stream 本身并不存储数据,数据是存储在对应的 collection 里,或者在需要的时候才生成的;
* stream 不会修改数据源,总是返回新的 stream;
* stream 的操作是懒执行 (lazy) 的:仅当最终的结果需要的时候才会执行,比如上面的例子中,结果仅需要前3个长度大于7
的字符串,那么在找到前3个长度符合要求的字符串后,`filter()` 将停止执行;
使用 stream 的步骤如下:
* 创建stream;
* 通过一个或多个中间操作(intermediate operations)将初始stream转换为另一个stream;
* 通过中止操作(terminal operation)获取结果;该操作触发之前的懒操作的执行,中止操作后,该stream关闭,不能再
使用了;
在上面的例子中, wordList.stream()
和 wordList.parallelStream()
是创建 stream,filter()
是中间操作,过滤后生成一个新的 stream,count()
是中止操作,获取结果。
创建 Stream 的方式
- 从 array 或 list 创建 stream:
Stream<Integer> integerStream = Stream.of(10, 20, 30, 40);
String[] cityArr = {"Beijing", "Shanghai", "Chengdu"};
Stream<String> cityStream = Stream.of(cityArr);
Stream<String> nameStream = Arrays.asList("Daniel", "Peter", "Kevin").stream();
Stream<String> cityStream2 = Arrays.stream(cityArr, 0, 1);
Stream<String> emptyStream = Stream.empty();
- 通过 generate 和 iterate 创建无穷 stream:
Stream<String> echos = Stream.generate(() -> "echo");
Stream<Integer> integers = Stream.iterate(0, num -> num + 1);
- 通过其它 API 创建 stream:
Stream<String> lines = Files.lines(Paths.get("test.txt"))
String content = "AXDBDGXC";
Stream<String> contentStream = Pattern.compile("[ABC]{1,3}").splitAsStream(content);
Stream 转换
filter()
用于过滤,即使原 stream 中满足条件的元素构成新的 stream:
List<String> langList = Arrays.asList("Java", "Python", "Swift", "HTML");
Stream<String> filterStream = langList.stream().filter(lang -> lang.equalsIgnoreCase("java"));
- map()用于映射,遍历原 stream 中的元素,转换后构成新的 stream:
List<String> langList = Arrays.asList("Java", "Python", "Swift", "HTML");
Stream<String> mapStream = langList.stream().map(String::toUpperCase);
flatMap()
用于将[["ABC", "DEF"], ["FGH", "IJK"]]
的形式转换为["ABC", "DEF", "FGH", "IJK"]
:
Stream<String> cityStream = Stream.of("Beijing", "Shanghai", "Shenzhen");
// [['B', 'e', 'i', 'j', 'i', 'n', 'g'], ['S', 'h', 'a', 'n', 'g', 'h', 'a', 'i'], ...]
Stream<Stream<Character>> characterStream1 = cityStream.map(city -> characterStream(city));
Stream<String> cityStreamCopy = Stream.of("Beijing", "Shanghai", "Shenzhen");
// ['B', 'e', 'i', 'j', 'i', 'n', 'g', 'S', 'h', 'a', 'n', 'g', 'h', 'a', 'i', ...]
Stream<Character> characterStreamCopy = cityStreamCopy.flatMap(city -> characterStream(city));
其中, characterStream()
返回有参数字符串的字符构成的 Stream;
limit()
表示限制 stream 中元素的数量,skip()
表示跳过 stream 中前几个元素, concat 表示将多个 stream 连接起来,peek()
主要用于 debug 时查看 stream 中元素的值:
Stream<Integer> limitStream = Stream.of(18, 20, 12, 35, 89).sorted().limit(3);
Stream<Integer> skipStream = Stream.of(18, 20, 12, 35, 89).sorted(Comparator.reverseOrder())
.skip(1);
Stream<Integer> concatStream = Stream.concat(Stream.of(1, 2, 3), Stream.of(4, 5, 6));
concatStream.peek(i -> System.out.println(i)).count();
peek()
是 intermediate operation,所以后面需要一个 terminal operation,如 count()
才能在输出中看到结果;
- 有状态的 (stateful) 转换,即元素之间有依赖关系,如
distinct()
返回由唯一元素构成的 stream,sorted()
返回排序后的 stream:
Stream<String> distinctStream = Stream.of("Beijing", "Tianjin", "Beijing").distinct();
Stream<String> sortedStream = Stream.of("Beijing", "Shanghai", "Chengdu").sorted(Comparator.comparing(String::length).reversed());
Stream reduction
reduction 就是从 stream 中取出结果,是 terminal operation,因此经过 reduction 后的 stream 不能再使用了。
Optional
Optional 表示或者有一个 T 类型的对象,或者没有值;
- 创建 Optional 对象:
直接通过 Optional 的类方法:of()/empty()/ofNullable()
:
Optional<Integer> intOpt = Optional.of(10);
Optional<String> emptyOpt = Optional.empty();
Optional<Double> doubleOpt = Optional.ofNullable(5.5);
- 使用 Optional 对象:
你当然可以这么使用:
if (intOpt.isPresent()) {
intOpt.get();
}
但是,最好这么使用:
doubleOpt.orElse(0.0);
doubleOpt.orElseGet(() -> 1.0);
doubleOpt.orElseThrow(RuntimeException::new);
List<Double> doubleList = new ArrayList<>();
doubleOpt.ifPresent(doubleList::add);
map()方法与ifPresent()用法相同,就是多个返回值,flatMap()用于Optional的链式表达:
Optional<Boolean> addOk = doubleOpt.map(doubleList::add);
Optional.of(4.0).flatMap(num -> Optional.ofNullable(num * 100)).flatMap(num -> Optional.ofNullable(Math.sqrt(num)));
简单的 reduction
主要包含以下操作: findFirst()/findAny()/allMatch/anyMatch()/noneMatch
,比如:
Optional<String> firstWord = wordStream.filter(s -> s.startsWith("Y")).findFirst();
Optional<String> anyWord = wordStream.filter(s -> s.length() > 3).findAny();
wordStream.allMatch(s -> s.length() > 3);
wordStream.anyMatch(s -> s.length() > 3);
wordStream.noneMatch(s -> s.length() > 3);
reduce 方法
- reduce(accumulator):参数是一个执行双目运算的 Functional Interface,假如这个参数表示的操作为 op,stream 中的元素为 x, y, z, …,则
reduce()
执行的就是 x op y op z ...,所以要求 op 这个操作具有结合性 (associative),即满足:(x op y) op z = x op (y op z),满足这个要求的操作主要有:求和、求积、求最大值、求最小值、字符串连接、集合并集和交集等。另外,该函数的返回值是 Optional 的:
Optional<Integer> sum1 = numStream.reduce((x, y) -> x + y);
- reduce(identity, accumulator):可以认为第一个参数为默认值,但需要满足 identity op x = x,所以对于求和操作,identity 的值为 0,对于求积操作,identity 的值为 1。返回值类型是 stream 元素的类型:
Integer sum2 = numStream.reduce(0, Integer::sum);
collect 结果
- collect() 方法:
reduce()
和 collect()
的区别是:
* reduce()的结果是一个值;
* collect()可以对stream中的元素进行各种处理后,得到stream中元素的值;
Collectors
接口提供了很方便的创建 Collector
对象的工厂方法:
// collect to Collection
Stream.of("You", "may", "assume").collect(Collectors.toList());
Stream.of("You", "may", "assume").collect(Collectors.toSet());
Stream.of("You", "may", "assume").collect(Collectors.toCollection(TreeSet::new));
// join element
Stream.of("You", "may", "assume").collect(Collectors.joining());
Stream.of("You", "may", "assume").collect(Collectors.joining(", "));
// summarize element
IntSummaryStatistics summary = Stream.of("You", "may", "assume").collect(Collectors.summarizingInt(String::length));
summary.getMax();
- foreach() 方法:
foreach()
用于遍历 stream 中的元素,属于 terminal operation;
forEachOrdered()
是按照 stream 中元素的顺序遍历,也就无法利用并发的优势;
Stream.of("You", "may", "assume", "you", "can", "fly").parallel().forEach(w -> System.out.println(w));
Stream.of("You", "may", "assume", "you", "can", "fly").forEachOrdered(w -> System.out.println(w));
- toArray() 方法:
得到由 stream 中的元素得到的数组,默认是 Object[],可以通过参数设置需要结果的类型:
Object[] words1 = Stream.of("You", "may", "assume").toArray();
String[] words2 = Stream.of("You", "may", "assume").toArray(String[]::new);
- toMap() 方法:
toMap: 将 stream 中的元素映射为的形式,两个参数分别用于生成对应的 key 和 value 的值。比如有一个字符串 stream,将首字母作为 key,字符串值作为 value,得到一个 map:
Stream<String> introStream = Stream.of("Get started with UICollectionView and the photo library".split(" "));
Map<String, String> introMap = introStream.collect(Collectors.toMap(s -> s.substring(0, 1), s -> s));
如果一个 key 对应多个 value,则会抛出异常,需要使用第三个参数设置如何处理冲突,比如仅使用原来的 value、使用新的 value,或者合并:
Stream<String> introStream = Stream.of("Get started with UICollectionView and the photo library".split(" "));
Map<Integer, String> introMap2 = introStream.collect(Collectors.toMap(s -> s.length(), s -> s, (existingValue, newValue) -> existingValue));
如果 value 是一个集合,即将 key 对应的所有 value 放到一个集合中,则需要使用第三个参数,将多个 value 合并:
Stream<String> introStream3 = Stream.of("Get started with UICollectionView and the photo library".split(" "));
Map<Integer, Set<String>> introMap3 = introStream3.collect(Collectors.toMap(s -> s.length(), s -> Collections.singleton(s), (existingValue, newValue) -> {
HashSet<String> set = new HashSet<>(existingValue);
set.addAll(newValue);
return set;
}
));
introMap3.forEach((k, v) -> System.out.println(k + ": " + v));
如果 value 是对象自身,则使用 Function.identity(),如:
Map<Integer, Person> idToPerson = people.collect(Collectors.toMap(Person::getId, Function.identity()));
toMap() 默认返回的是 HashMap,如果需要其它类型的 map ,比如 TreeMap ,则可以在第四个参数指定构造方法:
Map<Integer, String> introMap2 = introStream.collect(Collectors.toMap(s -> s.length(), s -> s, (existingValue, newValue) -> existingValue, TreeMap::new));
Grouping 和 Partitioning
groupingBy()
表示根据某一个字段或条件进行分组,返回一个 Map,其中 key 为分组的字段或条件,value 默认为 list,groupingByConcurrent()
是其并发版本:
Map<String, List<Locale>> countryToLocaleList = Stream.of(Locale.getAvailableLocales()).collect(Collectors.groupingBy(l -> l.getDisplayCountry()));
- 如果
groupingBy()
分组的依据是一个 bool 条件,则 key 的值为 true/false ,此时与partitioningBy()
等价,且partitioningBy()
的效率更高:
// predicate
Map<Boolean, List<Locale>> englishAndOtherLocales = Stream.of(Locale.getAvailableLocales()).collect(Collectors.groupingBy(l -> l.getDisplayLanguage().equalsIgnoreCase("English")));
// partitioningBy
Map<Boolean, List<Locale>> englishAndOtherLocales2 = Stream.of(Locale.getAvailableLocales()).collect(Collectors.partitioningBy(l -> l.getDisplayLanguage().equalsIgnoreCase("English")));
groupingBy()
提供第二个参数,表示 downstream,即对分组后的 value 作进一步的处理:
返回 set,而不是 list:
Map<String, Set<Locale>> countryToLocaleSet = Stream.of(Locale.getAvailableLocales()).collect(Collectors.groupingBy(l -> l.getDisplayCountry(), Collectors.toSet()));
返回 value 集合中元素的数量:
Map<String, Long> countryToLocaleCounts = Stream.of(Locale.getAvailableLocales()).collect(Collectors.groupingBy(l -> l.getDisplayCountry(), Collectors.counting()));
对 value 集合中的元素求和:
Map<String, Integer> cityToPopulationSum = Stream.of(cities).collect(Collectors.groupingBy(City::getName, Collectors.summingInt(City::getPopulation)));
对 value 的某一个字段求最大值,注意 value 是 Optional 的:
Map<String, Optional<City>> cityToPopulationMax = Stream.of(cities).collect(Collectors.groupingBy(City::getName, Collectors.maxBy(Comparator.comparing(City::getPopulation))));
使用 mapping 对 value 的字段进行 map 处理:
Map<String, Optional<String>> stateToNameMax = Stream.of(cities).collect(Collectors.groupingBy(City::getState, Collectors.mapping(City::getName, Collectors.maxBy(Comparator.comparing(String::length)))));
Map<String, Set<String>> stateToNameSet = Stream.of(cities).collect(Collectors.groupingBy(City::getState, Collectors.mapping(City::getName, Collectors.toSet())));
通过 summarizingXXX 获取统计结果:
Map<String, IntSummaryStatistics> stateToPopulationSummary = Stream.of(cities).collect(Collectors.groupingBy(City::getState, Collectors.summarizingInt(City::getPopulation)));
reducing()
可以对结果作更复杂的处理,但是 reducing()
却并不常用:
Map<String, String> stateToNameJoining = Stream.of(cities).collect(Collectors.groupingBy(City::getState, Collectors.reducing("", City::getName, (s, t) -> s.length() == 0 ? t : s + ", " + t)));
比如上例可以通过 mapping 达到同样的效果:
Map<String, String> stateToNameJoining2 = Stream.of(cities).collect(Collectors.groupingBy(City::getState, Collectors.mapping(City::getName, Collectors.joining(", "))));
Primitive Stream
Stream 对应的 Primitive Stream 就是 IntStream,类似的还有 DoubleStream 和 LongStream。
- Primitive Stream 的构造:
of(), range(), rangeClosed(), Arrays.stream()
:
IntStream intStream = IntStream.of(10, 20, 30);
IntStream zeroToNintyNine = IntStream.range(0, 100);
IntStream zeroToHundred = IntStream.rangeClosed(0, 100);
double[] nums = {10.0, 20.0, 30.0};
DoubleStream doubleStream = Arrays.stream(nums, 0, 3);
- Object Stream 与 Primitive Stream 之间的相互转换,通过 mapToXXX() 和 boxed():
// map to
Stream<String> cityStream = Stream.of("Beijing", "Tianjin", "Chengdu");
IntStream lengthStream = cityStream.mapToInt(String::length);
// box
Stream<Integer> oneToNine = IntStream.range(0, 10).boxed();
- 与 Object Stream 相比,Primitive Stream 的特点:
toArray() 方法返回的是对应的 Primitive 类型:
int[] intArr = intStream.toArray();
自带统计类型的方法,如:max(), average(), summaryStatistics()
:
OptionalInt maxNum = intStream.max();
IntSummaryStatistics intSummary = intStream.summaryStatistics();
Parallel Stream
- Stream 支持并发操作,但需要满足以下几点:
构造一个 paralle stream,默认构造的 stream 是顺序执行的,调用 paralle()
构造并行的 stream:
IntStream scoreStream = IntStream.rangeClosed(10, 30).parallel();
要执行的操作必须是可并行执行的,即并行执行的结果和顺序执行的结果是一致的,而且必须保证 stream 中执行的操作是线程安全的:
int[] wordLength = new int[12];
Stream.of("It", "is", "your", "responsibility").parallel().forEach(s -> {
if (s.length() < 12) wordLength[s.length()]++;
});
这段程序的问题在于,多线程访问共享数组 wordLength,是非线程安全的。解决的思路有:
- 构造 AtomicInteger 数组
- 使用 groupingBy()根据 length 统计
- 可以通过并行提高效率的常见场景:
使 stream 无序:对于 distinct()
和 limit()
等方法,如果不关心顺序,则可以使用并行:
LongStream.rangeClosed(5, 10).unordered().parallel().limit(3);
IntStream.of(14, 15, 15, 14, 12, 81).unordered().parallel().distinct();
在 groupingBy()
的操作中,map 的合并操作是比较重的,可以通过 groupingByConcurrent()
来并行处理,不过前提是 parallel stream:
Stream.of(cities).parallel().collect(Collectors.groupingByConcurrent(City::getState));
在执行 stream 操作时不能修改 stream 对应的 collection ;
stream 本身是不存储数据的,数据保存在对应的 collection 中,所以在执行 stream 操作的同时修改对应的 collection ,结果 是未定义的:
// ok
Stream<String> wordStream = wordList.stream();
wordList.add("number");
wordStream.distinct().count();
// ConcurrentModificationException
Stream<String> wordStream = wordList.stream();
wordStream.forEach(s -> { if (s.length() >= 6) wordList.remove(s);});
Functional Interface
仅包含一个抽象方法的 interface 被成为 Functional Interface,比如:Predicate, Function, Consumer 等。
此时我们一般传入一个 lambda 表达式或 Method Reference。
常见的 Functional Interface 有:
Functional Interface Parameter Return Type Description Types
Supplier<T> None T Supplies a value of type T
Consumer<T> T void Consumes a value of type T
BiConsumer<T, U> T,U void Consumes values of types T and U
Predicate<T> T boolean A Boolean-valued function
ToIntFunction<T> T int An int-, long-, or double-valued function
ToLongFunction<T> T long
ToDoubleFunction<T> T double
IntFunction<R> int R A function with argument of type int, long, or double
LongFunction<R> long
DoubleFunction<R> double
Function<T, R> T R A function with argument of type T
BiFunction<T, U, R> T,U R A function with arguments of types T and U
UnaryOperator<T> T T A unary operator on the type T
BinaryOperator<T> T,T T A binary operator on the type T
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于