[零]java8 函数式编程入门官方文档中文版 java.util.stream 中文版 流处理的相关概念
前言
本文为java.util.stream 包文档的译文 极其个别部分可能为了更好理解,陈述略有改动,与原文几乎一致 原文可参考在线API文档 https://docs.oracle.com/javase/8/docs/api/Package java.util.stream Description
一些用于支持流上函数式操作的类 ,例如在集合上的map-reduce转换。例如int sum = widgets.stream() .filter(b -> b.getColor() == RED) .mapToInt(b -> b.getWeight()) .sum();此处,我们使用widgets, 他是一个 Collection
- 不存储数据 流不是存储元素的数据结构;相反,它通过一个哥哥计算操作组合而成的管道,从一个数据源,如数据结构、数组、生成器函数或i/o通道 来传递元素
- 函数特性 一个流上的操作产生一个结果,但是不会修改它的源。例如,过滤集合 获得的流会产生一个没有被过滤元素的新流,而不是从源集合中删除元素
- 延迟搜索 许多流操作,如过滤、映射或重复删除,都可以延迟实现,从而提供出优化的机会。
- 例如,“找到带有三个连续元音的第一个字符串”不需要检查所有的输入字符串。
- 流操作分为中间(流生成)操作和终端(值或副作用生成)操作。许多的中间操作, 如filter,map等,都是延迟执行。
- 中间操作总是惰性的的。
- Stream可能是无限的 虽然集合的大小是有限的,但流不需要。诸如limit(n)或findFirst()这样的短路操作可以允许在有限时间内完成无限流的计算。
- 消耗的 流的元素只在流的生命周期中访问一次。就像迭代器一样,必须生成一个新的流来重新访问源的相同元素
- Collection 提供的stream parallelStream
- 从数组 Arrays.stream(Object[]) 静态方法
- Stream类的静态工厂方法 比如 Stream.of(Object[]), IntStream.range(int, int), Stream.iterate(Object, UnaryOperator) Stream.generate
- BufferedReader.lines(); 文件行
- 获取文件路径的流: Files类的find(), lines(), list(), walk();
- Random.ints() 随机数流
- JDK中的许多其他流载方法,包括BitSet.stream(), Pattern.splitAsStream(java.lang.CharSequence), and JarFile.stream().
这个例子的串行和并行版本的唯一区别是初始时创建流,使用parallelStream()而不是stream()
当启动终端操作时,流管道是按顺序或并行执行的,这取决于它被调用的流的策略模式。
一个流是否可以串行或并行执行,可以用isParallel()方法来获得,
可以用BaseStream.sequential() 和 BaseStream.parallel() 操作修改。
当启动终端操作时,流管道是按顺序或并行执行的,这取决于它被调用的流的模式。
除了被确定为显式非确定性的操作之外,如findAny(),无论是顺序执行还是并行执行,都不应该改变计算的结果。
大多数流操作接受描述用户指定行为的参数,这些参数通常是lambda表达式。
为了保持正确的行为,这些行为参数必须是不干涉non-interfering的,并且在大多数情况下必须是无状态的。
这些参数始终是函数式接口的实例,例如Function,通常是lambda表达式或方法引用
首先创建一个列表,由两个字符串组成:“one”;和“two”。
然后,从该列表中创建一条stream。接下来,通过添加第三个字符串:“three”来修改列表。
最后,流的元素被collect 以及joining在一起。由于该列表在终端收集操作开始之前被修改,结果将是一串“one two three”。
从JDK集合返回的所有流,以及大多数其他JDK类,都像这样表现良好;
对于其他库生成的流,请参阅 Low-level stream construction,以满足构建行为良好的流的需求。
在这里,如果映射操作是并行执行的,那么相同输入的结果可能因线程调度差异而变化,而对于无状态lambda表达式,结果总是相同的
还要注意的是,试图从行为参数访问可变状态时,在安全性和性能方面是您一个错误的选择;
如果你不同步访问那个状态,你就有了数据竞争,因此你的代码可能出现问题,
但是如果你对那个状态进行同步访问,你就有可能会破坏你想要从并行性中得到的受益。
最好的方法是在流操作中完全地避免有状态的行为参数; 通常总会有种方法可以重构流以避免状态性
Side-effects副作用
一般来说,对流操作的行为参数的副作用是不鼓励的,因为它们通常会导致不知情的违反无状态要求的行为,以及其他线程安全隐患
如果行为参数确实有副作用,除非显式地声明,否则就无法保证这些副作用对其他线程的可见性,也不能保证在同一条管道内的“相同”元素上的不同操作在相同的线程中执行。此外,这些影响的排序可能出乎意料。即使管道被限制生成一个与stream源的处理顺序一致的结果(例如,IntStream.range(0,5).parallel().map(x -> x*2).toArray() 必须生成0、2、4、6、8),对于将mapper函数应用于个别元素的顺序,或者对于给定元素执行任何行为参数的顺序,都没有保证
对许多可能会被尝试使用于副作用的计算中,可以替换为无副作用的,更安全更有效的表达,比如使用归约而不是可变的累积器。
然而,使用println()来进行调试的副作用通常是无害的。少部分的流操作,如forEach()和peek(),用的就是他们的副作用;这些应该小心使用。
下面的例子演示,如何从一个使用副作用的计算转变为不适用副作用
下面的代码搜索一个字符串流,以匹配给定的正则表达式,并将匹配放在列表中
这段代码不必要地使用了副作用。如果并行执行,ArrayList的非线程安全将导致不正确的结果,并且添加所需的同步将导致竞争,从而破坏并行性的好处。
此外,在这里使用副作用是完全没有必要的;forEach()可以简单地被替换为更安全、更高效、更适合并行化的reduce操作。
然而,我们有充分的理由倾向于reduce操作,而不是像上面这样的迭代累计运算。
它不仅是一个“更抽象的”——它在流上把流作为一个整体运行而不是作用于单独的元素——但是一个适当构造的reduce操作本质上是可并行的,只要用于处理元素的函数(s)是结合的associative和无状态stateless的。举个例子,给定一个数字流,我们想要找到和,我们可以写:
几乎不需要怎么修改,就可以以并行的方式运行
之所以归约操作可以很好地并行,是因为实现可以并行地操作数据的子集,然后将中间结果组合在一起,得到最终的正确答案。(即使该语言有一个“"parallel for-each"”构造,迭代累计运算方法仍然需要开发人员提供对共享累积变量sum的线程安全更新以及所需的同步,这可能会消除并行性带来的任何性能收益。)
使用reduce()代替了归约操作的并行化的所有负担,并且库可以提供一个高效的并行实现,不需要额外的同步
前面展示的“widgets”示例展示了如何与其他操作相结合,以替换for循环。
如果widgets 是Widget 对象的集合,它有一个getWeight方法,我们可以找到最重的widget:
??
在更通用的形式中 对类型为T的元素,并且返回结果类型为U的reduce操作 需要三个参数:
在这里,identity不仅仅是归约的初始化结果值或者如果没有任何元素时的一个默认的返回值
迭代累计运算器接受部分结果和下一个元素,并产生一个新的中间结果。
组合函数结合了两个部分结果,产生了一个新的中间结果。
(在并行减少的情况下,组合是必要的,在这个过程中,输入被分区,每个分区都计算出部分的累积,然后将部分结果组合起来产生最终的结果。)
更准确地说,identity必须是组合函数的恒等式。这意味着对所有的u,combiner.apply(identity, u)等于u,
另外,组合函数必须是结合的,必须与累加器函数兼容:
对所有u和t,
combiner.apply(identity, u) 必须等于accumulator.apply(u, t).
三参数形式是双参数形式的泛化,将映射步骤合并到累加步骤中。
我们可以用更一般的形式重新改写这个简单的widgets重量的例子
尽管显式的map-reduce的形式更易于阅读,因此通常应该优先考虑。
通用的形式是为了 通过将映射和减少到单个函数,以重要的工作进行优化 这种场景
我们会得到想要的结果,它甚至可以并行工作,然而,但是我们可能对性能不满意
这样的实现将会进行大量的字符串复制 时间复杂度O(n^2)
一种更有效的方法是将结果累积到StringBuilder中,这是一个用于累积字符串的可变容器
就如同我们对普通的归约操作处理一样,我们可以使用相同的技术来处理可变的归约
可变归约操作称为collect()当它将期望的结果收集到一个结果容器中,例如一个集合
收集操作需要三个功能:
一个supplier 功能来构造结果容器的新实例,
一个累计运算器函数将一个输入元素合并到一个结果容器中,
一个组合函数将一个结果容器的内容合并到另一个结果容器中。
它的形式与普通归约的一般形式非常相似
与reduce()相比,以这种抽象的方式表示收集的好处是它直接适合并行化:
我们可以并行地累计运算部分结果,然后将它们组合起来,只要积累和组合功能满足适当的需求。
例如,为了收集流中的元素的字符串表示到ArrayList,我们可以编写显式的for循环
或者我们可以使用一个可并行的collect形式
或者,从累加器函数中提取出来map操作,我们可以更简洁地表达它:
在这里,我们的supplier只是ArrayList的构造器,累加器将string element元素添加到ArrayList中,组合器简单地使用addAll将字符串从一个容器复制到另一个容器中
collect的三个部分——supplier, accumulator, 和combiner ——是紧密耦合的。
我们可以使用Collector来抽象的表达描述这三部分。
上面的例子可以将字符串collect到列表中,可以使用一个标准收集器来重写:
将可变的归约打包成收集器有另一个优点:可组合性。
类Collectors包含许多用于收集器的预定义工厂,包括将一个收集器转换为另一个收集器的组合器。
例如,假设我们有一个Collector,它计算员工流的薪水之和,如下所列
(对于第二个类型的参数 ? ,仅仅表明我们不关心收集器所使用的中间类型。 )如果我们想要创建一个收集器来按部门计算工资的总和,我们可以使用groupingBy来重用summingSalaries 薪水:
就像常规的reduce操作一样,只有满足适当的条件collect() 操作才能够并行化
对于任何部分累计运算的结果,将其与空结果容器相结合combiner 必须产生一个等效的结果
也就是说,对于任意一个部分累计运算的结果p,累计运算或者组合调用的结果,p必须等于 combiner.apply(p, supplier.get()).
而且,无论计算是否分割,它必须产生一个等价的结果。对于任何输入元素t1和t2,下面计算的结果r1和r2必须是等价的
在这里,等价通常指的是Object.equals(Object).。但在某些情况下,等价性的要求可能会降低
并行执行操作可能实际上会产生反效果。这是因为组合步骤(通过键将一个Map合并到另一个Map)对于某些Map实现来说可能代价很大
然而,假设在这个reduce中使用的结果容器是一个可修改的集合——例如ConcurrentHashMap。在这种情况下,对迭代累计运算器的并行调用实际上可以将它们的结果并发地放到相同的共享结果容器中,从而将不再需要组合器合并不同的结果容器。这可能会促进并行执行性能的提升。我们称之为并行reduce
支持并发reduce的收集器以Collector.Characteristics.CONCURRENT characteristic特性为标志。并发特性。然而,并发集合也有缺点。
如果多个线程将结果并发地存入一个共享容器,那么产生结果的顺序是不确定的。
因此,只有在排序对正在处理的流不重要的情况下,才可能执行并发的reduce
下面这些条件下 Stream.collect(Collector) 的实现会并发reduce(归约)
- 流是并行的;
- 收集器有Collector.Characteristics.CONCURRENT 特性
- 要么是无序的流,要么收集器拥有Collector.Characteristics.UNORDERED 特性
您可以通过使用BaseStream.unordered()方法来确保流是无序的。例如:
(Collectors.groupingByConcurrent(java.util.function.Function<? super T, ? extends K>) 等同于 groupingBy).
如果我们把这个问题扩大到四项,就可以看到这种结合性对于并行的重要性
这样我们就可以把(a op b) 和 (c op d) 进行并行计算 最后在对他们进行 op 运算
结合性操作的例子包括数字加法、min、max和字符串串联
Low-level stream construction 低级流构造器
到目前为止,所有的流示例都使用了Collection.stream()或Arrays.stream(Object)等方法来获得一个stream。这些处理流的方法是如何实现的?
类StreamSupport提供了许多用于创建流的低级方法,所有这些方法都使用某种形式的Spliterator。
一个Spliterator是迭代器的一个并行版本;
它描述了一个(可能是无限的)元素集合,支持顺序前进、批量遍历,并将一部分输入分割成另一个可并行处理的Spliterator。
在最低层,所有的流都由一个Spliterator驱动构造
在实现Spliterator时,有许多实现选择,几乎所有的实现都是在简单的实现和运行时性能之间进行权衡。
创建Spliterator的最简单、但最不高性能的方法是,使用 Spliterators.spliteratorUnknownSize(java.util.Iterator, int)从一个iterator中创建spliterator 。
虽然这样的spliterator 可以工作,但它可能会提供糟糕的并行性能,因为我们已经丢失了容量信息(底层数据集有多大),以及被限制为一个简单的分割算法。
一个高质量的spliterator 将提供平衡的和已知大小的分割,精确的容量信息,以及一些可用于实现优化执行的spliterator 或数据的其他特征 (特征见spliterator characteristics)
可变数据源的Spliterators 有一个额外的挑战;绑定到数据的时间,因为数据可能在创建Spliterators 后和开始执行流管道的期间,发生变化。
理想情况下,一个流的spliterator将报告一个IMMUTABLE or CONCURRENT;如果不是,应该是后期绑定(late-binding)。
如果一个源不能直接提供一个推荐的spliterator,它可能会通过Supplier 间接地提供一个spliterator,并通过接收Supplier作为参数的stream()版本构造一个stream。只有在流管道的终端操作之后,才从Supplier处获得spliterator
这些要求极大地减少了流源的变化和流管道的执行之间的潜在的干扰。
基于具有所需特性的spliterators ,或者使用 Supplier-based 的工厂的形式的流,在终端操作开始之前对数据源的修改是不受影响的(如果流操作的行为参数满足不干涉和无状态的要求标准)。参见不干涉 Non-Interference的细节。
在这里,如果映射操作是并行执行的,那么相同输入的结果可能因线程调度差异而变化,而对于无状态lambda表达式,结果总是相同的
还要注意的是,试图从行为参数访问可变状态时,在安全性和性能方面是您一个错误的选择;
如果你不同步访问那个状态,你就有了数据竞争,因此你的代码可能出现问题,
但是如果你对那个状态进行同步访问,你就有可能会破坏你想要从并行性中得到的受益。
最好的方法是在流操作中完全地避免有状态的行为参数; 通常总会有种方法可以重构流以避免状态性
Side-effects副作用
一般来说,对流操作的行为参数的副作用是不鼓励的,因为它们通常会导致不知情的违反无状态要求的行为,以及其他线程安全隐患
如果行为参数确实有副作用,除非显式地声明,否则就无法保证这些副作用对其他线程的可见性,也不能保证在同一条管道内的“相同”元素上的不同操作在相同的线程中执行。此外,这些影响的排序可能出乎意料。即使管道被限制生成一个与stream源的处理顺序一致的结果(例如,IntStream.range(0,5).parallel().map(x -> x*2).toArray() 必须生成0、2、4、6、8),对于将mapper函数应用于个别元素的顺序,或者对于给定元素执行任何行为参数的顺序,都没有保证
对许多可能会被尝试使用于副作用的计算中,可以替换为无副作用的,更安全更有效的表达,比如使用归约而不是可变的累积器。
然而,使用println()来进行调试的副作用通常是无害的。少部分的流操作,如forEach()和peek(),用的就是他们的副作用;这些应该小心使用。
下面的例子演示,如何从一个使用副作用的计算转变为不适用副作用
下面的代码搜索一个字符串流,以匹配给定的正则表达式,并将匹配放在列表中
这段代码不必要地使用了副作用。如果并行执行,ArrayList的非线程安全将导致不正确的结果,并且添加所需的同步将导致竞争,从而破坏并行性的好处。
此外,在这里使用副作用是完全没有必要的;forEach()可以简单地被替换为更安全、更高效、更适合并行化的reduce操作。
然而,我们有充分的理由倾向于reduce操作,而不是像上面这样的迭代累计运算。
它不仅是一个“更抽象的”——它在流上把流作为一个整体运行而不是作用于单独的元素——但是一个适当构造的reduce操作本质上是可并行的,只要用于处理元素的函数(s)是结合的associative和无状态stateless的。举个例子,给定一个数字流,我们想要找到和,我们可以写:
几乎不需要怎么修改,就可以以并行的方式运行
之所以归约操作可以很好地并行,是因为实现可以并行地操作数据的子集,然后将中间结果组合在一起,得到最终的正确答案。(即使该语言有一个“"parallel for-each"”构造,迭代累计运算方法仍然需要开发人员提供对共享累积变量sum的线程安全更新以及所需的同步,这可能会消除并行性带来的任何性能收益。)
使用reduce()代替了归约操作的并行化的所有负担,并且库可以提供一个高效的并行实现,不需要额外的同步
前面展示的“widgets”示例展示了如何与其他操作相结合,以替换for循环。
如果widgets 是Widget 对象的集合,它有一个getWeight方法,我们可以找到最重的widget:
??
在更通用的形式中 对类型为T的元素,并且返回结果类型为U的reduce操作 需要三个参数:
在这里,identity不仅仅是归约的初始化结果值或者如果没有任何元素时的一个默认的返回值
迭代累计运算器接受部分结果和下一个元素,并产生一个新的中间结果。
组合函数结合了两个部分结果,产生了一个新的中间结果。
(在并行减少的情况下,组合是必要的,在这个过程中,输入被分区,每个分区都计算出部分的累积,然后将部分结果组合起来产生最终的结果。)
更准确地说,identity必须是组合函数的恒等式。这意味着对所有的u,combiner.apply(identity, u)等于u,
另外,组合函数必须是结合的,必须与累加器函数兼容:
对所有u和t,
combiner.apply(identity, u) 必须等于accumulator.apply(u, t).
三参数形式是双参数形式的泛化,将映射步骤合并到累加步骤中。
我们可以用更一般的形式重新改写这个简单的widgets重量的例子
尽管显式的map-reduce的形式更易于阅读,因此通常应该优先考虑。
通用的形式是为了 通过将映射和减少到单个函数,以重要的工作进行优化 这种场景
我们会得到想要的结果,它甚至可以并行工作,然而,但是我们可能对性能不满意
这样的实现将会进行大量的字符串复制 时间复杂度O(n^2)
一种更有效的方法是将结果累积到StringBuilder中,这是一个用于累积字符串的可变容器
就如同我们对普通的归约操作处理一样,我们可以使用相同的技术来处理可变的归约
可变归约操作称为collect()当它将期望的结果收集到一个结果容器中,例如一个集合
收集操作需要三个功能:
一个supplier 功能来构造结果容器的新实例,
一个累计运算器函数将一个输入元素合并到一个结果容器中,
一个组合函数将一个结果容器的内容合并到另一个结果容器中。
它的形式与普通归约的一般形式非常相似
与reduce()相比,以这种抽象的方式表示收集的好处是它直接适合并行化:
我们可以并行地累计运算部分结果,然后将它们组合起来,只要积累和组合功能满足适当的需求。
例如,为了收集流中的元素的字符串表示到ArrayList,我们可以编写显式的for循环
或者我们可以使用一个可并行的collect形式
或者,从累加器函数中提取出来map操作,我们可以更简洁地表达它:
在这里,我们的supplier只是ArrayList的构造器,累加器将string element元素添加到ArrayList中,组合器简单地使用addAll将字符串从一个容器复制到另一个容器中
collect的三个部分——supplier, accumulator, 和combiner ——是紧密耦合的。
我们可以使用Collector来抽象的表达描述这三部分。
上面的例子可以将字符串collect到列表中,可以使用一个标准收集器来重写:
将可变的归约打包成收集器有另一个优点:可组合性。
类Collectors包含许多用于收集器的预定义工厂,包括将一个收集器转换为另一个收集器的组合器。
例如,假设我们有一个Collector,它计算员工流的薪水之和,如下所列
(对于第二个类型的参数 ? ,仅仅表明我们不关心收集器所使用的中间类型。 )如果我们想要创建一个收集器来按部门计算工资的总和,我们可以使用groupingBy来重用summingSalaries 薪水:
就像常规的reduce操作一样,只有满足适当的条件collect() 操作才能够并行化
对于任何部分累计运算的结果,将其与空结果容器相结合combiner 必须产生一个等效的结果
也就是说,对于任意一个部分累计运算的结果p,累计运算或者组合调用的结果,p必须等于 combiner.apply(p, supplier.get()).
而且,无论计算是否分割,它必须产生一个等价的结果。对于任何输入元素t1和t2,下面计算的结果r1和r2必须是等价的
在这里,等价通常指的是Object.equals(Object).。但在某些情况下,等价性的要求可能会降低
并行执行操作可能实际上会产生反效果。这是因为组合步骤(通过键将一个Map合并到另一个Map)对于某些Map实现来说可能代价很大
然而,假设在这个reduce中使用的结果容器是一个可修改的集合——例如ConcurrentHashMap。在这种情况下,对迭代累计运算器的并行调用实际上可以将它们的结果并发地放到相同的共享结果容器中,从而将不再需要组合器合并不同的结果容器。这可能会促进并行执行性能的提升。我们称之为并行reduce
支持并发reduce的收集器以Collector.Characteristics.CONCURRENT characteristic特性为标志。并发特性。然而,并发集合也有缺点。
如果多个线程将结果并发地存入一个共享容器,那么产生结果的顺序是不确定的。
因此,只有在排序对正在处理的流不重要的情况下,才可能执行并发的reduce
下面这些条件下 Stream.collect(Collector) 的实现会并发reduce(归约)
- 流是并行的;
- 收集器有Collector.Characteristics.CONCURRENT 特性
- 要么是无序的流,要么收集器拥有Collector.Characteristics.UNORDERED 特性
您可以通过使用BaseStream.unordered()方法来确保流是无序的。例如:
(Collectors.groupingByConcurrent(java.util.function.Function<? super T, ? extends K>) 等同于 groupingBy).
如果我们把这个问题扩大到四项,就可以看到这种结合性对于并行的重要性
这样我们就可以把(a op b) 和 (c op d) 进行并行计算 最后在对他们进行 op 运算
结合性操作的例子包括数字加法、min、max和字符串串联
Low-level stream construction 低级流构造器
到目前为止,所有的流示例都使用了Collection.stream()或Arrays.stream(Object)等方法来获得一个stream。这些处理流的方法是如何实现的?
类StreamSupport提供了许多用于创建流的低级方法,所有这些方法都使用某种形式的Spliterator。
一个Spliterator是迭代器的一个并行版本;
它描述了一个(可能是无限的)元素集合,支持顺序前进、批量遍历,并将一部分输入分割成另一个可并行处理的Spliterator。
在最低层,所有的流都由一个Spliterator驱动构造
在实现Spliterator时,有许多实现选择,几乎所有的实现都是在简单的实现和运行时性能之间进行权衡。
创建Spliterator的最简单、但最不高性能的方法是,使用 Spliterators.spliteratorUnknownSize(java.util.Iterator, int)从一个iterator中创建spliterator 。
虽然这样的spliterator 可以工作,但它可能会提供糟糕的并行性能,因为我们已经丢失了容量信息(底层数据集有多大),以及被限制为一个简单的分割算法。
一个高质量的spliterator 将提供平衡的和已知大小的分割,精确的容量信息,以及一些可用于实现优化执行的spliterator 或数据的其他特征 (特征见spliterator characteristics)
可变数据源的Spliterators 有一个额外的挑战;绑定到数据的时间,因为数据可能在创建Spliterators 后和开始执行流管道的期间,发生变化。
理想情况下,一个流的spliterator将报告一个IMMUTABLE or CONCURRENT;如果不是,应该是后期绑定(late-binding)。
如果一个源不能直接提供一个推荐的spliterator,它可能会通过Supplier 间接地提供一个spliterator,并通过接收Supplier作为参数的stream()版本构造一个stream。只有在流管道的终端操作之后,才从Supplier处获得spliterator
这些要求极大地减少了流源的变化和流管道的执行之间的潜在的干扰。
基于具有所需特性的spliterators ,或者使用 Supplier-based 的工厂的形式的流,在终端操作开始之前对数据源的修改是不受影响的(如果流操作的行为参数满足不干涉和无状态的要求标准)。参见不干涉 Non-Interference的细节。
我们会得到想要的结果,它甚至可以并行工作,然而,但是我们可能对性能不满意
这样的实现将会进行大量的字符串复制 时间复杂度O(n^2)
一种更有效的方法是将结果累积到StringBuilder中,这是一个用于累积字符串的可变容器
就如同我们对普通的归约操作处理一样,我们可以使用相同的技术来处理可变的归约
可变归约操作称为collect()当它将期望的结果收集到一个结果容器中,例如一个集合
收集操作需要三个功能:
一个supplier 功能来构造结果容器的新实例,
一个累计运算器函数将一个输入元素合并到一个结果容器中,
一个组合函数将一个结果容器的内容合并到另一个结果容器中。
它的形式与普通归约的一般形式非常相似
与reduce()相比,以这种抽象的方式表示收集的好处是它直接适合并行化:
我们可以并行地累计运算部分结果,然后将它们组合起来,只要积累和组合功能满足适当的需求。
例如,为了收集流中的元素的字符串表示到ArrayList,我们可以编写显式的for循环
或者我们可以使用一个可并行的collect形式
或者,从累加器函数中提取出来map操作,我们可以更简洁地表达它:
在这里,我们的supplier只是ArrayList的构造器,累加器将string element元素添加到ArrayList中,组合器简单地使用addAll将字符串从一个容器复制到另一个容器中
collect的三个部分——supplier, accumulator, 和combiner ——是紧密耦合的。
我们可以使用Collector来抽象的表达描述这三部分。
上面的例子可以将字符串collect到列表中,可以使用一个标准收集器来重写:
将可变的归约打包成收集器有另一个优点:可组合性。
类Collectors包含许多用于收集器的预定义工厂,包括将一个收集器转换为另一个收集器的组合器。
例如,假设我们有一个Collector,它计算员工流的薪水之和,如下所列
(对于第二个类型的参数 ? ,仅仅表明我们不关心收集器所使用的中间类型。 )如果我们想要创建一个收集器来按部门计算工资的总和,我们可以使用groupingBy来重用summingSalaries 薪水:
就像常规的reduce操作一样,只有满足适当的条件collect() 操作才能够并行化
对于任何部分累计运算的结果,将其与空结果容器相结合combiner 必须产生一个等效的结果
也就是说,对于任意一个部分累计运算的结果p,累计运算或者组合调用的结果,p必须等于 combiner.apply(p, supplier.get()).
而且,无论计算是否分割,它必须产生一个等价的结果。对于任何输入元素t1和t2,下面计算的结果r1和r2必须是等价的
在这里,等价通常指的是Object.equals(Object).。但在某些情况下,等价性的要求可能会降低
并行执行操作可能实际上会产生反效果。这是因为组合步骤(通过键将一个Map合并到另一个Map)对于某些Map实现来说可能代价很大
然而,假设在这个reduce中使用的结果容器是一个可修改的集合——例如ConcurrentHashMap。在这种情况下,对迭代累计运算器的并行调用实际上可以将它们的结果并发地放到相同的共享结果容器中,从而将不再需要组合器合并不同的结果容器。这可能会促进并行执行性能的提升。我们称之为并行reduce
支持并发reduce的收集器以Collector.Characteristics.CONCURRENT characteristic特性为标志。并发特性。然而,并发集合也有缺点。
如果多个线程将结果并发地存入一个共享容器,那么产生结果的顺序是不确定的。
因此,只有在排序对正在处理的流不重要的情况下,才可能执行并发的reduce
下面这些条件下 Stream.collect(Collector) 的实现会并发reduce(归约)
- 流是并行的;
- 收集器有Collector.Characteristics.CONCURRENT 特性
- 要么是无序的流,要么收集器拥有Collector.Characteristics.UNORDERED 特性
您可以通过使用BaseStream.unordered()方法来确保流是无序的。例如:
(Collectors.groupingByConcurrent(java.util.function.Function<? super T, ? extends K>) 等同于 groupingBy).
如果我们把这个问题扩大到四项,就可以看到这种结合性对于并行的重要性
这样我们就可以把(a op b) 和 (c op d) 进行并行计算 最后在对他们进行 op 运算
结合性操作的例子包括数字加法、min、max和字符串串联
Low-level stream construction 低级流构造器
到目前为止,所有的流示例都使用了Collection.stream()或Arrays.stream(Object)等方法来获得一个stream。这些处理流的方法是如何实现的?
类StreamSupport提供了许多用于创建流的低级方法,所有这些方法都使用某种形式的Spliterator。
一个Spliterator是迭代器的一个并行版本;
它描述了一个(可能是无限的)元素集合,支持顺序前进、批量遍历,并将一部分输入分割成另一个可并行处理的Spliterator。
在最低层,所有的流都由一个Spliterator驱动构造
在实现Spliterator时,有许多实现选择,几乎所有的实现都是在简单的实现和运行时性能之间进行权衡。
创建Spliterator的最简单、但最不高性能的方法是,使用 Spliterators.spliteratorUnknownSize(java.util.Iterator, int)从一个iterator中创建spliterator 。
虽然这样的spliterator 可以工作,但它可能会提供糟糕的并行性能,因为我们已经丢失了容量信息(底层数据集有多大),以及被限制为一个简单的分割算法。
一个高质量的spliterator 将提供平衡的和已知大小的分割,精确的容量信息,以及一些可用于实现优化执行的spliterator 或数据的其他特征 (特征见spliterator characteristics)
可变数据源的Spliterators 有一个额外的挑战;绑定到数据的时间,因为数据可能在创建Spliterators 后和开始执行流管道的期间,发生变化。
理想情况下,一个流的spliterator将报告一个IMMUTABLE or CONCURRENT;如果不是,应该是后期绑定(late-binding)。
如果一个源不能直接提供一个推荐的spliterator,它可能会通过Supplier 间接地提供一个spliterator,并通过接收Supplier作为参数的stream()版本构造一个stream。只有在流管道的终端操作之后,才从Supplier处获得spliterator
这些要求极大地减少了流源的变化和流管道的执行之间的潜在的干扰。
基于具有所需特性的spliterators ,或者使用 Supplier-based 的工厂的形式的流,在终端操作开始之前对数据源的修改是不受影响的(如果流操作的行为参数满足不干涉和无状态的要求标准)。参见不干涉 Non-Interference的细节。
- 流是并行的;
- 收集器有Collector.Characteristics.CONCURRENT 特性
- 要么是无序的流,要么收集器拥有Collector.Characteristics.UNORDERED 特性