@@ -70,9 +70,9 @@ for(String str : strings){
70
70
71
71
<img src =" ./Figures/Java_stream_pipeline_classes.png " width =" 400px " align =" right " hspace =" 10px " alt =" Java_stream_pipeline_classes " />
72
72
73
- 注意这里使用的是“* 操作(operation)* ”一词,指的是“Stream中间操作”的操作,很多Stream操作会需要一个回调函数(Lambda表达式),因此一个完整的操作是* < 数据来源,操作,回调函数> * 构成的三元组。Stream中使用Stage的概念来描述一个完整的操作,并用某种实例化后的* PipelineHelper* 来代表Stage,将具有先后顺序的各个Stage连到一起,就构成了整个流水线。跟Stream相关类和接口的继承关系图示。
73
+ 注意这里使用的是“* 操作(operation)* ”一词,指的是“Stream中间操作”的操作,很多Stream操作会需要一个回调函数(Lambda表达式),因此一个完整的操作是< * 数据来源,操作,回调函数* > 构成的三元组。Stream中使用Stage的概念来描述一个完整的操作,并用某种实例化后的* PipelineHelper* 来代表Stage,将具有先后顺序的各个Stage连到一起,就构成了整个流水线。跟Stream相关类和接口的继承关系图示。
74
74
75
- 还有* IntPipeline* , * LongPipeline* , * DoublePipeline* 没在图中画出,这三个类专门为三种基本类型(不是包装类型)而定制的,跟* ReferencePipeline* 是并列关系。图中* Head* 用于表示第一个Stage,即调用调用诸如* Collection.stream()* 方法产生的Stage,很显然这个Stage里不包含任何操作;* StatelessOp* 和* StatefulOp* 分别表示无状态和有状态的Stage,对应于无状态和有状态的中间操作。
75
+ 还有* IntPipeline, LongPipeline, DoublePipeline* 没在图中画出,这三个类专门为三种基本类型(不是包装类型)而定制的,跟* ReferencePipeline* 是并列关系。图中* Head* 用于表示第一个Stage,即调用调用诸如* Collection.stream()* 方法产生的Stage,很显然这个Stage里不包含任何操作;* StatelessOp* 和* StatefulOp* 分别表示无状态和有状态的Stage,对应于无状态和有状态的中间操作。
76
76
77
77
一个可能的流水线示意图如下:
78
78
@@ -88,7 +88,7 @@ for(String str : strings){
88
88
89
89
<table ><tr ><td >方法名</td ><td >作用</td ></tr ><tr ><td >void begin(long size)</td ><td >开始遍历元素之前调用该方法,通知Sink做好准备。</td ></tr ><tr ><td >void end()</td ><td >所有元素遍历完成之后调用,通知Sink没有更多的元素了。</td ></tr ><tr ><td >boolean cancellationRequested()</td ><td >是否可以结束操作,可以让短路操作尽早结束。</td ></tr ><tr ><td >void accept(T t)</td ><td >遍历元素时调用,接受一个待处理元素,并对元素进行处理。Stage把自己包含的操作和回调方法封装到该方法里,前一个Stage只需要调用当前Stage.accept(T t)方法就行了。</td ></tr ></table >
90
90
91
- 有了上面的协议,相邻Stage之间调用就很方便了,每个Stage都会将自己的操作封装到一个Sink里,前一个Stage只需调用后一个Stage的* accept()* 方法即可,并不需要知道其内部是如何处理的。当然对于有状态的操作,Sink的* begin()* 和 * end()* 方法也是必须实现的。比如Stream.sorted()是一个有状态的中间操作,其对应的* Sink.begin()* 方法可能创建一个乘放结果的容器,而* accept()* 方法负责将元素添加到该容器,最后* end()* 负责对容器进行排序。对于短路操作,* Sink.cancellationRequested()* 也是必须实现的,比如* Stream.findFirst()* 是短路操作,只要找到一个元素,* cancellationRequested()* 就应该返回* true* ,以便调用者尽快结束查找。Sink的四个接口方法常常相互协作,共同完成计算任务。** 实际上Stream API内部实现的的本质,就是如何重载Sink的这四个接口方法** 。
91
+ 有了上面的协议,相邻Stage之间调用就很方便了,每个Stage都会将自己的操作封装到一个Sink里,前一个Stage只需调用后一个Stage的` accept() ` 方法即可,并不需要知道其内部是如何处理的。当然对于有状态的操作,Sink的` begin() ` 和 ` end() ` 方法也是必须实现的。比如Stream.sorted()是一个有状态的中间操作,其对应的* Sink.begin()* 方法可能创建一个乘放结果的容器,而* accept()* 方法负责将元素添加到该容器,最后* end()* 负责对容器进行排序。对于短路操作,` Sink.cancellationRequested() ` 也是必须实现的,比如* Stream.findFirst()* 是短路操作,只要找到一个元素,* cancellationRequested()* 就应该返回* true* ,以便调用者尽快结束查找。Sink的四个接口方法常常相互协作,共同完成计算任务。** 实际上Stream API内部实现的的本质,就是如何重载Sink的这四个接口方法** 。
92
92
93
93
有了Sink对操作的包装,Stage之间的调用问题就解决了,执行时只需要从流水线的head开始对数据源依次调用每个Stage对应的Sink.{begin(), accept(), cancellationRequested(), end()}方法就可以了。一种可能的* Sink.accept()* 方法流程是这样的:
94
94
@@ -121,9 +121,9 @@ public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
121
121
}
122
122
```
123
123
124
- 上述代码看似复杂,其实逻辑很简单,就是将回调函数* mapper* 包装到一个Sink当中。由于 * Stream .map()* 是一个无状态的中间操作,所以map()方法返回了一个StatelessOp内部类对象(一个新的Stream),调用这个新Stream的opWripSink()方法将得到一个包装了当前回调函数的Sink。
124
+ 上述代码看似复杂,其实逻辑很简单,就是将回调函数* mapper* 包装到一个Sink当中。由于Stream .map()是一个无状态的中间操作,所以map()方法返回了一个StatelessOp内部类对象(一个新的Stream),调用这个新Stream的opWripSink()方法将得到一个包装了当前回调函数的Sink。
125
125
126
- 再来看一个复杂一点的例子。* Stream.sorted()* 方法将对Stream中的元素进行排序,显然这是一个有状态的中间操作,因为读取所有元素之前是没法得到最终顺序的。抛开模板代码直接进入问题本质,sorted()方法是如何将操作封装成Sink的呢?sorted()一种可能封装的Sink代码如下:
126
+ 再来看一个复杂一点的例子。Stream.sorted()方法将对Stream中的元素进行排序,显然这是一个有状态的中间操作,因为读取所有元素之前是没法得到最终顺序的。抛开模板代码直接进入问题本质,sorted()方法是如何将操作封装成Sink的呢?sorted()一种可能封装的Sink代码如下:
127
127
128
128
``` Java
129
129
// Stream.sort()方法用到的Sink实现
@@ -175,7 +175,7 @@ Sink完美封装了Stream每一步操作,并给出了[处理->转发]的模式
175
175
176
176
结束操作之后不能再有别的操作,所以结束操作不会创建新的流水线阶段(Stage),直观的说就是流水线的链表不会在往后延伸了。结束操作会创建一个包装了自己操作的Sink,这也是流水线中最后一个Sink,这个Sink只需要处理数据而不需要将结果传递给下游的Sink(因为没有下游)。对于Sink的[ 处理->转发] 模型,结束操作的Sink就是调用链的出口。
177
177
178
- 我们再来考察一下上游的Sink是如何找到下游Sink的。一种可选的方案是在* PipelineHelper* 中设置一个Sink字段,在流水线中找到下游Stage并访问Sink字段即可。但Stream类库的设计者没有这么做,而是设置了一个` Sink AbstractPipeline.opWrapSink(int flags, Sink downstream) ` 方法来得到Sink,该方法的作用是产生一个新的包含了当前Stage代表的操作以及能够将结果传递给downstream的Sink对象。为什么要产生一个新对象而不是返回一个Sink字段?这是因为使用 * opWrapSink() * 可以将当前操作与下游Sink(上文中的downstream参数)结合成新Sink。试想只要从流水线的最后一个Stage开始,不断调用上一个Stage的 * opWrapSink() * 方法直到最开始(不包括stage0,因为stage0代表数据源,不包含操作),就可以得到一个代表了流水线上所有操作的Sink,用代码表示就是这样:
178
+ 我们再来考察一下上游的Sink是如何找到下游Sink的。一种可选的方案是在* PipelineHelper* 中设置一个Sink字段,在流水线中找到下游Stage并访问Sink字段即可。但Stream类库的设计者没有这么做,而是设置了一个` Sink AbstractPipeline.opWrapSink(int flags, Sink downstream) ` 方法来得到Sink,该方法的作用是产生一个新的包含了当前Stage代表的操作以及能够将结果传递给downstream的Sink对象。为什么要产生一个新对象而不是返回一个Sink字段?这是因为使用opWrapSink() 可以将当前操作与下游Sink(上文中的downstream参数)结合成新Sink。试想只要从流水线的最后一个Stage开始,不断调用上一个Stage的opWrapSink() 方法直到最开始(不包括stage0,因为stage0代表数据源,不包含操作),就可以得到一个代表了流水线上所有操作的Sink,用代码表示就是这样:
179
179
180
180
``` Java
181
181
// 从下游向上游不断包装Sink。如果最初传入的sink是结束操作代表的,
@@ -204,7 +204,7 @@ final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator
204
204
}
205
205
```
206
206
207
- 上述代码首先调用wrappedSink.begin()方法告诉Sink数据即将到来,然后调用spliterator.forEachRemaining()方法对数据进行迭代(Spliterator是容器的一种迭代器,参阅 [ ] ),最后调用wrappedSink.end()方法通知Sink数据处理结束。逻辑如此清晰。
207
+ 上述代码首先调用wrappedSink.begin()方法告诉Sink数据即将到来,然后调用spliterator.forEachRemaining()方法对数据进行迭代(Spliterator是容器的一种迭代器,[ 参阅 ] ( https://github.com/CarpenterLee/JavaLambdaInternals/blob/master/3-Lambda%20and%20Collections.md#spliterator ) ),最后调用wrappedSink.end()方法通知Sink数据处理结束。逻辑如此清晰。
208
208
209
209
210
210
0 commit comments