Java Stream
函數式接口
初識lambda,函數式接口肯定是繞不過去的,函數式接口就是一個有且僅有一個抽象方法,但是可以有多個非抽象方法的接口。函數式接口可以被隐式轉換為lambda表達式。
@FunctionalInterface
public interface Closeable {
void close();
}
在java.util.function它包含了很多類,用來支援Java的函數式程式設計,該包中的函數式接口有:
jdk提供的原生函數式接口
操作
Stream流操作分類
流程
Stream相關接口繼承圖:
Stream相關接口繼承圖: uml圖
Stream流水線組織結構示意圖(圖檔來源網絡):
Collection
類路徑java.util.colltction
@Override
default Spliterator<E> spliterator() {
return Spliterators.spliterator(this, 0);
}
// 常用Stream流轉換
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
// 并行流
default Stream<E> parallelStream() {
return StreamSupport.stream(spliterator(), true);
}
// java.util.stream.StreamSupport#stream(java.util.Spliterator<T>, boolean)
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel);
}
AbstractPipeline
類路徑java.util.stream.AbstractPipeline
// 反向連結到管道鍊的頭部(如果是源階段,則為自身)。
private final AbstractPipeline sourceStage;
// “上遊”管道,如果這是源階段,則為null。
private final AbstractPipeline previousStage;
// 此管道對象表示的中間操作的操作标志。
protected final int sourceOrOpFlags;
// 管道中的下一個階段;如果這是最後一個階段,則為null。 在連結到下一個管道時有效地結束。
private AbstractPipeline nextStage;
// 如果是順序的,則此管道對象與流源之間的中間操作數;如果是并行的,則為先前有狀态的中間操作數。 在管道準備進行評估時有效。
private int depth;
// 源和所有操作的組合源标志和操作标志,直到此流水線對象表示的操作為止(包括該流水線對象所代表的操作)。 在管道準備進行評估時有效。
private int combinedFlags;
// 源拆分器。 僅對頭管道有效。 如果管道使用非null值,那麼在使用管道之前, sourceSupplier必須為null。 在使用管道之後,如果非null,則将其設定為null。
private Spliterator<?> sourceSpliterator;
// 來源供應商。 僅對頭管道有效。 如果非null,則在使用管道之前, sourceSpliterator必須為null。 在使用管道之後,如果非null,則将其設定為null。
private Supplier<? extends Spliterator<?>> sourceSupplier;
// 如果已連結或使用此管道,則為True
private boolean linkedOrConsumed;
// 如果正在執行任何有狀态操作,則為true;否則為true。 僅對源階段有效。
private boolean sourceAnyStateful;
private Runnable sourceCloseAction;
// 如果管道是并行的,則為true;否則,管道為順序的;否則為true。 僅對源階段有效。
private boolean parallel;
ReferencePipeline
類路徑:java.util.stream.ReferencePipeline
filter
// java.util.stream.ReferencePipeline#filter
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
// 傳回一個匿名無狀态的管道
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) {
// 下遊生産線所需要的回調接口
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
// 真正執行操作的方法,依靠ChainedReference内置ReferencePipeline引用下遊的回調
@Override
public void accept(P_OUT u) {
// 隻有滿足條件的元素才能被下遊執行
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
map
// java.util.stream.ReferencePipeline#filter
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
// 傳回一個匿名無狀态的管道
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) {
// 下遊生産線所需要的回調接口
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
// 真正執行操作的方法,依靠ChainedReference内置ReferencePipeline引用下遊的回調
@Override
public void accept(P_OUT u) {
// 隻有滿足條件的元素才能被下遊執行
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
flatMap
// java.util.stream.ReferencePipeline#flatMap
@Override
public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
Objects.requireNonNull(mapper);
// 傳回一個匿名無狀态的管道
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
// 下遊生産線所需要的回調接口
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
// 真正執行操作的方法,依靠ChainedReference内置ReferencePipeline引用下遊的回調
@Override
public void accept(P_OUT u) {
try (Stream<? extends R> result = mapper.apply(u)) {
// 劃分為多個流執行下遊(分流)
if (result != null)
result.sequential().forEach(downstream);
}
}
};
}
};
}
peek
// java.util.stream.ReferencePipeline#peek
@Override
public final Stream<P_OUT> peek(Consumer<? super P_OUT> action) {
Objects.requireNonNull(action);
// 傳回一個匿名無狀态的管道
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, 0) {
// 下遊生産線所需要的回調接口
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
// 真正執行操作的方法,依靠ChainedReference内置ReferencePipeline引用下遊的回調
@Override
public void accept(P_OUT u) {
// 先執行自定義方法,在執行下遊方法
action.accept(u);
downstream.accept(u);
}
};
}
};
}
sorted
@Override
public final Stream<P_OUT> sorted() {
// 不提供Comparator,會使用元素自實作Comparator的compareTo方法
return SortedOps.makeRef(this);
}
@Override
public final Stream<P_OUT> sorted(Comparator<? super P_OUT> comparator) {
return SortedOps.makeRef(this, comparator);
}
// Sorted.makeRef
static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
Comparator<? super T> comparator) {
return new OfRef<>(upstream, comparator);
}
// ofRef類
private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
private final boolean isNaturalSort;
private final Comparator<? super T> comparator;
@Override
public Sink<T> opWrapSink(int flags, Sink<T> sink) {
Objects.requireNonNull(sink);
// 根據不同的flag進行不同排序
if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
return sink;
else if (StreamOpFlag.SIZED.isKnown(flags))
return new SizedRefSortingSink<>(sink, comparator);
else
return new RefSortingSink<>(sink, comparator);
}
}
distinct
@Override
public final Stream<P_OUT> distinct() {
return DistinctOps.makeRef(this);
}
static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {
// 傳回一個匿名有狀态的管道
return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE, StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {
@Override
Sink<T> opWrapSink(int flags, Sink<T> sink) {
Objects.requireNonNull(sink);
if (StreamOpFlag.DISTINCT.isKnown(flags)) {
// 已經是去重過了
return sink;
} else if (StreamOpFlag.SORTED.isKnown(flags)) {
// 有序流
return new Sink.ChainedReference<T, T>(sink) {
boolean seenNull;
// 這個為先執行的前序元素
T lastSeen;
@Override
public void begin(long size) {
seenNull = false;
lastSeen = null;
downstream.begin(-1);
}
@Override
public void end() {
seenNull = false;
lastSeen = null;
downstream.end();
}
// 這裡通過有序的特性,前序元素與後序元素比較,如果相等則跳過執行後序的元素
@Override
public void accept(T t) {
if (t == null) {
// 這裡控制元素為null隻有一個
if (!seenNull) {
seenNull = true;
downstream.accept(lastSeen = null);
}
} else if (lastSeen == null || !t.equals(lastSeen)) {
// 這裡将前序元素指派給lastSeen
downstream.accept(lastSeen = t);
}
}
};
} else {
// 底層通過Set進行去重,是以該元素需要重寫hashCode和equals方法
return new Sink.ChainedReference<T, T>(sink) {
Set<T> seen;
@Override
public void begin(long size) {
seen = new HashSet<>();
downstream.begin(-1);
}
@Override
public void end() {
seen = null;
downstream.end();
}
@Override
public void accept(T t) {
if (!seen.contains(t)) {
seen.add(t);
downstream.accept(t);
}
}
};
}
}
};
}
skip、limit
public static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
long skip, long limit) {
if (skip < 0)
throw new IllegalArgumentException("Skip must be non-negative: " + skip);
// 傳回一個匿名有狀态的管道
return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE, flags(limit)) {
Spliterator<T> unorderedSkipLimitSpliterator(Spliterator<T> s, long skip, long limit, long sizeIfKnown) {
if (skip <= sizeIfKnown) {
limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
skip = 0;
}
return new StreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s, skip, limit);
}
// 自己實作真正操作的方法
@Override
Sink<T> opWrapSink(int flags, Sink<T> sink) {
return new Sink.ChainedReference<T, T>(sink) {
long n = skip;
long m = limit >= 0 ? limit : Long.MAX_VALUE;
@Override
public void begin(long size) {
downstream.begin(calcSize(size, skip, m));
}
@Override
public void accept(T t) {
if (n == 0) {
// limit
if (m > 0) {
m--;
downstream.accept(t);
}
}
// skip
else {
n--;
}
}
@Override
public boolean cancellationRequested() {
return m == 0 || downstream.cancellationRequested();
}
};
}
};
}
reduce
// java.util.stream.ReferencePipeline#reduce(P_OUT, java.util.function.BinaryOperator<P_OUT>)
@Override
public final P_OUT reduce(final P_OUT identity, final BinaryOperator<P_OUT> accumulator) {
return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator));
}
// java.util.stream.ReferencePipeline#reduce(java.util.function.BinaryOperator<P_OUT>)
@Override
public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
return evaluate(ReduceOps.makeRef(accumulator));
}
// java.util.stream.ReferencePipeline#reduce(R, java.util.function.BiFunction<R,? super P_OUT,R>, java.util.function.BinaryOperator<R>)
@Override
public final <R> R reduce(R identity, BiFunction<R, ? super P_OUT, R> accumulator, BinaryOperator<R> combiner) {
return evaluate(ReduceOps.makeRef(identity, accumulator, combiner));
}
// java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp<E_OUT,R>)
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
collect
// java.util.stream.ReferencePipeline#collect(java.util.stream.Collector<? super P_OUT,A,R>)
@Override
@SuppressWarnings("unchecked")
public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
A container;
if (isParallel()
&& (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
&& (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
container = collector.supplier().get();
BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
forEach(u -> accumulator.accept(container, u));
}
else {
container = evaluate(ReduceOps.makeRef(collector));
}
// 具有特定轉換的使用finisher處理
return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
? (R) container
: collector.finisher().apply(container);
}
// java.util.stream.ReferencePipeline#collect(java.util.function.Supplier<R>, java.util.function.BiConsumer<R,? super P_OUT>, java.util.function.BiConsumer<R,R>)
@Override
public final <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super P_OUT> accumulator, BiConsumer<R, R> combiner) {
return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner));
}
// java.util.stream.AbstractPipeline#evaluate(java.util.stream.TerminalOp<E_OUT,R>)
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
forEach
// java.util.stream.ReferencePipeline#forEach
@Override
public void forEach(Consumer<? super P_OUT> action) {
evaluate(ForEachOps.makeRef(action, false));
}
// java.util.stream.ForEachOps#makeRef
public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action, boolean ordered) {
Objects.requireNonNull(action);
return new ForEachOp.OfRef<>(action, ordered);
}
// java.util.stream.ForEachOps.ForEachOp.OfRef
static final class OfRef<T> extends ForEachOp<T> {
final Consumer<? super T> consumer;
OfRef(Consumer<? super T> consumer, boolean ordered) {
super(ordered);
this.consumer = consumer;
}
// 隻是簡單的消費
@Override
public void accept(T t) {
consumer.accept(t);
}
}
Head
流的資料元的頭,類路徑java.util.stream.ReferencePipeline.Head
// java.util.stream.ReferencePipeline.Head
static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
Head(Supplier<? extends Spliterator<?>> source, int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
Head(Spliterator<?> source, int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
@Override
final boolean opIsStateful() {
throw new UnsupportedOperationException();
}
@Override
final Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) {
throw new UnsupportedOperationException();
}
// Optimized sequential terminal operations for the head of the pipeline
@Override
public void forEach(Consumer<? super E_OUT> action) {
if (!isParallel()) {
sourceStageSpliterator().forEachRemaining(action);
}
else {
super.forEach(action);
}
}
@Override
public void forEachOrdered(Consumer<? super E_OUT> action) {
if (!isParallel()) {
sourceStageSpliterator().forEachRemaining(action);
}
else {
super.forEachOrdered(action);
}
}
}
StatelessOp
無狀态的中間管道,類路徑java.util.stream.ReferencePipeline.StatelessOp
// java.util.stream.ReferencePipeline.StatelessOp
abstract static class StatelessOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
StatelessOp(AbstractPipeline<?, E_IN, ?> upstream, StreamShape inputShape, int opFlags) {
super(upstream, opFlags);
assert upstream.getOutputShape() == inputShape;
}
@Override
final boolean opIsStateful() {
return false;
}
}
StatefulOp
有狀态的中間管道,類路徑java.util.stream.ReferencePipeline.StatefulOp
// java.util.stream.ReferencePipeline.StatefulOp
abstract static class StatefulOp<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
StatefulOp(AbstractPipeline<?, E_IN, ?> upstream, StreamShape inputShape, int opFlags) {
super(upstream, opFlags);
assert upstream.getOutputShape() == inputShape;
}
@Override
final boolean opIsStateful() {
return true;
}
@Override
abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
Spliterator<P_IN> spliterator,
IntFunction<E_OUT[]> generator);
TerminalOp
管道流的結束操作,類路徑java.util.stream.TerminalOp
interface TerminalOp<E_IN, R> {
// 擷取此操作的輸入類型的形狀
default StreamShape inputShape() { return StreamShape.REFERENCE; }
// 擷取操作的流标志。 終端操作可以設定StreamOpFlag定義的流标志的有限子集,并且這些标志與管道的先前組合的流和中間操作标志組合在一起。
default int getOpFlags() { return 0; }
// 使用指定的PipelineHelper對操作執行并行評估,該操作描述上遊中間操作。
default <P_IN> R evaluateParallel(PipelineHelper<E_IN> helper, Spliterator<P_IN> spliterator) {
if (Tripwire.ENABLED)
Tripwire.trip(getClass(), "{0} triggering TerminalOp.evaluateParallel serial default");
return evaluateSequential(helper, spliterator);
}
// 使用指定的PipelineHelper對操作執行順序評估,該操作描述上遊中間操作。
<P_IN> R evaluateSequential(PipelineHelper<E_IN> helper, Spliterator<P_IN> spliterator);
}
ReduceOp
類路徑java.util.stream.ReduceOps.ReduceOp
private static abstract class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>> implements TerminalOp<T, R> {
private final StreamShape inputShape;
ReduceOp(StreamShape shape) {
inputShape = shape;
}
public abstract S makeSink();
@Override
public StreamShape inputShape() {
return inputShape;
}
// 通過匿名子類實作makeSink()擷取Sink
@Override
public <P_IN> R evaluateSequential(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}
@Override
public <P_IN> R evaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
return new ReduceTask<>(this, helper, spliterator).invoke().get();
}
}
MatchOp
類路徑java.util.stream.MatchOps.MatchOp
private static final class MatchOp<T> implements TerminalOp<T, Boolean> {
private final StreamShape inputShape;
final MatchKind matchKind;
final Supplier<BooleanTerminalSink<T>> sinkSupplier;
MatchOp(StreamShape shape, MatchKind matchKind, Supplier<BooleanTerminalSink<T>> sinkSupplier) {
this.inputShape = shape;
this.matchKind = matchKind;
this.sinkSupplier = sinkSupplier;
}
@Override
public int getOpFlags() {
return StreamOpFlag.IS_SHORT_CIRCUIT | StreamOpFlag.NOT_ORDERED;
}
@Override
public StreamShape inputShape() {
return inputShape;
}
// 使用内置的sinkSupplier擷取Sink
@Override
public <S> Boolean evaluateSequential(PipelineHelper<T> helper, Spliterator<S> spliterator) {
return helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).getAndClearState();
}
@Override
public <S> Boolean evaluateParallel(PipelineHelper<T> helper, Spliterator<S> spliterator) {
return new MatchTask<>(this, helper, spliterator).invoke();
}
}
FindOp
類路徑java.util.stream.FindOps.FindOp
private static final class FindOp<T, O> implements TerminalOp<T, O> {
private final StreamShape shape;
final boolean mustFindFirst;
final O emptyValue;
final Predicate<O> presentPredicate;
final Supplier<TerminalSink<T, O>> sinkSupplier;
FindOp(boolean mustFindFirst,
StreamShape shape,
O emptyValue,
Predicate<O> presentPredicate,
Supplier<TerminalSink<T, O>> sinkSupplier) {
this.mustFindFirst = mustFindFirst;
this.shape = shape;
this.emptyValue = emptyValue;
this.presentPredicate = presentPredicate;
this.sinkSupplier = sinkSupplier;
}
@Override
public int getOpFlags() {
return StreamOpFlag.IS_SHORT_CIRCUIT | (mustFindFirst ? 0 : StreamOpFlag.NOT_ORDERED);
}
@Override
public StreamShape inputShape() {
return shape;
}
// 通過内置sinkSupplier擷取Sink
@Override
public <S> O evaluateSequential(PipelineHelper<T> helper, Spliterator<S> spliterator) {
O result = helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get();
return result != null ? result : emptyValue;
}
@Override
public <P_IN> O evaluateParallel(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
return new FindTask<>(this, helper, spliterator).invoke();
}
}
ForEachOp
類路徑java.util.stream.ForEachOps.ForEachOp
static abstract class ForEachOp<T> implements TerminalOp<T, Void>, TerminalSink<T, Void> {
private final boolean ordered;
protected ForEachOp(boolean ordered) {
this.ordered = ordered;
}
@Override
public int getOpFlags() {
return ordered ? 0 : StreamOpFlag.NOT_ORDERED;
}
// 自己實作了Sink
@Override
public <S> Void evaluateSequential(PipelineHelper<T> helper, Spliterator<S> spliterator) {
return helper.wrapAndCopyInto(this, spliterator).get();
}
@Override
public <S> Void evaluateParallel(PipelineHelper<T> helper, Spliterator<S> spliterator) {
if (ordered)
new ForEachOrderedTask<>(helper, spliterator, this).invoke();
else
new ForEachTask<>(helper, spliterator, helper.wrapSink(this)).invoke();
return null;
}
@Override
public Void get() {
return null;
}
static final class OfRef<T> extends ForEachOp<T> {
final Consumer<? super T> consumer;
OfRef(Consumer<? super T> consumer, boolean ordered) {
super(ordered);
this.consumer = consumer;
}
@Override
public void accept(T t) {
consumer.accept(t);
}
}
...
}
Sink
類路徑java.util.stream.Sink
interface Sink<T> extends Consumer<T> {
// 開始周遊元素之前調用該方法,通知Sink做好準備。
default void begin(long size) {}
// 所有元素周遊完成之後調用,通知Sink沒有更多的元素了。
default void end() {}
// 是否可以結束操作,可以讓短路操作盡早結束。
default boolean cancellationRequested() {
return false;
}
// 周遊元素時調用,接受一個待處理元素,并對元素進行處理。Stage把自己包含的操作和回調方法封裝到該方法裡,前一個Stage隻需要調用目前Stage.accept(T t)方法就行了。
void accept(T t);
}
這裡Sink的子類實作中分為兩種:中間操作匿名實作ChainedReference和TerminalOp子類所提供的Sink。
ChainedReference
類路徑java.util.stream.Sink.ChainedReference,這裡是中間操作的預設模闆父類
static abstract class ChainedReference<T, E_OUT> implements Sink<T> {
protected final Sink<? super E_OUT> downstream;
public ChainedReference(Sink<? super E_OUT> downstream) {
this.downstream = Objects.requireNonNull(downstream);
}
@Override
public void begin(long size) {
downstream.begin(size);
}
@Override
public void end() {
downstream.end();
}
@Override
public boolean cancellationRequested() {
return downstream.cancellationRequested();
}
}
在上述的中間操作管道流中都是通過匿名類繼承ChainedReference實作onWrapSink(int, Sink)傳回一個指定操作的Sink。
TerminalSink
這裡為什麼講提供呢?這是因為不同的實作TerminalOp的子類中在實作java.util.stream.TerminalOp#evaluateSequential中都是通過helper.wrapAndCopyInto(TerminalOp子類實作提供的Sink, spliterator)中通過參數傳遞的方式提供的,不同的子類傳遞的方式不一樣是以此處用了一個提供Sink
由ReduceOps中實作TerminalOp所提供的ReducingSink,它是由匿名類實作java.util.stream.ReduceOps.ReduceOp#makeSink來傳遞給helper.wrapAndCopyInto(makeSink(), spliterator)的。
public static <T, U> TerminalOp<T, U> makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) {
Objects.requireNonNull(reducer);
Objects.requireNonNull(combiner);
class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> {
@Override
public void begin(long size) {
state = seed;
}
@Override
public void accept(T t) {
state = reducer.apply(state, t);
}
@Override
public void combine(ReducingSink other) {
state = combiner.apply(state, other.state);
}
}
return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) {
@Override
public ReducingSink makeSink() {
return new ReducingSink();
}
};
}
由ForEachOps中實作TerminalOp所提供的是this,它的提供方式就是通過this傳遞給helper.wrapAndCopyInto(this, spliterator)。
// 這裡ForEachOp自己通過TerminalSink間接的實作了Sink
static abstract class ForEachOp<T> implements TerminalOp<T, Void>, TerminalSink<T, Void> {
@Override
public <S> Void evaluateSequential(PipelineHelper<T> helper, Spliterator<S> spliterator) {
return helper.wrapAndCopyInto(this, spliterator).get();
}
}
由MatchOps中實作TerminalOp所提供的sinkSupplier通過構造函數由外部指派,通過Supplier接口的get()來傳遞給helper.wrapAndCopyInto(sinkSupplier.get(), spliterator)。
private static final class MatchOp<T> implements TerminalOp<T, Boolean> {
final Supplier<BooleanTerminalSink<T>> sinkSupplier;
@Override
public <S> Boolean evaluateSequential(PipelineHelper<T> helper,Spliterator<S> spliterator) {
return helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).getAndClearState();
}
}
由FindOps中實作TerminalOp所提供的與上述MatchOps是一緻的
private static final class FindOp<T, O> implements TerminalOp<T, O> {
final Supplier<TerminalSink<T, O>> sinkSupplier;
@Override
public <S> O evaluateSequential(PipelineHelper<T> helper, Spliterator<S> spliterator) {
O result = helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get();
return result != null ? result : emptyValue;
}
}
Collector
在Collector中有以下幾個實作接口:
- Supplier:結果類型的提供器。
- BiConsumer<A, T>:将元素放入結果的累加器。
- BinaryOperator:合并部分結果的組合器。
- Function<A, R>:對結果類型轉換為最終結果類型的轉換器。
- Set:儲存Collector特征的集合
并行流
前述都是基于串行流的講解,其實并行流也是基于上述的helper.wrapAndCopyInto(op.sinkSupplier.get(), spliterator)這個方法上面做的一層基于ForkJoinTask多線程架構的封裝。
ForkJoinTask
ForkJoin架構的思想就是分而治之,它将一個大任務切割為多個小任務這個過程稱為fork,将每個任務的執行的結果進行彙總的過程稱為join。ForkJoin架構相關的接口關系圖如下(圖檔來源網絡):
AbstractTask
類路徑java.util.stream.AbstractTask,AbstractTask繼承了在JUC中已經封裝好的ForkJoinTask抽象子類java.util.concurrent.CountedCompleter。
此類基于CountedCompleter ,它是fork-join任務的一種形式,其中每個任務都有未完成子代的信号量計數,并且該任務隐式完成并在其最後一個子代完成時得到通知。 内部節點任務可能會覆寫CountedCompleter的onCompletion方法,以将子任務的結果合并到目前任務的結果中。 拆分和設定子任務連結是由内部節點的compute()完成的。 在葉節點的compute()時間,可以確定将為所有子代設定父代的子代相關字段(包括父代子代的同級連結)。
例如,執行減少任務的任務将覆寫doLeaf()以使用Spliterator對該葉節點的塊執行減少Spliterator ,并覆寫onCompletion()以合并内部節點的子任務的結果:
@Override
protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) {
// 傳回一個ForkJoinTask任務
return new ReduceTask<>(this, spliterator);
}
@Override
protected S doLeaf() {
// 其他實作大同小異
return helper.wrapAndCopyInto(op.makeSink(), spliterator);
}
@Override
public void onCompletion(CountedCompleter<?> caller) {
// 非葉子節點進行結果組合
if (!isLeaf()) {
S leftResult = leftChild.getLocalResult();
leftResult.combine(rightChild.getLocalResult());
setLocalResult(leftResult);
}
// GC spliterator, left and right child
super.onCompletion(caller);
}
AbstractTask封裝了分片任務的算法模闆,通過是Spliterator的trySplit()方法來實作分片的細節,詳細算法源碼如下(類路徑:java.util.stream.AbstractTask#compute):
@Override
public void compute() {
// 将目前這個spliterator作為右節點(此時為root節點)
Spliterator<P_IN> rs = spliterator, ls;
// 評估任務的大小
long sizeEstimate = rs.estimateSize();
// 擷取任務門檻值
long sizeThreshold = getTargetSize(sizeEstimate);
boolean forkRight = false;
@SuppressWarnings("unchecked") K task = (K) this;
// 細節不多贅述,下面我用圖來講解算法
/**
* 根節點指定為:右邊節點
* root
* split()
* left right
* left.fork()
* split()
* l r
* rs = ls
* right.fork()
* split()
* l r
* l.fork()
*/
while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
K leftChild, rightChild, taskToFork;
task.leftChild = leftChild = task.makeChild(ls);
task.rightChild = rightChild = task.makeChild(rs);
task.setPendingCount(1);
if (forkRight) {
forkRight = false;
// 左右節點切換進行fork和split
rs = ls;
task = leftChild;
taskToFork = rightChild;
}
else {
forkRight = true;
task = rightChild;
taskToFork = leftChild;
}
// fork任務加入隊列中去
taskToFork.fork();
sizeEstimate = rs.estimateSize();
}
// 将執行doLeaf底層就是單個串行流的操作
task.setLocalResult(task.doLeaf());
// 将結果組合成一個最終結果
task.tryComplete();
}
AbstractTask執行與分片流程圖如下:
到這裡Stream流的相關知識介紹到這,這裡附上一副總體圖來加深下印象
原文連結:https://blog.csdn.net/jacknler/article/details/116810311