[三]java8 函数式编程Stream 概念深入理解 Stream 运行原理 Stream设计思路


Stream的概念定义

  官方文档是永远的圣经~    
表格内容来自https://docs.oracle.com/javase/8/docs/api/   Package java.util.stream  一节部分原文内容的翻译
 
int sum = widgets.stream()
.filter(b -> b.getColor() == RED)
.mapToInt(b -> b.getWeight())
.sum();
  流操作被划分为中间和终端操作,并组合成流管道。 一条Stream管道由一个源(如一个集合、一个数组、一个生成器函数或一个i/o通道)组成; 然后是零个或更多的中间操作,例如Stream.filter或者 Stream.map 还有一个终端操作,Stream.forEach or Stream.reduce   中间操作返回一条新流,他们总是惰性的; 执行诸如filter()之类的中间操作实际上并不会立即执行任何过滤操作,而是创建了一个新流,当遍历时,它包含与给定谓词相匹配的初始流的元素。直到管道的终端操作被执行,管道源的遍历才会开始   终端操作,例如Stream.forEach 和 IntStream.sum,可以遍历流以产生结果或副作用。 在执行终端操作之后,流管道被认为是被消耗掉的,并且不能再被使用; 如果您需要再次遍历相同的数据源,您必须返回到数据源以获得一条新的stream。 在几乎所有情况下,终端操作都很迫切,在返回之前完成了数据源的遍历和管道的处理。只有终端操作iterator() 和 spliterator() 不是;这些都是作为一个“逃生舱口”提供的,以便在现有操作不足以完成任务的情况下启用任意客户控制的管道遍历(个人理解就是如果流不足以提供处理可以让你自行遍历处理)   延迟处理流可以显著提高效率; 在像上面的filer-map-sum例子这样的管道中,过滤、映射和求和可以被融合到数据的单个传递中,并且具有最小的中间状态。 惰性还允许在没有必要的情况下避免检查所有数据;对于诸如“查找第一个超过1000个字符的字符串”这样的操作,只需要检查足够的字符串,就可以找到具有所需特征的字符串,而不需要检查源的所有字符串。(当输入流是无限的而不仅仅是大的时候,这种行为就变得更加重要了。)   中间操作被进一步划分为无状态和有状态操作。 无状态操作,如filter和map,在处理新元素时不保留以前处理的元素的状态——每个元素都可以独立于其他元素的操作处理。有状态的操作,例如distinct和sorted,可以在处理新元素时从先前看到处理的元素中合并状态。   有状态操作可能需要在产生结果之前处理整个输入。 例如,一个人不能从排序流中产生任何结果,直到一个人看到了流的所有元素。 因此,在并行计算下,一些包含有状态中间操作的管道可能需要对数据进行多次传递,或者可能需要缓冲重要数据。包含完全无状态的中间操作的管道可以在单次传递过程中进行处理,无论是顺序的还是并行的,只有最少的数据缓冲   此外,一些操作被认为是短路操作。一个中间操作,如果在提供无限流输入时,它可能会产生一个有限的流,那么他就是短路的。如果在无限流作为输入时,它可能在有限的时间内终止,这个终端操作是短路的。 在管道中进行短路操作是处理无限流在有限时间内正常终止的必要条件,但不是充分条件
这些流的方法是如何实现的?
类StreamSupport提供了许多用于创建流的低级方法,所有这些方法都使用某种形式的Spliterator.
一个Spliterator.是迭代器Iterator的并行版本
它描述了一个(可能是无限的)元素集合,支持顺序前进、批量遍历,并将一部分输入分割成另一个可并行处理的Spliterator 在最低层,所有的流都由一个spliterator 构造(所以说流就是迭代器的一种高级形式)

在实现Spliterator时,有许多实现选择,几乎所有的实现都是在简单的实现和使用Spliterator流的运行时性能之间进行权衡。创建Spliterator的最简单、但最不高性能的方法是,使用Spliterators.spliteratorUnknownSize(java.util.Iterator, int).虽然这样的Spliterator可以工作,但它很可能提供糟糕的并行性能,因为我们已经丢失了尺寸信息(底层数据集有多大),并且被限制为一个简单的分割算法。

一个高质量的Spliterator将提供平衡的和知道大小的分割,精确的尺寸信息,以及一些可用于实现优化执行的spliterator 或数据的 characteristics (见Spliterator int characteristics()  )

可变数据源的Spliterators有一个额外的挑战;
绑定到数据的时间,因为数据可能在创建spliterator的时间和执行流管道的时间之间发生变化。理想情况下,一个流的spliterator 应该报告一个characteristic of IMMUTABLE or CONCURRENT;
如果不是,应该是后期绑定。如果一个源不能直接提供一个推荐的spliterator,它可能会间接地通过Supplier提供一个spliterator,通过接收Supplier作为参数的stream方法构建一个流
public static  Stream stream(Supplier<? extends Spliterator> supplier,
                                   int characteristics,
                                   boolean parallel)
只有在流管道的终端操作开始后,才从supplier处获  
    Stream  文档概要译文:
支持顺序和并行聚合操作的一组元素序列
除了Stream 还有专门为原始类型特殊化的IntStream、LongStream和double Stream  所有这些都被称为“流”
集合和流,虽然表面上有一些相似性,但有不同的设计目的 集合主要关注的是对其元素的有效管理和访问 相比之下,流并没有提供直接访问或操纵其元素的方法,而是关注于声明性地描述它们的源和计算操作,这些操作将在该源上进行聚合。 但是,如果所提供的流操作没有提供所需的功能,那么 BaseStream.iterator() 和 BaseStream.spliterator() 操作可以用来执行受控的遍历   示例:
widgets 是 Collection
int sum = widgets.stream()
.filter(w -> w.getColor() == RED)
.mapToInt(w -> w.getWeight())
.sum();
像上面的“widgets”示例一样,流管道可以看作是在流的数据源上进行的查询。   除非源代码是为并发修改而显式设计的(例如ConcurrentHashMap),否则在查询时 修改流的源 可能导致不可预测或错误的行为。   大多数流操作都接受描述用户指定行为的参数,比如在上面的例子中传递给mapToInt的lambda表达式w-w.getweight()。 为了保持正确的行为,这些行为参数:         必须是非干扰(也就是它们不修改流源);         在大多数情况下,必须是无状态的(它们的结果不应该依赖于任何在流水线执行过程中可能发生变化的状态) 这些参数通常是函数接口的实例,例如Function,一般是lambda表达式或方法引用。除非另有说明,这些参数必须是非空的。   一个流应该只运行一次(调用中间操作或结束操作)。这就排除了比如“forked”流,在这些流中,相同的源提供两个或更多的管道,或者同一流的多个遍历。 一个流实现可能会抛出IllegalStateException 异常,如果它检测到流正在被重用。 然而,由于某些流操作可能返回它们的接收者而不是一个新的stream对象,所以并不能在所有情况下都检测到重用。   Streams有一个BaseStream.close()方法并实现AutoCloseable,但是几乎所有的stream实例在使用后实际上并不需要关闭。 通常,只有源是IO通道的流(比Files.lines(Path,Charset))将需要关闭。 大多数流都是由集合、数组或生成函数支持的,这些功能不需要特殊的资源管理。(如果流确实需要关闭,它可以在try-with-resources语句中声明为资源。)   流管道可以按顺序或并行执行 ,这种执行模式是流的属性。 流的类型是创建初始时选择通过顺序或并行操作执行来决定的。(例如,Collections.stream()创建了一个顺序流,而Collection.parallelStream()创建了一个并行的流。) 这种执行模式的选择可以由BaseStream.sequential() 或BaseStream.parallel()方法修改,并且可以使用BaseStream.isParallel() 方法查询。
   

流的基本特点

 
集合是对一组特定类型的元素值序列提供的接口  是数据结构,提供了元素的存取 流也是对一组特定类型元素值序列提供的接口,在于计算,提供了对元素序列的操作计算方式 比如 filter map等
流只能运行一次
流由源 0个或者多个中间操作以及结束操作组成
流操作的方法基本上是函数式接口的实例
流的中间操作是惰性的并不会立即执行 这更有利于内部迭代的优化
流借助于它内部迭代特性提供了声明式的编程方式 更急简洁
中间操作本身会返回一个流,可以将多个操作复合叠加,形成一个更大的流水线
流分为顺序和并行两种方式
 

流与集合主要区别

  不存储数据 ????????流不是存储元素的数据结构;相反,它通过一个计算操作的管道,从一个数据源,如数据结构、数组、生成器函数或i/o通道中传递元素 函数特性 ????????一个流上的操作产生一个结果,但是不会修改它的源。例如,过滤集合 获得的流会产生一个没有被过滤元素的新流,而不是从源集合中删除元素 延迟搜索 ????????许多流操作,如过滤、映射或重复删除,都可以延迟实现,从而提供出优化的机会。 ????????例如,“找到带有三个连续元音的第一个字符串”不需要检查所有的输入字符串。 ????????流操作分为中间(流生成)操作和终端(值或副作用生成)操作。许多的中间操作, 如filter,map等,都是延迟执行。        中间操作总是lazy的。 Stream可能是无界的 ????????虽然集合的大小是有限的,但流不需要。诸如limit(n)或findFirst()这样的短路操作可以允许在有限时间内完成无限流的计算。 消耗的 ????????流的元素只在流的生命周期中访问一次。就像迭代器一样,必须生成一个新的流来重新访问源的相同元素    
可以把流当做一个高级的迭代器Iterator ,内部有它自身运行逻辑的迭代器 你只需要告诉他你想要做什么,他自己就会自动的去迭代筛选组织你想要的数据
     

Stream的生成

  目前在java中 集合框架与Stream的结合最多 因为Stream 是对数据项的计算,而集合恰恰是用来存储数据项的数据结构 你当然可以使用其他的数据项表示形式

方法列表

  • collection.stream   parallelStream  实例方法 转换
  • Arrays.stream(Object[]) 静态方法 转换
  • Stream类的静态工厂方法 比如  Stream.of(Object[]), IntStream.range(int, int), Stream.iterate(Object, UnaryOperator)   Stream.generate   
  • BufferedReader.lines(); 文件行
  • Files类的获取文件路径列表: find(), lines(), list(), walk();
  • Random.ints()  随机数流
  • JDK中的许多其他流载方法,包括BitSet.stream(), Pattern.splitAsStream(java.lang.CharSequence), and JarFile.stream().
 

常用创建方法

 
Stream.of( )      Stream类静态方法     
转换XXX为Stream
 
 
Collection stream   集合转换为Stream
特   别   注   意:这是一个default方法,也就意味着如果没有特别处理,所有Collection子类都具有
 
Arrays.stream();     数组转换为Stream
   
Stream.iterate   Stream类静态方法
迭代器的形式,创建一个数据流
    比如
Stream.generate 
  比如      

Stream类体系结构与流水线设计思路

BaseStream规定了流的基本接口 Stream中定义了map、filter、flatmap等用户关注的常用操作; Int~ Long~ Double~是针对于基本类型的特化 方法与Stream中大致对应,当然也有一些差别 BaseStream Stream IntStream LongStream DoubleStream 组建了Java的流体系根基 他们都只是接口
PipelineHelper主要用于Stream执行过程中相关结构的构建
ReferencePipeline  和 AbstractPipeline 完成了Stream的主要实现
AbstractPipeline类实现了所有的 Stream的中间操作和最终操作
[Int | Long | Double] pipeline 同理类似ReferencePipeline只不过是针对基本类型



       
Head、StatelessOp、StatefulOp为ReferencePipeline中的内部类
[Int | Long | Double]Pipeline 内部也都是定义了这三个内部类
  说到这我们已经可以清晰地知道Stream的实现类    

流的族谱

    回头看一下获取Stream的方式 比如Collection中调用StreamSupport.stream 返回的是ReferencePipline.Head   再比如 Stream的of方法 也还是调用了StreamSupport.stream方法 他返回的还是ReferencePipline.Head  
你会发现流的生成转换创建都是使用StreamSupport StreamSupport 是用于创建和操纵Stream的低级工具方法 除了构造方法,每个方法都是返回他们对应的Head ReferencePipeline.Head / DoublePipeline.Head / IntPipeline.Head /  LongPipeline.Head 
    Stream的操作一般都有三部分构成
数据源
操作( filter map.....)
回调方法(Lambda匿名函数 方法引用)
 

 Stream  使用stage来对操作进行抽象描述

    可以看下Head  StatefulOp  StatelessOp 的源码注释就可以知道了 .分别是 :
Source stage of a ReferencePipeline.  Base class for a stateful intermediate stage of a Stream. Base class for a stateless intermediate stage of a Stream.
原始类型特化出来的也是一样 我们知道Head  StatefulOp  StatelessOp是     ReferencePipeline类型的 ([Int | Long | Double]Pipeline).他们也都是Stream 所以说每个操作其实也都是Stream 现在也就可以明白为什么创建转换生成的流都是Head 了 因为它用来抽象描述 源阶段也就是初始阶段   再回头看下AbstractPipeline的概念 “管道”类的抽象基类,它们是流接口及其原始专门化的核心实现。管理管道的建设和评估审查 AbstractPipeline表示一个流管道的初始部分,封装了一个流源和零个或多个中间操作。 单独的AbstractPipeline对象通常用来表示阶段,其中每个阶段描述的是流源或中间操作。   一个具体的中间阶段通常是由AbstractPipeline 构建的--一个特定于操作的具体类,他继承于AbstractPipeline的子类的比如IntPipeline
说白了也就是上面说到过的Head  StatefulOp  StatelessOp  他们本身也是AbstractPipeline类型的
AbstractPipeline包含了评估管道的大部分机制,并实现了操作所使用的方法; 特定类型的类添加了助手方法,用于处理将结果收集到适当的特定类型的容器中。  

那么AbstractPipeline 是怎么构造管道的呢?

  先说Head  这是创建生成流的时候返回的对象 可以看得出来 Head的构造也还是调用的AbstractPipeline 仔细看下他的属性域
private final AbstractPipeline sourceStage;  //反向链接管道链的head,也就是说每个管道节点都有一个头 private final AbstractPipeline previousStage;//指向上一个 private AbstractPipeline nextStage;//指向下一个 赤裸裸的双向链表
    首先是构造,最终调用AbstractPipeline的构造方法 然后当前节点作为源 上一个为null  下一个为null   然后再看下filter和map方法
Stream中将操作抽象化为stage 每个stage 也就是一个AbstractPipeline 每个stage 相当于一个双向链表的节点  ,每个节点都保存Head然后保存着上一个和下一个 这个双向链表就构成了整个流水线 (上面的图看起来next一直是null 是在每个处理后的this里面的previousStage里面 上一个的next是当前)  
    每个操作向操作一样组合成为双向链表 链表将每个操作流水线化,但是每个操作具体的行为是什么? 那么,每个操作的具体细节又是什么样子的呢?  

Sink是什么?

sink就是每个操作具体的行为操作,也可以叫做回调   sink是Consumer的扩展,用于流管道中的多个操作阶段中进行数据的监管 通过额外的方法来管理大小信息 流控等 调用accept前需要调用begin通知数据到来,数据发送完成后需要调用end,再次调用accept前必须再次调用begin  一个sink有两种状态,初始/激活 开始时是初始状态,begin 激活 ,  end使之回到初始状态,可以重复利用  accept只能在激活状态使用   Sink用于协调相邻的stage之间的数据调用 通过begin end accept方法 以及cancellationRequested短路标志位来控制处理流程,对数据进行管控     再次回头看看filter的代码(理解这个过程需要了解闭包 回调的概念)       每个操作 statelessOp 或者statefulOp 都是一个AbstractPipeline  也都是stream    也就是流水线的一个抽象stage阶段 AbstractPipeline 他提供了一个opWrapSink方法 statelessOp 或者statefulOp   重写了这个方法 返回了一个Sink对象    (Chained[Reference | Int | Long | Double] 其实都是Sink) 这个对象的accept方法  调用 参数 predicate的test方法  其实也就是调用Lambda 行为化的参数   这一步相当于封装了当前stage的回调函数 并且他还调用了downstream.accept()方法  

downstream  是什么?

他就是你传递进去的那个参数 sink 也就是说 statelessOp 或者statefulOp 是流水线 stage的抽象 他有一个opWrapSink()方法,他其中一个参数是Sink 这个方法本身返回一个Sink  sink的accept方法封装了回调函数 也就是当前操作阶段的行为 然后他还会调用参数sink的accept方法 试想,如果传递过来的是下一个操作阶段的sink呢? 思考下上面的这个调用会有什么效果 AbstractPipeline中有一个方法 wrapSink来自于PipelineHelper 从最后一个开始,按照深度进行 现在流水线上从开始到结束的所有的操作都被包装到了一个Sink里    

Stream运行流程原理

 
Stream体系是一组接口家族,AbstractPipeline 是接口的实现,PipelineHelper 是管道的辅助类,StreamSupport是流的低级工具类   使用stage来抽象流水线上的每个操作 其实每个stage就是一个stream 也就是AbstractPipeline几个子类的  内部子类  Head StatelessOp statefulOp StreamSupport用于创建生成Stream 对应的是Head类 其他的中间操作分为有状态和无状态的,中间操作通过方法比如 filter map 等返回的是StatelessOp  或者 statefulOp  多个stage组合称为双向链表的形式 从而成了整个流水线   有了流水线,相邻两个操作阶段之间如何协调运算? 于是又有了sink的概念,又来协调相邻的stage之间计算运行 他的模式是begin  accept end 还有短路标记 他的accept就是封装了回调方法   所以说每个操作stage, StatelessOp  或者 statefulOp中又封装了Sink 通过AbstractPipeline提供的opWrapSink方法可以获取这个sink 调用这个sink的accept方法就可以调用当前操作的方法   那么如何串联起来呢?关键点在于opWrapSink方法 ,他接收一个Sink作为参数 在调用accept方法中  可以调用这个入参sink的accept方法 这样子从当前就能调用下一个,也就是说有了推动的动作 那么只需要找到开始,每个处理了之后都推动下一个,就顺序完成了所欲的操作了   注意上面说的操作都是中间操作,中间操作才会产生操作阶段  终端操作不会增加stage的个数了
     

Stream预置操作

 
中间操作  filter() flatMap() limit() map()  concat() distinct() peek() skip() sorted() parallel() sequential() unordered() 
flatMapTo[Double | Int | Long]  mapTo[ Double | Int | Long ]
结束操作 allMatch() anyMatch() collect() count() findAny() findFirst() forEach() forEachOrdered() max() min() noneMatch() reduce() toArray() 
  操作的参数基本上是函数式接口的实例---->也就是Lambda匿名函数   方法引用 所以说想要使用Stream预置的函数,只需要了解清楚对应的函数式接口即可    

Stream 主要有四类接口

 
流到流之间的转换
filter(过滤), map(映射转换), mapTo[Int|Long|Double] (到基本类型流的转换),
flatMap(流展开合并),flatMapTo[Int|Long|Double],
sorted(排序),distinct(不重复值),peek(执行某种操作,流不变,可用于调试),
limit(限制到指定元素数量), skip(跳过若干元素) 
流到终值的转换
toArray(转为数组),reduce(推导结果),collect(聚合结果),
min(最小值), max(最大值), count (元素个数),
anyMatch (任一匹配), allMatch(所有都匹配), noneMatch(一个都不匹配)
findFirst(选择首元素),findAny(任选一元素)
直接遍历
forEach (不保证顺序遍历,比如并行流), forEachOrdered(顺序遍历)
构造流
empty (构造空流),of (单个元素的流及多元素顺序流)
iterate (无限长度的有序顺序流),generate (将数据提供器转换成无限非有序的顺序流),
concat (流的连接), Builder (用于构造流的Builder对象)

   

Stream常用API

 
filter 条件筛选   boolean test(T t); 
   
map   数据转换    R apply(T t); 新的值替换Stream中的值
mapTo[Int | Long | Double] 类似
       
flatMap  R apply(T t); 同map但是是多个流转换为一个流 返回值限定为Stream
其他flatMapTo[Int | Long | Double] 类似
     
生成流 iterate()
public static Stream iterate(final T seed, final UnaryOperator f)
UnaryOperator  是 Function的一元运算形式 操作单个的操作数,产生的结果同操作数类型,是Function转换数据--针对操作数和结果类型一致的一种特殊类型 比如 x->x+2  就是如此,操作数为x   转换为x+2

Stream.iterate(0, i -> i + 2)

 
生成流 generate()
public static Stream generate(Supplier s)
   

reduce  归约  原理

一个归约操作(也称为折叠)接受一系列的输入元素,并通过重复应用组合操作将它们组合成一个简单的结果 例如查找一组数字的总和或最大值,或者将元素累积到一个列表中。 流的类中有多种形式的通用归约操作,称为reduce()和collect(),以及多个专门化的简化形式,如sum()、max()或count()。 当然,这样的操作可以很容易地实现为简单的顺序循环,如下所示:
int sum = 0;
for (int x : numbers) {
sum += x;
}
  然而,我们有充分的理由倾向于减少操作,而不是像上面这样的累加运算。 它不仅是一个“更抽象的”——它在流上作为一个整体而不是单独的元素来运行——而且一个适当构造的reduce操作本质上是可并行的,只要用于处理元素的函数(s)是结合的和无状态的。举个例子,给定一个数字流,我们想要找到和,我们可以写:     int sum = numbers.stream().reduce(0, (x,y) -> x+y); 或者     int sum = numbers.stream().reduce(0, Integer::sum); 这些归约操作几乎不需要修改就可以并行运行     int sum = numbers.parallelStream().reduce(0, Integer::sum);   如果一个操作符或函数 op  满足    (a op b) op c == a op (b op c) ,那么他是结合的 结合性对于并行结算非常重要 比如      a op b op c op d == (a op b) op (c op d)     就可以并行计算  (a op b)    (c op d)   然后再处理他们  
  他是一个结束操作   三个重载版本   是Function的双参数版本 接受 T  U 转换为R 是BiFunction的特殊化形式,两个输入一个输出,三个参数类型相同  
Optional reduce(BinaryOperator accumulator) 参数accumulator:  累计计算器——结合两个值的结合性、非干扰、无状态函数 因为没有初始值 所以返回值为 Optional 计算1+2+3+4+5   相当于
boolean foundAny = false;
T result = null;
for (T element : this stream) {
if (!foundAny) {
foundAny = true;
result = element;
}
else
result = accumulator.apply(result, element);
}
return foundAny ? Optional.of(result) : Optional.empty();

 

如果不进入循环,那么foundAny就是false 直接返回 否则第一次循环给result赋值,此后foundAny 为true  result作为一个操作数调用accumulator.apply()   BinaryOperator 意味着两个操作数一个结果数 类型一样 这可不仅仅用于累加 还可以用来合并字符串 多种形式    

T reduce(T identity, BinaryOperator accumulator);
identity 可以理解为初始值, accumulator 同上


你会发现对于上面的例子,可以使用Integer::sum 替代其中的lambda

相当于
T result = identity;
for (T element : this stream)
result = accumulator.apply(result, element)
return result;
 
    
 U reduce(U identity,
                 BiFunctionsuper T, U> accumulator,
                 BinaryOperator combiner);
相当于:
U result = identity;
for (T element : this stream)
result = accumulator.apply(result, element)
return result;
看得出来和T reduce(T identity, BinaryOperator accumulator);  几乎是一样的 只不过Stream是T 返回是U   可能一样,可能不一样了 如果类型相同的话就是完全一样的了 第三个参数用于在并行计算下 合并各个线程的计算结果 所以要求      combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)     结果不同  是因为  ((((5+1)+2)+3)+4)+5   和   (5+1)+ (5+2)+ (5+3)+ (5+4)+ (5+5)  运算结果不相同     此处  挨个比较找到最大,和 使用8和每个数字比较然后在统一比较 的结果是相同的      
         

相关