天天看點

Java 8 Stream流底層原理

作者:Running的程式員

Java Stream

函數式接口

初識lambda,函數式接口肯定是繞不過去的,函數式接口就是一個有且僅有一個抽象方法,但是可以有多個非抽象方法的接口。函數式接口可以被隐式轉換為lambda表達式。

@FunctionalInterface
public interface Closeable {
    
    void close();
}           

在java.util.function它包含了很多類,用來支援Java的函數式程式設計,該包中的函數式接口有:

Java 8 Stream流底層原理

jdk提供的原生函數式接口

操作

Java 8 Stream流底層原理

Stream流操作分類

流程

Stream相關接口繼承圖:

Java 8 Stream流底層原理

Stream相關接口繼承圖: uml圖

Stream流水線組織結構示意圖(圖檔來源網絡):

Java 8 Stream流底層原理

Collection

​ 類路徑java.util.colltction

@Override
default Spliterator<E> spliterator() {
    return Spliterators.spliterator(this, 0);
}
// 常用Stream流轉換
default Stream<E> stream() {
    return StreamSupport.stream(spliterator(), false);
}
// 并行流
default Stream<E> parallelStream() {
    return StreamSupport.stream(spliterator(), true);
}

// java.util.stream.StreamSupport#stream(java.util.Spliterator<T>, boolean)
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
    Objects.requireNonNull(spliterator);
    return new ReferencePipeline.Head<>(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel);
}
           

AbstractPipeline

​ 類路徑java.util.stream.AbstractPipeline

// 反向連結到管道鍊的頭部(如果是源階段,則為自身)。
private final AbstractPipeline sourceStage;

// “上遊”管道,如果這是源階段,則為null。
private final AbstractPipeline previousStage;

// 此管道對象表示的中間操作的操作标志。
protected final int sourceOrOpFlags;

// 管道中的下一個階段;如果這是最後一個階段,則為null。 在連結到下一個管道時有效地結束。
private AbstractPipeline nextStage;

// 如果是順序的,則此管道對象與流源之間的中間操作數;如果是并行的,則為先前有狀态的中間操作數。 在管道準備進行評估時有效。
private int depth;

// 源和所有操作的組合源标志和操作标志,直到此流水線對象表示的操作為止(包括該流水線對象所代表的操作)。 在管道準備進行評估時有效。
private int combinedFlags;

// 源拆分器。 僅對頭管道有效。 如果管道使用非null值,那麼在使用管道之前, sourceSupplier必須為null。 在使用管道之後,如果非null,則将其設定為null。
private Spliterator<?> sourceSpliterator;

// 來源供應商。 僅對頭管道有效。 如果非null,則在使用管道之前, sourceSpliterator必須為null。 在使用管道之後,如果非null,則将其設定為null。
private Supplier<? extends Spliterator<?>> sourceSupplier;

// 如果已連結或使用此管道,則為True
private boolean linkedOrConsumed;

// 如果正在執行任何有狀态操作,則為true;否則為true。 僅對源階段有效。
private boolean sourceAnyStateful;

private Runnable sourceCloseAction;

// 如果管道是并行的,則為true;否則,管道為順序的;否則為true。 僅對源階段有效。
private boolean parallel;
           

ReferencePipeline

​ 類路徑:java.util.stream.ReferencePipeline

filter

// java.util.stream.ReferencePipeline#filter
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    // 傳回一個匿名無狀态的管道
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) {
        // 下遊生産線所需要的回調接口
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }
			   // 真正執行操作的方法,依靠ChainedReference内置ReferencePipeline引用下遊的回調
                @Override
                public void accept(P_OUT u) {
                    // 隻有滿足條件的元素才能被下遊執行
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}
           

map

// java.util.stream.ReferencePipeline#filter
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    // 傳回一個匿名無狀态的管道
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) {
        // 下遊生産線所需要的回調接口
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }
			   // 真正執行操作的方法,依靠ChainedReference内置ReferencePipeline引用下遊的回調
                @Override
                public void accept(P_OUT u) {
                    // 隻有滿足條件的元素才能被下遊執行
                    if (predicate.test(u))
                        downstream.accept(u);
                }
            };
        }
    };
}
           

flatMap

// java.util.stream.ReferencePipeline#flatMap
@Override
public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
    Objects.requireNonNull(mapper);
    // 傳回一個匿名無狀态的管道
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
        // 下遊生産線所需要的回調接口
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
            return new Sink.ChainedReference<P_OUT, R>(sink) {
                @Override
                public void begin(long size) {
                    downstream.begin(-1);
                }
			   // 真正執行操作的方法,依靠ChainedReference内置ReferencePipeline引用下遊的回調
                @Override
                public void accept(P_OUT u) {
                    try (Stream<? extends R> result = mapper.apply(u)) {
                       	// 劃分為多個流執行下遊(分流)
                        if (result != null)
                            result.sequential().forEach(downstream);
                    }
                }
            };
        }
    };
}
           

peek

// java.util.stream.ReferencePipeline#peek
@Override
public final Stream<P_OUT> peek(Consumer<? super P_OUT> action) {
    Objects.requireNonNull(action);
    // 傳回一個匿名無狀态的管道
    return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, 0) {
        // 下遊生産線所需要的回調接口
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                // 真正執行操作的方法,依靠ChainedReference内置ReferencePipeline引用下遊的回調
                @Override
                public void accept(P_OUT u) {
                    // 先執行自定義方法,在執行下遊方法
                    action.accept(u);
                    downstream.accept(u);
                }
            };
        }
    };
}
           

sorted

@Override
public final Stream<P_OUT> sorted() {
    // 不提供Comparator,會使用元素自實作Comparator的compareTo方法
    return SortedOps.makeRef(this);
}

@Override
public final Stream<P_OUT> sorted(Comparator<? super P_OUT> comparator) {
    return SortedOps.makeRef(this, comparator);
}
// Sorted.makeRef
static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
                             Comparator<? super T> comparator) {
    return new OfRef<>(upstream, comparator);
}
// ofRef類
private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {

        private final boolean isNaturalSort;
        private final Comparator<? super T> comparator;

        @Override
        public Sink<T> opWrapSink(int flags, Sink<T> sink) {
            Objects.requireNonNull(sink);
		   // 根據不同的flag進行不同排序
            if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
                return sink;
            else if (StreamOpFlag.SIZED.isKnown(flags))
                return new SizedRefSortingSink<>(sink, comparator);
            else
                return new RefSortingSink<>(sink, comparator);
        }

    }
           

distinct

@Override
public final Stream<P_OUT> distinct() {
    return DistinctOps.makeRef(this);
}
static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {
    // 傳回一個匿名有狀态的管道
    return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE, StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {

        @Override
        Sink<T> opWrapSink(int flags, Sink<T> sink) {
            Objects.requireNonNull(sink);

            if (StreamOpFlag.DISTINCT.isKnown(flags)) {
                // 已經是去重過了
                return sink;
            } else if (StreamOpFlag.SORTED.isKnown(flags)) {
               	// 有序流
                return new Sink.ChainedReference<T, T>(sink) {
                    boolean seenNull;
                    // 這個為先執行的前序元素
                    T lastSeen;

                    @Override
                    public void begin(long size) {
                        seenNull = false;
                        lastSeen = null;
                        downstream.begin(-1);
                    }

                    @Override
                    public void end() {
                        seenNull = false;
                        lastSeen = null;
                        downstream.end();
                    }
				  // 這裡通過有序的特性,前序元素與後序元素比較,如果相等則跳過執行後序的元素
                    @Override
                    public void accept(T t) {
                        if (t == null) {
                            // 這裡控制元素為null隻有一個
                            if (!seenNull) {
                                seenNull = true;
                                downstream.accept(lastSeen = null);
                            }
                        } else if (lastSeen == null || !t.equals(lastSeen)) {
                            // 這裡将前序元素指派給lastSeen
                            downstream.accept(lastSeen = t);
                        }
                    }
                };
            } else {
                // 底層通過Set進行去重,是以該元素需要重寫hashCode和equals方法
                return new Sink.ChainedReference<T, T>(sink) {
                    Set<T> seen;

                    @Override
                    public void begin(long size) {
                        seen = new HashSet<>();
                        downstream.begin(-1);
                    }

                    @Override
                    public void end() {
                        seen = null;
                        downstream.end();
                    }

                    @Override
                    public void accept(T t) {
                        if (!seen.contains(t)) {
                            seen.add(t);
                            downstream.accept(t);
                        }
                    }
                };
            }
        }
    };
}
           

skip、limit

public static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
                                        long skip, long limit) {
        if (skip < 0)
            throw new IllegalArgumentException("Skip must be non-negative: " + skip);
    	// 傳回一個匿名有狀态的管道
        return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE, flags(limit)) {
            Spliterator<T> unorderedSkipLimitSpliterator(Spliterator<T> s, long skip, long limit, long sizeIfKnown) {
                if (skip <= sizeIfKnown) {
                    limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
                    skip = 0;
                }
                return new StreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s, skip, limit);
            }
		   // 自己實作真正操作的方法
            @Override
            Sink<T> opWrapSink(int flags, Sink<T> sink) {
                return new Sink.ChainedReference<T, T>(sink) {
                    long n = skip;
                    long m = limit >= 0 ? limit : Long.MAX_VALUE;

                    @Override
                    public void begin(long size) {
                        downstream.begin(calcSize(size, skip, m));
                    }

                    @Override
                    public void accept(T t) {
                        if (n == 0) {
                            // limit
                            if (m > 0) {
                                m--;
                                downstream.accept(t);
                            }
                        }
                        // skip
                        else {
                            n--;
                        }
                    }

                    @Override
                    public boolean cancellationRequested() {
                        return m == 0 || downstream.cancellationRequested();
                    }
                };
            }
        };
    }
           

reduce

// java.util.stream.ReferencePipeline#reduce(P_OUT, java.util.function.BinaryOperator<P_OUT>)
@Override
public final P_OUT reduce(final P_OUT identity, final BinaryOperator<P_OUT> accumulator) {
    return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator));
}
// java.util.stream.ReferencePipeline#reduce(java.util.function.BinaryOperator<P_OUT>)
@Override
public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
    return evaluate(ReduceOps.makeRef(accumulator));
}
// java.util.stream.ReferencePipeline#reduce(R, java.util.function.BiFunction<R,? super P_OUT,R>, java.util.function.BinaryOperator<R>)
@Override
public final <R> R reduce(R identity, BiFunction<R, ? super P_OUT, R> accumulator, BinaryOperator<R> combiner) {
    return evaluate(ReduceOps.makeRef(identity, accumulator, combiner));
}

// java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp<E_OUT,R>)
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
        ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
        : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
           

collect

// java.util.stream.ReferencePipeline#collect(java.util.stream.Collector<? super P_OUT,A,R>)
@Override
@SuppressWarnings("unchecked")
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
    A container;
    if (isParallel()
        && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
        && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
        container = collector.supplier().get();
        BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
        forEach(u -> accumulator.accept(container, u));
    }
    else {
        container = evaluate(ReduceOps.makeRef(collector));
    }
    // 具有特定轉換的使用finisher處理
    return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
        ? (R) container
        : collector.finisher().apply(container);
}
// java.util.stream.ReferencePipeline#collect(java.util.function.Supplier<R>, java.util.function.BiConsumer<R,? super P_OUT>, java.util.function.BiConsumer<R,R>)
@Override
public final <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super P_OUT> accumulator, BiConsumer<R, R> combiner) {
    return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner));
}

// java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp<E_OUT,R>)
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    assert getOutputShape() == terminalOp.inputShape();
    if (linkedOrConsumed)
        throw new IllegalStateException(MSG_STREAM_LINKED);
    linkedOrConsumed = true;

    return isParallel()
        ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
        : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
           

forEach

// java.util.stream.ReferencePipeline#forEach
@Override
public void forEach(Consumer<? super P_OUT> action) {
    evaluate(ForEachOps.makeRef(action, false));
}

// java.util.stream.ForEachOps#makeRef
public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action, boolean ordered) {
    Objects.requireNonNull(action);
    return new ForEachOp.OfRef<>(action, ordered);
}

// java.util.stream.ForEachOps.ForEachOp.OfRef
static final class OfRef<T> extends ForEachOp<T> {
    final Consumer<? super T> consumer;

    OfRef(Consumer<? super T> consumer, boolean ordered) {
        super(ordered);
        this.consumer = consumer;
    }

    // 隻是簡單的消費
    @Override
    public void accept(T t) {
        consumer.accept(t);
    }
}
           

Head

​ 流的資料元的頭,類路徑java.util.stream.ReferencePipeline.Head

// java.util.stream.ReferencePipeline.Head
static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
    
    Head(Supplier<? extends Spliterator<?>> source, int sourceFlags, boolean parallel) {
        super(source, sourceFlags, parallel);
    }

    Head(Spliterator<?> source, int sourceFlags, boolean parallel) {
        super(source, sourceFlags, parallel);
    }

    @Override
    final boolean opIsStateful() {
        throw new UnsupportedOperationException();
    }

    @Override
    final Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) {
        throw new UnsupportedOperationException();
    }
    // Optimized sequential terminal operations for the head of the pipeline
    @Override
    public void forEach(Consumer<? super E_OUT> action) {
        if (!isParallel()) {
            sourceStageSpliterator().forEachRemaining(action);
        }
        else {
            super.forEach(action);
        }
    }

    @Override
    public void forEachOrdered(Consumer<? super E_OUT> action) {
        if (!isParallel()) {
            sourceStageSpliterator().forEachRemaining(action);
        }
        else {
            super.forEachOrdered(action);
        }
    }
}
           

StatelessOp

​ 無狀态的中間管道,類路徑java.util.stream.ReferencePipeline.StatelessOp

// java.util.stream.ReferencePipeline.StatelessOp
abstract static class StatelessOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {

    StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, StreamShape inputShape, int opFlags) {
        super(upstream, opFlags);
        assert upstream.getOutputShape() == inputShape;
    }

    @Override
    final boolean opIsStateful() {
        return false;
    }
}
           

StatefulOp

​ 有狀态的中間管道,類路徑java.util.stream.ReferencePipeline.StatefulOp

// java.util.stream.ReferencePipeline.StatefulOp
abstract static class StatefulOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {

    StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, StreamShape inputShape, int opFlags) {
        super(upstream, opFlags);
        assert upstream.getOutputShape() == inputShape;
    }

    @Override
    final boolean opIsStateful() {
        return true;
    }

    @Override
    abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
                                                   Spliterator<P_IN> spliterator,
                                                   IntFunction<E_OUT[]> generator);
           

TerminalOp

​ 管道流的結束操作,類路徑java.util.stream.TerminalOp

interface TerminalOp<E_IN, R> {
    
	// 擷取此操作的輸入類型的形狀
    default StreamShape inputShape() { return StreamShape.REFERENCE; }

    // 擷取操作的流标志。 終端操作可以設定StreamOpFlag定義的流标志的有限子集,并且這些标志與管道的先前組合的流和中間操作标志組合在一起。
    default int getOpFlags() { return 0; }

    // 使用指定的PipelineHelper對操作執行并行評估,該操作描述上遊中間操作。
    default <P_IN> R evaluateParallel(PipelineHelper<E_IN> helper, Spliterator<P_IN> spliterator) {
        if (Tripwire.ENABLED)
            Tripwire.trip(getClass(), "{0} triggering TerminalOp.evaluateParallel serial default");
        return evaluateSequential(helper, spliterator);
    }

    // 使用指定的PipelineHelper對操作執行順序評估,該操作描述上遊中間操作。
    <P_IN> R evaluateSequential(PipelineHelper<E_IN> helper, Spliterator<P_IN> spliterator);
}
           

ReduceOp

​ 類路徑java.util.stream.ReduceOps.ReduceOp

private static abstract class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>> implements TerminalOp<T, R> {
        private final StreamShape inputShape;

        ReduceOp(StreamShape shape) {
            inputShape = shape;
        }

        public abstract S makeSink();

        @Override
        public StreamShape inputShape() {
            return inputShape;
        }

        // 通過匿名子類實作makeSink()擷取Sink
        @Override
        public <P_IN> R evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
            return helper.wrapAndCopyInto(makeSink(), spliterator).get();
        }

        @Override
        public <P_IN> R evaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
            return new ReduceTask<>(this, helper, spliterator).invoke().get();
        }
    }
           

MatchOp

​ 類路徑java.util.stream.MatchOps.MatchOp

private static final class MatchOp<T> implements TerminalOp<T, Boolean> {
        private final StreamShape inputShape;
        final MatchKind matchKind;
        final Supplier<BooleanTerminalSink<T>> sinkSupplier;

        MatchOp(StreamShape shape, MatchKind matchKind, Supplier<BooleanTerminalSink<T>> sinkSupplier) {
            this.inputShape = shape;
            this.matchKind = matchKind;
            this.sinkSupplier = sinkSupplier;
        }

        @Override
        public int getOpFlags() {
            return StreamOpFlag.IS_SHORT_CIRCUIT | StreamOpFlag.NOT_ORDERED;
        }

        @Override
        public StreamShape inputShape() {
            return inputShape;
        }

    	// 使用内置的sinkSupplier擷取Sink
        @Override
        public <S> Boolean evaluateSequential(PipelineHelper<T> helper, Spliterator<S> spliterator) {
            return helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).getAndClearState();
        }

        @Override
        public <S> Boolean evaluateParallel(PipelineHelper<T> helper, Spliterator<S> spliterator) {
            return new MatchTask<>(this, helper, spliterator).invoke();
        }
    }
           

FindOp

​ 類路徑java.util.stream.FindOps.FindOp

private static final class FindOp<T, O> implements TerminalOp<T, O> {
        private final StreamShape shape;
        final boolean mustFindFirst;
        final O emptyValue;
        final Predicate<O> presentPredicate;
        final Supplier<TerminalSink<T, O>> sinkSupplier;

        FindOp(boolean mustFindFirst,
                       StreamShape shape,
                       O emptyValue,
                       Predicate<O> presentPredicate,
                       Supplier<TerminalSink<T, O>> sinkSupplier) {
            this.mustFindFirst = mustFindFirst;
            this.shape = shape;
            this.emptyValue = emptyValue;
            this.presentPredicate = presentPredicate;
            this.sinkSupplier = sinkSupplier;
        }

        @Override
        public int getOpFlags() {
            return StreamOpFlag.IS_SHORT_CIRCUIT | (mustFindFirst ? 0 : StreamOpFlag.NOT_ORDERED);
        }

        @Override
        public StreamShape inputShape() {
            return shape;
        }

    	// 通過内置sinkSupplier擷取Sink
        @Override
        public <S> O evaluateSequential(PipelineHelper<T> helper, Spliterator<S> spliterator) {
            O result = helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get();
            return result != null ? result : emptyValue;
        }

        @Override
        public <P_IN> O evaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
            return new FindTask<>(this, helper, spliterator).invoke();
        }
    }
           

ForEachOp

​ 類路徑java.util.stream.ForEachOps.ForEachOp

static abstract class ForEachOp<T> implements TerminalOp<T, Void>, TerminalSink<T, Void> {
        private final boolean ordered;

        protected ForEachOp(boolean ordered) {
            this.ordered = ordered;
        }

        @Override
        public int getOpFlags() {
            return ordered ? 0 : StreamOpFlag.NOT_ORDERED;
        }
	
    	// 自己實作了Sink
        @Override
        public <S> Void evaluateSequential(PipelineHelper<T> helper, Spliterator<S> spliterator) {
            return helper.wrapAndCopyInto(this, spliterator).get();
        }

        @Override
        public <S> Void evaluateParallel(PipelineHelper<T> helper, Spliterator<S> spliterator) {
            if (ordered)
                new ForEachOrderedTask<>(helper, spliterator, this).invoke();
            else
                new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
            return null;
        }

        @Override
        public Void get() {
            return null;
        }

        static final class OfRef<T> extends ForEachOp<T> {
            final Consumer<? super T> consumer;

            OfRef(Consumer<? super T> consumer, boolean ordered) {
                super(ordered);
                this.consumer = consumer;
            }

            @Override
            public void accept(T t) {
                consumer.accept(t);
            }
        }
		...
    }
           

Sink

​ 類路徑java.util.stream.Sink

interface Sink<T> extends Consumer<T> {
	// 開始周遊元素之前調用該方法,通知Sink做好準備。
    default void begin(long size) {}
	// 所有元素周遊完成之後調用,通知Sink沒有更多的元素了。
    default void end() {}
	// 是否可以結束操作,可以讓短路操作盡早結束。
    default boolean cancellationRequested() {
        return false;
    }
    // 周遊元素時調用,接受一個待處理元素,并對元素進行處理。Stage把自己包含的操作和回調方法封裝到該方法裡,前一個Stage隻需要調用目前Stage.accept(T t)方法就行了。
    void accept(T t);
}
           

​ 這裡Sink的子類實作中分為兩種:中間操作匿名實作ChainedReference和TerminalOp子類所提供的Sink。

ChainedReference

​ 類路徑java.util.stream.Sink.ChainedReference,這裡是中間操作的預設模闆父類

static abstract class ChainedReference<T, E_OUT> implements Sink<T> {
        protected final Sink<? super E_OUT> downstream;

        public ChainedReference(Sink<? super E_OUT> downstream) {
            this.downstream = Objects.requireNonNull(downstream);
        }

        @Override
        public void begin(long size) {
            downstream.begin(size);
        }

        @Override
        public void end() {
            downstream.end();
        }

        @Override
        public boolean cancellationRequested() {
            return downstream.cancellationRequested();
        }
    }
           

​ 在上述的中間操作管道流中都是通過匿名類繼承ChainedReference實作onWrapSink(int, Sink)傳回一個指定操作的Sink。

TerminalSink

​ 這裡為什麼講提供呢?這是因為不同的實作TerminalOp的子類中在實作java.util.stream.TerminalOp#evaluateSequential中都是通過helper.wrapAndCopyInto(TerminalOp子類實作提供的Sink, spliterator)中通過參數傳遞的方式提供的,不同的子類傳遞的方式不一樣是以此處用了一個提供Sink

​ 由ReduceOps中實作TerminalOp所提供的ReducingSink,它是由匿名類實作java.util.stream.ReduceOps.ReduceOp#makeSink來傳遞給helper.wrapAndCopyInto(makeSink(), spliterator)的。

public static <T, U> TerminalOp<T, U> makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) {
        Objects.requireNonNull(reducer);
        Objects.requireNonNull(combiner);
        class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> {
            @Override
            public void begin(long size) {
                state = seed;
            }

            @Override
            public void accept(T t) {
                state = reducer.apply(state, t);
            }

            @Override
            public void combine(ReducingSink other) {
                state = combiner.apply(state, other.state);
            }
        }
        return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) {
            @Override
            public ReducingSink makeSink() {
                return new ReducingSink();
            }
        };
    }
           

​ 由ForEachOps中實作TerminalOp所提供的是this,它的提供方式就是通過this傳遞給helper.wrapAndCopyInto(this, spliterator)。

// 這裡ForEachOp自己通過TerminalSink間接的實作了Sink
static abstract class ForEachOp<T> implements TerminalOp<T, Void>, TerminalSink<T, Void> {
        @Override
        public <S> Void evaluateSequential(PipelineHelper<T> helper, Spliterator<S> spliterator) {
            return helper.wrapAndCopyInto(this, spliterator).get();
        }
}
           

​ 由MatchOps中實作TerminalOp所提供的sinkSupplier通過構造函數由外部指派,通過Supplier接口的get()來傳遞給helper.wrapAndCopyInto(sinkSupplier.get(), spliterator)。

private static final class MatchOp<T> implements TerminalOp<T, Boolean> {
        final Supplier<BooleanTerminalSink<T>> sinkSupplier;

        @Override
        public <S> Boolean evaluateSequential(PipelineHelper<T> helper,Spliterator<S> spliterator) {
            return helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).getAndClearState();
        }
    }
           

​ 由FindOps中實作TerminalOp所提供的與上述MatchOps是一緻的

private static final class FindOp<T, O> implements TerminalOp<T, O> {
        final Supplier<TerminalSink<T, O>> sinkSupplier;

        @Override
        public <S> O evaluateSequential(PipelineHelper<T> helper, Spliterator<S> spliterator) {
            O result = helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get();
            return result != null ? result : emptyValue;
        }
    }
           

Collector

​ 在Collector中有以下幾個實作接口:

  • Supplier:結果類型的提供器。
  • BiConsumer<A, T>:将元素放入結果的累加器。
  • BinaryOperator:合并部分結果的組合器。
  • Function<A, R>:對結果類型轉換為最終結果類型的轉換器。
  • Set:儲存Collector特征的集合

并行流

​ 前述都是基于串行流的講解,其實并行流也是基于上述的helper.wrapAndCopyInto(op.sinkSupplier.get(), spliterator)這個方法上面做的一層基于ForkJoinTask多線程架構的封裝。

ForkJoinTask

​ ForkJoin架構的思想就是分而治之,它将一個大任務切割為多個小任務這個過程稱為fork,将每個任務的執行的結果進行彙總的過程稱為join。ForkJoin架構相關的接口關系圖如下(圖檔來源網絡):

Java 8 Stream流底層原理

AbstractTask

​ 類路徑java.util.stream.AbstractTask,AbstractTask繼承了在JUC中已經封裝好的ForkJoinTask抽象子類java.util.concurrent.CountedCompleter。

​ 此類基于CountedCompleter ,它是fork-join任務的一種形式,其中每個任務都有未完成子代的信号量計數,并且該任務隐式完成并在其最後一個子代完成時得到通知。 内部節點任務可能會覆寫CountedCompleter的onCompletion方法,以将子任務的結果合并到目前任務的結果中。​ 拆分和設定子任務連結是由内部節點的compute()完成的。 在葉節點的compute()時間,可以確定将為所有子代設定父代的子代相關字段(包括父代子代的同級連結)。

​ 例如,執行減少任務的任務将覆寫doLeaf()以使用Spliterator對該葉節點的塊執行減少Spliterator ,并覆寫onCompletion()以合并内部節點的子任務的結果:

@Override
protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) {
    // 傳回一個ForkJoinTask任務
    return new ReduceTask<>(this, spliterator);
}

@Override
protected S doLeaf() {
    // 其他實作大同小異
    return helper.wrapAndCopyInto(op.makeSink(), spliterator);
}

@Override
public void onCompletion(CountedCompleter<?> caller) {
    // 非葉子節點進行結果組合
    if (!isLeaf()) {
        S leftResult = leftChild.getLocalResult();
        leftResult.combine(rightChild.getLocalResult());
        setLocalResult(leftResult);
    }
    // GC spliterator, left and right child
    super.onCompletion(caller);
}
           

​ AbstractTask封裝了分片任務的算法模闆,通過是Spliterator的trySplit()方法來實作分片的細節,詳細算法源碼如下(類路徑:java.util.stream.AbstractTask#compute):

@Override
public void compute() {
    // 将目前這個spliterator作為右節點(此時為root節點)
    Spliterator<P_IN> rs = spliterator, ls; 
    // 評估任務的大小
    long sizeEstimate = rs.estimateSize();
    // 擷取任務門檻值
    long sizeThreshold = getTargetSize(sizeEstimate);
    boolean forkRight = false;
    @SuppressWarnings("unchecked") K task = (K) this;
    // 細節不多贅述,下面我用圖來講解算法
      /**
         * 根節點指定為:右邊節點
         *              root
         *              split()
         *    left               right
         * left.fork()
         *                       split()
         *                   l            r
         *            rs = ls
         *                      right.fork()
         *    split()
         * l           r
         *    l.fork()
         */
    while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
        K leftChild, rightChild, taskToFork;
        task.leftChild  = leftChild = task.makeChild(ls);
        task.rightChild = rightChild = task.makeChild(rs);
        task.setPendingCount(1);
        if (forkRight) {
            forkRight = false;
            // 左右節點切換進行fork和split
            rs = ls;
            task = leftChild;
            taskToFork = rightChild;
        }
        else {
            forkRight = true;
            task = rightChild;
            taskToFork = leftChild;
        }
        // fork任務加入隊列中去
        taskToFork.fork();
        sizeEstimate = rs.estimateSize();
    }
    // 将執行doLeaf底層就是單個串行流的操作
    task.setLocalResult(task.doLeaf());
    // 将結果組合成一個最終結果
    task.tryComplete();
}
           

​ AbstractTask執行與分片流程圖如下:

Java 8 Stream流底層原理

到這裡Stream流的相關知識介紹到這,這裡附上一副總體圖來加深下印象

Java 8 Stream流底層原理

原文連結:https://blog.csdn.net/jacknler/article/details/116810311

繼續閱讀