Stream是JDK1.8中首次引入的,距今已经过去了接近8年时间(JDK1.8正式版是2013年底发布的)。Stream的引入一方面极大地简化了某些开发场景,另一方面也可能降低了编码的可读性(确实有不少人说到Stream会降低代码的可读性,但是在笔者看来,熟练使用之后反而觉得代码的可读性提高了)。这篇文章会花巨量篇幅,详细分析Stream的底层实现原理,参考的源码是JDK11的源码,其他版本JDK可能不适用于本文中的源码展示和相关例子。
这篇文章花费了极多时间和精力梳理和编写,希望能够帮助到本文的读者
Stream是JDK1.8引入的,如要需要JDK1.7或者以前的代码也能在JDK1.8或以上运行,那么Stream的引入必定不能在原来已经发布的接口方法进行修改,否则必定会因为兼容性问题导致老版本的接口实现无法在新版本中运行(方法签名出现异常),猜测是基于这个问题引入了接口默认方法,也就是default关键字。查看源码可以发现,ArrayList的超类Collection和Iterable分别添加了数个default方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public interface Collection <E> extends Iterable <E> { @Override default Spliterator<E> spliterator () { return Spliterators.spliterator(this , 0 ); } default Stream<E> stream () { return StreamSupport.stream(spliterator(), false ); } default Stream<E> parallelStream () { return StreamSupport.stream(spliterator(), true ); } } public interface Iterable <T> { default void forEach (Consumer<? super T> action) { Objects.requireNonNull(action); for (T t : this ) { action.accept(t); } } default Spliterator<T> spliterator () { return Spliterators.spliteratorUnknownSize(iterator(), 0 ); } }
从直觉来看,这些新增的方法应该就是Stream实现的关键方法(后面会印证这不是直觉,而是查看源码的结果)。接口默认方法在使用上和实例方法一致,在实现上可以直接在接口方法中编写方法体,有点静态方法的意味,但是子类可以覆盖其实现(也就是接口默认方法在本接口中的实现有点像静态方法,可以被子类覆盖,使用上和实例方法一致)。这种实现方式,有可能是一种突破,也有可能是一种妥协,但是无论是妥协还是突破,都实现了向前兼容:
1 2 3 4 5 6 7 8 9 10 11 12 13 public interface Iterable <T> { Iterator<T> iterator () ; } public MyIterable<Long> implements Iterable <Long>{ public Iterator<Long> iterator () { .... } }
如上,MyIterable在JDK1.7中定义,如果该类在JDK1.8中运行,那么调用其实例中的forEach()和spliterator()方法,相当于直接调用JDK1.8中的Iterable中的接口默认方法forEach()和spliterator()。当然受限于JDK版本,这里只能确保编译通过,旧功能正常使用,而无法在JDK1.7中使用Stream相关功能或者使用default方法关键字。总结这么多,就是想说明为什么使用JDK7开发和编译的代码可以在JDK8环境下运行。
可拆分迭代器Spliterator Stream实现的基石是Spliterator,Spliterator是splitable iterator的缩写,意为”可拆分迭代器”,用于遍历指定数据源(例如数组、集合或者IO Channel等)中的元素,在设计上充分考虑了串行和并行的场景。上一节提到了Collection存在接口默认方法spliterator(),此方法会生成一个Spliterator<E>实例,意为着所有的集合子类都具备创建Spliterator实例的能力 。Stream的实现在设计上和Netty中的ChannelHandlerContext十分相似,本质是一个链表,而Spliterator就是这个链表的Head节点 (Spliterator实例就是一个流实例的头节点,后面分析具体的源码时候再具体展开)。
Spliterator接口方法 接着看Spliterator接口定义的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public interface Spliterator <T> { boolean tryAdvance (Consumer<? super T> action) ; default void forEachRemaining (Consumer<? super T> action) { do { } while (tryAdvance(action)); } Spliterator<T> trySplit () ; long estimateSize () ; default long getExactSizeIfKnown () { return (characteristics() & SIZED) == 0 ? -1L : estimateSize(); } int characteristics () ; default boolean hasCharacteristics (int characteristics) { return (characteristics() & characteristics) == characteristics; } default Comparator<? super T> getComparator() { throw new IllegalStateException (); } }
tryAdvance
方法签名:boolean tryAdvance(Consumer<? super T> action)
功能:如果Spliterator中存在剩余元素,则对其中的某个元素执行传入的action回调,并且返回true,否则返回false。如果Spliterator启用了ORDERED特性,会按照顺序(这里的顺序值可以类比为ArrayList中容器数组元素的下标,ArrayList中添加新元素是天然有序的,下标由零开始递增)处理下一个元素
例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static void main (String[] args) throws Exception { List<Integer> list = new ArrayList <>(); list.add(2 ); list.add(1 ); list.add(3 ); Spliterator<Integer> spliterator = list.stream().spliterator(); final AtomicInteger round = new AtomicInteger (1 ); final AtomicInteger loop = new AtomicInteger (1 ); while (spliterator.tryAdvance(num -> System.out.printf("第%d轮回调Action,值:%d\n" , round.getAndIncrement(), num))) { System.out.printf("第%d轮循环\n" , loop.getAndIncrement()); } } 第1 轮回调Action,值:2 第1 轮循环 第2 轮回调Action,值:1 第2 轮循环 第3 轮回调Action,值:3 第3 轮循环
forEachRemaining
方法签名:default void forEachRemaining(Consumer<? super T> action)
功能:如果Spliterator中存在剩余元素,则对其中的所有剩余元素 在当前线程中 执行传入的action回调。如果Spliterator启用了ORDERED特性,会按照顺序处理剩余所有元素。这是一个接口默认方法,方法体比较粗暴,直接是一个死循环包裹着tryAdvance()方法,直到false退出循环
例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public static void main (String[] args) { List<Integer> list = new ArrayList <>(); list.add(2 ); list.add(1 ); list.add(3 ); Spliterator<Integer> spliterator = list.stream().spliterator(); final AtomicInteger round = new AtomicInteger (1 ); spliterator.forEachRemaining(num -> System.out.printf("第%d轮回调Action,值:%d\n" , round.getAndIncrement(), num)); } 第1 轮回调Action,值:2 第2 轮回调Action,值:1 第3 轮回调Action,值:3
trySplit
方法签名:Spliterator<T> trySplit()
功能:如果当前的Spliterator是可分区(可分割)的,那么此方法将会返回一个全新的Spliterator实例,这个全新的Spliterator实例里面的元素不会被当前Spliterator实例中的元素覆盖(这里是直译了API注释,实际要表达的意思是:当前的Spliterator实例X是可分割的,trySplit()方法会分割X产生一个全新的Spliterator实例Y,原来的X所包含的元素(范围)也会收缩,类似于X = [a,b,c,d] => X = [a,b], Y = [c,d];如果当前的Spliterator实例X是不可分割的,此方法会返回NULL),具体的分割算法由实现类决定
例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static void main (String[] args) throws Exception { List<Integer> list = new ArrayList <>(); list.add(2 ); list.add(3 ); list.add(4 ); list.add(1 ); Spliterator<Integer> first = list.stream().spliterator(); Spliterator<Integer> second = first.trySplit(); first.forEachRemaining(num -> { System.out.printf("first spliterator item: %d\n" , num); }); second.forEachRemaining(num -> { System.out.printf("second spliterator item: %d\n" , num); }); } first spliterator item: 4 first spliterator item: 1 second spliterator item: 2 second spliterator item: 3
estimateSize
方法签名:long estimateSize()
功能:返回forEachRemaining()方法需要遍历的元素总量的估计值,如果样本个数是无限、计算成本过高或者未知,会直接返回Long.MAX_VALUE
例子:
1 2 3 4 5 6 7 8 9 10 11 12 public static void main (String[] args) throws Exception { List<Integer> list = new ArrayList <>(); list.add(2 ); list.add(3 ); list.add(4 ); list.add(1 ); Spliterator<Integer> spliterator = list.stream().spliterator(); System.out.println(spliterator.estimateSize()); } 4
getExactSizeIfKnown
方法签名:default long getExactSizeIfKnown()
功能:如果当前的Spliterator具备SIZED特性(关于特性,下文再展开分析),那么直接调用estimateSize()方法,否则返回-1
例子:
1 2 3 4 5 6 7 8 9 10 11 12 public static void main (String[] args) throws Exception { List<Integer> list = new ArrayList <>(); list.add(2 ); list.add(3 ); list.add(4 ); list.add(1 ); Spliterator<Integer> spliterator = list.stream().spliterator(); System.out.println(spliterator.getExactSizeIfKnown()); } 4
int characteristics()
方法签名:long estimateSize()
功能:当前的Spliterator具备的特性(集合),采用位运算,存储在32位整数中(关于特性,下文再展开分析)
hasCharacteristics
方法签名:default boolean hasCharacteristics(int characteristics)
功能:判断当前的Spliterator是否具备传入的特性
getComparator
方法签名:default Comparator<? super T> getComparator()
功能:如果当前的Spliterator具备SORTED特性,则需要返回一个Comparator实例;如果Spliterator中的元素是天然有序(例如元素实现了Comparable接口),则返回NULL;其他情况直接抛出IllegalStateException异常
Spliterator自分割 Spliterator#trySplit()可以把一个既有的Spliterator实例分割为两个Spliterator实例,笔者这里把这种方式称为Spliterator自分割,示意图如下:
这里的分割在实现上可以采用两种方式:
物理分割:对于ArrayList而言,把底层数组拷贝 并且进行分割,用上面的例子来说相当于X = [1,3,4,2] => X = [4,2], Y = [1,3],这样实现加上对于ArrayList中本身的元素容器数组,相当于多存了一份数据,显然不是十分合理
逻辑分割:对于ArrayList而言,由于元素容器数组天然有序,可以采用数组的索引(下标)进行分割,用上面的例子来说相当于X = 索引表[0,1,2,3] => X = 索引表[2,3], Y = 索引表[0,1],这种方式是共享底层容器数组,只对元素索引进行分割,实现上比较简单而且相对合理
参看ArrayListSpliterator的源码,可以分析其分割算法实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 public Spliterator<E> spliterator () { return new ArrayListSpliterator (0 , -1 , 0 ); } final class ArrayListSpliterator implements Spliterator <E> { private int index; private int fence; private int expectedModCount; ArrayListSpliterator(int origin, int fence, int expectedModCount) { this .index = origin; this .fence = fence; this .expectedModCount = expectedModCount; } private int getFence () { int hi; if ((hi = fence) < 0 ) { expectedModCount = modCount; hi = fence = size; } return hi; } public ArrayListSpliterator trySplit () { int hi = getFence(), lo = index, mid = (lo + hi) >>> 1 ; return (lo >= mid) ? null : new ArrayListSpliterator (lo, index = mid, expectedModCount); } public boolean tryAdvance (Consumer<? super E> action) { if (action == null ) throw new NullPointerException (); int hi = getFence(), i = index; if (i < hi) { index = i + 1 ; @SuppressWarnings("unchecked") E e = (E)elementData[i]; action.accept(e); if (modCount != expectedModCount) throw new ConcurrentModificationException (); return true ; } return false ; } public void forEachRemaining (Consumer<? super E> action) { int i, hi, mc; Object[] a; if (action == null ) throw new NullPointerException (); if ((a = elementData) != null ) { if ((hi = fence) < 0 ) { mc = modCount; hi = size; } else mc = expectedModCount; if ((i = index) >= 0 && (index = hi) <= a.length) { for (; i < hi; ++i) { @SuppressWarnings("unchecked") E e = (E) a[i]; action.accept(e); } if (modCount == mc) return ; } } throw new ConcurrentModificationException (); } public long estimateSize () { return getFence() - index; } public int characteristics () { return Spliterator.ORDERED | Spliterator.SIZED | Spliterator.SUBSIZED; } }
在阅读源码的时候务必注意,老一辈的程序员有时候会采用比较隐蔽 的赋值方式,笔者认为需要展开一下:
第一处红圈位置在构建新的ArrayListSpliterator的时候,当前ArrayListSpliterator的index属性也被修改了,过程如下图:
第二处红圈位置,在forEachRemaining()方法调用时候做参数校验,并且if分支里面把index(下边界值)赋值为hi(上边界值),那么一个ArrayListSpliterator实例中的forEachRemaining()方法的遍历操作必定只会执行一次 。可以这样验证一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public static void main (String[] args) { List<Integer> list = new ArrayList <>(); list.add(2 ); list.add(1 ); list.add(3 ); Spliterator<Integer> spliterator = list.stream().spliterator(); final AtomicInteger round = new AtomicInteger (1 ); spliterator.forEachRemaining(num -> System.out.printf("[第一次遍历forEachRemaining]第%d轮回调Action,值:%d\n" , round.getAndIncrement(), num)); round.set(1 ); spliterator.forEachRemaining(num -> System.out.printf("[第二次遍历forEachRemaining]第%d轮回调Action,值:%d\n" , round.getAndIncrement(), num)); } [第一次遍历forEachRemaining]第1 轮回调Action,值:2 [第一次遍历forEachRemaining]第2 轮回调Action,值:1 [第一次遍历forEachRemaining]第3 轮回调Action,值:3
对于ArrayListSpliterator的实现可以确认下面几点:
一个新的ArrayListSpliterator实例中的forEachRemaining()方法只能调用一次
ArrayListSpliterator实例中的forEachRemaining()方法遍历元素的边界是[index, fence)
ArrayListSpliterator自分割的时候,分割出来的新ArrayListSpliterator负责处理元素下标小的分段(类比fork的左分支),而原ArrayListSpliterator负责处理元素下标大的分段(类比fork的右分支)
ArrayListSpliterator提供的estimateSize()方法得到的分段元素剩余数量是一个准确值
如果把上面的例子继续分割,可以得到下面的过程:
Spliterator自分割是并行流实现的基础 ,并行流计算过程其实就是fork-join的处理过程,trySplit()方法的实现决定了fork任务的粒度,每个fork任务进行计算的时候是并发安全的,这一点由线程封闭(线程栈封闭)保证,每一个fork任务计算完成最后的结果再由单个线程进行join操作,才能得到正确的结果。下面的例子是求整数1 ~ 100的和:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 public class ConcurrentSplitCalculateSum { private static class ForkTask extends Thread { private int result = 0 ; private final Spliterator<Integer> spliterator; private final CountDownLatch latch; public ForkTask (Spliterator<Integer> spliterator, CountDownLatch latch) { this .spliterator = spliterator; this .latch = latch; } @Override public void run () { long start = System.currentTimeMillis(); spliterator.forEachRemaining(num -> result = result + num); long end = System.currentTimeMillis(); System.out.printf("线程[%s]完成计算任务,当前段计算结果:%d,耗时:%d ms\n" , Thread.currentThread().getName(), result, end - start); latch.countDown(); } public int result () { return result; } } private static int join (List<ForkTask> tasks) { int result = 0 ; for (ForkTask task : tasks) { result = result + task.result(); } return result; } private static final int THREAD_NUM = 4 ; public static void main (String[] args) throws Exception { List<Integer> source = new ArrayList <>(); for (int i = 1 ; i < 101 ; i++) { source.add(i); } Spliterator<Integer> root = source.stream().spliterator(); List<Spliterator<Integer>> spliteratorList = new ArrayList <>(); Spliterator<Integer> x = root.trySplit(); Spliterator<Integer> y = x.trySplit(); Spliterator<Integer> z = root.trySplit(); spliteratorList.add(root); spliteratorList.add(x); spliteratorList.add(y); spliteratorList.add(z); List<ForkTask> tasks = new ArrayList <>(); CountDownLatch latch = new CountDownLatch (THREAD_NUM); for (int i = 0 ; i < THREAD_NUM; i++) { ForkTask task = new ForkTask (spliteratorList.get(i), latch); task.setName("fork-task-" + (i + 1 )); tasks.add(task); } tasks.forEach(Thread::start); latch.await(); int result = join(tasks); System.out.println("最终计算结果为:" + result); } } 线程[fork-task-4 ]完成计算任务,当前段计算结果:1575 ,耗时:0 ms 线程[fork-task-2 ]完成计算任务,当前段计算结果:950 ,耗时:1 ms 线程[fork-task-3 ]完成计算任务,当前段计算结果:325 ,耗时:1 ms 线程[fork-task-1 ]完成计算任务,当前段计算结果:2200 ,耗时:1 ms 最终计算结果为:5050
当然,最终并行流的计算用到了ForkJoinPool,并不像这个例子中这么粗暴地进行异步执行。关于并行流的实现下文会详细分析。
Spliterator支持的特性 某一个Spliterator实例支持的特性由方法characteristics()决定,这个方法返回的是一个32位数值,实际使用中会展开为bit数组,所有的特性分配在不同的位上,而hasCharacteristics(int characteristics)就是通过输入的具体特性值通过位运算判断该特性是否存在于characteristics()中。下面简化characteristics为byte分析一下这个技巧:
1 2 3 4 5 6 7 假设:byte characteristics() => 也就是最多8个位用于表示特性集合,如果每个位只表示一种特性,那么可以总共表示8种特性 特性X:0000 0001 特性Y:0000 0010 以此类推 假设:characteristics = X | Y = 0000 0001 | 0000 0010 = 0000 0011 那么:characteristics & X = 0000 0011 & 0000 0001 = 0000 0001 判断characteristics是否包含X:(characteristics & X) == X
上面推断的过程就是Spliterator中特性判断方法的处理逻辑:
1 2 3 4 5 6 7 int characteristics () ;default boolean hasCharacteristics (int characteristics) { return (characteristics() & characteristics) == characteristics; }
这里可以验证一下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class CharacteristicsCheck { public static void main (String[] args) { System.out.printf("是否存在ORDERED特性:%s\n" , hasCharacteristics(Spliterator.ORDERED)); System.out.printf("是否存在SIZED特性:%s\n" , hasCharacteristics(Spliterator.SIZED)); System.out.printf("是否存在DISTINCT特性:%s\n" , hasCharacteristics(Spliterator.DISTINCT)); } private static int characteristics () { return Spliterator.ORDERED | Spliterator.SIZED | Spliterator.SORTED; } private static boolean hasCharacteristics (int characteristics) { return (characteristics() & characteristics) == characteristics; } } 是否存在ORDERED特性:true 是否存在SIZED特性:true 是否存在DISTINCT特性:false
目前Spliterator支持的特性一共有8个,如下:
特性
十六进制值
二进制值
功能
DISTINCT
0x00000001
0000 0000 0000 0001
去重,例如对于每对要处理的元素(x,y),使用!x.equals(y)比较,Spliterator中去重实际上基于Set处理
ORDERED
0x00000010
0000 0000 0001 0000
(元素)顺序处理,可以理解为trySplit()、tryAdvance()和forEachRemaining()方法对所有元素处理都保证一个严格的前缀顺序
SORTED
0x00000004
0000 0000 0000 0100
排序,元素使用getComparator()方法提供的Comparator进行排序,如果定义了SORTED特性,则必须定义ORDERED特性
SIZED
0x00000040
0000 0000 0100 0000
(元素)预估数量,启用此特性,那么Spliterator拆分或者迭代之前,estimateSize()返回的是元素的准确数量
NONNULL
0x00000040
0000 0001 0000 0000
(元素)非NULL,数据源保证Spliterator需要处理的元素不能为NULL,最常用于并发容器中的集合、队列和Map
IMMUTABLE
0x00000400
0000 0100 0000 0000
(元素)不可变,数据源不可被修改,也就是处理过程中元素不能被添加、替换和移除(更新属性是允许的)
CONCURRENT
0x00001000
0001 0000 0000 0000
(元素源)的修改是并发安全的,意味着多线程在数据源中添加、替换或者移除元素在不需要额外的同步条件下是并发安全的
SUBSIZED
0x00004000
0100 0000 0000 0000
(子Spliterator元素)预估数量,启用此特性,意味着通过trySplit()方法分割出来的所有子Spliterator(当前Spliterator分割后也属于子Spliterator)都启用SIZED特性
细心点观察可以发现:所有特性采用32位的整数存储,使用了隔1位存储的策略,位下标和特性的映射是:(0 => DISTINCT)、(3 => SORTED)、(5 => ORDERED)、(7=> SIZED)、(9 => NONNULL)、(11 => IMMUTABLE)、(13 => CONCURRENT)、(15 => SUBSIZED)
所有特性的功能这里只概括了核心的定义,还有一些小字或者特例描述限于篇幅没有完全加上,这一点可以参考具体的源码中的API注释。这些特性最终会转化为StreamOpFlag再提供给Stream中的操作判断使用,由于StreamOpFlag会更加复杂,下文再进行详细分析。
流的实现原理以及源码分析 由于流的实现是高度抽象的工程代码,所以在源码阅读上会有点困难。整个体系涉及到大量的接口、类和枚举,如下图:
图中的顶层类结构图描述的就是流的流水线相关类继承关系,其中IntStream、LongStream和DoubleStream都是特化类型,分别针对于Integer、Long和Double三种类型,其他引用类型构建的Pipeline都是ReferencePipeline实例,因此笔者认为,ReferencePipeline(引用类型流水线)是流的核心数据结构,下面会基于ReferencePipeline的实现做深入分析。
StreamOpFlag源码分析
注意,这一小节很烧脑,也有可能是笔者的位操作不怎么熟练,这篇文章大部分时间消耗在这一小节
StreamOpFlag是一个枚举,功能是存储Stream和操作的标志(Flags corresponding to characteristics of streams and operations,下称Stream标志),这些标志提供给Stream框架用于控制、定制化和优化计算。Stream标志可以用于描述与流相关联的若干不同实体的特征,这些实体包括:Stream的源、Stream的中间操作(Op)和Stream的终端操作(Terminal Op)。但是并非所有的Stream标志对所有的Stream实体都具备意义,目前这些实体和标志映射关系如下:
Type(Stream Entity Type)
DISTINCT
SORTED
ORDERED
SIZED
SHORT_CIRCUIT
SPLITERATOR
01
01
01
01
00
STREAM
01
01
01
01
00
OP
11
11
11
10
01
TERMINAL_OP
00
00
10
00
01
UPSTREAM_TERMINAL_OP
00
00
10
00
00
其中:
01:表示设置/注入
10:表示清除
11:表示保留
00:表示初始化值(默认填充值),这是一个关键点,0值表示绝对不会是某个类型的标志
StreamOpFlag的顶部注释中还有一个表格如下:
-
DISTINCT
SORTED
ORDERED
SIZED
SHORT_CIRCUIT
Stream source(Stream的源)
Y
Y
Y
Y
N
Intermediate operation(中间操作)
PCI
PCI
PCI
PC
PI
Terminal operation(终结操作)
N
N
PC
N
PI
标记 -> 含义:
Y:允许
N:非法
P:保留
C:清除
I:注入
组合PCI:可以保留、清除或者注入
组合PC:可以保留或者清除
组合PI:可以保留或者注入
两个表格其实是在描述同一个结论,可以相互对照和理解,但是最终实现参照于第一个表的定义 。注意一点:这里的preserved(P)表示保留的意思,如果Stream实体某个标志被赋值为preserved,意味着该实体可以使用此标志代表的特性。例如此小节第一个表格中的OP的DISTINCT、SORTED和ORDERED都赋值为11(preserved),意味着OP类型的实体允许使用去重、自然排序和顺序处理特性。回到源码部分,先看StreamOpFlag的核心属性和构造器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 enum StreamOpFlag { enum Type { SPLITERATOR, STREAM, OP, TERMINAL_OP, UPSTREAM_TERMINAL_OP } private static final int SET_BITS = 0b01 ; private static final int CLEAR_BITS = 0b10 ; private static final int PRESERVE_BITS = 0b11 ; private static MaskBuilder set (Type t) { return new MaskBuilder (new EnumMap <>(Type.class)).set(t); } private static class MaskBuilder { final Map<Type, Integer> map; MaskBuilder(Map<Type, Integer> map) { this .map = map; } MaskBuilder mask (Type t, Integer i) { map.put(t, i); return this ; } MaskBuilder set (Type t) { return mask(t, SET_BITS); } MaskBuilder clear (Type t) { return mask(t, CLEAR_BITS); } MaskBuilder setAndClear (Type t) { return mask(t, PRESERVE_BITS); } Map<Type, Integer> build () { for (Type t : Type.values()) { map.putIfAbsent(t, 0b00 ); } return map; } } private final Map<Type, Integer> maskTable; private final int bitPosition; private final int set; private final int clear; private final int preserve; private StreamOpFlag (int position, MaskBuilder maskBuilder) { this .maskTable = maskBuilder.build(); position *= 2 ; this .bitPosition = position; this .set = SET_BITS << position; this .clear = CLEAR_BITS << position; this .preserve = PRESERVE_BITS << position; } static final int IS_DISTINCT = DISTINCT.set; static final int NOT_DISTINCT = DISTINCT.clear; static final int IS_SORTED = SORTED.set; static final int NOT_SORTED = SORTED.clear; static final int IS_ORDERED = ORDERED.set; static final int NOT_ORDERED = ORDERED.clear; static final int IS_SIZED = SIZED.set; static final int NOT_SIZED = SIZED.clear; static final int IS_SHORT_CIRCUIT = SHORT_CIRCUIT.set; }
又因为StreamOpFlag是一个枚举,一个枚举成员是一个独立的标志,而一个标志会对多个Stream实体类型产生作用,所以它的一个成员描述的是上面实体和标志映射关系的一个列(竖着看):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 // 纵向看 DISTINCT Flag: maskTable: { SPLITERATOR: 0000 0001, STREAM: 0000 0001, OP: 0000 0011, TERMINAL_OP: 0000 0000, UPSTREAM_TERMINAL_OP: 0000 0000 } position(input): 0 bitPosition: 0 set: 1 => 0000 0000 0000 0000 0000 0000 0000 0001 clear: 2 => 0000 0000 0000 0000 0000 0000 0000 0010 preserve: 3 => 0000 0000 0000 0000 0000 0000 0000 0011 SORTED Flag: maskTable: { SPLITERATOR: 0000 0001, STREAM: 0000 0001, OP: 0000 0011, TERMINAL_OP: 0000 0000, UPSTREAM_TERMINAL_OP: 0000 0000 } position(input): 1 bitPosition: 2 set: 4 => 0000 0000 0000 0000 0000 0000 0000 0100 clear: 8 => 0000 0000 0000 0000 0000 0000 0000 1000 preserve: 12 => 0000 0000 0000 0000 0000 0000 0000 1100 ORDERED Flag: maskTable: { SPLITERATOR: 0000 0001, STREAM: 0000 0001, OP: 0000 0011, TERMINAL_OP: 0000 0010, UPSTREAM_TERMINAL_OP: 0000 0010 } position(input): 2 bitPosition: 4 set: 16 => 0000 0000 0000 0000 0000 0000 0001 0000 clear: 32 => 0000 0000 0000 0000 0000 0000 0010 0000 preserve: 48 => 0000 0000 0000 0000 0000 0000 0011 0000 SIZED Flag: maskTable: { SPLITERATOR: 0000 0001, STREAM: 0000 0001, OP: 0000 0010, TERMINAL_OP: 0000 0000, UPSTREAM_TERMINAL_OP: 0000 0000 } position(input): 3 bitPosition: 6 set: 64 => 0000 0000 0000 0000 0000 0000 0100 0000 clear: 128 => 0000 0000 0000 0000 0000 0000 1000 0000 preserve: 192 => 0000 0000 0000 0000 0000 0000 1100 0000 SHORT_CIRCUIT Flag: maskTable: { SPLITERATOR: 0000 0000, STREAM: 0000 0000, OP: 0000 0001, TERMINAL_OP: 0000 0001, UPSTREAM_TERMINAL_OP: 0000 0000 } position(input): 12 bitPosition: 24 set: 16777216 => 0000 0001 0000 0000 0000 0000 0000 0000 clear: 33554432 => 0000 0010 0000 0000 0000 0000 0000 0000 preserve: 50331648 => 0000 0011 0000 0000 0000 0000 0000 0000
接着就用到按位与(&)和按位或(|)的操作,假设A = 0001、B = 0010、C = 1000,那么:
A|B = A | B = 0001 | 0010 = 0011(按位或,1|0=1, 0|1=1,0|0 =0,1|1=1)
A&B = A & B = 0001 | 0010 = 0000(按位与,1|0=0, 0|1=0,0|0 =0,1|1=1)
MASK = A | B | C = 0001 | 0010 | 1000 = 1011
那么判断A|B是否包含A的条件为:A == (A|B & A)
那么判断MASK是否包含A的条件为:A == MASK & A
这里把StreamOpFlag中的枚举套用进去分析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 static int DISTINCT_SET = 0b0001 ;static int SORTED_CLEAR = 0b1000 ;public static void main (String[] args) throws Exception { int flags = DISTINCT_SET | SORTED_CLEAR; System.out.println(Integer.toBinaryString(flags)); System.out.printf("支持DISTINCT标志:%s\n" , DISTINCT_SET == (DISTINCT_SET & flags)); System.out.printf("不支持SORTED标志:%s\n" , SORTED_CLEAR == (SORTED_CLEAR & flags)); } 1001 支持DISTINCT标志:true 不支持SORTED标志:true
由于StreamOpFlag的修饰符是默认,不能直接使用,可以把它的代码拷贝出来修改包名验证里面的功能:
1 2 3 4 5 6 7 8 9 10 public static void main (String[] args) { int flags = StreamOpFlag.DISTINCT.set | StreamOpFlag.SORTED.clear; System.out.println(StreamOpFlag.DISTINCT.set == (StreamOpFlag.DISTINCT.set & flags)); System.out.println(StreamOpFlag.SORTED.clear == (StreamOpFlag.SORTED.clear & flags)); } true true
下面这些方法就是基于这些运算特性而定义的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 enum StreamOpFlag { int set () { return set; } int clear () { return clear; } boolean isStreamFlag () { return maskTable.get(Type.STREAM) > 0 ; } boolean isKnown (int flags) { return (flags & preserve) == set; } boolean isCleared (int flags) { return (flags & preserve) == clear; } boolean isPreserved (int flags) { return (flags & preserve) == preserve; } boolean canSet (Type t) { return (maskTable.get(t) & SET_BITS) > 0 ; } }
这里有个特殊操作,位运算的时候采用了(flags & preserve),理由是:同一个标志中的同一个Stream实体类型只可能存在set/inject、clear和preserve的其中一种,也就是同一个flags中不可能同时存在StreamOpFlag.SORTED.set和StreamOpFlag.SORTED.clear,从语义上已经矛盾,而set/inject、clear和preserve在bit map中的大小(为2位)和位置已经是固定的,preserve在设计的时候为0b11刚好2位取反,因此可以特化为(这个特化也让判断更加严谨):
1 2 3 (flags & set) == set => (flags & preserve) == set (flags & clear) == clear => (flags & preserve) == clear (flags & preserve) == preserve => (flags & preserve) == preserve
分析这么多,总的来说,就是想通过一个32位整数,每2位分别表示3种状态,那么一个完整的Flags(标志集合)一共可以表示16种标志(position=[0,15],可以查看API注释,[4,11]和[13,15]的位置是未需实现或者预留的,属于gap)。接着分析掩码Mask的计算过程例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 // 横向看(位移动运算符优先级高于与或,例如<<的优先级比|高) SPLITERATOR_CHARACTERISTICS_MASK: mask(init) = 0 mask(DISTINCT,SPLITERATOR[DISTINCT]=01,bitPosition=0) = 0000 0000 | 0000 0001 << 0 = 0000 0000 | 0000 0001 = 0000 0001 mask(SORTED,SPLITERATOR[SORTED]=01,bitPosition=2) = 0000 0001 | 0000 0001 << 2 = 0000 0001 | 0000 0100 = 0000 0101 mask(ORDERED,SPLITERATOR[ORDERED]=01,bitPosition=4) = 0000 0101 | 0000 0001 << 4 = 0000 0101 | 0001 0000 = 0001 0101 mask(SIZED,SPLITERATOR[SIZED]=01,bitPosition=6) = 0001 0101 | 0000 0001 << 6 = 0001 0101 | 0100 0000 = 0101 0101 mask(SHORT_CIRCUIT,SPLITERATOR[SHORT_CIRCUIT]=00,bitPosition=24) = 0101 0101 | 0000 0000 << 24 = 0101 0101 | 0000 0000 = 0101 0101 mask(final) = 0000 0000 0000 0000 0000 0000 0101 0101(二进制)、85(十进制) STREAM_MASK: mask(init) = 0 mask(DISTINCT,SPLITERATOR[DISTINCT]=01,bitPosition=0) = 0000 0000 | 0000 0001 << 0 = 0000 0000 | 0000 0001 = 0000 0001 mask(SORTED,SPLITERATOR[SORTED]=01,bitPosition=2) = 0000 0001 | 0000 0001 << 2 = 0000 0001 | 0000 0100 = 0000 0101 mask(ORDERED,SPLITERATOR[ORDERED]=01,bitPosition=4) = 0000 0101 | 0000 0001 << 4 = 0000 0101 | 0001 0000 = 0001 0101 mask(SIZED,SPLITERATOR[SIZED]=01,bitPosition=6) = 0001 0101 | 0000 0001 << 6 = 0001 0101 | 0100 0000 = 0101 0101 mask(SHORT_CIRCUIT,SPLITERATOR[SHORT_CIRCUIT]=00,bitPosition=24) = 0101 0101 | 0000 0000 << 24 = 0101 0101 | 0000 0000 = 0101 0101 mask(final) = 0000 0000 0000 0000 0000 0000 0101 0101(二进制)、85(十进制) OP_MASK: mask(init) = 0 mask(DISTINCT,SPLITERATOR[DISTINCT]=11,bitPosition=0) = 0000 0000 | 0000 0011 << 0 = 0000 0000 | 0000 0011 = 0000 0011 mask(SORTED,SPLITERATOR[SORTED]=11,bitPosition=2) = 0000 0011 | 0000 0011 << 2 = 0000 0011 | 0000 1100 = 0000 1111 mask(ORDERED,SPLITERATOR[ORDERED]=11,bitPosition=4) = 0000 1111 | 0000 0011 << 4 = 0000 1111 | 0011 0000 = 0011 1111 mask(SIZED,SPLITERATOR[SIZED]=10,bitPosition=6) = 0011 1111 | 0000 0010 << 6 = 0011 1111 | 1000 0000 = 1011 1111 mask(SHORT_CIRCUIT,SPLITERATOR[SHORT_CIRCUIT]=01,bitPosition=24) = 1011 1111 | 0000 0001 << 24 = 1011 1111 | 0100 0000 0000 0000 0000 0000 0000 = 0100 0000 0000 0000 0000 1011 1111 mask(final) = 0000 0000 1000 0000 0000 0000 1011 1111(二进制)、16777407(十进制) TERMINAL_OP_MASK: mask(init) = 0 mask(DISTINCT,SPLITERATOR[DISTINCT]=00,bitPosition=0) = 0000 0000 | 0000 0000 << 0 = 0000 0000 | 0000 0000 = 0000 0000 mask(SORTED,SPLITERATOR[SORTED]=00,bitPosition=2) = 0000 0000 | 0000 0000 << 2 = 0000 0000 | 0000 0000 = 0000 0000 mask(ORDERED,SPLITERATOR[ORDERED]=10,bitPosition=4) = 0000 0000 | 0000 0010 << 4 = 0000 0000 | 0010 0000 = 0010 0000 mask(SIZED,SPLITERATOR[SIZED]=00,bitPosition=6) = 0010 0000 | 0000 0000 << 6 = 0010 0000 | 0000 0000 = 0010 0000 mask(SHORT_CIRCUIT,SPLITERATOR[SHORT_CIRCUIT]=01,bitPosition=24) = 0010 0000 | 0000 0001 << 24 = 0010 0000 | 0001 0000 0000 0000 0000 0000 0000 = 0001 0000 0000 0000 0000 0010 0000 mask(final) = 0000 0001 0000 0000 0000 0000 0010 0000(二进制)、16777248(十进制) UPSTREAM_TERMINAL_OP_MASK: mask(init) = 0 mask(DISTINCT,SPLITERATOR[DISTINCT]=00,bitPosition=0) = 0000 0000 | 0000 0000 << 0 = 0000 0000 | 0000 0000 = 0000 0000 mask(SORTED,SPLITERATOR[SORTED]=00,bitPosition=2) = 0000 0000 | 0000 0000 << 2 = 0000 0000 | 0000 0000 = 0000 0000 mask(ORDERED,SPLITERATOR[ORDERED]=10,bitPosition=4) = 0000 0000 | 0000 0010 << 4 = 0000 0000 | 0010 0000 = 0010 0000 mask(SIZED,SPLITERATOR[SIZED]=00,bitPosition=6) = 0010 0000 | 0000 0000 << 6 = 0010 0000 | 0000 0000 = 0010 0000 mask(SHORT_CIRCUIT,SPLITERATOR[SHORT_CIRCUIT]=00,bitPosition=24) = 0010 0000 | 0000 0000 << 24 = 0010 0000 | 0000 0000 = 0010 0000 mask(final) = 0000 0000 0000 0000 0000 0000 0010 0000(二进制)、32(十进制)
相关的方法和属性如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 enum StreamOpFlag { static final int SPLITERATOR_CHARACTERISTICS_MASK = createMask(Type.SPLITERATOR); static final int STREAM_MASK = createMask(Type.STREAM); static final int OP_MASK = createMask(Type.OP); static final int TERMINAL_OP_MASK = createMask(Type.TERMINAL_OP); static final int UPSTREAM_TERMINAL_OP_MASK = createMask(Type.UPSTREAM_TERMINAL_OP); private static int createMask (Type t) { int mask = 0 ; for (StreamOpFlag flag : StreamOpFlag.values()) { mask |= flag.maskTable.get(t) << flag.bitPosition; } return mask; } private static final int FLAG_MASK = createFlagMask(); private static int createFlagMask () { int mask = 0 ; for (StreamOpFlag flag : StreamOpFlag.values()) { mask |= flag.preserve; } return mask; } private static final int FLAG_MASK_IS = STREAM_MASK; private static final int FLAG_MASK_NOT = STREAM_MASK << 1 ; static final int INITIAL_OPS_VALUE = FLAG_MASK_IS | FLAG_MASK_NOT; }
SPLITERATOR_CHARACTERISTICS_MASK等5个成员(见上面的Mask计算例子)其实就是预先计算好对应的Stream实体类型的所有StreamOpFlag标志 的bit map,也就是之前那个展示Stream的类型和标志的映射图的”横向”展示:
前面的分析已经相对详细,过程非常复杂,但是更复杂的Mask应用还在后面的方法。Mask的初始化就是提供给标志的合并(combine)和转化(从Spliterator中的characteristics转化为flags)操作的,见下面的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 enum StreamOpFlag { private static int getMask (int flags) { return (flags == 0 ) ? FLAG_MASK : ~(flags | ((FLAG_MASK_IS & flags) << 1 ) | ((FLAG_MASK_NOT & flags) >> 1 )); } static int combineOpFlags (int newStreamOrOpFlags, int prevCombOpFlags) { return (prevCombOpFlags & StreamOpFlag.getMask(newStreamOrOpFlags)) | newStreamOrOpFlags; } static int toStreamFlags (int combOpFlags) { return ((~combOpFlags) >> 1 ) & FLAG_MASK_IS & combOpFlags; } static int toCharacteristics (int streamFlags) { return streamFlags & SPLITERATOR_CHARACTERISTICS_MASK; } static int fromCharacteristics (Spliterator<?> spliterator) { int characteristics = spliterator.characteristics(); if ((characteristics & Spliterator.SORTED) != 0 && spliterator.getComparator() != null ) { return characteristics & SPLITERATOR_CHARACTERISTICS_MASK & ~Spliterator.SORTED; } else { return characteristics & SPLITERATOR_CHARACTERISTICS_MASK; } } static int fromCharacteristics (int characteristics) { return characteristics & SPLITERATOR_CHARACTERISTICS_MASK; } }
这里的位运算很复杂,只展示简单的计算结果和相关功能:
combineOpFlags():用于合并新的flags和上一个flags,因为Stream的数据结构是一个Pipeline,后继节点需要合并前驱节点的flags,例如前驱节点flags是ORDERED.set,当前新加入Pipeline的节点(后继节点)的新flags为SIZED.set,那么在后继节点中应该合并前驱节点的标志,简单想象为SIZED.set | ORDERED.set,如果是头节点,那么初始化头节点时候的flags要合并INITIAL_OPS_VALUE,这里举个例子:
1 2 3 4 5 6 7 8 9 10 11 12 int left = ORDERED.set | DISTINCT.set;int right = SIZED.clear | SORTED.clear;System.out.println("left:" + Integer.toBinaryString(left)); System.out.println("right:" + Integer.toBinaryString(right)); System.out.println("right mask:" + Integer.toBinaryString(getMask(right))); System.out.println("combine:" + Integer.toBinaryString(combineOpFlags(right, left))); left:1010001 right:10001000 right mask:11111111111111111111111100110011 combine:10011001
characteristics的转化问题:Spliterator中的characteristics可以通过简单的按位与转换为flags的原因是Spliterator中的characteristics在设计时候本身就是和StreamOpFlag匹配的,准确来说就是bit map的位分布是匹配的,所以直接与SPLITERATOR_CHARACTERISTICS_MASK做按位与即可,见下面的例子:
1 2 3 4 // 这里简单点只展示8 bit SPLITERATOR_CHARACTERISTICS_MASK: 0101 0101 Spliterator.ORDERED: 0001 0000 StreamOpFlag.ORDERED.set: 0001 0000
至此,已经分析完StreamOpFlag的完整实现,Mask相关的方法限于篇幅就不打算详细展开,下面会开始分析Stream中的”流水线”结构实现,因为习惯问题,下文的”标志”和”特性”两个词语会混用。
ReferencePipeline源码分析 既然Stream具备流的特性,那么就需要一个链式数据结构,让元素能够从Source一直往下”流动”和传递到每一个链节点,实现这种场景的常用数据结构就是双向链表(考虑需要回溯,单向链表不太合适),目前比较著名的实现有AQS和Netty中的ChannelHandlerContext。例如Netty中的流水线ChannelPipeline设计如下:
对于这个双向链表的数据结构,Stream中对应的类就是AbstractPipeline,核心实现类在ReferencePipeline和ReferencePipeline的内部类。
主要接口 先简单展示AbstractPipeline的核心父类方法定义,主要接父类是Stream、BaseStream和PipelineHelper:
Stream代表一个支持串行和并行聚合操作集合的元素序列,此顶层接口提供了流中间操作、终结操作和一些静态工厂方法的定义(由于方法太多,这里不全部列举),这个接口本质是一个建造器类型接口(对接中间操作来说),可以构成一个多中间操作,单终结操作的链,例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public interface Stream <T> extends BaseStream <T, Stream<T>> { Stream<T> filter (Predicate<? super T> predicate) ; <R> Stream<R> map (Function<? super T, ? extends R> mapper) ; void forEach (Consumer<? super T> action) ; } Stream x = buildStream();x.filter().map().forEach()
BaseStream:Stream的基础接口,定义流的迭代器、流的等效变体(并发处理变体、同步处理变体和不支持顺序处理元素变体)、并发和同步判断以及关闭相关方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public interface BaseStream <T, S extends BaseStream <T, S>> extends AutoCloseable { Iterator<T> iterator () ; Spliterator<T> spliterator () ; boolean isParallel () ; S sequential () ; S parallel () ; S unordered () ; S onClose (Runnable closeHandler) ; @Override void close () ; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 abstract class PipelineHelper <P_OUT> { abstract StreamShape getSourceShape () ; abstract int getStreamAndOpFlags () ; abstract <P_IN> long exactOutputSizeIfKnown (Spliterator<P_IN> spliterator) ; abstract <P_IN, S extends Sink <P_OUT>> S wrapAndCopyInto (S sink, Spliterator<P_IN> spliterator) ; abstract <P_IN> void copyInto (Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) ; abstract <P_IN> boolean copyIntoWithCancel (Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) ; abstract <P_IN> Sink<P_IN> wrapSink (Sink<P_OUT> sink) ; abstract <P_IN> Spliterator<P_OUT> wrapSpliterator (Spliterator<P_IN> spliterator) ; abstract Node.Builder<P_OUT> makeNodeBuilder (long exactSizeIfKnown, IntFunction<P_OUT[]> generator) ; abstract <P_IN> Node<P_OUT> evaluate (Spliterator<P_IN> spliterator, boolean flatten, IntFunction<P_OUT[]> generator) ;}
注意一点(重复3次):
这里把同步流称为同步处理|执行的流,”并行流”称为并发处理|执行的流,因为并行流有歧义,实际上只是并发执行,不是并行执行
这里把同步流称为同步处理|执行的流,”并行流”称为并发处理|执行的流,因为并行流有歧义,实际上只是并发执行,不是并行执行
这里把同步流称为同步处理|执行的流,”并行流”称为并发处理|执行的流,因为并行流有歧义,实际上只是并发执行,不是并行执行
Sink和引用类型链 PipelineHelper的几个方法中存在Sink这个接口,上一节没有分析,这一小节会详细展开。Stream在构建的时候虽然是一个双向链表的结构,但是在最终应用终结操作的时候,会把所有操作转化为引用类型链(ChainedReference),记得之前也提到过这种类似于多层包装器的编程模式,简化一下模型如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class WrapperApp { interface Wrapper { void doAction () ; } public static void main (String[] args) { AtomicInteger counter = new AtomicInteger (0 ); Wrapper first = () -> System.out.printf("wrapper [depth => %d] invoke\n" , counter.incrementAndGet()); Wrapper second = () -> { first.doAction(); System.out.printf("wrapper [depth => %d] invoke\n" , counter.incrementAndGet()); }; second.doAction(); } } wrapper [depth => 1 ] invoke wrapper [depth => 2 ] invoke
上面的例子有点突兀,两个不同Sink的实现可以做到无感知融合,举另一个例子如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 public interface Sink <T> extends Consumer <T> { default void begin (long size) { } default void end () { } abstract class ChainedReference <T, OUT> implements Sink <T> { protected final Sink<OUT> downstream; public ChainedReference (Sink<OUT> downstream) { this .downstream = downstream; } } } @SuppressWarnings({"unchecked", "rawtypes"}) public class ReferenceChain <OUT, R> { private final List<Supplier<Sink<?>>> sinkBuilders = new ArrayList <>(); private final AtomicReference<Sink> sinkReference = new AtomicReference <>(); public ReferenceChain<OUT, R> filter (Predicate<OUT> predicate) { sinkBuilders.add(() -> { Sink<OUT> prevSink = (Sink<OUT>) sinkReference.get(); Sink.ChainedReference<OUT, OUT> currentSink = new Sink .ChainedReference<>(prevSink) { @Override public void accept (OUT out) { if (predicate.test(out)) { downstream.accept(out); } } }; sinkReference.set(currentSink); return currentSink; }); return this ; } public ReferenceChain<OUT, R> map (Function<OUT, R> function) { sinkBuilders.add(() -> { Sink<R> prevSink = (Sink<R>) sinkReference.get(); Sink.ChainedReference<OUT, R> currentSink = new Sink .ChainedReference<>(prevSink) { @Override public void accept (OUT in) { downstream.accept(function.apply(in)); } }; sinkReference.set(currentSink); return currentSink; }); return this ; } public void forEachPrint (Collection<OUT> collection) { forEachPrint(collection, false ); } public void forEachPrint (Collection<OUT> collection, boolean reverse) { Spliterator<OUT> spliterator = collection.spliterator(); Sink<OUT> sink = System.out::println; sinkReference.set(sink); Sink<OUT> stage = sink; if (reverse) { for (int i = 0 ; i <= sinkBuilders.size() - 1 ; i++) { Supplier<Sink<?>> supplier = sinkBuilders.get(i); stage = (Sink<OUT>) supplier.get(); } } else { for (int i = sinkBuilders.size() - 1 ; i >= 0 ; i--) { Supplier<Sink<?>> supplier = sinkBuilders.get(i); stage = (Sink<OUT>) supplier.get(); } } Sink<OUT> finalStage = stage; spliterator.forEachRemaining(finalStage); } public static void main (String[] args) { List<Integer> list = new ArrayList <>(); list.add(1 ); list.add(2 ); list.add(3 ); list.add(12 ); ReferenceChain<Integer, Integer> chain = new ReferenceChain <>(); chain.filter(item -> item > 10 ) .map(item -> item * 2 ) .forEachPrint(list); } } 24
执行的流程如下:
多层包装器的编程模式的核心要领就是:
绝大部分操作可以转换为java.util.function.Consumer的实现,也就是实现accept(T t)方法完成对传入的元素进行处理
先处理的Sink总是以后处理的Sink为入参,在自身处理方法中判断和回调传入的Sink的处理方法回调,也就是构建引用链的时候,需要从后往前构建,这种方式的实现逻辑可以参考AbstractPipeline#wrapSink(),例如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 Sink mapSink = new Sink (inputSink){ private Function mapper; public void accept (E ele) { inputSink.accept(mapper.apply(ele)) } } Sink filterSink = new Sink (mapSink){ private Predicate predicate; public void accept (E ele) { if (predicate.test(ele)){ mapSink.accept(ele); } } }
由上一点得知,一般来说,最后的终结操作会应用在引用链的第一个Sink上
上面的代码并非笔者虚构出来,可见java.util.stream.Sink的源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 interface Sink <T> extends Consumer <T> { default void begin (long size) {} default void end () {} default boolean cancellationRequested () { return false ; } default void accept (int value) { throw new IllegalStateException ("called wrong accept method" ); } default void accept (long value) { throw new IllegalStateException ("called wrong accept method" ); } default void accept (double value) { throw new IllegalStateException ("called wrong accept method" ); } abstract static class ChainedReference <T, E_OUT> implements Sink <T> { protected final Sink<? super E_OUT> downstream; public ChainedReference (Sink<? super E_OUT> downstream) { this .downstream = Objects.requireNonNull(downstream); } @Override public void begin (long size) { downstream.begin(size); } @Override public void end () { downstream.end(); } @Override public boolean cancellationRequested () { return downstream.cancellationRequested(); } } }
如果用过RxJava或者Project-Reactor,Sink更像是Subscriber,多个Subscriber组成了ChainedReference(Sink Chain,可以理解为一个复合的Subscriber),而Terminal Op则类似于Publisher,只有在Subscriber订阅Publisher的时候才会进行数据的处理,这里是应用了Reactive编程模式。
AbstractPipeline和ReferencePipeline的实现 AbstractPipeline和ReferencePipeline都是抽象类,AbstractPipeline用于构建Pipeline的数据结构,提供一些Shape相关的抽象方法给ReferencePipeline实现,而ReferencePipeline就是Stream中Pipeline的基础类型,从源码上看,Stream链式(管道式)结构的头节点和操作节点都是ReferencePipeline的子类。先看AbstractPipeline的成员变量和构造函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 abstract class AbstractPipeline <E_IN, E_OUT, S extends BaseStream <E_OUT, S>> extends PipelineHelper <E_OUT> implements BaseStream <E_OUT, S> { @SuppressWarnings("rawtypes") private final AbstractPipeline sourceStage; @SuppressWarnings("rawtypes") private final AbstractPipeline previousStage; protected final int sourceOrOpFlags; @SuppressWarnings("rawtypes") private AbstractPipeline nextStage; private int depth; private int combinedFlags; private Spliterator<?> sourceSpliterator; private Supplier<? extends Spliterator <?>> sourceSupplier; private boolean linkedOrConsumed; private boolean sourceAnyStateful; private Runnable sourceCloseAction; private boolean parallel; AbstractPipeline(Supplier<? extends Spliterator <?>> source, int sourceFlags, boolean parallel) { this .previousStage = null ; this .sourceSupplier = source; this .sourceStage = this ; this .sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK; this .combinedFlags = (~(sourceOrOpFlags << 1 )) & StreamOpFlag.INITIAL_OPS_VALUE; this .depth = 0 ; this .parallel = parallel; } AbstractPipeline(Spliterator<?> source, int sourceFlags, boolean parallel) { this .previousStage = null ; this .sourceSpliterator = source; this .sourceStage = this ; this .sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK; this .combinedFlags = (~(sourceOrOpFlags << 1 )) & StreamOpFlag.INITIAL_OPS_VALUE; this .depth = 0 ; this .parallel = parallel; } AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) { if (previousStage.linkedOrConsumed) throw new IllegalStateException (MSG_STREAM_LINKED); previousStage.linkedOrConsumed = true ; previousStage.nextStage = this ; this .previousStage = previousStage; this .sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK; this .combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags); this .sourceStage = previousStage.sourceStage; if (opIsStateful()) sourceStage.sourceAnyStateful = true ; this .depth = previousStage.depth + 1 ; } }
至此,可以看出流管道的数据结构:
Terminal Op不参与管道链式结构的构建。接着看AbstractPipeline中的终结求值方法(Terminal evaluation methods):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 abstract class AbstractPipeline <E_IN, E_OUT, S extends BaseStream <E_OUT, S>> extends PipelineHelper <E_OUT> implements BaseStream <E_OUT, S> { final <R> R evaluate (TerminalOp<E_OUT, R> terminalOp) { assert getOutputShape () == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException (MSG_STREAM_LINKED); linkedOrConsumed = true ; return isParallel() ? terminalOp.evaluateParallel(this , sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this , sourceSpliterator(terminalOp.getOpFlags())); } @SuppressWarnings("unchecked") final Node<E_OUT> evaluateToArrayNode (IntFunction<E_OUT[]> generator) { if (linkedOrConsumed) throw new IllegalStateException (MSG_STREAM_LINKED); linkedOrConsumed = true ; if (isParallel() && previousStage != null && opIsStateful()) { depth = 0 ; return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0 ), generator); } else { return evaluate(sourceSpliterator(0 ), true , generator); } } final Spliterator<E_OUT> sourceStageSpliterator () { if (this != sourceStage) throw new IllegalStateException (); if (linkedOrConsumed) throw new IllegalStateException (MSG_STREAM_LINKED); linkedOrConsumed = true ; if (sourceStage.sourceSpliterator != null ) { @SuppressWarnings("unchecked") Spliterator<E_OUT> s = sourceStage.sourceSpliterator; sourceStage.sourceSpliterator = null ; return s; } else if (sourceStage.sourceSupplier != null ) { @SuppressWarnings("unchecked") Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSupplier.get(); sourceStage.sourceSupplier = null ; return s; } else { throw new IllegalStateException (MSG_CONSUMED); } } }
AbstractPipeline中实现了BaseStream的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 abstract class AbstractPipeline <E_IN, E_OUT, S extends BaseStream <E_OUT, S>> extends PipelineHelper <E_OUT> implements BaseStream <E_OUT, S> { @Override @SuppressWarnings("unchecked") public final S sequential () { sourceStage.parallel = false ; return (S) this ; } @Override @SuppressWarnings("unchecked") public final S parallel () { sourceStage.parallel = true ; return (S) this ; } @Override public void close () { linkedOrConsumed = true ; sourceSupplier = null ; sourceSpliterator = null ; if (sourceStage.sourceCloseAction != null ) { Runnable closeAction = sourceStage.sourceCloseAction; sourceStage.sourceCloseAction = null ; closeAction.run(); } } @Override @SuppressWarnings("unchecked") public S onClose (Runnable closeHandler) { if (linkedOrConsumed) throw new IllegalStateException (MSG_STREAM_LINKED); Objects.requireNonNull(closeHandler); Runnable existingHandler = sourceStage.sourceCloseAction; sourceStage.sourceCloseAction = (existingHandler == null ) ? closeHandler : Streams.composeWithExceptions(existingHandler, closeHandler); return (S) this ; } @Override @SuppressWarnings("unchecked") public Spliterator<E_OUT> spliterator () { if (linkedOrConsumed) throw new IllegalStateException (MSG_STREAM_LINKED); linkedOrConsumed = true ; if (this == sourceStage) { if (sourceStage.sourceSpliterator != null ) { @SuppressWarnings("unchecked") Spliterator<E_OUT> s = (Spliterator<E_OUT>) sourceStage.sourceSpliterator; sourceStage.sourceSpliterator = null ; return s; } else if (sourceStage.sourceSupplier != null ) { @SuppressWarnings("unchecked") Supplier<Spliterator<E_OUT>> s = (Supplier<Spliterator<E_OUT>>) sourceStage.sourceSupplier; sourceStage.sourceSupplier = null ; return lazySpliterator(s); } else { throw new IllegalStateException (MSG_CONSUMED); } } else { return wrap(this , () -> sourceSpliterator(0 ), isParallel()); } } @Override public final boolean isParallel () { return sourceStage.parallel; } final int getStreamFlags () { return StreamOpFlag.toStreamFlags(combinedFlags); } @SuppressWarnings("unchecked") private Spliterator<?> sourceSpliterator(int terminalFlags) { Spliterator<?> spliterator = null ; if (sourceStage.sourceSpliterator != null ) { spliterator = sourceStage.sourceSpliterator; sourceStage.sourceSpliterator = null ; } else if (sourceStage.sourceSupplier != null ) { spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get(); sourceStage.sourceSupplier = null ; } else { throw new IllegalStateException (MSG_CONSUMED); } if (isParallel() && sourceStage.sourceAnyStateful) { int depth = 1 ; for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this ; u != e; u = p, p = p.nextStage) { int thisOpFlags = p.sourceOrOpFlags; if (p.opIsStateful()) { depth = 0 ; if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) { thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT; } spliterator = p.opEvaluateParallelLazy(u, spliterator); thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED) ? (thisOpFlags & ~StreamOpFlag.NOT_SIZED) | StreamOpFlag.IS_SIZED : (thisOpFlags & ~StreamOpFlag.IS_SIZED) | StreamOpFlag.NOT_SIZED; } p.depth = depth++; p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags); } } if (terminalFlags != 0 ) { combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags); } return spliterator; } }
AbstractPipeline中实现了PipelineHelper的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 abstract class AbstractPipeline <E_IN, E_OUT, S extends BaseStream <E_OUT, S>> extends PipelineHelper <E_OUT> implements BaseStream <E_OUT, S> { @Override final StreamShape getSourceShape () { @SuppressWarnings("rawtypes") AbstractPipeline p = AbstractPipeline.this ; while (p.depth > 0 ) { p = p.previousStage; } return p.getOutputShape(); } @Override final <P_IN> long exactOutputSizeIfKnown (Spliterator<P_IN> spliterator) { return StreamOpFlag.SIZED.isKnown(getStreamAndOpFlags()) ? spliterator.getExactSizeIfKnown() : -1 ; } @Override final <P_IN, S extends Sink <E_OUT>> S wrapAndCopyInto (S sink, Spliterator<P_IN> spliterator) { copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator); return sink; } @Override final <P_IN> void copyInto (Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { Objects.requireNonNull(wrappedSink); if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); } else { copyIntoWithCancel(wrappedSink, spliterator); } } @Override @SuppressWarnings("unchecked") final <P_IN> boolean copyIntoWithCancel (Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { @SuppressWarnings({"rawtypes","unchecked"}) AbstractPipeline p = AbstractPipeline.this ; while (p.depth > 0 ) { p = p.previousStage; } wrappedSink.begin(spliterator.getExactSizeIfKnown()); boolean cancelled = p.forEachWithCancel(spliterator, wrappedSink); wrappedSink.end(); return cancelled; } @Override final int getStreamAndOpFlags () { return combinedFlags; } final boolean isOrdered () { return StreamOpFlag.ORDERED.isKnown(combinedFlags); } @Override @SuppressWarnings("unchecked") final <P_IN> Sink<P_IN> wrapSink (Sink<E_OUT> sink) { Objects.requireNonNull(sink); for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this ; p.depth > 0 ; p=p.previousStage) { sink = p.opWrapSink(p.previousStage.combinedFlags, sink); } return (Sink<P_IN>) sink; } @Override @SuppressWarnings("unchecked") final <P_IN> Spliterator<E_OUT> wrapSpliterator (Spliterator<P_IN> sourceSpliterator) { if (depth == 0 ) { return (Spliterator<E_OUT>) sourceSpliterator; } else { return wrap(this , () -> sourceSpliterator, isParallel()); } } @Override @SuppressWarnings("unchecked") final <P_IN> Node<E_OUT> evaluate (Spliterator<P_IN> spliterator, boolean flatten, IntFunction<E_OUT[]> generator) { if (isParallel()) { return evaluateToNode(this , spliterator, flatten, generator); } else { Node.Builder<E_OUT> nb = makeNodeBuilder( exactOutputSizeIfKnown(spliterator), generator); return wrapAndCopyInto(nb, spliterator).build(); } } }
AbstractPipeline中剩余的待如XXYYZZPipeline等子类实现的抽象方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 abstract class AbstractPipeline <E_IN, E_OUT, S extends BaseStream <E_OUT, S>> extends PipelineHelper <E_OUT> implements BaseStream <E_OUT, S> { abstract StreamShape getOutputShape () ; abstract <P_IN> Node<E_OUT> evaluateToNode (PipelineHelper<E_OUT> helper, Spliterator<P_IN> spliterator, boolean flattenTree, IntFunction<E_OUT[]> generator) ; abstract <P_IN> Spliterator<E_OUT> wrap (PipelineHelper<E_OUT> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel) ; abstract <P_IN> Spliterator<E_OUT> wrap (PipelineHelper<E_OUT> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel) ; abstract boolean forEachWithCancel (Spliterator<E_OUT> spliterator, Sink<E_OUT> sink) ; abstract Node.Builder<E_OUT> makeNodeBuilder (long exactSizeIfKnown, IntFunction<E_OUT[]> generator) ; abstract boolean opIsStateful () ; abstract Sink<E_IN> opWrapSink (int flags, Sink<E_OUT> sink) ; <P_IN> Node<E_OUT> opEvaluateParallel (PipelineHelper<E_OUT> helper, Spliterator<P_IN> spliterator, IntFunction<E_OUT[]> generator) { throw new UnsupportedOperationException ("Parallel evaluation is not supported" ); } @SuppressWarnings("unchecked") <P_IN> Spliterator<E_OUT> opEvaluateParallelLazy (PipelineHelper<E_OUT> helper, Spliterator<P_IN> spliterator) { return opEvaluateParallel(helper, spliterator, i -> (E_OUT[]) new Object [i]).spliterator(); } }
这里提到的抽象方法opWrapSink()其实就是元素引用链的添加链节点的方法,它的实现逻辑见子类,这里只考虑非特化子类ReferencePipeline的部分源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 abstract class ReferencePipeline <P_IN, P_OUT> extends AbstractPipeline <P_IN, P_OUT, Stream<P_OUT>> implements Stream <P_OUT> { ReferencePipeline(Supplier<? extends Spliterator <?>> source, int sourceFlags, boolean parallel) { super (source, sourceFlags, parallel); } ReferencePipeline(Spliterator<?> source, int sourceFlags, boolean parallel) { super (source, sourceFlags, parallel); } ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) { super (upstream, opFlags); } @Override final StreamShape getOutputShape () { return StreamShape.REFERENCE; } @Override final <P_IN> Node<P_OUT> evaluateToNode (PipelineHelper<P_OUT> helper, Spliterator<P_IN> spliterator, boolean flattenTree, IntFunction<P_OUT[]> generator) { return Nodes.collect(helper, spliterator, flattenTree, generator); } @Override final <P_IN> Spliterator<P_OUT> wrap (PipelineHelper<P_OUT> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel) { return new StreamSpliterators .WrappingSpliterator<>(ph, supplier, isParallel); } @Override final Spliterator<P_OUT> lazySpliterator (Supplier<? extends Spliterator<P_OUT>> supplier) { return new StreamSpliterators .DelegatingSpliterator<>(supplier); } @Override final boolean forEachWithCancel (Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) { boolean cancelled; do { } while (!(cancelled = sink.cancellationRequested()) && spliterator.tryAdvance(sink)); return cancelled; } @Override final Node.Builder<P_OUT> makeNodeBuilder (long exactSizeIfKnown, IntFunction<P_OUT[]> generator) { return Nodes.builder(exactSizeIfKnown, generator); } @Override public final Iterator<P_OUT> iterator () { return Spliterators.iterator(spliterator()); } static class Head <E_IN, E_OUT> extends ReferencePipeline <E_IN, E_OUT> { Head(Supplier<? extends Spliterator <?>> source, int sourceFlags, boolean parallel) { super (source, sourceFlags, parallel); } Head(Spliterator<?> source, int sourceFlags, boolean parallel) { super (source, sourceFlags, parallel); } @Override final boolean opIsStateful () { throw new UnsupportedOperationException (); } @Override final Sink<E_IN> opWrapSink (int flags, Sink<E_OUT> sink) { throw new UnsupportedOperationException (); } @Override public void forEach (Consumer<? super E_OUT> action) { if (!isParallel()) { sourceStageSpliterator().forEachRemaining(action); } else { super .forEach(action); } } @Override public void forEachOrdered (Consumer<? super E_OUT> action) { if (!isParallel()) { sourceStageSpliterator().forEachRemaining(action); } else { super .forEachOrdered(action); } } } abstract static class StatelessOp <E_IN, E_OUT> extends ReferencePipeline <E_IN, E_OUT> { StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, StreamShape inputShape, int opFlags) { super (upstream, opFlags); assert upstream.getOutputShape() == inputShape; } @Override final boolean opIsStateful () { return false ; } } abstract static class StatefulOp <E_IN, E_OUT> extends ReferencePipeline <E_IN, E_OUT> { StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, StreamShape inputShape, int opFlags) { super (upstream, opFlags); assert upstream.getOutputShape() == inputShape; } @Override final boolean opIsStateful () { return true ; } @Override abstract <P_IN> Node<E_OUT> opEvaluateParallel (PipelineHelper<E_OUT> helper, Spliterator<P_IN> spliterator, IntFunction<E_OUT[]> generator) ; } }
这里重重重点分析一下ReferencePipeline中的wrapSink方法实现:
1 2 3 4 5 6 7 8 final <P_IN> Sink<P_IN> wrapSink (Sink<E_OUT> sink) { Objects.requireNonNull(sink); for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this ; p.depth > 0 ; p=p.previousStage) { sink = p.opWrapSink(p.previousStage.combinedFlags, sink); } return (Sink<P_IN>) sink; }
入参是一个Sink实例,返回值也是一个Sink实例,里面的for循环是基于当前的AbstractPipeline节点向前遍历,直到depth为0的节点跳出循环,而depth为0意味着该节点必定为头节点,也就是该循环是遍历当前节点到头节点的后继节点,Sink是”向前包装的”,也就是处于链后面的节点Sink总是会作为其前驱节点的opWrapSink()方法的入参,在同步执行流求值计算的时候,前驱节点的Sink处理完元素后就会通过downstream引用(其实就是后驱节点的Sink)调用其accept()把元素或者处理完的元素结果传递进去,激活下一个Sink,以此类推。另外,ReferencePipeline的三个内部类Head、StatelessOp和StatefulOp就是流的节点类,其中只有Head是非抽象类,代表流管道结构(或者说双向链表结构)的头节点,StatelessOp(无状态操作)和StatefulOp(有状态操作)的子类构成了流管道结构的操作节点或者是终结操作。在忽略是否有状态操作的前提下看ReferencePipeline,它只是流数据结构的承载体,表面上看到的双向链表结构在流的求值计算过程中并不会进行直接遍历每个节点进行求值,而是先转化成一个多层包装的Sink,也就是前文笔者提到的元素引用链后者前一句分析的Sink元素处理以及传递,正确来说应该是一个Sink栈或者Sink包装器,它的实现可以类比为现实生活中的洋葱,或者编程模式中的AOP编程模式。形象一点的描述如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 Head(Spliterator) -> Op(filter) -> Op(map) -> Op(sorted) -> Terminal Op(forEach) ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓ forEach ele in Spliterator: Sink[filter](ele){ if filter process == true: Sink[map](ele){ ele = mapper(ele) Sink[sorted](ele){ var array begin: accept(ele): add ele to array end: sort ele in array } } }
终结操作forEach是目前分析源码中最简单的实现,下面会详细分析每种终结操作的实现细节。
流中间操作的源码实现 限于篇幅,这里只能挑选一部分的中间Op进行分析。流的中间操作基本都是由BaseStream接口定义,在ReferencePipeline中进行实现,这里挑选比较常用的filter、map和sorted进行分析。先看filter:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 abstract class ReferencePipeline <P_IN, P_OUT> extends AbstractPipeline <P_IN, P_OUT, Stream<P_OUT>> implements Stream <P_OUT> { @Override public final Stream<P_OUT> filter (Predicate<? super P_OUT> predicate) { Objects.requireNonNull(predicate); return new StatelessOp <P_OUT, P_OUT>(this , StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) { @Override Sink<P_OUT> opWrapSink (int flags, Sink<P_OUT> sink) { return new Sink .ChainedReference<P_OUT, P_OUT>(sink) { @Override public void begin (long size) { downstream.begin(-1 ); } @Override public void accept (P_OUT u) { if (predicate.test(u)) downstream.accept(u); } }; } }; } }
接着是map:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 abstract class ReferencePipeline <P_IN, P_OUT> extends AbstractPipeline <P_IN, P_OUT, Stream<P_OUT>> implements Stream <P_OUT> { @Override @SuppressWarnings("unchecked") public final <R> Stream<R> map (Function<? super P_OUT, ? extends R> mapper) { Objects.requireNonNull(mapper); return new StatelessOp <P_OUT, R>(this , StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { @Override Sink<P_OUT> opWrapSink (int flags, Sink<R> sink) { return new Sink .ChainedReference<P_OUT, R>(sink) { @Override public void accept (P_OUT u) { downstream.accept(mapper.apply(u)); } }; } }; } }
然后是sorted,sorted操作会相对复杂一点:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 abstract class ReferencePipeline <P_IN, P_OUT> extends AbstractPipeline <P_IN, P_OUT, Stream<P_OUT>> implements Stream <P_OUT> { @Override public final Stream<P_OUT> sorted (Comparator<? super P_OUT> comparator) { return SortedOps.makeRef(this , comparator); } } final class SortedOps { static <T> Stream<T> makeRef (AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) { return new OfRef <>(upstream, comparator); } private static final class OfRef <T> extends ReferencePipeline .StatefulOp<T, T> { private final boolean isNaturalSort; private final Comparator<? super T> comparator; OfRef(AbstractPipeline<?, T, ?> upstream) { super (upstream, StreamShape.REFERENCE, StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED); this .isNaturalSort = true ; @SuppressWarnings("unchecked") Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder(); this .comparator = comp; } OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) { super (upstream, StreamShape.REFERENCE, StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED); this .isNaturalSort = false ; this .comparator = Objects.requireNonNull(comparator); } @Override public Sink<T> opWrapSink (int flags, Sink<T> sink) { Objects.requireNonNull(sink); if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort) return sink; else if (StreamOpFlag.SIZED.isKnown(flags)) return new SizedRefSortingSink <>(sink, comparator); else return new RefSortingSink <>(sink, comparator); } @Override public <P_IN> Node<T> opEvaluateParallel (PipelineHelper<T> helper, Spliterator<P_IN> spliterator, IntFunction<T[]> generator) { if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) { return helper.evaluate(spliterator, false , generator); } else { T[] flattenedData = helper.evaluate(spliterator, true , generator).asArray(generator); Arrays.parallelSort(flattenedData, comparator); return Nodes.node(flattenedData); } } } private static final class RefSortingSink <T> extends AbstractRefSortingSink <T> { private ArrayList<T> list; RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) { super (sink, comparator); } @Override public void begin (long size) { if (size >= Nodes.MAX_ARRAY_SIZE) throw new IllegalArgumentException (Nodes.BAD_SIZE); list = (size >= 0 ) ? new ArrayList <>((int ) size) : new ArrayList <>(); } @Override public void end () { list.sort(comparator); downstream.begin(list.size()); if (!cancellationRequestedCalled) { list.forEach(downstream::accept); } else { for (T t : list) { if (downstream.cancellationRequested()) break ; downstream.accept(t); } } downstream.end(); list = null ; } @Override public void accept (T t) { list.add(t); } } }
sorted操作有个比较显著的特点,一般的Sink处理完自身的逻辑,会在accept()方法激活下一个Sink引用,但是它在accept()方法中只做元素的累积(元素富集 ),在end()方法进行最终的排序操作和模仿Spliterator的两个元素遍历方法向downstream推送待处理的元素。示意图如下:
其他中间操作的实现逻辑是大致相同的。
同步执行流终结操作的源码实现 限于篇幅,这里只能挑选一部分的Terminal Op进行分析,简单起见只分析同步执行的场景 ,这里挑选最典型和最复杂的froEach()和collect(),还有比较独特的toArray()方法。先看froEach()方法的实现过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 abstract class ReferencePipeline <P_IN, P_OUT> extends AbstractPipeline <P_IN, P_OUT, Stream<P_OUT>> implements Stream <P_OUT> { @Override public void forEach (Consumer<? super P_OUT> action) { evaluate(ForEachOps.makeRef(action, false )); } final <R> R evaluate (TerminalOp<E_OUT, R> terminalOp) { assert getOutputShape () == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException (MSG_STREAM_LINKED); linkedOrConsumed = true ; return isParallel() ? terminalOp.evaluateParallel(this , sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this , sourceSpliterator(terminalOp.getOpFlags())); } } final class ForEachOps { public static <T> TerminalOp<T, Void> makeRef (Consumer<? super T> action, boolean ordered) { Objects.requireNonNull(action); return new ForEachOp .OfRef<>(action, ordered); } abstract static class ForEachOp <T> implements TerminalOp <T, Void>, TerminalSink<T, Void> { private final boolean ordered; protected ForEachOp (boolean ordered) { this .ordered = ordered; } @Override public int getOpFlags () { return ordered ? 0 : StreamOpFlag.NOT_ORDERED; } @Override public <S> Void evaluateSequential (PipelineHelper<T> helper, Spliterator<S> spliterator) { return helper.wrapAndCopyInto(this , spliterator).get(); } @Override public <S> Void evaluateParallel (PipelineHelper<T> helper, Spliterator<S> spliterator) { if (ordered) new ForEachOrderedTask <>(helper, spliterator, this ).invoke(); else new ForEachTask <>(helper, spliterator, helper.wrapSink(this )).invoke(); return null ; } @Override public Void get () { return null ; } static final class OfRef <T> extends ForEachOp <T> { final Consumer<? super T> consumer; OfRef(Consumer<? super T> consumer, boolean ordered) { super (ordered); this .consumer = consumer; } @Override public void accept (T t) { consumer.accept(t); } } } }
forEach终结操作实现上,自身这个操作并不会构成流的链式结构的一部分,也就是它不是一个AbstractPipeline的子类实例,而是构建一个回调Consumer实例操作的一个Sink实例(准确来说是TerminalSink)实例,这里暂且叫forEach terminal sink,通过流最后一个操作节点的wrapSink()方法,把forEach terminal sink添加到Sink链的尾部,通过流最后一个操作节点的copyInto()方法进行元素遍历,按照copyInto()方法的套路,只要多层包装的Sink方法在回调其实现方法的时候总是激活downstream的前提下,执行的顺序就是流链式结构定义的操作节点顺序,而forEach最后添加的Consumer实例一定就是最后回调的。
接着分析collect()方法的实现,先看Collector接口的定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 public interface Collector <T, A, R> { Supplier<A> supplier () ; BiConsumer<A, T> accumulator () ; BinaryOperator<A> combiner () ; Function<A, R> finisher () ; Set<Characteristics> characteristics () ; enum Characteristics { CONCURRENT, UNORDERED, IDENTITY_FINISH } } public final class Collectors { static final Set<Collector.Characteristics> CH_CONCURRENT_ID = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.CONCURRENT, Collector.Characteristics.UNORDERED, Collector.Characteristics.IDENTITY_FINISH)); static final Set<Collector.Characteristics> CH_CONCURRENT_NOID = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.CONCURRENT, Collector.Characteristics.UNORDERED)); static final Set<Collector.Characteristics> CH_ID = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.IDENTITY_FINISH)); static final Set<Collector.Characteristics> CH_UNORDERED_ID = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.UNORDERED, Collector.Characteristics.IDENTITY_FINISH)); static final Set<Collector.Characteristics> CH_NOID = Collections.emptySet(); static final Set<Collector.Characteristics> CH_UNORDERED_NOID = Collections.unmodifiableSet(EnumSet.of(Collector.Characteristics.UNORDERED)); private Collectors () { } static class CollectorImpl <T, A, R> implements Collector <T, A, R> { private final Supplier<A> supplier; private final BiConsumer<A, T> accumulator; private final BinaryOperator<A> combiner; private final Function<A, R> finisher; private final Set<Characteristics> characteristics; CollectorImpl(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner, Function<A,R> finisher, Set<Characteristics> characteristics) { this .supplier = supplier; this .accumulator = accumulator; this .combiner = combiner; this .finisher = finisher; this .characteristics = characteristics; } CollectorImpl(Supplier<A> supplier, BiConsumer<A, T> accumulator, BinaryOperator<A> combiner, Set<Characteristics> characteristics) { this (supplier, accumulator, combiner, castingIdentity(), characteristics); } @Override public BiConsumer<A, T> accumulator () { return accumulator; } @Override public Supplier<A> supplier () { return supplier; } @Override public BinaryOperator<A> combiner () { return combiner; } @Override public Function<A, R> finisher () { return finisher; } @Override public Set<Characteristics> characteristics () { return characteristics; } } private static <I, R> Function<I, R> castingIdentity () { return i -> (R) i; } }
collect()方法的求值执行入口在ReferencePipeline中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 abstract class ReferencePipeline <P_IN, P_OUT> extends AbstractPipeline <P_IN, P_OUT, Stream<P_OUT>> implements Stream <P_OUT> { public final <R, A> R collect (Collector<? super P_OUT, A, R> collector) { A container; if (isParallel() && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT)) && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) { container = collector.supplier().get(); BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator(); forEach(u -> accumulator.accept(container, u)); } else { container = evaluate(ReduceOps.makeRef(collector)); } return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH) ? (R) container : collector.finisher().apply(container); } } final class ReduceOps { private ReduceOps () { } public static <T, I> TerminalOp<T, I> makeRef (Collector<? super T, I, ?> collector) { Supplier<I> supplier = Objects.requireNonNull(collector).supplier(); BiConsumer<I, ? super T> accumulator = collector.accumulator(); BinaryOperator<I> combiner = collector.combiner(); class ReducingSink extends Box <I> implements AccumulatingSink <T, I, ReducingSink> { @Override public void begin (long size) { state = supplier.get(); } @Override public void accept (T t) { accumulator.accept(state, t); } @Override public void combine (ReducingSink other) { state = combiner.apply(state, other.state); } } return new ReduceOp <T, I, ReducingSink>(StreamShape.REFERENCE) { @Override public ReducingSink makeSink () { return new ReducingSink (); } @Override public int getOpFlags () { return collector.characteristics().contains(Collector.Characteristics.UNORDERED) ? StreamOpFlag.NOT_ORDERED : 0 ; } }; } private interface AccumulatingSink <T, R, K extends AccumulatingSink <T, R, K>> extends TerminalSink <T, R> { void combine (K other) ; } private abstract static class Box <U> { U state; Box() {} public U get () { return state; } } private abstract static class ReduceOp <T, R, S extends AccumulatingSink <T, R, S>> implements TerminalOp <T, R> { private final StreamShape inputShape; ReduceOp(StreamShape shape) { inputShape = shape; } public abstract S makeSink () ; @Override public StreamShape inputShape () { return inputShape; } @Override public <P_IN> R evaluateSequential (PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { return helper.wrapAndCopyInto(makeSink(), spliterator).get(); } @Override public <P_IN> R evaluateParallel (PipelineHelper<T> helper, Spliterator<P_IN> spliterator) { return new ReduceTask <>(this , helper, spliterator).invoke().get(); } } }
接着就看Collector的静态工厂方法,看一些常用的Collector实例是如何构建的,例如看Collectors.toList():
1 2 3 4 5 6 7 8 9 10 public static <T>Collector<T, ?, List<T>> toList() { return new CollectorImpl <>((Supplier<List<T>>) ArrayList::new , List::add, (left, right) -> { left.addAll(right); return left; }, CH_ID); }
把过程画成流程图如下:
甚至可以更通俗地用伪代码表示Collector这类Terminal Op的执行过程(还是以Collectors.toList()为例):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 [begin] Supplier supplier = () -> new ArrayList <T>();Container container = supplier.get();Box.state = container; [accept] Box.state.add(element); [end] return supplier.get(); (=> return Box.state);↓↓↓↓↓↓↓↓↓甚至更加通俗的过程如下↓↓↓↓↓↓↓↓↓↓↓↓↓↓ ArrayList<T> container = new ArrayList <T>(); loop: container.add(element) return container;
也就是虽然工程化的代码看起来很复杂,最终的实现就是简单的:初始化ArrayList实例由state属性持有,遍历处理元素的时候把元素添加到state中,最终返回state。最后看toArray()的方法实现(下面的方法代码没有按照实际的位置贴出,笔者把零散的代码块放在一起方便分析):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 abstract class ReferencePipeline <P_IN, P_OUT> extends AbstractPipeline <P_IN, P_OUT, Stream<P_OUT>> implements Stream <P_OUT> { @Override @SuppressWarnings("unchecked") public final <A> A[] toArray(IntFunction<A[]> generator) { @SuppressWarnings("rawtypes") IntFunction rawGenerator = (IntFunction) generator; return (A[]) Nodes.flatten(evaluateToArrayNode(rawGenerator), rawGenerator) .asArray(rawGenerator); } @Override public final Object[] toArray() { return toArray(Object[]::new ); } final Node<E_OUT> evaluateToArrayNode (IntFunction<E_OUT[]> generator) { if (linkedOrConsumed) throw new IllegalStateException (MSG_STREAM_LINKED); linkedOrConsumed = true ; if (isParallel() && previousStage != null && opIsStateful()) { depth = 0 ; return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0 ), generator); } else { return evaluate(sourceSpliterator(0 ), true , generator); } } final <P_IN> Node<E_OUT> evaluate (Spliterator<P_IN> spliterator, boolean flatten, IntFunction<E_OUT[]> generator) { if (isParallel()) { return evaluateToNode(this , spliterator, flatten, generator); } else { Node.Builder<E_OUT> nb = makeNodeBuilder( exactOutputSizeIfKnown(spliterator), generator); return wrapAndCopyInto(nb, spliterator).build(); } } final Node.Builder<P_OUT> makeNodeBuilder (long exactSizeIfKnown, IntFunction<P_OUT[]> generator) { return Nodes.builder(exactSizeIfKnown, generator); } } interface Node <T> { Spliterator<T> spliterator () ; void forEach (Consumer<? super T> consumer) ; default int getChildCount () { return 0 ; } default Node<T> getChild (int i) { throw new IndexOutOfBoundsException (); } default Node<T> truncate (long from, long to, IntFunction<T[]> generator) { if (from == 0 && to == count()) return this ; Spliterator<T> spliterator = spliterator(); long size = to - from; Node.Builder<T> nodeBuilder = Nodes.builder(size, generator); nodeBuilder.begin(size); for (int i = 0 ; i < from && spliterator.tryAdvance(e -> { }); i++) { } if (to == count()) { spliterator.forEachRemaining(nodeBuilder); } else { for (int i = 0 ; i < size && spliterator.tryAdvance(nodeBuilder); i++) { } } nodeBuilder.end(); return nodeBuilder.build(); } T[] asArray(IntFunction<T[]> generator); void copyInto (T[] array, int offset) ; default StreamShape getShape () { return StreamShape.REFERENCE; } long count () ; interface Builder <T> extends Sink <T> { Node<T> build () ; interface OfInt extends Node .Builder<Integer>, Sink.OfInt { @Override Node.OfInt build () ; } interface OfLong extends Node .Builder<Long>, Sink.OfLong { @Override Node.OfLong build () ; } interface OfDouble extends Node .Builder<Double>, Sink.OfDouble { @Override Node.OfDouble build () ; } } } final class Nodes { public static <T> Node<T> flatten (Node<T> node, IntFunction<T[]> generator) { if (node.getChildCount() > 0 ) { long size = node.count(); if (size >= MAX_ARRAY_SIZE) throw new IllegalArgumentException (BAD_SIZE); T[] array = generator.apply((int ) size); new ToArrayTask .OfRef<>(node, array, 0 ).invoke(); return node(array); } else { return node; } } static <T> Node.Builder<T> builder (long exactSizeIfKnown, IntFunction<T[]> generator) { return (exactSizeIfKnown >= 0 && exactSizeIfKnown < MAX_ARRAY_SIZE) ? new FixedNodeBuilder <>(exactSizeIfKnown, generator) : builder(); } static <T> Node.Builder<T> builder () { return new SpinedNodeBuilder <>(); } private static final class FixedNodeBuilder <T> extends ArrayNode <T> implements Node .Builder<T> { FixedNodeBuilder(long size, IntFunction<T[]> generator) { super (size, generator); assert size < MAX_ARRAY_SIZE; } @Override public Node<T> build () { if (curSize < array.length) throw new IllegalStateException (String.format("Current size %d is less than fixed size %d" , curSize, array.length)); return this ; } @Override public void begin (long size) { if (size != array.length) throw new IllegalStateException (String.format("Begin size %d is not equal to fixed size %d" , size, array.length)); curSize = 0 ; } @Override public void accept (T t) { if (curSize < array.length) { array[curSize++] = t; } else { throw new IllegalStateException (String.format("Accept exceeded fixed size of %d" , array.length)); } } @Override public void end () { if (curSize < array.length) throw new IllegalStateException (String.format("End size %d is less than fixed size %d" , curSize, array.length)); } @Override public String toString () { return String.format("FixedNodeBuilder[%d][%s]" , array.length - curSize, Arrays.toString(array)); } } private static class ArrayNode <T> implements Node <T> { final T[] array; int curSize; @SuppressWarnings("unchecked") ArrayNode(long size, IntFunction<T[]> generator) { if (size >= MAX_ARRAY_SIZE) throw new IllegalArgumentException (BAD_SIZE); this .array = generator.apply((int ) size); this .curSize = 0 ; } ArrayNode(T[] array) { this .array = array; this .curSize = array.length; } @Override public Spliterator<T> spliterator () { return Arrays.spliterator(array, 0 , curSize); } @Override public void copyInto (T[] dest, int destOffset) { System.arraycopy(array, 0 , dest, destOffset, curSize); } @Override public T[] asArray(IntFunction<T[]> generator) { if (array.length == curSize) { return array; } else { throw new IllegalStateException (); } } @Override public long count () { return curSize; } @Override public void forEach (Consumer<? super T> consumer) { for (int i = 0 ; i < curSize; i++) { consumer.accept(array[i]); } } @Override public String toString () { return String.format("ArrayNode[%d][%s]" , array.length - curSize, Arrays.toString(array)); } } }
很多集合容器的Spliterator其实并不支持SIZED特性,其实Node的最终实现很多情况下都是Nodes.SpinedNodeBuilder,因为SpinedNodeBuilder重实现实现了数组扩容和Spliterator基于数组进行分割的方法,源码相对复杂(特别是spliterator()方法),这里挑部分进行分析,由于SpinedNodeBuilder绝大部分方法都是使用父类SpinedBuffer中的实现,这里可以直接分析SpinedBuffer:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 class SpinedBuffer <E> extends AbstractSpinedBuffer implements Consumer <E>, Iterable<E> { protected E[] curChunk; protected E[][] spine; SpinedBuffer(int initialCapacity) { super (initialCapacity); curChunk = (E[]) new Object [1 << initialChunkPower]; } @SuppressWarnings("unchecked") SpinedBuffer() { super (); curChunk = (E[]) new Object [1 << initialChunkPower]; } public void copyInto (E[] array, int offset) { long finalOffset = offset + count(); if (finalOffset > array.length || finalOffset < offset) { throw new IndexOutOfBoundsException ("does not fit" ); } if (spineIndex == 0 ) System.arraycopy(curChunk, 0 , array, offset, elementIndex); else { for (int i=0 ; i < spineIndex; i++) { System.arraycopy(spine[i], 0 , array, offset, spine[i].length); offset += spine[i].length; } if (elementIndex > 0 ) System.arraycopy(curChunk, 0 , array, offset, elementIndex); } } public E[] asArray(IntFunction<E[]> arrayFactory) { long size = count(); if (size >= Nodes.MAX_ARRAY_SIZE) throw new IllegalArgumentException (Nodes.BAD_SIZE); E[] result = arrayFactory.apply((int ) size); copyInto(result, 0 ); return result; } @Override public void clear () { if (spine != null ) { curChunk = spine[0 ]; for (int i=0 ; i<curChunk.length; i++) curChunk[i] = null ; spine = null ; priorElementCount = null ; } else { for (int i=0 ; i<elementIndex; i++) curChunk[i] = null ; } elementIndex = 0 ; spineIndex = 0 ; } @Override public void forEach (Consumer<? super E> consumer) { for (int j = 0 ; j < spineIndex; j++) for (E t : spine[j]) consumer.accept(t); for (int i=0 ; i<elementIndex; i++) consumer.accept(curChunk[i]); } @Override public void accept (E e) { if (elementIndex == curChunk.length) { inflateSpine(); if (spineIndex+1 >= spine.length || spine[spineIndex+1 ] == null ) increaseCapacity(); elementIndex = 0 ; ++spineIndex; curChunk = spine[spineIndex]; } curChunk[elementIndex++] = e; } }
源码已经基本分析完毕,下面还是用一个例子转化为流程图:
流并发执行的源码实现 如果流实例调用了parallel(),注释中提到会返回一个异步执行流的变体,实际上并没有构造变体,只是把sourceStage.parallel标记为true,异步求值的基本过程是:构建流管道结构的时候和同步求值的过程一致,构建完Sink链之后,Spliterator会使用特定算法基于trySplit()进行自分割,自分割算法由具体的子类决定,例如ArrayList采用的就是二分法,分割完成后每个Spliterator持有所有元素中的一小部分,然后把每个Spliterator作为sourceSpliterator在fork-join线程池中执行Sink链,得到多个部分的结果在当前调用线程中聚合,得到最终结果。这里用到的技巧就是:线程封闭和fork-join。因为不同Terminal Op的并发求值过程大同小异,这里只分析forEach并发执行的实现。首先展示一个使用fork-join线程池的简单例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 public class MapReduceApp { public static void main (String[] args) { Integer result = new MapReducer <>(new Integer []{1 , 2 , 3 , 4 }, x -> x * 2 , Integer::sum).invoke(); System.out.println(result); } interface Mapper <S, T> { T apply (S source) ; } interface Reducer <S, T> { T apply (S first, S second) ; } public static class MapReducer <T> extends CountedCompleter <T> { final T[] array; final Mapper<T, T> mapper; final Reducer<T, T> reducer; final int lo, hi; MapReducer<T> sibling; T result; public MapReducer (T[] array, Mapper<T, T> mapper, Reducer<T, T> reducer) { this .array = array; this .mapper = mapper; this .reducer = reducer; this .lo = 0 ; this .hi = array.length; } public MapReducer (CountedCompleter<?> p, T[] array, Mapper<T, T> mapper, Reducer<T, T> reducer, int lo, int hi) { super (p); this .array = array; this .mapper = mapper; this .reducer = reducer; this .lo = lo; this .hi = hi; } @Override public void compute () { if (hi - lo >= 2 ) { int mid = (lo + hi) >> 1 ; MapReducer<T> left = new MapReducer <>(this , array, mapper, reducer, lo, mid); MapReducer<T> right = new MapReducer <>(this , array, mapper, reducer, mid, hi); left.sibling = right; right.sibling = left; setPendingCount(1 ); right.fork(); left.compute(); } else { if (hi > lo) { result = mapper.apply(array[lo]); } tryComplete(); } } @Override public T getRawResult () { return result; } @SuppressWarnings("unchecked") @Override public void onCompletion (CountedCompleter<?> caller) { if (caller != this ) { MapReducer<T> child = (MapReducer<T>) caller; MapReducer<T> sib = child.sibling; if (Objects.isNull(sib) || Objects.isNull(sib.result)) { result = child.result; } else { result = reducer.apply(child.result, sib.result); } } } } }
这里简单使用了fork-join编写了一个简易的MapReduce应用,main方法中运行的是数组[1,2,3,4]中的所有元素先映射为i -> i * 2,再进行reduce(求和)的过程,代码中也是简单使用二分法对原始的array进行分割,当最终的任务只包含一个元素,也就是lo < hi且hi - lo == 1的时候,会基于单个元素调用Mapper的方法进行完成通知tryComplete(),任务完成会最终通知onCompletion()方法,Reducer就是在此方法中进行结果的聚合操作。对于流的并发求值来说,过程是类似的,ForEachOp中最终调用ForEachOrderedTask或者ForEachTask,这里挑选ForEachTask进行分析:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 abstract static class ForEachOp <T> implements TerminalOp <T, Void>, TerminalSink<T, Void> { @Override public <S> Void evaluateParallel (PipelineHelper<T> helper, Spliterator<S> spliterator) { if (ordered) new ForEachOrderedTask <>(helper, spliterator, this ).invoke(); else new ForEachTask <>(helper, spliterator, helper.wrapSink(this )).invoke(); return null ; } } final class ForEachOps { private ForEachOps () { } static final class ForEachTask <S, T> extends CountedCompleter <Void> { private Spliterator<S> spliterator; private final Sink<S> sink; private final PipelineHelper<T> helper; private long targetSize; ForEachTask(PipelineHelper<T> helper, Spliterator<S> spliterator, Sink<S> sink) { super (null ); this .sink = sink; this .helper = helper; this .spliterator = spliterator; this .targetSize = 0L ; } ForEachTask(ForEachTask<S, T> parent, Spliterator<S> spliterator) { super (parent); this .spliterator = spliterator; this .sink = parent.sink; this .targetSize = parent.targetSize; this .helper = parent.helper; } public void compute () { Spliterator<S> rightSplit = spliterator, leftSplit; long sizeEstimate = rightSplit.estimateSize(), sizeThreshold; if ((sizeThreshold = targetSize) == 0L ) targetSize = sizeThreshold = AbstractTask.suggestTargetSize(sizeEstimate); boolean isShortCircuit = StreamOpFlag.SHORT_CIRCUIT.isKnown(helper.getStreamAndOpFlags()); boolean forkRight = false ; Sink<S> taskSink = sink; ForEachTask<S, T> task = this ; while (!isShortCircuit || !taskSink.cancellationRequested()) { if (sizeEstimate <= sizeThreshold || (leftSplit = rightSplit.trySplit()) == null ) { task.helper.copyInto(taskSink, rightSplit); break ; } ForEachTask<S, T> leftTask = new ForEachTask <>(task, leftSplit); task.addToPendingCount(1 ); ForEachTask<S, T> taskToFork; if (forkRight) { forkRight = false ; rightSplit = leftSplit; taskToFork = task; task = leftTask; } else { forkRight = true ; taskToFork = leftTask; } taskToFork.fork(); sizeEstimate = rightSplit.estimateSize(); } task.spliterator = null ; task.propagateCompletion(); } } }
上面的源码分析看起来可能比较难理解,这里举个简单的例子:
1 2 3 4 5 6 7 8 public static void main (String[] args) throws Exception { List<Integer> list = new ArrayList <>(); list.add(1 ); list.add(2 ); list.add(3 ); list.add(4 ); list.stream().parallel().forEach(System.out::println); }
这段代码中最终转换成ForEachTask中评估后得到的targetSize = sizeThreshold == 1,当前调用线程会参与计算,会执行3次fork,也就是一共有4个处理流程实例(也就是原始的Spliterator实例最终会分割出3个全新的Spliterator实例,加上自身一个4个Spliterator实例),每个处理流程实例只处理1个元素,对应的流程图如下:
最终的计算结果是调用CountedCompleter.invoke()方法获取的,此方法会阻塞直到所有子任务处理完成,当然forEach终结操作不需要返回值,所以没有实现getRawResult()方法,这里只是为了阻塞到所有任务执行完毕才解除调用线程的阻塞状态。
状态操作与短路操作 Stream中按照中间操作 是否有状态可以把这些操作分为无状态操作 和有状态操作 。Stream中按照终结操作 是否支持短路特性可以把这些操作分为非短路操作 和短路操作 。理解如下:
无状态操作:当前操作节点处理元素完成后,在满足前提条件下直接把结果传递到下一个操作节点,也就是操作内部不存在状态也不需要保存状态,例如filter、map等操作
有状态操作:处理元素的时候,依赖于节点的内部状态对元素进行累积,当处理一个新的元素的时候,其实可以感知到所有处理过的元素的历史状态,这个”状态”其实更像是缓冲区的概念,例如sort、limit等操作,以sort操作为例,一般是把所有待处理的元素全部添加到一个容器如ArrayList,再进行所有元素的排序,然后再重新模拟Spliterator把元素推送到后一个节点
非短路(终结)操作:终结操作在处理元素时候不能基于短路条件提前中断处理并且返回,也就是必须处理所有的元素,如forEach
短路(终结)操作:终结操作在处理元素时候允许基于短路条件提前中断处理并且返回,但是最终实现中是有可能遍历完所有的元素中,只是在处理方法中基于前置的短路条件跳过了实际的处理过程,如anyMatch(实际上anyMatch会遍历完所有的元素,不过在命中了短路条件下,元素回调Sink.accept()方法时候会基于stop短路标记跳过具体的处理流程)
这里不展开源码进行分析,仅仅展示一个经常见到的Stream操作汇总表如下:
这里还有两点要注意:
从源码上看部分中间操作也是支持短路的,例如slice和while相关操作
从源码上看find相关终结操作中findFirst、findAny均支持和判断StreamOpFlag.SHORT_CIRCUIT,而match相关终结操作是通过内部的临时状态stop和value进行短路控制
小结 前前后后写了十多万字,其实也仅仅相对浅层次介绍了Stream的基本实现,笔者认为很多没分析到的中间操作实现和终结操作实现,特别是并发执行的终结操作实现是十分复杂的,多线程环境下需要进行一些想象和多处DEBUG定位执行位置和推演执行的过程。简单总结一下:
JDK中Stream的实现是精炼的高度工程化代码
Stream的载体虽然是AbstractPipeline,管道结构,但是只用其形,实际求值操作之前会转化为一个多层包裹的Sink结构,也就是前文一直说的Sink链,从编程模式来看,应用的是Reactor编程模式
Stream目前支持的固有求值执行结构一定是Head(Source Spliterator) -> Op -> Op ... -> Terminal Op的形式,这算是一个局限性,没有办法做到像LINQ那样可以灵活实现类似内存视图的功能
Stream目前支持并发求值方案是针对Source Spliterator进行分割,封装Terminal Op和固定Sink链构造的ForkJoinTask进行并发计算,调用线程和fork-join线程池中的工作线程都可以参与求值过程,笔者认为这部分是Stream中除了那些标志集合位运算外最复杂的实现
Stream实现的功能是一个突破,也有人说过此功能是一个”早产儿”,在此希望JDK能够在矛盾螺旋中前进和发展