Stream 源码分析(串行流)

Stream

支持顺序和并行聚合操作的一组元素序列。
    1)operations:支持在单个元素上执行的操作,流操作分为中间操作和终止操作
    1-1)中间操作:
        1-1-1)无状态:unordered()、filter()、map()、mapToInt()、mapToLong()、mapToDouble、
                      flatMap()、flatMapToInt()、flatMapToLong()、flatMapToDouble()、
                      peek()
        1-1-2)有状态:distinct()、sorted()、limit()、skip()
    1-2)终止操作:
        1-2-1)非短路操作:forEach()、forEachOrdered()、toArray()、min()、max()、count()、
                      collect()、reduce()
        1-2-2)短路操作: findFirst()、findAny()、anyMatch()、noneMatch()、allMatch()
    2)stream pipeline:将多个流操作串联的流管道
流是延迟处理的,直到遇到一个终止操作时,才会触发流管道计算。
已经执行终止操作的流不能再次触发计算。

流管道

  • 流管道的创建【以 ArrayList 为数据源】:.stream()、parallelStream()
Collection#
    /**
     *  返回一个顺序流,集合中的元素就是数据源
     */
    default Stream<E> stream() {
        return StreamSupport.stream(spliterator(), false);
    }

    /**
     *  返回一个并行流,集合中的元素就是数据源
     */
    default Stream<E> parallelStream() {
        return StreamSupport.stream(spliterator(), true);
    }

ArrayList#
    /**
     * 创建一个延时绑定和快速失败的分割迭代器
     */
    @Override
    public Spliterator<E> spliterator() {
        return new ArrayListSpliterator(0, -1, 0);
    }

    /** 基于索引实现二分、延迟初始化的 Spliterator */
    final class ArrayListSpliterator implements Spliterator<E> {
        /**
         * 当前索引,在调用 advance/split 时修改
         */
        private int index;
        /**
         * 初始状态是 -1,使用之后是索引的上界
         */
        private int fence;
        /**
         * 快速失败计数器
         */
        private int expectedModCount;

        /** 创建一个覆盖给定索引范围的新  ArrayListSpliterator*/
        ArrayListSpliterator(int origin, int fence, int expectedModCount) {
            this.index = origin;
            this.fence = fence;
            this.expectedModCount = expectedModCount;
        }

        private int getFence() {
            int hi;
            // 第一次使用时初始化为元素个数
            if ((hi = fence) < 0) {
                expectedModCount = modCount;
                hi = fence = size;
            }
            return hi;
        }

        /**
         *  对此 Spliterator 进行拆分,一分为二
         */
        @Override
        public ArrayListSpliterator trySplit() {
            /**
             * hi:high 索引上界,不包括
             * lo:low 索引下界,包括
             * mid:middle 二分索引
             */
            final int hi = getFence(), lo = index, mid = lo + hi >>> 1;
        // 将范围分成两半,直到无法分割为止【高低索引相邻】
        return lo >= mid ? null : // divide range in half unless too small
            new ArrayListSpliterator(lo, index = mid, expectedModCount);
        }

        /**
         * 如果此 Spliterator 中还有元素可用,则将低索引位的元素传递给 action 进行消费
         * 同时递增 index【一次消费一个元素】
         */
        @Override
        public boolean tryAdvance(Consumer<? super E> action) {
            if (action == null) {
                throw new NullPointerException();
            }
            final int hi = getFence(), i = index;
            if (i < hi) {
                index = i + 1;
                @SuppressWarnings("unchecked")
                // 读取元素
                final E e = (E)elementData[i];
                // 执行消费过程
                action.accept(e);
                if (modCount != expectedModCount) {
                    throw new ConcurrentModificationException();
                }
                return true;
            }
            return false;
        }

        /**
         *  一次性消费此 Spliterator 中的所有元素
         */
        @Override
        public void forEachRemaining(Consumer<? super E> action) {
            int i, hi, mc; // hoist accesses and checks from loop
            Object[] a;
            if (action == null) {
                throw new NullPointerException();
            }
            if ((a = elementData) != null) {
                if ((hi = fence) < 0) {
                    mc = modCount;
                    hi = size;
                } else {
                    mc = expectedModCount;
                }
                // 读取并更新 index
                if ((i = index) >= 0 && (index = hi) <= a.length) {
                    // 顺序消费 Spliterator 中的所有元素
                    for (; i < hi; ++i) {
                        @SuppressWarnings("unchecked")
                        final E e = (E) a[i];
                        action.accept(e);
                    }
                    if (modCount == mc) {
                        return;
                    }
                }
            }
            throw new ConcurrentModificationException();
        }

        /**
         * 获取此分割迭代器的估计可用元素数【ArrayListSpliterator 是精确的】
         */
        @Override
        public long estimateSize() {
            return getFence() - index;
        }

        /**
         *  此分割迭代器的特性
         */
        @Override
        public int characteristics() {
            return Spliterator.ORDERED | Spliterator.SIZED | Spliterator.SUBSIZED;
        }
    }

StreamSupport#
    /**
     *  基于一个 Spliterator【分割迭代器】创建一个顺序或并行的流
     */
    public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }

StreamOpFlag#
    /**
     *  将 Spliterator 的特征值转换为流的标志位
     */
    static int fromCharacteristics(Spliterator<?> spliterator) {
        // 读取特征值
        final int characteristics = spliterator.characteristics();
        if ((characteristics & Spliterator.SORTED) != 0 && spliterator.getComparator() != null) {
            // Do not propagate the SORTED characteristic if it does not correspond to a natural sort order
            return characteristics & SPLITERATOR_CHARACTERISTICS_MASK & ~Spliterator.SORTED;
        }
        else {
            // 转换为流标识
            return characteristics & SPLITERATOR_CHARACTERISTICS_MASK;
        }
    }

ReferencePipeline#Head
    static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
        /**
         * 创建流水线的管道头
         */
        Head(Spliterator<?> source,
                int sourceFlags, boolean parallel) {
            super(source, sourceFlags, parallel);
        }
    }

ReferencePipeline#
    /**
     *  创建流水线的管道头
     */
    ReferencePipeline(Spliterator<?> source,
            int sourceFlags, boolean parallel) {
        super(source, sourceFlags, parallel);
    }

AbstractPipeline#
abstract class AbstractPipeline<E_IN, E_OUT, S extends BaseStream<E_OUT, S>>
extends PipelineHelper<E_OUT> implements BaseStream<E_OUT, S> {
    private static final String MSG_STREAM_LINKED = "stream has already been operated upon or closed";
    private static final String MSG_CONSUMED = "source already consumed or closed";

    /**
     *  流水线的源阶段【即第一个流管道】
     */
    @SuppressWarnings("rawtypes")
    private final AbstractPipeline sourceStage;

    /**
     *  当前流管道的上一阶段,如果是源流,则为 null
     */
    @SuppressWarnings("rawtypes")
    private final AbstractPipeline previousStage;

    /**
     *  此阶段操作的操作标识
     */
    protected final int sourceOrOpFlags;

    /**
     *  此流管道的下一阶段
     */
    @SuppressWarnings("rawtypes")
    private AbstractPipeline nextStage;

    /**
     *  顺序流:当前阶段和源流之间存在的中间阶段的个数
     *  并行流:上一阶段的状态
     */
    private int depth;

    /**
     *  组合了源流和所有中间阶段的流标识和操作标识
     */
    private int combinedFlags;

    /**
     *  源流的分割迭代器,用于产生元素
     */
    private Spliterator<?> sourceSpliterator;

    /**
     *  源流的分割迭代器生成器,如果 sourceSpliterator == null
     */
    private Supplier<? extends Spliterator<?>> sourceSupplier;

    /**
     *  此流管道已经被链接或消费
     */
    private boolean linkedOrConsumed;

    /**
     *  流水线中存在有状态的流管道
     */
    private boolean sourceAnyStateful;

    /**
     *  此流管道关闭时的后置操作
     */
    private Runnable sourceCloseAction;

    /**
     *  此流管道是否是并行的
     */
    private boolean parallel;

    /**
     *  流水线头部管道的构造函数
     */
    AbstractPipeline(Spliterator<?> source,
            int sourceFlags, boolean parallel) {
        this.previousStage = null;
        this.sourceSpliterator = source;
        this.sourceStage = this;
        this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;
        // The following is an optimization of:
        // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE);
        this.combinedFlags = ~(sourceOrOpFlags << 1) & StreamOpFlag.INITIAL_OPS_VALUE;
        // 源阶段的 depth=0
        this.depth = 0;
        this.parallel = parallel;
    }
  • 无状态流管道的链接【以 map 为例】
ReferencePipeline#
    /**
     *  基于 mapper 创建一个无状态的流管道,并将其链接到此流管道之后
     */
    @Override
    @SuppressWarnings("unchecked")
    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
        /**
         * 新流管道的操作为 mapper
         * 新流管道的操作标识为 NOT_SORTED、NOT_DISTINCT
         * 下一阶段的操作为 sink【反向链接】
         */
        return new StatelessOp<>(this, StreamShape.REFERENCE,
                StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        /**
                         * 接收上游阶段发送的数据 u,并进行当前阶段的处理,
                         * 并将结果发送给下游阶段处理
                         */
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }

    /**
     *  一个无状态的流管道
     */
    abstract static class StatelessOp<E_IN, E_OUT>
    extends ReferencePipeline<E_IN, E_OUT> {

        /**
         * 将此流管道追加到上游管道  upstream 之后
         */
        StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
                StreamShape inputShape,
                int opFlags) {
            super(upstream, opFlags);
            assert upstream.getOutputShape() == inputShape;
        }

        /**
         *  此管道的操作是无状态的
         */
        @Override
        final boolean opIsStateful() {
            return false;
        }
    }

    /**
     * 将此流管道追加到上游管道 upstream 之后
     */
    ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
        super(upstream, opFlags);
    }

    /**
     *  将此流管道追加到 previousStage 之后
     */
    AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) {
        // 前一阶段已经被链接或消费,则不允许重复消费
        if (previousStage.linkedOrConsumed) {
            throw new IllegalStateException(MSG_STREAM_LINKED);
        }
        // 前一阶段已经被链接
        previousStage.linkedOrConsumed = true;
        // 设置前一阶段的后置阶段为当前阶段
        previousStage.nextStage = this;

        // 写入前置阶段
        this.previousStage = previousStage;
        // 写入此阶段的操作标识
        this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK;
        // 合并前一阶段和此阶段的流操作标识
        this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);
        // 写入管道头
        this.sourceStage = previousStage.sourceStage;
        // 此阶段是否是有状态操作
        if (opIsStateful()) {
            sourceStage.sourceAnyStateful = true;
        }
        // 写入中间操作计数值(上一阶段 + 1)
        this.depth = previousStage.depth + 1;
    }
  • 有状态流管道的链接【sorted()】
ReferencePipeline#
    /**
     *  将一个排序的流管道追加到此流管道之后
     */
    @Override
    public final Stream<P_OUT> sorted() {
        return SortedOps.makeRef(this);
    }

SortedOps#
    /**
     *  将一个排序管道追加到 upstream 之后
     */
    static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) {
        return new OfRef<>(upstream);
    }

ReferencePipeline#
    /**
     *  有状态的流管道
     */
    abstract static class StatefulOp<E_IN, E_OUT>
    extends ReferencePipeline<E_IN, E_OUT> {
        /**
         * 将一个有状态的流管道追加到 upstream 之后
         */
        StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
                StreamShape inputShape,
                int opFlags) {
            super(upstream, opFlags);
            assert upstream.getOutputShape() == inputShape;
        }

        /**
         *  此流管道是有状态的
         */
        @Override
        final boolean opIsStateful() {
            return true;
        }

        @Override
        abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
                Spliterator<P_IN> spliterator,
                IntFunction<E_OUT[]> generator);
    }

    /**
     *  用于对引用流进行排序的管道
     */
    private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
        /**
         * 是否是自然顺序
         */
        private final boolean isNaturalSort;
        /**
         * 排序使用的比较器
         */
        private final Comparator<? super T> comparator;

        /**
         * 使用自然顺序排序
         */
        OfRef(AbstractPipeline<?, T, ?> upstream) {
            super(upstream, StreamShape.REFERENCE, StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
            this.isNaturalSort = true;
            final Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder();
            this.comparator = comp;
        }

        /**
         * 使用指定的比较器排序
         */
        OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
            super(upstream, StreamShape.REFERENCE, StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
            this.isNaturalSort = false;
            this.comparator = Objects.requireNonNull(comparator);
        }

        @Override
        public Sink<T> opWrapSink(int flags, Sink<T> sink) {
            Objects.requireNonNull(sink);

            // 1)如果上游管道是已排序的,并且是按照自然顺序排序的,则此流管道可以忽略
            if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort) {
                return sink;
            // 2)如果上游管道是已知大小的
            } else if (StreamOpFlag.SIZED.isKnown(flags)) {
                return new SizedRefSortingSink<>(sink, comparator);
            // 3)如果上游管道是未知大小的
            } else {
                return new RefSortingSink<>(sink, comparator);
            }
        }
    }

SortedOps#
    private abstract static class AbstractRefSortingSink<T> extends Sink.ChainedReference<T, T> {
        /**
         * 排序元素的比较器
         */
        protected final Comparator<? super T> comparator;
        // 是否取消接收上游的元素
        protected boolean cancellationRequestedCalled;

        AbstractRefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
            super(downstream);
            this.comparator = comparator;
        }

        /**
         *  在排序元素被发送到下游时,能够保存短路行为【流水线中存在短路操作】
         */
        @Override
        public final boolean cancellationRequested() {
            cancellationRequestedCalled = true;
            return false;
        }
    }

    private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> {
        // 暂存元素的数组
        private T[] array;
        // 当前元素偏移
        private int offset;

        SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
            super(sink, comparator);
        }

        @Override
        public void begin(long size) {
            if (size >= Nodes.MAX_ARRAY_SIZE) {
                throw new IllegalArgumentException(Nodes.BAD_SIZE);
            }
            // 创建固定大小的对象数组,用于接收上游发送的元素
            array = (T[]) new Object[(int) size];
        }

        /**
         * 上游元素发送完毕,开始执行排序操作,并将排序后的元素发送到下游
         */
        @Override
        public void end() {
            // 执行元素排序
            Arrays.sort(array, 0, offset, comparator);
            // 发送通知给下游管道,准备接收数据
            downstream.begin(offset);
            // 1)当前管道的下游不存在短路操作
            if (!cancellationRequestedCalled) {
                // 顺序发送所有元素
                for (int i = 0; i < offset; i++) {
                    downstream.accept(array[i]);
                }
            // 2)当前管道的下游存在短路操作
            } else {
                // 先发送一个元素,之后每次发送前都询问下游是否继续接收,下游拒绝接收元素则退出循环
                for (int i = 0; i < offset && !downstream.cancellationRequested(); i++) {
                    downstream.accept(array[i]);
                }
            }
            // 通知下游管道,数据发送完毕
            downstream.end();
            // 回收数组
            array = null;
        }

        /**
         * 接收上游发送的单个元素
         */
        @Override
        public void accept(T t) {
            array[offset++] = t;
        }
    }

/**
 *  能够消费上游管道发送的元素,同时存储状态的 Sink
 */
interface Sink<T> extends Consumer<T> {
    /**
     *  通知下游管道,重置状态以接收新的数据集
     */
    default void begin(long size) {}

    /**
     *  通知下游管道,数据已经推送完毕,可以执行聚合处理
     */
    default void end() {}

    /**
     *  询问下游管道是否还需要继续推送数据,适用于短路操作
     */
    default boolean cancellationRequested() {
        return false;
    }
}

Sink#ChainedReference
    /**
     *  链式引用 sink
     */
    abstract static class ChainedReference<T, E_OUT> implements Sink<T> {
        // 下游 sink
        protected final Sink<? super E_OUT> downstream;

        public ChainedReference(Sink<? super E_OUT> downstream) {
            this.downstream = Objects.requireNonNull(downstream);
        }

        @Override
        public void begin(long size) {
            downstream.begin(size);
        }

        @Override
        public void end() {
            downstream.end();
        }

        @Override
        public boolean cancellationRequested() {
            return downstream.cancellationRequested();
        }
    }
  • 链接终端操作执行计算【forEach()】
ReferencePipeline#
    @Override
    public void forEach(Consumer<? super P_OUT> action) {
        evaluate(ForEachOps.makeRef(action, false));
    }

ForEachOps#
    /**
     *  创建一个 TerminalOp,遍历并处理流中的每个引用对象
     */
    public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
            boolean ordered) {
        Objects.requireNonNull(action);
        return new ForEachOp.OfRef<>(action, ordered);
    }

    abstract static class ForEachOp<T>
    implements TerminalOp<T, Void>, TerminalSink<T, Void> {
        // 遍历是否是有序的
        private final boolean ordered;

        protected ForEachOp(boolean ordered) {
            this.ordered = ordered;
        }

        // 获取此操作的操作标识
        @Override
        public int getOpFlags() {
            return ordered ? 0 : StreamOpFlag.NOT_ORDERED;
        }

        /**
         *  将此终端操作链接到流管道尾部,并将 spliterator 中的每个元素都发送到 sink 中
         */
        @Override
        public <S> Void evaluateSequential(PipelineHelper<T> helper,
                Spliterator<S> spliterator) {
            return helper.wrapAndCopyInto(this, spliterator).get();
        }

        /**
         *  并行评估 spliterator 中的元素
         */
        @Override
        public <S> Void evaluateParallel(PipelineHelper<T> helper,
                Spliterator<S> spliterator) {
            if (ordered) {
                new ForEachOrderedTask<>(helper, spliterator, this).invoke();
            } else {
                new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
            }
            return null;
        }

        static final class OfRef<T> extends ForEachOp<T> {
            /**
             * 实际消费者
             */
            final Consumer<? super T> consumer;

            OfRef(Consumer<? super T> consumer, boolean ordered) {
                super(ordered);
                this.consumer = consumer;
            }

            /**
             *  处理上游发送的单个元素
             */
            @Override
            public void accept(T t) {
                consumer.accept(t);
            }
        }
    }

AbstractPipeline#
    /**
     *  使用终端操作 terminalOp 对此流管道进行处理,处理过程中会从后往前链接形成流水线
     */
    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        // 此阶段的输出类型==终端操作的输入类型
        assert getOutputShape() == terminalOp.inputShape();
        // 不允许重复消费
        if (linkedOrConsumed) {
            throw new IllegalStateException(MSG_STREAM_LINKED);
        }
        // 设置已消费标识
        linkedOrConsumed = true;

        // 使用终端操作并行或串行处理此流管道
        return isParallel()
                ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
                        : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

    /**
     *  获取此阶段的源分割迭代器【数据源】
     *
     * created by ZXD at 19 Dec 2018 T 22:32:09
     * @param terminalFlags 终端操作标识
     * @return
     */
    @SuppressWarnings("unchecked")
    private Spliterator<?> sourceSpliterator(int terminalFlags) {
        Spliterator<?> spliterator = null;
        // 1)源分割迭代器不为 null
        if (sourceStage.sourceSpliterator != null) {
            // 读取
            spliterator = sourceStage.sourceSpliterator;
            // 使用后置空
            sourceStage.sourceSpliterator = null;
        }
        // 2)分割迭代器通过 sourceSupplier 进行生成
        else if (sourceStage.sourceSupplier != null) {
            spliterator = (Spliterator<?>) sourceStage.sourceSupplier.get();
            sourceStage.sourceSupplier = null;
        }
        else {
            throw new IllegalStateException(MSG_CONSUMED);
        }

        // 此流是并行的 && 流管道中存在有状态操作
        if (isParallel() && sourceStage.sourceAnyStateful) {
            // Adapt the source spliterator, evaluating each stateful op in the pipeline up to and including this pipeline stage.
            // The depth and flags of each pipeline stage are adjusted accordingly.
            int depth = 1;
            /**
             * 从源阶段开始处理,一直处理到当前阶段为止
             */
            for (@SuppressWarnings("rawtypes") AbstractPipeline u = sourceStage, p = sourceStage.nextStage, e = this;
                    u != e;
                    u = p, p = p.nextStage) {

                int thisOpFlags = p.sourceOrOpFlags;
                // 当前处理阶段是有状态操作
                if (p.opIsStateful()) {
                    depth = 0;
                    // 当前操作是短路操作
                    if (StreamOpFlag.SHORT_CIRCUIT.isKnown(thisOpFlags)) {
                        thisOpFlags = thisOpFlags & ~StreamOpFlag.IS_SHORT_CIRCUIT;
                    }

                    spliterator = p.opEvaluateParallelLazy(u, spliterator);

                    // Inject or clear SIZED on the source pipeline stage based on the stage‘s spliterator
                    thisOpFlags = spliterator.hasCharacteristics(Spliterator.SIZED)
                            ? thisOpFlags & ~StreamOpFlag.NOT_SIZED | StreamOpFlag.IS_SIZED
                                    : thisOpFlags & ~StreamOpFlag.IS_SIZED | StreamOpFlag.NOT_SIZED;
                }
                p.depth = depth++;
                p.combinedFlags = StreamOpFlag.combineOpFlags(thisOpFlags, u.combinedFlags);
            }
        }

        // 终端操作带有标识位
        if (terminalFlags != 0)  {
            // 将终端操作的标志位合并到最后一阶段中
            combinedFlags = StreamOpFlag.combineOpFlags(terminalFlags, combinedFlags);
        }

        return spliterator;
    }

    /**
     * @param sink  下游管道操作,中间操作或终端操作
     * @param spliterator   分割迭代器
     * @return
     */
    @Override
    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
    }

    /**
     * 从当前流管道开始,向前构建流水线直到第一个流管道为止,流水线后置操作为 sink
     */
    @Override
    @SuppressWarnings("unchecked")
    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);
        /**
         * AbstractPipeline.this:当前流管道
         * p.depth:当前流管道距离管道头的距离
         * p.previousStage:前置流管道
         */
        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        // 返回链接后的 sink
        return (Sink<P_IN>) sink;
    }

    /**
     * 将分割迭代器中的元素顺序发送到流水线中处理
     *
     * @param wrappedSink   链接后的流水线
     * @param spliterator   数据源
     */
    @Override
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);
        // 1)流水线中不存在短路操作
        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            // 通知 wrappedSink 处理元素的个数
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            // 使用 wrappedSink 依次处理 spliterator 中的所有元素
            spliterator.forEachRemaining(wrappedSink);
            // 通知 wrappedSink 元素发送完毕,可以执行后置操作
            wrappedSink.end();
        }
        // 2)流水线中存在短路操作
        else {
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }
  • Spliterator 及其特征值
/**
 *  用于划分和遍历数据源的对象,数据源可以是 array、Collection、IO channel、生成器等。
 */
public interface Spliterator<T> {
    /**
     *  尝试使用 action 处理 Spliterator 中的一个元素
     */
    boolean tryAdvance(Consumer<? super T> action);

    /**
     *  尝试使用 action 一次性处理 Spliterator 中的所有元素
     */
    default void forEachRemaining(Consumer<? super T> action) {
        do { } while (tryAdvance(action));
    }

    /**
     *  对此 Spliterator 进行拆分
     */
    Spliterator<T> trySplit();

    /**
     *  获取此 Spliterator 的估计元素数,如果数据源是无限的,则返回 -1
     */
    long estimateSize();

    /**
     *  尝试获取此 Spliterator 的精确元素个数
     */
    default long getExactSizeIfKnown() {
        return (characteristics() & SIZED) == 0 ? -1L : estimateSize();
    }

    /**
     *  读取此 Spliterator 的特征值
     */
    int characteristics();

    /**
     *  Spliterator 中的元素是顺序处理的
     */
    public static final int ORDERED    = 0x00000010;

    /**
     *  Spliterator 中的元素是唯一的
     */
    public static final int DISTINCT   = 0x00000001;

    /**
     *  Spliterator 中的元素根据自然顺序或比较器进行过排序
     */
    public static final int SORTED     = 0x00000004;

    /**
     *  Spliterator 中的元素个数是有限的
     */
    public static final int SIZED      = 0x00000040;

    /**
     *  Spliterator 中的元素是非 null 的
     */
    public static final int NONNULL    = 0x00000100;

    /**
     *  Spliterator 关联的数据源是不可变的,不支持增加、替换、删除等
     */
    public static final int IMMUTABLE  = 0x00000400;

    /**
     * Spliterator 关联的数据源支持并发修改
     */
    public static final int CONCURRENT = 0x00001000;

    /**
     *  此 Spliterator 通过 trySplit() 方法生成的子 Spliterator 是有限大小的
     */
    public static final int SUBSIZED = 0x00004000;
}
  • 流管道和操作标识
StreamOpFlag#
    /**
     * 流管道中的元素是唯一的
     */
    // 0, 0x00000001
    // Matches Spliterator.DISTINCT
    DISTINCT(0,
            set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP)),

    /**
     *  流管道中的元素是排过序的【有状态操作】
     */
    // 1, 0x00000004
    // Matches Spliterator.SORTED
    SORTED(1,
            set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP)),

    /**
     *  流管道中的元素是顺序处理的
     */
    // 2, 0x00000010
    // Matches Spliterator.ORDERED
    ORDERED(2,
            set(Type.SPLITERATOR).set(Type.STREAM).setAndClear(Type.OP).clear(Type.TERMINAL_OP)
            .clear(Type.UPSTREAM_TERMINAL_OP)),

    /**
     *  流管道的大小是有限的【非无限流】
     */
    // 3, 0x00000040
    // Matches Spliterator.SIZED
    SIZED(3,
            set(Type.SPLITERATOR).set(Type.STREAM).clear(Type.OP)),

    /**
     *  中间操作或终端操作是短路的
     */
    // 12, 0x01000000
    SHORT_CIRCUIT(12,
            set(Type.OP).set(Type.TERMINAL_OP));
  • 流管道中的元素类型
enum StreamShape {
    /**
     *  流元素是对象引用
     */
    REFERENCE,
    /**
     *  流元素是 int 值
     */
    INT_VALUE,
    /**
     *  流元素是 long 值
     */
    LONG_VALUE,
    /**
     *  流元素是 double 值
     */
    DOUBLE_VALUE
}

无状态中间操作

  • filter:使用指定的函数式断言过滤流中的元素
    @Override
    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp<>(this, StreamShape.REFERENCE,
                StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        /**
                         * 根据目标 predicate 对上游管道发送的数据进行过滤,
                         * 只将满足断言的对象发送到下游
                         */
                        if (predicate.test(u)) {
                            downstream.accept(u);
                        }
                    }
                };
            }
        };
    }
  • map:将上游管道发送的数据进行映射处理后,再发送到下游
    /**
     *  基于 mapper 创建一个无状态的流管道,并将其链接到此流管道之后
     */
    @Override
    @SuppressWarnings("unchecked")
    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
        /**
         * 新流管道的操作为 mapper
         * 新流管道的操作标识为 NOT_SORTED、NOT_DISTINCT
         * 下一阶段的操作为 sink【反向链接】
         */
        return new StatelessOp<>(this, StreamShape.REFERENCE,
                StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        /**
                         * 接收上游阶段发送的数据 u,并进行当前阶段的处理,
                         * 并将结果发送给下游阶段处理
                         */
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }
  • flatMap:流的扁平化
    @Override
    public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
        Objects.requireNonNull(mapper);
        return new StatelessOp<>(this, StreamShape.REFERENCE,
                StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<>(sink) {
                    // true if cancellationRequested() has been called
                    boolean cancellationRequestedCalled;

                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        /**
                         * 通过 mapper 函数将上游元素映射成一个新的流,并将流中的元素顺序发送到下游
                         */
                        try (Stream<? extends R> result = mapper.apply(u)) {
                            // 映射结果不为 null 时,将新流中的元素发送到下游
                            if (result != null) {
                                // 1)下游操作是非短路的
                                if (!cancellationRequestedCalled) {
                                    result.sequential().forEach(downstream);
                                }
                                // 2)下游操作是短路操作,则每次发送元素前都先询问下游是否需要继续接收
                                else {
                                    final var s = result.sequential().spliterator();
                                    do { } while (!downstream.cancellationRequested() && s.tryAdvance(downstream));
                                }
                            }
                        }
                    }

                    @Override
                    public boolean cancellationRequested() {
                        // If this method is called then an operation within the stream pipeline is short-circuiting (see AbstractPipeline.copyInto).
                        // Note that we cannot differentiate between an upstream or downstream operation
                        cancellationRequestedCalled = true;
                        return downstream.cancellationRequested();
                    }
                };
            }
        };
    }
  • peek:查看上游发送的元素
    @Override
    public final Stream<P_OUT> peek(Consumer<? super P_OUT> action) {
        Objects.requireNonNull(action);
        return new StatelessOp<>(this, StreamShape.REFERENCE,
                0) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        /**
                         * 先调用目标接口进行消费,之后再将该元素发送到下游,
                         * 可以查看流的具体处理过程,主要用于调试
                         */
                        action.accept(u);
                        downstream.accept(u);
                    }
                };
            }
        };
    }

有状态的中间操作

  • distinct:将流中的元素去重
    @Override
    public final Stream<P_OUT> distinct() {
        return DistinctOps.makeRef(this);
    }

DistinctOps#
    static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {
        // StreamOpFlag.IS_DISTINCT 新管道产生的元素是唯一的
        return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
                                                      StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {

            @Override
            Sink<T> opWrapSink(int flags, Sink<T> sink) {
                Objects.requireNonNull(sink);

                // 1)如果上游管道已经是 distinct 则此阶段无序任何处理。
                if (StreamOpFlag.DISTINCT.isKnown(flags)) {
                    return sink;
                // 2)上游管道是已排序的
                } else if (StreamOpFlag.SORTED.isKnown(flags)) {
                    return new Sink.ChainedReference<T, T>(sink) {
                        boolean seenNull;
                        // 最近发送的元素
                        T lastSeen;

                        @Override
                        public void begin(long size) {
                            seenNull = false;
                            lastSeen = null;
                            downstream.begin(-1);
                        }

                        @Override
                        public void end() {
                            seenNull = false;
                            lastSeen = null;
                            downstream.end();
                        }

                        @Override
                        public void accept(T t) {
                            // 1)上游发送的元素为 null
                            if (t == null) {
                                if (!seenNull) {
                                    seenNull = true;
                                    downstream.accept(lastSeen = null);
                                }
                            /**
                             * 2)上游发送的元素不为 null
                             * lastSeen == null,当前元素是第一个元素
                             * !t.equals(lastSeen),上次发送的元素和当前元素不一致
                             */
                            } else if (lastSeen == null || !t.equals(lastSeen)) {
                                downstream.accept(lastSeen = t);
                            }
                        }
                    };
                // 3)上游管道是未排序的
                } else {
                    return new Sink.ChainedReference<T, T>(sink) {
                        // 存放上游发送的唯一元素
                        Set<T> seen;

                        @Override
                        public void begin(long size) {
                            seen = new HashSet<>();
                            downstream.begin(-1);
                        }

                        @Override
                        public void end() {
                            seen = null;
                            downstream.end();
                        }

                        @Override
                        public void accept(T t) {
                            // 已接受元素中不存在此元素 t
                            if (!seen.contains(t)) {
                                // 将其加入已发送唯一元素集合
                                seen.add(t);
                                // 将此元素发送到下游
                                downstream.accept(t);
                            }
                        }
                    };
                }
            }
        };
    }
  • sorted:新管道产生的元素是已排序的
    /**
     *  将一个排序的流管道追加到此流管道之后
     */
    @Override
    public final Stream<P_OUT> sorted() {
        return SortedOps.makeRef(this);
    }

SortedOps#
    /**
     *  将一个排序管道追加到 upstream 之后
     */
    static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) {
        return new OfRef<>(upstream);
    }

    /**
     *  用于对引用流进行排序的管道
     */
    private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
        /**
         * 是否是自然顺序
         */
        private final boolean isNaturalSort;
        /**
         * 排序使用的比较器
         */
        private final Comparator<? super T> comparator;

        /**
         * 使用自然顺序排序
         */
        OfRef(AbstractPipeline<?, T, ?> upstream) {
            super(upstream, StreamShape.REFERENCE, StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
            this.isNaturalSort = true;
            final Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder();
            this.comparator = comp;
        }

        /**
         * 使用指定的比较器排序
         */
        OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
            super(upstream, StreamShape.REFERENCE, StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
            this.isNaturalSort = false;
            this.comparator = Objects.requireNonNull(comparator);
        }

        @Override
        public Sink<T> opWrapSink(int flags, Sink<T> sink) {
            Objects.requireNonNull(sink);

            // 1)如果上游管道是已排序的,并且是按照自然顺序排序的,则此流管道可以忽略
            if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort) {
                return sink;
                // 2)如果上游管道是已知大小的
            } else if (StreamOpFlag.SIZED.isKnown(flags)) {
                return new SizedRefSortingSink<>(sink, comparator);
                // 3)如果上游管道是未知大小的
            } else {
                return new RefSortingSink<>(sink, comparator);
            }
        }
    }
  • skip:忽略上游管道发送的前 n 个元素
    @Override
    public final Stream<P_OUT> skip(long n) {
        if (n < 0) {
            throw new IllegalArgumentException(Long.toString(n));
        }
        if (n == 0) {
            return this;
        } else {
            return SliceOps.makeRef(this, n, -1);
        }
    }

SliceOps#
    /**
     * @param upstream 上游管道
     * @param skip 需要跳过的元素个数
     * @param limit 限制接受的元素个数
     */
    public static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
            long skip, long limit) {
        if (skip < 0) {
            throw new IllegalArgumentException("Skip must be non-negative: " + skip);
        }

        return new ReferencePipeline.StatefulOp<>(upstream, StreamShape.REFERENCE,
                flags(limit)) {

            @Override
            Sink<T> opWrapSink(int flags, Sink<T> sink) {
                return new Sink.ChainedReference<>(sink) {
                    // 需要跳过的前 n 个元素
                    long n = skip;
                    // 只需要获取 m 个元素
                    long m = limit >= 0 ? limit : Long.MAX_VALUE;

                    @Override
                    public void begin(long size) {
                        downstream.begin(calcSize(size, skip, m));
                    }

                    @Override
                    public void accept(T t) {
                        // 已经不需要跳过元素
                        if (n == 0) {
                            // 下游需要接受的元素个数 > 0
                            if (m > 0) {
                                // 递减接收个数
                                m--;
                                // 将当前元素发送给下游管道
                                downstream.accept(t);
                            }
                        }
                        // 跳过当前元素,并递减跳过数
                        else {
                            n--;
                        }
                    }

                    @Override
                    public boolean cancellationRequested() {
                        // m == 0 表示此管道将不会发送元素到下游 || 下游拒绝接收元素
                        return m == 0 || downstream.cancellationRequested();
                    }
                };
            }
        };
    }
  • limit:只接受上游管道发送的前 maxSize 个元素
    @Override
    public final Stream<P_OUT> limit(long maxSize) {
        if (maxSize < 0) {
            throw new IllegalArgumentException(Long.toString(maxSize));
        }
        return SliceOps.makeRef(this, 0, maxSize);
    }

非短路的终端操作

  • forEach:使用函数式接口 action 消费流水线生产的所有元素
    @Override
    public void forEach(Consumer<? super P_OUT> action) {
        evaluate(ForEachOps.makeRef(action, false));
    }

ForEachOps#
    /**
     *  创建一个 TerminalOp,遍历并处理流中的每个引用对象
     */
    public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
            boolean ordered) {
        Objects.requireNonNull(action);
        return new ForEachOp.OfRef<>(action, ordered);
    }

    abstract static class ForEachOp<T>
    implements TerminalOp<T, Void>, TerminalSink<T, Void> {
        // 遍历是否是有序的
        private final boolean ordered;

        protected ForEachOp(boolean ordered) {
            this.ordered = ordered;
        }

        // 获取此操作的操作标识
        @Override
        public int getOpFlags() {
            return ordered ? 0 : StreamOpFlag.NOT_ORDERED;
        }

        /**
         *  将此终端操作链接到流管道尾部,并将 spliterator 中的每个元素都发送到 sink 中
         */
        @Override
        public <S> Void evaluateSequential(PipelineHelper<T> helper,
                Spliterator<S> spliterator) {
            return helper.wrapAndCopyInto(this, spliterator).get();
        }

        static final class OfRef<T> extends ForEachOp<T> {
            /**
             * 实际消费者
             */
            final Consumer<? super T> consumer;

            OfRef(Consumer<? super T> consumer, boolean ordered) {
                super(ordered);
                this.consumer = consumer;
            }

            /**
             *  处理上游发送的单个元素
             */
            @Override
            public void accept(T t) {
                consumer.accept(t);
            }
        }
    }
  • forEachOrdered:使用函数式接口 action 顺序消费流水线生产的所有元素
    @Override
    public void forEachOrdered(Consumer<? super P_OUT> action) {
        evaluate(ForEachOps.makeRef(action, true));
    }

短路的终端操作

  • anyMatch:上游管道发送的元素中至少有一个满足函数式断言 predicate 时返回 true
    @Override
    public final boolean anyMatch(Predicate<? super P_OUT> predicate) {
        return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY));
    }

MatchOps#
    enum MatchKind {
        /** 是否所有的元素都满足指定的断言 */
        ANY(true, true),

        /** 是否至少有一个元素满足指定的断言 */
        ALL(false, false),

        /** 是否所有的元素都不满足指定的断言 */
        NONE(true, false);

        /**
         * 是否需要在满足断言时停止接收上游元素
         */
        private final boolean stopOnPredicateMatches;
        /**
         * 操作被短路时的返回结果
         */
        private final boolean shortCircuitResult;

        private MatchKind(boolean stopOnPredicateMatches,
                          boolean shortCircuitResult) {
            this.stopOnPredicateMatches = stopOnPredicateMatches;
            this.shortCircuitResult = shortCircuitResult;
        }
    }

    public static <T> TerminalOp<T, Boolean> makeRef(Predicate<? super T> predicate,
            MatchKind matchKind) {
        Objects.requireNonNull(predicate);
        Objects.requireNonNull(matchKind);
        class MatchSink extends BooleanTerminalSink<T> {
            MatchSink() {
                super(matchKind);
            }

            @Override
            public void accept(T t) {
                /**
                 * 当前管道还能继续接收元素 && 当前元素匹配停止条件
                 */
                if (!stop && predicate.test(t) == matchKind.stopOnPredicateMatches) {
                    // 停止接收元素
                    stop = true;
                    // 写入结果值
                    value = matchKind.shortCircuitResult;
                }
            }
        }

        return new MatchOp<>(StreamShape.REFERENCE, matchKind, MatchSink::new);
    }

    private static final class MatchOp<T> implements TerminalOp<T, Boolean> {
        /**
         * 上游发送的元素类型
         */
        private final StreamShape inputShape;
        /**
         * 匹配类型
         */
        final MatchKind matchKind;
        /**
         * sink 生成器
         */
        final Supplier<BooleanTerminalSink<T>> sinkSupplier;

        MatchOp(StreamShape shape,
                MatchKind matchKind,
                Supplier<BooleanTerminalSink<T>> sinkSupplier) {
            this.inputShape = shape;
            this.matchKind = matchKind;
            this.sinkSupplier = sinkSupplier;
        }

        @Override
        public int getOpFlags() {
            // 当前管道是短路的 && 未排序的
            return StreamOpFlag.IS_SHORT_CIRCUIT | StreamOpFlag.NOT_ORDERED;
        }

        @Override
        public StreamShape inputShape() {
            return inputShape;
        }

        @Override
        public <S> Boolean evaluateSequential(PipelineHelper<T> helper,
                                              Spliterator<S> spliterator) {
            return helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).getAndClearState();
        }
    }

    /**
     * 避免返回值的装箱而定义的 BooleanTerminalSink
     */
    private abstract static class BooleanTerminalSink<T> implements Sink<T> {
        /**
         * 是否停止接收
         */
        boolean stop;
        /**
         * 返回结果值
         */
        boolean value;

        BooleanTerminalSink(MatchKind matchKind) {
            value = !matchKind.shortCircuitResult;
        }

        /**
         * 情况状态并返回结果值
         */
        public boolean getAndClearState() {
            return value;
        }

        /**
         * 是否停止接收上游元素
         */
        @Override
        public boolean cancellationRequested() {
            return stop;
        }
    }
  • allMatch:上游管道发送的所有元素都满足函数式断言 predicate 时返回 true
    @Override
    public final boolean allMatch(Predicate<? super P_OUT> predicate) {
        return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ALL));
    }
  • noneMatch:上游管道发送的元素没有一个满足函数式断言 predicate 时返回 true
    @Override
    public final boolean noneMatch(Predicate<? super P_OUT> predicate) {
        return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.NONE));
    }
  • findFirst:获取上游管道发送的第一个元素
    @Override
    public final Optional<P_OUT> findFirst() {
        return evaluate(FindOps.makeRef(true));
    }

FindOps#
    /**
     * @param mustFindFirst 是否必须是第一个元素
     */
    @SuppressWarnings("unchecked")
    public static <T> TerminalOp<T, Optional<T>> makeRef(boolean mustFindFirst) {
        return (TerminalOp<T, Optional<T>>)
                (mustFindFirst ? FindSink.OfRef.OP_FIND_FIRST : FindSink.OfRef.OP_FIND_ANY);
    }

   private abstract static class FindSink<T, O> implements TerminalSink<T, O> {
        /**
         * 是否已经找到值了
         */
        boolean hasValue;
        /**
         * 结果值
         */
        T value;

        FindSink() {} // Avoid creation of special accessor

        @Override
        public void accept(T value) {
            // 当前元素是第一个元素
            if (!hasValue) {
                // 已经找到值
                hasValue = true;
                // 写入目标值
                this.value = value;
            }
        }

        /**
         * 一旦找到值,就拒绝接收上游元素
         */
        @Override
        public boolean cancellationRequested() {
            return hasValue;
        }

        static final class OfRef<T> extends FindSink<T, Optional<T>> {
            /**
             * 获取结果值
             */
            @Override
            public Optional<T> get() {
                return hasValue ? Optional.of(value) : null;
            }

            static final TerminalOp<?, ?> OP_FIND_FIRST = new FindOp<>(true,
                    StreamShape.REFERENCE, Optional.empty(),
                    Optional::isPresent, FindSink.OfRef::new);

            static final TerminalOp<?, ?> OP_FIND_ANY = new FindOp<>(false,
                    StreamShape.REFERENCE, Optional.empty(),
                    Optional::isPresent, FindSink.OfRef::new);
        }
    }

    private static final class FindOp<T, O> implements TerminalOp<T, O> {
        /**
         * 上游发送的元素类型
         */
        private final StreamShape shape;
        /**
         * 此操作的标识
         */
        final int opFlags;
        /**
         * 未找到值时的返回值
         */
        final O emptyValue;
        /**
         * 查找断言
         */
        final Predicate<O> presentPredicate;
        /**
         * sink 生成器
         */
        final Supplier<TerminalSink<T, O>> sinkSupplier;

        FindOp(boolean mustFindFirst,
                StreamShape shape,
                O emptyValue,
                Predicate<O> presentPredicate,
                Supplier<TerminalSink<T, O>> sinkSupplier) {
            this.opFlags = StreamOpFlag.IS_SHORT_CIRCUIT | (mustFindFirst ? 0 : StreamOpFlag.NOT_ORDERED);
            this.shape = shape;
            this.emptyValue = emptyValue;
            this.presentPredicate = presentPredicate;
            this.sinkSupplier = sinkSupplier;
        }

        @Override
        public int getOpFlags() {
            return opFlags;
        }

        @Override
        public StreamShape inputShape() {
            return shape;
        }

        @Override
        public <S> O evaluateSequential(PipelineHelper<T> helper,
                Spliterator<S> spliterator) {
            // 使用 sink 顺序评估流水线产生的元素,并返回查找结果
            final O result = helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get();
            // 找到目标值,则返回它;否则返回 emptyValue
            return result != null ? result : emptyValue;
        }
    }
  • findAny:获取上游管道发送的任意一个元素【串行流取的是第一个元素】
    @Override
    public final Optional<P_OUT> findAny() {
        return evaluate(FindOps.makeRef(false));
    }

原文地址:https://www.cnblogs.com/zhuxudong/p/10152663.html

时间: 2024-10-31 18:13:38

Stream 源码分析(串行流)的相关文章

red5源码分析---9

red5源码分析-客户端publish流 接着上一章的分析结果,参考<red5源码分析-7>的分析结论,当服务器返回steamId后,客户端会执行BaseRTMPClientHandler的onCommand函数,onCommand函数会根据返回的方法名"_result"开始执行handlePendingCallResult函数,handlePendingCallResult会获取之前注册的回调函数,根据<red5源码分析-7>,该回调函数就为CreateStr

Netty源码分析第6章(解码器)----&gt;第3节: 行解码器

Netty源码分析第六章: 解码器 第三节: 行解码器 这一小节了解下行解码器LineBasedFrameDecoder, 行解码器的功能是一个字节流, 以\r\n或者直接以\n结尾进行解码, 也就是以换行符为分隔进行解析 同样, 这个解码器也继承了ByteToMessageDecoder 首先看其参数: //数据包的最大长度, 超过该长度会进行丢弃模式 private final int maxLength; //超出最大长度是否要抛出异常 private final boolean fail

5.Sentinel源码分析—Sentinel如何实现自适应限流?

Sentinel源码解析系列: 1.Sentinel源码分析-FlowRuleManager加载规则做了什么? 2. Sentinel源码分析-Sentinel是如何进行流量统计的? 3. Sentinel源码分析- QPS流量控制是如何实现的? 4.Sentinel源码分析- Sentinel是如何做到降级的? 这篇文章主要学习一下Sentinel如何实现自适应限流的. 为什么要做自适应限流,官方给了两个理由: 保证系统不被拖垮 在系统稳定的前提下,保持系统的吞吐量 我再贴一下官方的原理: 能

6.Sentinel源码分析—Sentinel是如何动态加载配置限流的?

Sentinel源码解析系列: 1.Sentinel源码分析-FlowRuleManager加载规则做了什么? 2. Sentinel源码分析-Sentinel是如何进行流量统计的? 3. Sentinel源码分析- QPS流量控制是如何实现的? 4.Sentinel源码分析- Sentinel是如何做到降级的? 5.Sentinel源码分析-Sentinel如何实现自适应限流? 有时候我们做限流的时候并不想直接写死在代码里面,然后每次要改规则,或者增加规则的时候只能去重启应用来解决.而是希望能

React躬行记(16)——React源码分析

React可大致分为三部分:Core.Reconciler和Renderer,在阅读源码之前,首先需要搭建测试环境,为了方便起见,本文直接采用了网友搭建好的环境,React版本是16.8.6,与最新版本很接近. 一.目录结构 React采用了由Lerna维护monorepo方式进行代码管理,即用一个仓库管理多个模块(module)或包(package).在React仓库的根目录中,包含三个目录: (1)fixtures,给源码贡献者准备的测试用例. (2)packages,React库提供的包的

2020了你还不会Java8新特性?(四)Collector类源码分析

Collector类源码分析 jdk8是怎么对底层完成支持的.不了解底层,平时用还可以,但是遇到问题的时候就会卡在那里.迟迟灭有解决方案.在学习一门新技术时,先学习怎么去用,不要执着于源码.但是随着用的越来越多,你去了解底层是比较好的一种学习方法. 有多种方法可以实现同一个功能.什么方式更好呢? 越具体的方法越好. 减少自动装箱拆箱操作 collect : 收集器 Collector作为collect方法的参数. Collector作为一个接口.它是一个可变的汇聚操作,将输入元素累计到一个可变的

vlc源码分析(七) 调试学习HLS协议

HTTP Live Streaming(HLS)是苹果公司提出来的流媒体传输协议.与RTP协议不同的是,HLS可以穿透某些允许HTTP协议通过的防火墙. 一.HLS播放模式 (1) 点播模式(Video on demand, VOD) 点播模式是指当前时间点可以获取到所有index文件和ts文件,二级index文件中记录了所有ts文件的地址.这种模式允许客户端访问全部内容.上面的例子中就是一个点播模式下的m3u8的结构. (2) 直播模式(Live) 直播模式是指实时生成M3u8和ts文件.它的

最新版ffmpeg源码分析

最新版ffmpeg源码分析一:框架 (ffmpeg v0.9) 框架 最新版的ffmpeg中发现了一个新的东西:avconv,而且ffmpeg.c与avconv.c一个模样,一研究才发现是libav下把ffmpeg改名为avconv了. 到底libav与ffmpeg现在是什么个关系?我也搞得希里糊涂的,先不管它了. ffmpeg的主要功能是音视频的转换和处理.其功能之强大已经到了匪夷所思的地步(有点替它吹了).它的主要特点是能做到把多个输入文件中的任意几个流重新组合到输出文件中,当然输出文件也可

MapReduce源码分析之MapTask分析(二)

SpillThread分析 为什么需要Spill 内存大小总是有效,因此在Mapper在处理过程中,数据持续输出到内存中时,必然需要有机制能将内存中的数据换出,合理的刷出到磁盘上.SpillThread就是用来完成这部分工作. SpillThread的线程处理函数只是做一层封装,当索引表中的kvstart和kvend指向一样的索引位置时,会持续处于等待过程,等待外部通知需要触发spill动作,当有spill请求时,会触发StartSpill来唤醒SpillThread线程,进入到sortAndS