1  /*
2   * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
3   * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4   *
5   * This code is free software; you can redistribute it and/or modify it
6   * under the terms of the GNU General Public License version 2 only, as
7   * published by the Free Software Foundation.  Oracle designates this
8   * particular file as subject to the "Classpath" exception as provided
9   * by Oracle in the LICENSE file that accompanied this code.
10   *
11   * This code is distributed in the hope that it will be useful, but WITHOUT
12   * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13   * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14   * version 2 for more details (a copy is included in the LICENSE file that
15   * accompanied this code).
16   *
17   * You should have received a copy of the GNU General Public License version
18   * 2 along with this work; if not, write to the Free Software Foundation,
19   * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20   *
21   * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22   * or visit www.oracle.com if you need additional information or have any
23   * questions.
24   */
25  package java.util.stream;
26  
27  import java.util.Comparator;
28  import java.util.Objects;
29  import java.util.Spliterator;
30  import java.util.concurrent.ConcurrentHashMap;
31  import java.util.concurrent.atomic.AtomicLong;
32  import java.util.function.BooleanSupplier;
33  import java.util.function.Consumer;
34  import java.util.function.DoubleConsumer;
35  import java.util.function.DoubleSupplier;
36  import java.util.function.IntConsumer;
37  import java.util.function.IntSupplier;
38  import java.util.function.LongConsumer;
39  import java.util.function.LongSupplier;
40  import java.util.function.Supplier;
41  
42  /**
43   * Spliterator implementations for wrapping and delegating spliterators, used
44   * in the implementation of the {@link Stream#spliterator()} method.
45   *
46   * @since 1.8
47   */
48  class StreamSpliterators {
49  
50      /**
51       * Abstract wrapping spliterator that binds to the spliterator of a
52       * pipeline helper on first operation.
53       *
54       * <p>This spliterator is not late-binding and will bind to the source
55       * spliterator when first operated on.
56       *
57       * <p>A wrapping spliterator produced from a sequential stream
58       * cannot be split if there are stateful operations present.
59       */
60      private static abstract class AbstractWrappingSpliterator<P_IN, P_OUT,
61                                                                T_BUFFER extends AbstractSpinedBuffer>
62              implements Spliterator<P_OUT> {
63  
64          // @@@ Detect if stateful operations are present or not
65          //     If not then can split otherwise cannot
66  
67          /**
68           * True if this spliterator supports splitting
69           */
70          final boolean isParallel;
71  
72          final PipelineHelper<P_OUT> ph;
73  
74          /**
75           * Supplier for the source spliterator.  Client provides either a
76           * spliterator or a supplier.
77           */
78          private Supplier<Spliterator<P_IN>> spliteratorSupplier;
79  
80          /**
81           * Source spliterator.  Either provided from client or obtained from
82           * supplier.
83           */
84          Spliterator<P_IN> spliterator;
85  
86          /**
87           * Sink chain for the downstream stages of the pipeline, ultimately
88           * leading to the buffer. Used during partial traversal.
89           */
90          Sink<P_IN> bufferSink;
91  
92          /**
93           * A function that advances one element of the spliterator, pushing
94           * it to bufferSink.  Returns whether any elements were processed.
95           * Used during partial traversal.
96           */
97          BooleanSupplier pusher;
98  
99          /** Next element to consume from the buffer, used during partial traversal */
100          long nextToConsume;
101  
102          /** Buffer into which elements are pushed.  Used during partial traversal. */
103          T_BUFFER buffer;
104  
105          /**
106           * True if full traversal has occurred (with possible cancelation).
107           * If doing a partial traversal, there may be still elements in buffer.
108           */
109          boolean finished;
110  
111          /**
112           * Construct an AbstractWrappingSpliterator from a
113           * {@code Supplier<Spliterator>}.
114           */
AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph, Supplier<Spliterator<P_IN>> spliteratorSupplier, boolean parallel)115          AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph,
116                                      Supplier<Spliterator<P_IN>> spliteratorSupplier,
117                                      boolean parallel) {
118              this.ph = ph;
119              this.spliteratorSupplier = spliteratorSupplier;
120              this.spliterator = null;
121              this.isParallel = parallel;
122          }
123  
124          /**
125           * Construct an AbstractWrappingSpliterator from a
126           * {@code Spliterator}.
127           */
AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph, Spliterator<P_IN> spliterator, boolean parallel)128          AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph,
129                                      Spliterator<P_IN> spliterator,
130                                      boolean parallel) {
131              this.ph = ph;
132              this.spliteratorSupplier = null;
133              this.spliterator = spliterator;
134              this.isParallel = parallel;
135          }
136  
137          /**
138           * Called before advancing to set up spliterator, if needed.
139           */
init()140          final void init() {
141              if (spliterator == null) {
142                  spliterator = spliteratorSupplier.get();
143                  spliteratorSupplier = null;
144              }
145          }
146  
147          /**
148           * Get an element from the source, pushing it into the sink chain,
149           * setting up the buffer if needed
150           * @return whether there are elements to consume from the buffer
151           */
doAdvance()152          final boolean doAdvance() {
153              if (buffer == null) {
154                  if (finished)
155                      return false;
156  
157                  init();
158                  initPartialTraversalState();
159                  nextToConsume = 0;
160                  bufferSink.begin(spliterator.getExactSizeIfKnown());
161                  return fillBuffer();
162              }
163              else {
164                  ++nextToConsume;
165                  boolean hasNext = nextToConsume < buffer.count();
166                  if (!hasNext) {
167                      nextToConsume = 0;
168                      buffer.clear();
169                      hasNext = fillBuffer();
170                  }
171                  return hasNext;
172              }
173          }
174  
175          /**
176           * Invokes the shape-specific constructor with the provided arguments
177           * and returns the result.
178           */
179          abstract AbstractWrappingSpliterator<P_IN, P_OUT, ?> wrap(Spliterator<P_IN> s);
180  
181          /**
182           * Initializes buffer, sink chain, and pusher for a shape-specific
183           * implementation.
184           */
185          abstract void initPartialTraversalState();
186  
187          @Override
188          public Spliterator<P_OUT> trySplit() {
189              if (isParallel && !finished) {
190                  init();
191  
192                  Spliterator<P_IN> split = spliterator.trySplit();
193                  return (split == null) ? null : wrap(split);
194              }
195              else
196                  return null;
197          }
198  
199          /**
200           * If the buffer is empty, push elements into the sink chain until
201           * the source is empty or cancellation is requested.
202           * @return whether there are elements to consume from the buffer
203           */
204          private boolean fillBuffer() {
205              while (buffer.count() == 0) {
206                  if (bufferSink.cancellationRequested() || !pusher.getAsBoolean()) {
207                      if (finished)
208                          return false;
209                      else {
210                          bufferSink.end(); // might trigger more elements
211                          finished = true;
212                      }
213                  }
214              }
215              return true;
216          }
217  
218          @Override
219          public final long estimateSize() {
220              init();
221              // Use the estimate of the wrapped spliterator
222              // Note this may not be accurate if there are filter/flatMap
223              // operations filtering or adding elements to the stream
224              return spliterator.estimateSize();
225          }
226  
227          @Override
228          public final long getExactSizeIfKnown() {
229              init();
230              return StreamOpFlag.SIZED.isKnown(ph.getStreamAndOpFlags())
231                     ? spliterator.getExactSizeIfKnown()
232                     : -1;
233          }
234  
235          @Override
236          public final int characteristics() {
237              init();
238  
239              // Get the characteristics from the pipeline
240              int c = StreamOpFlag.toCharacteristics(StreamOpFlag.toStreamFlags(ph.getStreamAndOpFlags()));
241  
242              // Mask off the size and uniform characteristics and replace with
243              // those of the spliterator
244              // Note that a non-uniform spliterator can change from something
245              // with an exact size to an estimate for a sub-split, for example
246              // with HashSet where the size is known at the top level spliterator
247              // but for sub-splits only an estimate is known
248              if ((c & Spliterator.SIZED) != 0) {
249                  c &= ~(Spliterator.SIZED | Spliterator.SUBSIZED);
250                  c |= (spliterator.characteristics() & (Spliterator.SIZED | Spliterator.SUBSIZED));
251              }
252  
253              return c;
254          }
255  
256          @Override
257          public Comparator<? super P_OUT> getComparator() {
258              if (!hasCharacteristics(SORTED))
259                  throw new IllegalStateException();
260              return null;
261          }
262  
263          @Override
264          public final String toString() {
265              return String.format("%s[%s]", getClass().getName(), spliterator);
266          }
267      }
268  
269      static final class WrappingSpliterator<P_IN, P_OUT>
270              extends AbstractWrappingSpliterator<P_IN, P_OUT, SpinedBuffer<P_OUT>> {
271  
272          WrappingSpliterator(PipelineHelper<P_OUT> ph,
273                              Supplier<Spliterator<P_IN>> supplier,
274                              boolean parallel) {
275              super(ph, supplier, parallel);
276          }
277  
278          WrappingSpliterator(PipelineHelper<P_OUT> ph,
279                              Spliterator<P_IN> spliterator,
280                              boolean parallel) {
281              super(ph, spliterator, parallel);
282          }
283  
284          @Override
285          WrappingSpliterator<P_IN, P_OUT> wrap(Spliterator<P_IN> s) {
286              return new WrappingSpliterator<>(ph, s, isParallel);
287          }
288  
289          @Override
290          void initPartialTraversalState() {
291              SpinedBuffer<P_OUT> b = new SpinedBuffer<>();
292              buffer = b;
293              bufferSink = ph.wrapSink(b::accept);
294              pusher = () -> spliterator.tryAdvance(bufferSink);
295          }
296  
297          @Override
tryAdvance(Consumer<? super P_OUT> consumer)298          public boolean tryAdvance(Consumer<? super P_OUT> consumer) {
299              Objects.requireNonNull(consumer);
300              boolean hasNext = doAdvance();
301              if (hasNext)
302                  consumer.accept(buffer.get(nextToConsume));
303              return hasNext;
304          }
305  
306          @Override
forEachRemaining(Consumer<? super P_OUT> consumer)307          public void forEachRemaining(Consumer<? super P_OUT> consumer) {
308              if (buffer == null && !finished) {
309                  Objects.requireNonNull(consumer);
310                  init();
311  
312                  ph.wrapAndCopyInto((Sink<P_OUT>) consumer::accept, spliterator);
313                  finished = true;
314              }
315              else {
316                  do { } while (tryAdvance(consumer));
317              }
318          }
319      }
320  
321      static final class IntWrappingSpliterator<P_IN>
322              extends AbstractWrappingSpliterator<P_IN, Integer, SpinedBuffer.OfInt>
323              implements Spliterator.OfInt {
324  
IntWrappingSpliterator(PipelineHelper<Integer> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel)325          IntWrappingSpliterator(PipelineHelper<Integer> ph,
326                                 Supplier<Spliterator<P_IN>> supplier,
327                                 boolean parallel) {
328              super(ph, supplier, parallel);
329          }
330  
IntWrappingSpliterator(PipelineHelper<Integer> ph, Spliterator<P_IN> spliterator, boolean parallel)331          IntWrappingSpliterator(PipelineHelper<Integer> ph,
332                                 Spliterator<P_IN> spliterator,
333                                 boolean parallel) {
334              super(ph, spliterator, parallel);
335          }
336  
337          @Override
wrap(Spliterator<P_IN> s)338          AbstractWrappingSpliterator<P_IN, Integer, ?> wrap(Spliterator<P_IN> s) {
339              return new IntWrappingSpliterator<>(ph, s, isParallel);
340          }
341  
342          @Override
initPartialTraversalState()343          void initPartialTraversalState() {
344              SpinedBuffer.OfInt b = new SpinedBuffer.OfInt();
345              buffer = b;
346              bufferSink = ph.wrapSink((Sink.OfInt) b::accept);
347              pusher = () -> spliterator.tryAdvance(bufferSink);
348          }
349  
350          @Override
trySplit()351          public Spliterator.OfInt trySplit() {
352              return (Spliterator.OfInt) super.trySplit();
353          }
354  
355          @Override
tryAdvance(IntConsumer consumer)356          public boolean tryAdvance(IntConsumer consumer) {
357              Objects.requireNonNull(consumer);
358              boolean hasNext = doAdvance();
359              if (hasNext)
360                  consumer.accept(buffer.get(nextToConsume));
361              return hasNext;
362          }
363  
364          @Override
forEachRemaining(IntConsumer consumer)365          public void forEachRemaining(IntConsumer consumer) {
366              if (buffer == null && !finished) {
367                  Objects.requireNonNull(consumer);
368                  init();
369  
370                  ph.wrapAndCopyInto((Sink.OfInt) consumer::accept, spliterator);
371                  finished = true;
372              }
373              else {
374                  do { } while (tryAdvance(consumer));
375              }
376          }
377      }
378  
379      static final class LongWrappingSpliterator<P_IN>
380              extends AbstractWrappingSpliterator<P_IN, Long, SpinedBuffer.OfLong>
381              implements Spliterator.OfLong {
382  
LongWrappingSpliterator(PipelineHelper<Long> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel)383          LongWrappingSpliterator(PipelineHelper<Long> ph,
384                                  Supplier<Spliterator<P_IN>> supplier,
385                                  boolean parallel) {
386              super(ph, supplier, parallel);
387          }
388  
LongWrappingSpliterator(PipelineHelper<Long> ph, Spliterator<P_IN> spliterator, boolean parallel)389          LongWrappingSpliterator(PipelineHelper<Long> ph,
390                                  Spliterator<P_IN> spliterator,
391                                  boolean parallel) {
392              super(ph, spliterator, parallel);
393          }
394  
395          @Override
wrap(Spliterator<P_IN> s)396          AbstractWrappingSpliterator<P_IN, Long, ?> wrap(Spliterator<P_IN> s) {
397              return new LongWrappingSpliterator<>(ph, s, isParallel);
398          }
399  
400          @Override
initPartialTraversalState()401          void initPartialTraversalState() {
402              SpinedBuffer.OfLong b = new SpinedBuffer.OfLong();
403              buffer = b;
404              bufferSink = ph.wrapSink((Sink.OfLong) b::accept);
405              pusher = () -> spliterator.tryAdvance(bufferSink);
406          }
407  
408          @Override
trySplit()409          public Spliterator.OfLong trySplit() {
410              return (Spliterator.OfLong) super.trySplit();
411          }
412  
413          @Override
tryAdvance(LongConsumer consumer)414          public boolean tryAdvance(LongConsumer consumer) {
415              Objects.requireNonNull(consumer);
416              boolean hasNext = doAdvance();
417              if (hasNext)
418                  consumer.accept(buffer.get(nextToConsume));
419              return hasNext;
420          }
421  
422          @Override
forEachRemaining(LongConsumer consumer)423          public void forEachRemaining(LongConsumer consumer) {
424              if (buffer == null && !finished) {
425                  Objects.requireNonNull(consumer);
426                  init();
427  
428                  ph.wrapAndCopyInto((Sink.OfLong) consumer::accept, spliterator);
429                  finished = true;
430              }
431              else {
432                  do { } while (tryAdvance(consumer));
433              }
434          }
435      }
436  
437      static final class DoubleWrappingSpliterator<P_IN>
438              extends AbstractWrappingSpliterator<P_IN, Double, SpinedBuffer.OfDouble>
439              implements Spliterator.OfDouble {
440  
DoubleWrappingSpliterator(PipelineHelper<Double> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel)441          DoubleWrappingSpliterator(PipelineHelper<Double> ph,
442                                    Supplier<Spliterator<P_IN>> supplier,
443                                    boolean parallel) {
444              super(ph, supplier, parallel);
445          }
446  
DoubleWrappingSpliterator(PipelineHelper<Double> ph, Spliterator<P_IN> spliterator, boolean parallel)447          DoubleWrappingSpliterator(PipelineHelper<Double> ph,
448                                    Spliterator<P_IN> spliterator,
449                                    boolean parallel) {
450              super(ph, spliterator, parallel);
451          }
452  
453          @Override
wrap(Spliterator<P_IN> s)454          AbstractWrappingSpliterator<P_IN, Double, ?> wrap(Spliterator<P_IN> s) {
455              return new DoubleWrappingSpliterator<>(ph, s, isParallel);
456          }
457  
458          @Override
initPartialTraversalState()459          void initPartialTraversalState() {
460              SpinedBuffer.OfDouble b = new SpinedBuffer.OfDouble();
461              buffer = b;
462              bufferSink = ph.wrapSink((Sink.OfDouble) b::accept);
463              pusher = () -> spliterator.tryAdvance(bufferSink);
464          }
465  
466          @Override
trySplit()467          public Spliterator.OfDouble trySplit() {
468              return (Spliterator.OfDouble) super.trySplit();
469          }
470  
471          @Override
tryAdvance(DoubleConsumer consumer)472          public boolean tryAdvance(DoubleConsumer consumer) {
473              Objects.requireNonNull(consumer);
474              boolean hasNext = doAdvance();
475              if (hasNext)
476                  consumer.accept(buffer.get(nextToConsume));
477              return hasNext;
478          }
479  
480          @Override
forEachRemaining(DoubleConsumer consumer)481          public void forEachRemaining(DoubleConsumer consumer) {
482              if (buffer == null && !finished) {
483                  Objects.requireNonNull(consumer);
484                  init();
485  
486                  ph.wrapAndCopyInto((Sink.OfDouble) consumer::accept, spliterator);
487                  finished = true;
488              }
489              else {
490                  do { } while (tryAdvance(consumer));
491              }
492          }
493      }
494  
495      /**
496       * Spliterator implementation that delegates to an underlying spliterator,
497       * acquiring the spliterator from a {@code Supplier<Spliterator>} on the
498       * first call to any spliterator method.
499       * @param <T>
500       */
501      static class DelegatingSpliterator<T, T_SPLITR extends Spliterator<T>>
502              implements Spliterator<T> {
503          private final Supplier<? extends T_SPLITR> supplier;
504  
505          private T_SPLITR s;
506  
DelegatingSpliterator(Supplier<? extends T_SPLITR> supplier)507          DelegatingSpliterator(Supplier<? extends T_SPLITR> supplier) {
508              this.supplier = supplier;
509          }
510  
get()511          T_SPLITR get() {
512              if (s == null) {
513                  s = supplier.get();
514              }
515              return s;
516          }
517  
518          @Override
519          @SuppressWarnings("unchecked")
trySplit()520          public T_SPLITR trySplit() {
521              return (T_SPLITR) get().trySplit();
522          }
523  
524          @Override
tryAdvance(Consumer<? super T> consumer)525          public boolean tryAdvance(Consumer<? super T> consumer) {
526              return get().tryAdvance(consumer);
527          }
528  
529          @Override
forEachRemaining(Consumer<? super T> consumer)530          public void forEachRemaining(Consumer<? super T> consumer) {
531              get().forEachRemaining(consumer);
532          }
533  
534          @Override
estimateSize()535          public long estimateSize() {
536              return get().estimateSize();
537          }
538  
539          @Override
characteristics()540          public int characteristics() {
541              return get().characteristics();
542          }
543  
544          @Override
getComparator()545          public Comparator<? super T> getComparator() {
546              return get().getComparator();
547          }
548  
549          @Override
getExactSizeIfKnown()550          public long getExactSizeIfKnown() {
551              return get().getExactSizeIfKnown();
552          }
553  
554          @Override
toString()555          public String toString() {
556              return getClass().getName() + "[" + get() + "]";
557          }
558  
559          static class OfPrimitive<T, T_CONS, T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>>
560              extends DelegatingSpliterator<T, T_SPLITR>
561              implements Spliterator.OfPrimitive<T, T_CONS, T_SPLITR> {
OfPrimitive(Supplier<? extends T_SPLITR> supplier)562              OfPrimitive(Supplier<? extends T_SPLITR> supplier) {
563                  super(supplier);
564              }
565  
566              @Override
tryAdvance(T_CONS consumer)567              public boolean tryAdvance(T_CONS consumer) {
568                  return get().tryAdvance(consumer);
569              }
570  
571              @Override
forEachRemaining(T_CONS consumer)572              public void forEachRemaining(T_CONS consumer) {
573                  get().forEachRemaining(consumer);
574              }
575          }
576  
577          static final class OfInt
578                  extends OfPrimitive<Integer, IntConsumer, Spliterator.OfInt>
579                  implements Spliterator.OfInt {
580  
OfInt(Supplier<Spliterator.OfInt> supplier)581              OfInt(Supplier<Spliterator.OfInt> supplier) {
582                  super(supplier);
583              }
584          }
585  
586          static final class OfLong
587                  extends OfPrimitive<Long, LongConsumer, Spliterator.OfLong>
588                  implements Spliterator.OfLong {
589  
OfLong(Supplier<Spliterator.OfLong> supplier)590              OfLong(Supplier<Spliterator.OfLong> supplier) {
591                  super(supplier);
592              }
593          }
594  
595          static final class OfDouble
596                  extends OfPrimitive<Double, DoubleConsumer, Spliterator.OfDouble>
597                  implements Spliterator.OfDouble {
598  
OfDouble(Supplier<Spliterator.OfDouble> supplier)599              OfDouble(Supplier<Spliterator.OfDouble> supplier) {
600                  super(supplier);
601              }
602          }
603      }
604  
605      /**
606       * A slice Spliterator from a source Spliterator that reports
607       * {@code SUBSIZED}.
608       *
609       */
610      static abstract class SliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
611          // The start index of the slice
612          final long sliceOrigin;
613          // One past the last index of the slice
614          final long sliceFence;
615  
616          // The spliterator to slice
617          T_SPLITR s;
618          // current (absolute) index, modified on advance/split
619          long index;
620          // one past last (absolute) index or sliceFence, which ever is smaller
621          long fence;
622  
SliceSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence)623          SliceSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence) {
624              assert s.hasCharacteristics(Spliterator.SUBSIZED);
625              this.s = s;
626              this.sliceOrigin = sliceOrigin;
627              this.sliceFence = sliceFence;
628              this.index = origin;
629              this.fence = fence;
630          }
631  
makeSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence)632          protected abstract T_SPLITR makeSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence);
633  
trySplit()634          public T_SPLITR trySplit() {
635              if (sliceOrigin >= fence)
636                  return null;
637  
638              if (index >= fence)
639                  return null;
640  
641              // Keep splitting until the left and right splits intersect with the slice
642              // thereby ensuring the size estimate decreases.
643              // This also avoids creating empty spliterators which can result in
644              // existing and additionally created F/J tasks that perform
645              // redundant work on no elements.
646              while (true) {
647                  @SuppressWarnings("unchecked")
648                  T_SPLITR leftSplit = (T_SPLITR) s.trySplit();
649                  if (leftSplit == null)
650                      return null;
651  
652                  long leftSplitFenceUnbounded = index + leftSplit.estimateSize();
653                  long leftSplitFence = Math.min(leftSplitFenceUnbounded, sliceFence);
654                  if (sliceOrigin >= leftSplitFence) {
655                      // The left split does not intersect with, and is to the left of, the slice
656                      // The right split does intersect
657                      // Discard the left split and split further with the right split
658                      index = leftSplitFence;
659                  }
660                  else if (leftSplitFence >= sliceFence) {
661                      // The right split does not intersect with, and is to the right of, the slice
662                      // The left split does intersect
663                      // Discard the right split and split further with the left split
664                      s = leftSplit;
665                      fence = leftSplitFence;
666                  }
667                  else if (index >= sliceOrigin && leftSplitFenceUnbounded <= sliceFence) {
668                      // The left split is contained within the slice, return the underlying left split
669                      // Right split is contained within or intersects with the slice
670                      index = leftSplitFence;
671                      return leftSplit;
672                  } else {
673                      // The left split intersects with the slice
674                      // Right split is contained within or intersects with the slice
675                      return makeSpliterator(leftSplit, sliceOrigin, sliceFence, index, index = leftSplitFence);
676                  }
677              }
678          }
679  
estimateSize()680          public long estimateSize() {
681              return (sliceOrigin < fence)
682                     ? fence - Math.max(sliceOrigin, index) : 0;
683          }
684  
characteristics()685          public int characteristics() {
686              return s.characteristics();
687          }
688  
689          static final class OfRef<T>
690                  extends SliceSpliterator<T, Spliterator<T>>
691                  implements Spliterator<T> {
692  
OfRef(Spliterator<T> s, long sliceOrigin, long sliceFence)693              OfRef(Spliterator<T> s, long sliceOrigin, long sliceFence) {
694                  this(s, sliceOrigin, sliceFence, 0, Math.min(s.estimateSize(), sliceFence));
695              }
696  
OfRef(Spliterator<T> s, long sliceOrigin, long sliceFence, long origin, long fence)697              private OfRef(Spliterator<T> s,
698                            long sliceOrigin, long sliceFence, long origin, long fence) {
699                  super(s, sliceOrigin, sliceFence, origin, fence);
700              }
701  
702              @Override
makeSpliterator(Spliterator<T> s, long sliceOrigin, long sliceFence, long origin, long fence)703              protected Spliterator<T> makeSpliterator(Spliterator<T> s,
704                                                       long sliceOrigin, long sliceFence,
705                                                       long origin, long fence) {
706                  return new OfRef<>(s, sliceOrigin, sliceFence, origin, fence);
707              }
708  
709              @Override
tryAdvance(Consumer<? super T> action)710              public boolean tryAdvance(Consumer<? super T> action) {
711                  Objects.requireNonNull(action);
712  
713                  if (sliceOrigin >= fence)
714                      return false;
715  
716                  while (sliceOrigin > index) {
717                      s.tryAdvance(e -> {});
718                      index++;
719                  }
720  
721                  if (index >= fence)
722                      return false;
723  
724                  index++;
725                  return s.tryAdvance(action);
726              }
727  
728              @Override
forEachRemaining(Consumer<? super T> action)729              public void forEachRemaining(Consumer<? super T> action) {
730                  Objects.requireNonNull(action);
731  
732                  if (sliceOrigin >= fence)
733                      return;
734  
735                  if (index >= fence)
736                      return;
737  
738                  if (index >= sliceOrigin && (index + s.estimateSize()) <= sliceFence) {
739                      // The spliterator is contained within the slice
740                      s.forEachRemaining(action);
741                      index = fence;
742                  } else {
743                      // The spliterator intersects with the slice
744                      while (sliceOrigin > index) {
745                          s.tryAdvance(e -> {});
746                          index++;
747                      }
748                      // Traverse elements up to the fence
749                      for (;index < fence; index++) {
750                          s.tryAdvance(action);
751                      }
752                  }
753              }
754          }
755  
756          static abstract class OfPrimitive<T,
757                  T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>,
758                  T_CONS>
759                  extends SliceSpliterator<T, T_SPLITR>
760                  implements Spliterator.OfPrimitive<T, T_CONS, T_SPLITR> {
761  
OfPrimitive(T_SPLITR s, long sliceOrigin, long sliceFence)762              OfPrimitive(T_SPLITR s, long sliceOrigin, long sliceFence) {
763                  this(s, sliceOrigin, sliceFence, 0, Math.min(s.estimateSize(), sliceFence));
764              }
765  
OfPrimitive(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence)766              private OfPrimitive(T_SPLITR s,
767                                  long sliceOrigin, long sliceFence, long origin, long fence) {
768                  super(s, sliceOrigin, sliceFence, origin, fence);
769              }
770  
771              @Override
tryAdvance(T_CONS action)772              public boolean tryAdvance(T_CONS action) {
773                  Objects.requireNonNull(action);
774  
775                  if (sliceOrigin >= fence)
776                      return false;
777  
778                  while (sliceOrigin > index) {
779                      s.tryAdvance(emptyConsumer());
780                      index++;
781                  }
782  
783                  if (index >= fence)
784                      return false;
785  
786                  index++;
787                  return s.tryAdvance(action);
788              }
789  
790              @Override
forEachRemaining(T_CONS action)791              public void forEachRemaining(T_CONS action) {
792                  Objects.requireNonNull(action);
793  
794                  if (sliceOrigin >= fence)
795                      return;
796  
797                  if (index >= fence)
798                      return;
799  
800                  if (index >= sliceOrigin && (index + s.estimateSize()) <= sliceFence) {
801                      // The spliterator is contained within the slice
802                      s.forEachRemaining(action);
803                      index = fence;
804                  } else {
805                      // The spliterator intersects with the slice
806                      while (sliceOrigin > index) {
807                          s.tryAdvance(emptyConsumer());
808                          index++;
809                      }
810                      // Traverse elements up to the fence
811                      for (;index < fence; index++) {
812                          s.tryAdvance(action);
813                      }
814                  }
815              }
816  
emptyConsumer()817              protected abstract T_CONS emptyConsumer();
818          }
819  
820          static final class OfInt extends OfPrimitive<Integer, Spliterator.OfInt, IntConsumer>
821                  implements Spliterator.OfInt {
OfInt(Spliterator.OfInt s, long sliceOrigin, long sliceFence)822              OfInt(Spliterator.OfInt s, long sliceOrigin, long sliceFence) {
823                  super(s, sliceOrigin, sliceFence);
824              }
825  
OfInt(Spliterator.OfInt s, long sliceOrigin, long sliceFence, long origin, long fence)826              OfInt(Spliterator.OfInt s,
827                    long sliceOrigin, long sliceFence, long origin, long fence) {
828                  super(s, sliceOrigin, sliceFence, origin, fence);
829              }
830  
831              @Override
makeSpliterator(Spliterator.OfInt s, long sliceOrigin, long sliceFence, long origin, long fence)832              protected Spliterator.OfInt makeSpliterator(Spliterator.OfInt s,
833                                                          long sliceOrigin, long sliceFence,
834                                                          long origin, long fence) {
835                  return new SliceSpliterator.OfInt(s, sliceOrigin, sliceFence, origin, fence);
836              }
837  
838              @Override
emptyConsumer()839              protected IntConsumer emptyConsumer() {
840                  return e -> {};
841              }
842          }
843  
844          static final class OfLong extends OfPrimitive<Long, Spliterator.OfLong, LongConsumer>
845                  implements Spliterator.OfLong {
OfLong(Spliterator.OfLong s, long sliceOrigin, long sliceFence)846              OfLong(Spliterator.OfLong s, long sliceOrigin, long sliceFence) {
847                  super(s, sliceOrigin, sliceFence);
848              }
849  
OfLong(Spliterator.OfLong s, long sliceOrigin, long sliceFence, long origin, long fence)850              OfLong(Spliterator.OfLong s,
851                     long sliceOrigin, long sliceFence, long origin, long fence) {
852                  super(s, sliceOrigin, sliceFence, origin, fence);
853              }
854  
855              @Override
makeSpliterator(Spliterator.OfLong s, long sliceOrigin, long sliceFence, long origin, long fence)856              protected Spliterator.OfLong makeSpliterator(Spliterator.OfLong s,
857                                                           long sliceOrigin, long sliceFence,
858                                                           long origin, long fence) {
859                  return new SliceSpliterator.OfLong(s, sliceOrigin, sliceFence, origin, fence);
860              }
861  
862              @Override
emptyConsumer()863              protected LongConsumer emptyConsumer() {
864                  return e -> {};
865              }
866          }
867  
868          static final class OfDouble extends OfPrimitive<Double, Spliterator.OfDouble, DoubleConsumer>
869                  implements Spliterator.OfDouble {
OfDouble(Spliterator.OfDouble s, long sliceOrigin, long sliceFence)870              OfDouble(Spliterator.OfDouble s, long sliceOrigin, long sliceFence) {
871                  super(s, sliceOrigin, sliceFence);
872              }
873  
OfDouble(Spliterator.OfDouble s, long sliceOrigin, long sliceFence, long origin, long fence)874              OfDouble(Spliterator.OfDouble s,
875                       long sliceOrigin, long sliceFence, long origin, long fence) {
876                  super(s, sliceOrigin, sliceFence, origin, fence);
877              }
878  
879              @Override
makeSpliterator(Spliterator.OfDouble s, long sliceOrigin, long sliceFence, long origin, long fence)880              protected Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s,
881                                                             long sliceOrigin, long sliceFence,
882                                                             long origin, long fence) {
883                  return new SliceSpliterator.OfDouble(s, sliceOrigin, sliceFence, origin, fence);
884              }
885  
886              @Override
emptyConsumer()887              protected DoubleConsumer emptyConsumer() {
888                  return e -> {};
889              }
890          }
891      }
892  
893      /**
894       * A slice Spliterator that does not preserve order, if any, of a source
895       * Spliterator.
896       *
897       * Note: The source spliterator may report {@code ORDERED} since that
898       * spliterator be the result of a previous pipeline stage that was
899       * collected to a {@code Node}. It is the order of the pipeline stage
900       * that governs whether the this slice spliterator is to be used or not.
901       */
902      static abstract class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
903          static final int CHUNK_SIZE = 1 << 7;
904  
905          // The spliterator to slice
906          protected final T_SPLITR s;
907          protected final boolean unlimited;
908          private final long skipThreshold;
909          private final AtomicLong permits;
910  
UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit)911          UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
912              this.s = s;
913              this.unlimited = limit < 0;
914              this.skipThreshold = limit >= 0 ? limit : 0;
915              this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
916          }
917  
UnorderedSliceSpliterator(T_SPLITR s, UnorderedSliceSpliterator<T, T_SPLITR> parent)918          UnorderedSliceSpliterator(T_SPLITR s,
919                                    UnorderedSliceSpliterator<T, T_SPLITR> parent) {
920              this.s = s;
921              this.unlimited = parent.unlimited;
922              this.permits = parent.permits;
923              this.skipThreshold = parent.skipThreshold;
924          }
925  
926          /**
927           * Acquire permission to skip or process elements.  The caller must
928           * first acquire the elements, then consult this method for guidance
929           * as to what to do with the data.
930           *
931           * <p>We use an {@code AtomicLong} to atomically maintain a counter,
932           * which is initialized as skip+limit if we are limiting, or skip only
933           * if we are not limiting.  The user should consult the method
934           * {@code checkPermits()} before acquiring data elements.
935           *
936           * @param numElements the number of elements the caller has in hand
937           * @return the number of elements that should be processed; any
938           * remaining elements should be discarded.
939           */
acquirePermits(long numElements)940          protected final long acquirePermits(long numElements) {
941              long remainingPermits;
942              long grabbing;
943              // permits never increase, and don't decrease below zero
944              assert numElements > 0;
945              do {
946                  remainingPermits = permits.get();
947                  if (remainingPermits == 0)
948                      return unlimited ? numElements : 0;
949                  grabbing = Math.min(remainingPermits, numElements);
950              } while (grabbing > 0 &&
951                       !permits.compareAndSet(remainingPermits, remainingPermits - grabbing));
952  
953              if (unlimited)
954                  return Math.max(numElements - grabbing, 0);
955              else if (remainingPermits > skipThreshold)
956                  return Math.max(grabbing - (remainingPermits - skipThreshold), 0);
957              else
958                  return grabbing;
959          }
960  
961          enum PermitStatus { NO_MORE, MAYBE_MORE, UNLIMITED }
962  
963          /** Call to check if permits might be available before acquiring data */
permitStatus()964          protected final PermitStatus permitStatus() {
965              if (permits.get() > 0)
966                  return PermitStatus.MAYBE_MORE;
967              else
968                  return unlimited ?  PermitStatus.UNLIMITED : PermitStatus.NO_MORE;
969          }
970  
trySplit()971          public final T_SPLITR trySplit() {
972              // Stop splitting when there are no more limit permits
973              if (permits.get() == 0)
974                  return null;
975              @SuppressWarnings("unchecked")
976              T_SPLITR split = (T_SPLITR) s.trySplit();
977              return split == null ? null : makeSpliterator(split);
978          }
979  
makeSpliterator(T_SPLITR s)980          protected abstract T_SPLITR makeSpliterator(T_SPLITR s);
981  
estimateSize()982          public final long estimateSize() {
983              return s.estimateSize();
984          }
985  
characteristics()986          public final int characteristics() {
987              return s.characteristics() &
988                     ~(Spliterator.SIZED | Spliterator.SUBSIZED | Spliterator.ORDERED);
989          }
990  
991          static final class OfRef<T> extends UnorderedSliceSpliterator<T, Spliterator<T>>
992                  implements Spliterator<T>, Consumer<T> {
993              T tmpSlot;
994  
OfRef(Spliterator<T> s, long skip, long limit)995              OfRef(Spliterator<T> s, long skip, long limit) {
996                  super(s, skip, limit);
997              }
998  
OfRef(Spliterator<T> s, OfRef<T> parent)999              OfRef(Spliterator<T> s, OfRef<T> parent) {
1000                  super(s, parent);
1001              }
1002  
1003              @Override
accept(T t)1004              public final void accept(T t) {
1005                  tmpSlot = t;
1006              }
1007  
1008              @Override
tryAdvance(Consumer<? super T> action)1009              public boolean tryAdvance(Consumer<? super T> action) {
1010                  Objects.requireNonNull(action);
1011  
1012                  while (permitStatus() != PermitStatus.NO_MORE) {
1013                      if (!s.tryAdvance(this))
1014                          return false;
1015                      else if (acquirePermits(1) == 1) {
1016                          action.accept(tmpSlot);
1017                          tmpSlot = null;
1018                          return true;
1019                      }
1020                  }
1021                  return false;
1022              }
1023  
1024              @Override
forEachRemaining(Consumer<? super T> action)1025              public void forEachRemaining(Consumer<? super T> action) {
1026                  Objects.requireNonNull(action);
1027  
1028                  ArrayBuffer.OfRef<T> sb = null;
1029                  PermitStatus permitStatus;
1030                  while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
1031                      if (permitStatus == PermitStatus.MAYBE_MORE) {
1032                          // Optimistically traverse elements up to a threshold of CHUNK_SIZE
1033                          if (sb == null)
1034                              sb = new ArrayBuffer.OfRef<>(CHUNK_SIZE);
1035                          else
1036                              sb.reset();
1037                          long permitsRequested = 0;
1038                          do { } while (s.tryAdvance(sb) && ++permitsRequested < CHUNK_SIZE);
1039                          if (permitsRequested == 0)
1040                              return;
1041                          sb.forEach(action, acquirePermits(permitsRequested));
1042                      }
1043                      else {
1044                          // Must be UNLIMITED; let 'er rip
1045                          s.forEachRemaining(action);
1046                          return;
1047                      }
1048                  }
1049              }
1050  
1051              @Override
makeSpliterator(Spliterator<T> s)1052              protected Spliterator<T> makeSpliterator(Spliterator<T> s) {
1053                  return new UnorderedSliceSpliterator.OfRef<>(s, this);
1054              }
1055          }
1056  
1057          /**
1058           * Concrete sub-types must also be an instance of type {@code T_CONS}.
1059           *
1060           * @param <T_BUFF> the type of the spined buffer. Must also be a type of
1061           *        {@code T_CONS}.
1062           */
1063          static abstract class OfPrimitive<
1064                  T,
1065                  T_CONS,
1066                  T_BUFF extends ArrayBuffer.OfPrimitive<T_CONS>,
1067                  T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>>
1068                  extends UnorderedSliceSpliterator<T, T_SPLITR>
1069                  implements Spliterator.OfPrimitive<T, T_CONS, T_SPLITR> {
OfPrimitive(T_SPLITR s, long skip, long limit)1070              OfPrimitive(T_SPLITR s, long skip, long limit) {
1071                  super(s, skip, limit);
1072              }
1073  
OfPrimitive(T_SPLITR s, UnorderedSliceSpliterator.OfPrimitive<T, T_CONS, T_BUFF, T_SPLITR> parent)1074              OfPrimitive(T_SPLITR s, UnorderedSliceSpliterator.OfPrimitive<T, T_CONS, T_BUFF, T_SPLITR> parent) {
1075                  super(s, parent);
1076              }
1077  
1078              @Override
tryAdvance(T_CONS action)1079              public boolean tryAdvance(T_CONS action) {
1080                  Objects.requireNonNull(action);
1081                  @SuppressWarnings("unchecked")
1082                  T_CONS consumer = (T_CONS) this;
1083  
1084                  while (permitStatus() != PermitStatus.NO_MORE) {
1085                      if (!s.tryAdvance(consumer))
1086                          return false;
1087                      else if (acquirePermits(1) == 1) {
1088                          acceptConsumed(action);
1089                          return true;
1090                      }
1091                  }
1092                  return false;
1093              }
1094  
acceptConsumed(T_CONS action)1095              protected abstract void acceptConsumed(T_CONS action);
1096  
1097              @Override
forEachRemaining(T_CONS action)1098              public void forEachRemaining(T_CONS action) {
1099                  Objects.requireNonNull(action);
1100  
1101                  T_BUFF sb = null;
1102                  PermitStatus permitStatus;
1103                  while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
1104                      if (permitStatus == PermitStatus.MAYBE_MORE) {
1105                          // Optimistically traverse elements up to a threshold of CHUNK_SIZE
1106                          if (sb == null)
1107                              sb = bufferCreate(CHUNK_SIZE);
1108                          else
1109                              sb.reset();
1110                          @SuppressWarnings("unchecked")
1111                          T_CONS sbc = (T_CONS) sb;
1112                          long permitsRequested = 0;
1113                          do { } while (s.tryAdvance(sbc) && ++permitsRequested < CHUNK_SIZE);
1114                          if (permitsRequested == 0)
1115                              return;
1116                          sb.forEach(action, acquirePermits(permitsRequested));
1117                      }
1118                      else {
1119                          // Must be UNLIMITED; let 'er rip
1120                          s.forEachRemaining(action);
1121                          return;
1122                      }
1123                  }
1124              }
1125  
bufferCreate(int initialCapacity)1126              protected abstract T_BUFF bufferCreate(int initialCapacity);
1127          }
1128  
1129          static final class OfInt
1130                  extends OfPrimitive<Integer, IntConsumer, ArrayBuffer.OfInt, Spliterator.OfInt>
1131                  implements Spliterator.OfInt, IntConsumer {
1132  
1133              int tmpValue;
1134  
OfInt(Spliterator.OfInt s, long skip, long limit)1135              OfInt(Spliterator.OfInt s, long skip, long limit) {
1136                  super(s, skip, limit);
1137              }
1138  
OfInt(Spliterator.OfInt s, UnorderedSliceSpliterator.OfInt parent)1139              OfInt(Spliterator.OfInt s, UnorderedSliceSpliterator.OfInt parent) {
1140                  super(s, parent);
1141              }
1142  
1143              @Override
accept(int value)1144              public void accept(int value) {
1145                  tmpValue = value;
1146              }
1147  
1148              @Override
acceptConsumed(IntConsumer action)1149              protected void acceptConsumed(IntConsumer action) {
1150                  action.accept(tmpValue);
1151              }
1152  
1153              @Override
bufferCreate(int initialCapacity)1154              protected ArrayBuffer.OfInt bufferCreate(int initialCapacity) {
1155                  return new ArrayBuffer.OfInt(initialCapacity);
1156              }
1157  
1158              @Override
makeSpliterator(Spliterator.OfInt s)1159              protected Spliterator.OfInt makeSpliterator(Spliterator.OfInt s) {
1160                  return new UnorderedSliceSpliterator.OfInt(s, this);
1161              }
1162          }
1163  
1164          static final class OfLong
1165                  extends OfPrimitive<Long, LongConsumer, ArrayBuffer.OfLong, Spliterator.OfLong>
1166                  implements Spliterator.OfLong, LongConsumer {
1167  
1168              long tmpValue;
1169  
OfLong(Spliterator.OfLong s, long skip, long limit)1170              OfLong(Spliterator.OfLong s, long skip, long limit) {
1171                  super(s, skip, limit);
1172              }
1173  
OfLong(Spliterator.OfLong s, UnorderedSliceSpliterator.OfLong parent)1174              OfLong(Spliterator.OfLong s, UnorderedSliceSpliterator.OfLong parent) {
1175                  super(s, parent);
1176              }
1177  
1178              @Override
accept(long value)1179              public void accept(long value) {
1180                  tmpValue = value;
1181              }
1182  
1183              @Override
acceptConsumed(LongConsumer action)1184              protected void acceptConsumed(LongConsumer action) {
1185                  action.accept(tmpValue);
1186              }
1187  
1188              @Override
bufferCreate(int initialCapacity)1189              protected ArrayBuffer.OfLong bufferCreate(int initialCapacity) {
1190                  return new ArrayBuffer.OfLong(initialCapacity);
1191              }
1192  
1193              @Override
makeSpliterator(Spliterator.OfLong s)1194              protected Spliterator.OfLong makeSpliterator(Spliterator.OfLong s) {
1195                  return new UnorderedSliceSpliterator.OfLong(s, this);
1196              }
1197          }
1198  
1199          static final class OfDouble
1200                  extends OfPrimitive<Double, DoubleConsumer, ArrayBuffer.OfDouble, Spliterator.OfDouble>
1201                  implements Spliterator.OfDouble, DoubleConsumer {
1202  
1203              double tmpValue;
1204  
OfDouble(Spliterator.OfDouble s, long skip, long limit)1205              OfDouble(Spliterator.OfDouble s, long skip, long limit) {
1206                  super(s, skip, limit);
1207              }
1208  
OfDouble(Spliterator.OfDouble s, UnorderedSliceSpliterator.OfDouble parent)1209              OfDouble(Spliterator.OfDouble s, UnorderedSliceSpliterator.OfDouble parent) {
1210                  super(s, parent);
1211              }
1212  
1213              @Override
accept(double value)1214              public void accept(double value) {
1215                  tmpValue = value;
1216              }
1217  
1218              @Override
acceptConsumed(DoubleConsumer action)1219              protected void acceptConsumed(DoubleConsumer action) {
1220                  action.accept(tmpValue);
1221              }
1222  
1223              @Override
bufferCreate(int initialCapacity)1224              protected ArrayBuffer.OfDouble bufferCreate(int initialCapacity) {
1225                  return new ArrayBuffer.OfDouble(initialCapacity);
1226              }
1227  
1228              @Override
makeSpliterator(Spliterator.OfDouble s)1229              protected Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s) {
1230                  return new UnorderedSliceSpliterator.OfDouble(s, this);
1231              }
1232          }
1233      }
1234  
1235      /**
1236       * A wrapping spliterator that only reports distinct elements of the
1237       * underlying spliterator. Does not preserve size and encounter order.
1238       */
1239      static final class DistinctSpliterator<T> implements Spliterator<T>, Consumer<T> {
1240  
1241          // The value to represent null in the ConcurrentHashMap
1242          private static final Object NULL_VALUE = new Object();
1243  
1244          // The underlying spliterator
1245          private final Spliterator<T> s;
1246  
1247          // ConcurrentHashMap holding distinct elements as keys
1248          private final ConcurrentHashMap<T, Boolean> seen;
1249  
1250          // Temporary element, only used with tryAdvance
1251          private T tmpSlot;
1252  
DistinctSpliterator(Spliterator<T> s)1253          DistinctSpliterator(Spliterator<T> s) {
1254              this(s, new ConcurrentHashMap<>());
1255          }
1256  
DistinctSpliterator(Spliterator<T> s, ConcurrentHashMap<T, Boolean> seen)1257          private DistinctSpliterator(Spliterator<T> s, ConcurrentHashMap<T, Boolean> seen) {
1258              this.s = s;
1259              this.seen = seen;
1260          }
1261  
1262          @Override
accept(T t)1263          public void accept(T t) {
1264              this.tmpSlot = t;
1265          }
1266  
1267          @SuppressWarnings("unchecked")
mapNull(T t)1268          private T mapNull(T t) {
1269              return t != null ? t : (T) NULL_VALUE;
1270          }
1271  
1272          @Override
tryAdvance(Consumer<? super T> action)1273          public boolean tryAdvance(Consumer<? super T> action) {
1274              while (s.tryAdvance(this)) {
1275                  if (seen.putIfAbsent(mapNull(tmpSlot), Boolean.TRUE) == null) {
1276                      action.accept(tmpSlot);
1277                      tmpSlot = null;
1278                      return true;
1279                  }
1280              }
1281              return false;
1282          }
1283  
1284          @Override
forEachRemaining(Consumer<? super T> action)1285          public void forEachRemaining(Consumer<? super T> action) {
1286              s.forEachRemaining(t -> {
1287                  if (seen.putIfAbsent(mapNull(t), Boolean.TRUE) == null) {
1288                      action.accept(t);
1289                  }
1290              });
1291          }
1292  
1293          @Override
trySplit()1294          public Spliterator<T> trySplit() {
1295              Spliterator<T> split = s.trySplit();
1296              return (split != null) ? new DistinctSpliterator<>(split, seen) : null;
1297          }
1298  
1299          @Override
estimateSize()1300          public long estimateSize() {
1301              return s.estimateSize();
1302          }
1303  
1304          @Override
characteristics()1305          public int characteristics() {
1306              return (s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED |
1307                                              Spliterator.SORTED | Spliterator.ORDERED))
1308                     | Spliterator.DISTINCT;
1309          }
1310  
1311          @Override
getComparator()1312          public Comparator<? super T> getComparator() {
1313              return s.getComparator();
1314          }
1315      }
1316  
1317      /**
1318       * A Spliterator that infinitely supplies elements in no particular order.
1319       *
1320       * <p>Splitting divides the estimated size in two and stops when the
1321       * estimate size is 0.
1322       *
1323       * <p>The {@code forEachRemaining} method if invoked will never terminate.
1324       * The {@code tryAdvance} method always returns true.
1325       *
1326       */
1327      static abstract class InfiniteSupplyingSpliterator<T> implements Spliterator<T> {
1328          long estimate;
1329  
InfiniteSupplyingSpliterator(long estimate)1330          protected InfiniteSupplyingSpliterator(long estimate) {
1331              this.estimate = estimate;
1332          }
1333  
1334          @Override
estimateSize()1335          public long estimateSize() {
1336              return estimate;
1337          }
1338  
1339          @Override
characteristics()1340          public int characteristics() {
1341              return IMMUTABLE;
1342          }
1343  
1344          static final class OfRef<T> extends InfiniteSupplyingSpliterator<T> {
1345              final Supplier<T> s;
1346  
OfRef(long size, Supplier<T> s)1347              OfRef(long size, Supplier<T> s) {
1348                  super(size);
1349                  this.s = s;
1350              }
1351  
1352              @Override
tryAdvance(Consumer<? super T> action)1353              public boolean tryAdvance(Consumer<? super T> action) {
1354                  Objects.requireNonNull(action);
1355  
1356                  action.accept(s.get());
1357                  return true;
1358              }
1359  
1360              @Override
trySplit()1361              public Spliterator<T> trySplit() {
1362                  if (estimate == 0)
1363                      return null;
1364                  return new InfiniteSupplyingSpliterator.OfRef<>(estimate >>>= 1, s);
1365              }
1366          }
1367  
1368          static final class OfInt extends InfiniteSupplyingSpliterator<Integer>
1369                  implements Spliterator.OfInt {
1370              final IntSupplier s;
1371  
OfInt(long size, IntSupplier s)1372              OfInt(long size, IntSupplier s) {
1373                  super(size);
1374                  this.s = s;
1375              }
1376  
1377              @Override
tryAdvance(IntConsumer action)1378              public boolean tryAdvance(IntConsumer action) {
1379                  Objects.requireNonNull(action);
1380  
1381                  action.accept(s.getAsInt());
1382                  return true;
1383              }
1384  
1385              @Override
trySplit()1386              public Spliterator.OfInt trySplit() {
1387                  if (estimate == 0)
1388                      return null;
1389                  return new InfiniteSupplyingSpliterator.OfInt(estimate = estimate >>> 1, s);
1390              }
1391          }
1392  
1393          static final class OfLong extends InfiniteSupplyingSpliterator<Long>
1394                  implements Spliterator.OfLong {
1395              final LongSupplier s;
1396  
OfLong(long size, LongSupplier s)1397              OfLong(long size, LongSupplier s) {
1398                  super(size);
1399                  this.s = s;
1400              }
1401  
1402              @Override
tryAdvance(LongConsumer action)1403              public boolean tryAdvance(LongConsumer action) {
1404                  Objects.requireNonNull(action);
1405  
1406                  action.accept(s.getAsLong());
1407                  return true;
1408              }
1409  
1410              @Override
trySplit()1411              public Spliterator.OfLong trySplit() {
1412                  if (estimate == 0)
1413                      return null;
1414                  return new InfiniteSupplyingSpliterator.OfLong(estimate = estimate >>> 1, s);
1415              }
1416          }
1417  
1418          static final class OfDouble extends InfiniteSupplyingSpliterator<Double>
1419                  implements Spliterator.OfDouble {
1420              final DoubleSupplier s;
1421  
OfDouble(long size, DoubleSupplier s)1422              OfDouble(long size, DoubleSupplier s) {
1423                  super(size);
1424                  this.s = s;
1425              }
1426  
1427              @Override
tryAdvance(DoubleConsumer action)1428              public boolean tryAdvance(DoubleConsumer action) {
1429                  Objects.requireNonNull(action);
1430  
1431                  action.accept(s.getAsDouble());
1432                  return true;
1433              }
1434  
1435              @Override
trySplit()1436              public Spliterator.OfDouble trySplit() {
1437                  if (estimate == 0)
1438                      return null;
1439                  return new InfiniteSupplyingSpliterator.OfDouble(estimate = estimate >>> 1, s);
1440              }
1441          }
1442      }
1443  
1444      // @@@ Consolidate with Node.Builder
1445      static abstract class ArrayBuffer {
1446          int index;
1447  
reset()1448          void reset() {
1449              index = 0;
1450          }
1451  
1452          static final class OfRef<T> extends ArrayBuffer implements Consumer<T> {
1453              final Object[] array;
1454  
OfRef(int size)1455              OfRef(int size) {
1456                  this.array = new Object[size];
1457              }
1458  
1459              @Override
accept(T t)1460              public void accept(T t) {
1461                  array[index++] = t;
1462              }
1463  
forEach(Consumer<? super T> action, long fence)1464              public void forEach(Consumer<? super T> action, long fence) {
1465                  for (int i = 0; i < fence; i++) {
1466                      @SuppressWarnings("unchecked")
1467                      T t = (T) array[i];
1468                      action.accept(t);
1469                  }
1470              }
1471          }
1472  
1473          static abstract class OfPrimitive<T_CONS> extends ArrayBuffer {
1474              int index;
1475  
1476              @Override
reset()1477              void reset() {
1478                  index = 0;
1479              }
1480  
forEach(T_CONS action, long fence)1481              abstract void forEach(T_CONS action, long fence);
1482          }
1483  
1484          static final class OfInt extends OfPrimitive<IntConsumer>
1485                  implements IntConsumer {
1486              final int[] array;
1487  
OfInt(int size)1488              OfInt(int size) {
1489                  this.array = new int[size];
1490              }
1491  
1492              @Override
accept(int t)1493              public void accept(int t) {
1494                  array[index++] = t;
1495              }
1496  
1497              @Override
forEach(IntConsumer action, long fence)1498              public void forEach(IntConsumer action, long fence) {
1499                  for (int i = 0; i < fence; i++) {
1500                      action.accept(array[i]);
1501                  }
1502              }
1503          }
1504  
1505          static final class OfLong extends OfPrimitive<LongConsumer>
1506                  implements LongConsumer {
1507              final long[] array;
1508  
OfLong(int size)1509              OfLong(int size) {
1510                  this.array = new long[size];
1511              }
1512  
1513              @Override
accept(long t)1514              public void accept(long t) {
1515                  array[index++] = t;
1516              }
1517  
1518              @Override
forEach(LongConsumer action, long fence)1519              public void forEach(LongConsumer action, long fence) {
1520                  for (int i = 0; i < fence; i++) {
1521                      action.accept(array[i]);
1522                  }
1523              }
1524          }
1525  
1526          static final class OfDouble extends OfPrimitive<DoubleConsumer>
1527                  implements DoubleConsumer {
1528              final double[] array;
1529  
OfDouble(int size)1530              OfDouble(int size) {
1531                  this.array = new double[size];
1532              }
1533  
1534              @Override
accept(double t)1535              public void accept(double t) {
1536                  array[index++] = t;
1537              }
1538  
1539              @Override
forEach(DoubleConsumer action, long fence)1540              void forEach(DoubleConsumer action, long fence) {
1541                  for (int i = 0; i < fence; i++) {
1542                      action.accept(array[i]);
1543                  }
1544              }
1545          }
1546      }
1547  }
1548  
1549