1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 package java.util.stream; 26 27 import java.util.Comparator; 28 import java.util.Objects; 29 import java.util.Spliterator; 30 import java.util.concurrent.ConcurrentHashMap; 31 import java.util.concurrent.atomic.AtomicLong; 32 import java.util.function.BooleanSupplier; 33 import java.util.function.Consumer; 34 import java.util.function.DoubleConsumer; 35 import java.util.function.DoubleSupplier; 36 import java.util.function.IntConsumer; 37 import java.util.function.IntSupplier; 38 import java.util.function.LongConsumer; 39 import java.util.function.LongSupplier; 40 import java.util.function.Supplier; 41 42 /** 43 * Spliterator implementations for wrapping and delegating spliterators, used 44 * in the implementation of the {@link Stream#spliterator()} method. 45 * 46 * @since 1.8 47 */ 48 class StreamSpliterators { 49 50 /** 51 * Abstract wrapping spliterator that binds to the spliterator of a 52 * pipeline helper on first operation. 53 * 54 * <p>This spliterator is not late-binding and will bind to the source 55 * spliterator when first operated on. 56 * 57 * <p>A wrapping spliterator produced from a sequential stream 58 * cannot be split if there are stateful operations present. 59 */ 60 private static abstract class AbstractWrappingSpliterator<P_IN, P_OUT, 61 T_BUFFER extends AbstractSpinedBuffer> 62 implements Spliterator<P_OUT> { 63 64 // @@@ Detect if stateful operations are present or not 65 // If not then can split otherwise cannot 66 67 /** 68 * True if this spliterator supports splitting 69 */ 70 final boolean isParallel; 71 72 final PipelineHelper<P_OUT> ph; 73 74 /** 75 * Supplier for the source spliterator. Client provides either a 76 * spliterator or a supplier. 77 */ 78 private Supplier<Spliterator<P_IN>> spliteratorSupplier; 79 80 /** 81 * Source spliterator. Either provided from client or obtained from 82 * supplier. 83 */ 84 Spliterator<P_IN> spliterator; 85 86 /** 87 * Sink chain for the downstream stages of the pipeline, ultimately 88 * leading to the buffer. Used during partial traversal. 89 */ 90 Sink<P_IN> bufferSink; 91 92 /** 93 * A function that advances one element of the spliterator, pushing 94 * it to bufferSink. Returns whether any elements were processed. 95 * Used during partial traversal. 96 */ 97 BooleanSupplier pusher; 98 99 /** Next element to consume from the buffer, used during partial traversal */ 100 long nextToConsume; 101 102 /** Buffer into which elements are pushed. Used during partial traversal. */ 103 T_BUFFER buffer; 104 105 /** 106 * True if full traversal has occurred (with possible cancelation). 107 * If doing a partial traversal, there may be still elements in buffer. 108 */ 109 boolean finished; 110 111 /** 112 * Construct an AbstractWrappingSpliterator from a 113 * {@code Supplier<Spliterator>}. 114 */ AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph, Supplier<Spliterator<P_IN>> spliteratorSupplier, boolean parallel)115 AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph, 116 Supplier<Spliterator<P_IN>> spliteratorSupplier, 117 boolean parallel) { 118 this.ph = ph; 119 this.spliteratorSupplier = spliteratorSupplier; 120 this.spliterator = null; 121 this.isParallel = parallel; 122 } 123 124 /** 125 * Construct an AbstractWrappingSpliterator from a 126 * {@code Spliterator}. 127 */ AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph, Spliterator<P_IN> spliterator, boolean parallel)128 AbstractWrappingSpliterator(PipelineHelper<P_OUT> ph, 129 Spliterator<P_IN> spliterator, 130 boolean parallel) { 131 this.ph = ph; 132 this.spliteratorSupplier = null; 133 this.spliterator = spliterator; 134 this.isParallel = parallel; 135 } 136 137 /** 138 * Called before advancing to set up spliterator, if needed. 139 */ init()140 final void init() { 141 if (spliterator == null) { 142 spliterator = spliteratorSupplier.get(); 143 spliteratorSupplier = null; 144 } 145 } 146 147 /** 148 * Get an element from the source, pushing it into the sink chain, 149 * setting up the buffer if needed 150 * @return whether there are elements to consume from the buffer 151 */ doAdvance()152 final boolean doAdvance() { 153 if (buffer == null) { 154 if (finished) 155 return false; 156 157 init(); 158 initPartialTraversalState(); 159 nextToConsume = 0; 160 bufferSink.begin(spliterator.getExactSizeIfKnown()); 161 return fillBuffer(); 162 } 163 else { 164 ++nextToConsume; 165 boolean hasNext = nextToConsume < buffer.count(); 166 if (!hasNext) { 167 nextToConsume = 0; 168 buffer.clear(); 169 hasNext = fillBuffer(); 170 } 171 return hasNext; 172 } 173 } 174 175 /** 176 * Invokes the shape-specific constructor with the provided arguments 177 * and returns the result. 178 */ 179 abstract AbstractWrappingSpliterator<P_IN, P_OUT, ?> wrap(Spliterator<P_IN> s); 180 181 /** 182 * Initializes buffer, sink chain, and pusher for a shape-specific 183 * implementation. 184 */ 185 abstract void initPartialTraversalState(); 186 187 @Override 188 public Spliterator<P_OUT> trySplit() { 189 if (isParallel && !finished) { 190 init(); 191 192 Spliterator<P_IN> split = spliterator.trySplit(); 193 return (split == null) ? null : wrap(split); 194 } 195 else 196 return null; 197 } 198 199 /** 200 * If the buffer is empty, push elements into the sink chain until 201 * the source is empty or cancellation is requested. 202 * @return whether there are elements to consume from the buffer 203 */ 204 private boolean fillBuffer() { 205 while (buffer.count() == 0) { 206 if (bufferSink.cancellationRequested() || !pusher.getAsBoolean()) { 207 if (finished) 208 return false; 209 else { 210 bufferSink.end(); // might trigger more elements 211 finished = true; 212 } 213 } 214 } 215 return true; 216 } 217 218 @Override 219 public final long estimateSize() { 220 init(); 221 // Use the estimate of the wrapped spliterator 222 // Note this may not be accurate if there are filter/flatMap 223 // operations filtering or adding elements to the stream 224 return spliterator.estimateSize(); 225 } 226 227 @Override 228 public final long getExactSizeIfKnown() { 229 init(); 230 return StreamOpFlag.SIZED.isKnown(ph.getStreamAndOpFlags()) 231 ? spliterator.getExactSizeIfKnown() 232 : -1; 233 } 234 235 @Override 236 public final int characteristics() { 237 init(); 238 239 // Get the characteristics from the pipeline 240 int c = StreamOpFlag.toCharacteristics(StreamOpFlag.toStreamFlags(ph.getStreamAndOpFlags())); 241 242 // Mask off the size and uniform characteristics and replace with 243 // those of the spliterator 244 // Note that a non-uniform spliterator can change from something 245 // with an exact size to an estimate for a sub-split, for example 246 // with HashSet where the size is known at the top level spliterator 247 // but for sub-splits only an estimate is known 248 if ((c & Spliterator.SIZED) != 0) { 249 c &= ~(Spliterator.SIZED | Spliterator.SUBSIZED); 250 c |= (spliterator.characteristics() & (Spliterator.SIZED | Spliterator.SUBSIZED)); 251 } 252 253 return c; 254 } 255 256 @Override 257 public Comparator<? super P_OUT> getComparator() { 258 if (!hasCharacteristics(SORTED)) 259 throw new IllegalStateException(); 260 return null; 261 } 262 263 @Override 264 public final String toString() { 265 return String.format("%s[%s]", getClass().getName(), spliterator); 266 } 267 } 268 269 static final class WrappingSpliterator<P_IN, P_OUT> 270 extends AbstractWrappingSpliterator<P_IN, P_OUT, SpinedBuffer<P_OUT>> { 271 272 WrappingSpliterator(PipelineHelper<P_OUT> ph, 273 Supplier<Spliterator<P_IN>> supplier, 274 boolean parallel) { 275 super(ph, supplier, parallel); 276 } 277 278 WrappingSpliterator(PipelineHelper<P_OUT> ph, 279 Spliterator<P_IN> spliterator, 280 boolean parallel) { 281 super(ph, spliterator, parallel); 282 } 283 284 @Override 285 WrappingSpliterator<P_IN, P_OUT> wrap(Spliterator<P_IN> s) { 286 return new WrappingSpliterator<>(ph, s, isParallel); 287 } 288 289 @Override 290 void initPartialTraversalState() { 291 SpinedBuffer<P_OUT> b = new SpinedBuffer<>(); 292 buffer = b; 293 bufferSink = ph.wrapSink(b::accept); 294 pusher = () -> spliterator.tryAdvance(bufferSink); 295 } 296 297 @Override tryAdvance(Consumer<? super P_OUT> consumer)298 public boolean tryAdvance(Consumer<? super P_OUT> consumer) { 299 Objects.requireNonNull(consumer); 300 boolean hasNext = doAdvance(); 301 if (hasNext) 302 consumer.accept(buffer.get(nextToConsume)); 303 return hasNext; 304 } 305 306 @Override forEachRemaining(Consumer<? super P_OUT> consumer)307 public void forEachRemaining(Consumer<? super P_OUT> consumer) { 308 if (buffer == null && !finished) { 309 Objects.requireNonNull(consumer); 310 init(); 311 312 ph.wrapAndCopyInto((Sink<P_OUT>) consumer::accept, spliterator); 313 finished = true; 314 } 315 else { 316 do { } while (tryAdvance(consumer)); 317 } 318 } 319 } 320 321 static final class IntWrappingSpliterator<P_IN> 322 extends AbstractWrappingSpliterator<P_IN, Integer, SpinedBuffer.OfInt> 323 implements Spliterator.OfInt { 324 IntWrappingSpliterator(PipelineHelper<Integer> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel)325 IntWrappingSpliterator(PipelineHelper<Integer> ph, 326 Supplier<Spliterator<P_IN>> supplier, 327 boolean parallel) { 328 super(ph, supplier, parallel); 329 } 330 IntWrappingSpliterator(PipelineHelper<Integer> ph, Spliterator<P_IN> spliterator, boolean parallel)331 IntWrappingSpliterator(PipelineHelper<Integer> ph, 332 Spliterator<P_IN> spliterator, 333 boolean parallel) { 334 super(ph, spliterator, parallel); 335 } 336 337 @Override wrap(Spliterator<P_IN> s)338 AbstractWrappingSpliterator<P_IN, Integer, ?> wrap(Spliterator<P_IN> s) { 339 return new IntWrappingSpliterator<>(ph, s, isParallel); 340 } 341 342 @Override initPartialTraversalState()343 void initPartialTraversalState() { 344 SpinedBuffer.OfInt b = new SpinedBuffer.OfInt(); 345 buffer = b; 346 bufferSink = ph.wrapSink((Sink.OfInt) b::accept); 347 pusher = () -> spliterator.tryAdvance(bufferSink); 348 } 349 350 @Override trySplit()351 public Spliterator.OfInt trySplit() { 352 return (Spliterator.OfInt) super.trySplit(); 353 } 354 355 @Override tryAdvance(IntConsumer consumer)356 public boolean tryAdvance(IntConsumer consumer) { 357 Objects.requireNonNull(consumer); 358 boolean hasNext = doAdvance(); 359 if (hasNext) 360 consumer.accept(buffer.get(nextToConsume)); 361 return hasNext; 362 } 363 364 @Override forEachRemaining(IntConsumer consumer)365 public void forEachRemaining(IntConsumer consumer) { 366 if (buffer == null && !finished) { 367 Objects.requireNonNull(consumer); 368 init(); 369 370 ph.wrapAndCopyInto((Sink.OfInt) consumer::accept, spliterator); 371 finished = true; 372 } 373 else { 374 do { } while (tryAdvance(consumer)); 375 } 376 } 377 } 378 379 static final class LongWrappingSpliterator<P_IN> 380 extends AbstractWrappingSpliterator<P_IN, Long, SpinedBuffer.OfLong> 381 implements Spliterator.OfLong { 382 LongWrappingSpliterator(PipelineHelper<Long> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel)383 LongWrappingSpliterator(PipelineHelper<Long> ph, 384 Supplier<Spliterator<P_IN>> supplier, 385 boolean parallel) { 386 super(ph, supplier, parallel); 387 } 388 LongWrappingSpliterator(PipelineHelper<Long> ph, Spliterator<P_IN> spliterator, boolean parallel)389 LongWrappingSpliterator(PipelineHelper<Long> ph, 390 Spliterator<P_IN> spliterator, 391 boolean parallel) { 392 super(ph, spliterator, parallel); 393 } 394 395 @Override wrap(Spliterator<P_IN> s)396 AbstractWrappingSpliterator<P_IN, Long, ?> wrap(Spliterator<P_IN> s) { 397 return new LongWrappingSpliterator<>(ph, s, isParallel); 398 } 399 400 @Override initPartialTraversalState()401 void initPartialTraversalState() { 402 SpinedBuffer.OfLong b = new SpinedBuffer.OfLong(); 403 buffer = b; 404 bufferSink = ph.wrapSink((Sink.OfLong) b::accept); 405 pusher = () -> spliterator.tryAdvance(bufferSink); 406 } 407 408 @Override trySplit()409 public Spliterator.OfLong trySplit() { 410 return (Spliterator.OfLong) super.trySplit(); 411 } 412 413 @Override tryAdvance(LongConsumer consumer)414 public boolean tryAdvance(LongConsumer consumer) { 415 Objects.requireNonNull(consumer); 416 boolean hasNext = doAdvance(); 417 if (hasNext) 418 consumer.accept(buffer.get(nextToConsume)); 419 return hasNext; 420 } 421 422 @Override forEachRemaining(LongConsumer consumer)423 public void forEachRemaining(LongConsumer consumer) { 424 if (buffer == null && !finished) { 425 Objects.requireNonNull(consumer); 426 init(); 427 428 ph.wrapAndCopyInto((Sink.OfLong) consumer::accept, spliterator); 429 finished = true; 430 } 431 else { 432 do { } while (tryAdvance(consumer)); 433 } 434 } 435 } 436 437 static final class DoubleWrappingSpliterator<P_IN> 438 extends AbstractWrappingSpliterator<P_IN, Double, SpinedBuffer.OfDouble> 439 implements Spliterator.OfDouble { 440 DoubleWrappingSpliterator(PipelineHelper<Double> ph, Supplier<Spliterator<P_IN>> supplier, boolean parallel)441 DoubleWrappingSpliterator(PipelineHelper<Double> ph, 442 Supplier<Spliterator<P_IN>> supplier, 443 boolean parallel) { 444 super(ph, supplier, parallel); 445 } 446 DoubleWrappingSpliterator(PipelineHelper<Double> ph, Spliterator<P_IN> spliterator, boolean parallel)447 DoubleWrappingSpliterator(PipelineHelper<Double> ph, 448 Spliterator<P_IN> spliterator, 449 boolean parallel) { 450 super(ph, spliterator, parallel); 451 } 452 453 @Override wrap(Spliterator<P_IN> s)454 AbstractWrappingSpliterator<P_IN, Double, ?> wrap(Spliterator<P_IN> s) { 455 return new DoubleWrappingSpliterator<>(ph, s, isParallel); 456 } 457 458 @Override initPartialTraversalState()459 void initPartialTraversalState() { 460 SpinedBuffer.OfDouble b = new SpinedBuffer.OfDouble(); 461 buffer = b; 462 bufferSink = ph.wrapSink((Sink.OfDouble) b::accept); 463 pusher = () -> spliterator.tryAdvance(bufferSink); 464 } 465 466 @Override trySplit()467 public Spliterator.OfDouble trySplit() { 468 return (Spliterator.OfDouble) super.trySplit(); 469 } 470 471 @Override tryAdvance(DoubleConsumer consumer)472 public boolean tryAdvance(DoubleConsumer consumer) { 473 Objects.requireNonNull(consumer); 474 boolean hasNext = doAdvance(); 475 if (hasNext) 476 consumer.accept(buffer.get(nextToConsume)); 477 return hasNext; 478 } 479 480 @Override forEachRemaining(DoubleConsumer consumer)481 public void forEachRemaining(DoubleConsumer consumer) { 482 if (buffer == null && !finished) { 483 Objects.requireNonNull(consumer); 484 init(); 485 486 ph.wrapAndCopyInto((Sink.OfDouble) consumer::accept, spliterator); 487 finished = true; 488 } 489 else { 490 do { } while (tryAdvance(consumer)); 491 } 492 } 493 } 494 495 /** 496 * Spliterator implementation that delegates to an underlying spliterator, 497 * acquiring the spliterator from a {@code Supplier<Spliterator>} on the 498 * first call to any spliterator method. 499 * @param <T> 500 */ 501 static class DelegatingSpliterator<T, T_SPLITR extends Spliterator<T>> 502 implements Spliterator<T> { 503 private final Supplier<? extends T_SPLITR> supplier; 504 505 private T_SPLITR s; 506 DelegatingSpliterator(Supplier<? extends T_SPLITR> supplier)507 DelegatingSpliterator(Supplier<? extends T_SPLITR> supplier) { 508 this.supplier = supplier; 509 } 510 get()511 T_SPLITR get() { 512 if (s == null) { 513 s = supplier.get(); 514 } 515 return s; 516 } 517 518 @Override 519 @SuppressWarnings("unchecked") trySplit()520 public T_SPLITR trySplit() { 521 return (T_SPLITR) get().trySplit(); 522 } 523 524 @Override tryAdvance(Consumer<? super T> consumer)525 public boolean tryAdvance(Consumer<? super T> consumer) { 526 return get().tryAdvance(consumer); 527 } 528 529 @Override forEachRemaining(Consumer<? super T> consumer)530 public void forEachRemaining(Consumer<? super T> consumer) { 531 get().forEachRemaining(consumer); 532 } 533 534 @Override estimateSize()535 public long estimateSize() { 536 return get().estimateSize(); 537 } 538 539 @Override characteristics()540 public int characteristics() { 541 return get().characteristics(); 542 } 543 544 @Override getComparator()545 public Comparator<? super T> getComparator() { 546 return get().getComparator(); 547 } 548 549 @Override getExactSizeIfKnown()550 public long getExactSizeIfKnown() { 551 return get().getExactSizeIfKnown(); 552 } 553 554 @Override toString()555 public String toString() { 556 return getClass().getName() + "[" + get() + "]"; 557 } 558 559 static class OfPrimitive<T, T_CONS, T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>> 560 extends DelegatingSpliterator<T, T_SPLITR> 561 implements Spliterator.OfPrimitive<T, T_CONS, T_SPLITR> { OfPrimitive(Supplier<? extends T_SPLITR> supplier)562 OfPrimitive(Supplier<? extends T_SPLITR> supplier) { 563 super(supplier); 564 } 565 566 @Override tryAdvance(T_CONS consumer)567 public boolean tryAdvance(T_CONS consumer) { 568 return get().tryAdvance(consumer); 569 } 570 571 @Override forEachRemaining(T_CONS consumer)572 public void forEachRemaining(T_CONS consumer) { 573 get().forEachRemaining(consumer); 574 } 575 } 576 577 static final class OfInt 578 extends OfPrimitive<Integer, IntConsumer, Spliterator.OfInt> 579 implements Spliterator.OfInt { 580 OfInt(Supplier<Spliterator.OfInt> supplier)581 OfInt(Supplier<Spliterator.OfInt> supplier) { 582 super(supplier); 583 } 584 } 585 586 static final class OfLong 587 extends OfPrimitive<Long, LongConsumer, Spliterator.OfLong> 588 implements Spliterator.OfLong { 589 OfLong(Supplier<Spliterator.OfLong> supplier)590 OfLong(Supplier<Spliterator.OfLong> supplier) { 591 super(supplier); 592 } 593 } 594 595 static final class OfDouble 596 extends OfPrimitive<Double, DoubleConsumer, Spliterator.OfDouble> 597 implements Spliterator.OfDouble { 598 OfDouble(Supplier<Spliterator.OfDouble> supplier)599 OfDouble(Supplier<Spliterator.OfDouble> supplier) { 600 super(supplier); 601 } 602 } 603 } 604 605 /** 606 * A slice Spliterator from a source Spliterator that reports 607 * {@code SUBSIZED}. 608 * 609 */ 610 static abstract class SliceSpliterator<T, T_SPLITR extends Spliterator<T>> { 611 // The start index of the slice 612 final long sliceOrigin; 613 // One past the last index of the slice 614 final long sliceFence; 615 616 // The spliterator to slice 617 T_SPLITR s; 618 // current (absolute) index, modified on advance/split 619 long index; 620 // one past last (absolute) index or sliceFence, which ever is smaller 621 long fence; 622 SliceSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence)623 SliceSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence) { 624 assert s.hasCharacteristics(Spliterator.SUBSIZED); 625 this.s = s; 626 this.sliceOrigin = sliceOrigin; 627 this.sliceFence = sliceFence; 628 this.index = origin; 629 this.fence = fence; 630 } 631 makeSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence)632 protected abstract T_SPLITR makeSpliterator(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence); 633 trySplit()634 public T_SPLITR trySplit() { 635 if (sliceOrigin >= fence) 636 return null; 637 638 if (index >= fence) 639 return null; 640 641 // Keep splitting until the left and right splits intersect with the slice 642 // thereby ensuring the size estimate decreases. 643 // This also avoids creating empty spliterators which can result in 644 // existing and additionally created F/J tasks that perform 645 // redundant work on no elements. 646 while (true) { 647 @SuppressWarnings("unchecked") 648 T_SPLITR leftSplit = (T_SPLITR) s.trySplit(); 649 if (leftSplit == null) 650 return null; 651 652 long leftSplitFenceUnbounded = index + leftSplit.estimateSize(); 653 long leftSplitFence = Math.min(leftSplitFenceUnbounded, sliceFence); 654 if (sliceOrigin >= leftSplitFence) { 655 // The left split does not intersect with, and is to the left of, the slice 656 // The right split does intersect 657 // Discard the left split and split further with the right split 658 index = leftSplitFence; 659 } 660 else if (leftSplitFence >= sliceFence) { 661 // The right split does not intersect with, and is to the right of, the slice 662 // The left split does intersect 663 // Discard the right split and split further with the left split 664 s = leftSplit; 665 fence = leftSplitFence; 666 } 667 else if (index >= sliceOrigin && leftSplitFenceUnbounded <= sliceFence) { 668 // The left split is contained within the slice, return the underlying left split 669 // Right split is contained within or intersects with the slice 670 index = leftSplitFence; 671 return leftSplit; 672 } else { 673 // The left split intersects with the slice 674 // Right split is contained within or intersects with the slice 675 return makeSpliterator(leftSplit, sliceOrigin, sliceFence, index, index = leftSplitFence); 676 } 677 } 678 } 679 estimateSize()680 public long estimateSize() { 681 return (sliceOrigin < fence) 682 ? fence - Math.max(sliceOrigin, index) : 0; 683 } 684 characteristics()685 public int characteristics() { 686 return s.characteristics(); 687 } 688 689 static final class OfRef<T> 690 extends SliceSpliterator<T, Spliterator<T>> 691 implements Spliterator<T> { 692 OfRef(Spliterator<T> s, long sliceOrigin, long sliceFence)693 OfRef(Spliterator<T> s, long sliceOrigin, long sliceFence) { 694 this(s, sliceOrigin, sliceFence, 0, Math.min(s.estimateSize(), sliceFence)); 695 } 696 OfRef(Spliterator<T> s, long sliceOrigin, long sliceFence, long origin, long fence)697 private OfRef(Spliterator<T> s, 698 long sliceOrigin, long sliceFence, long origin, long fence) { 699 super(s, sliceOrigin, sliceFence, origin, fence); 700 } 701 702 @Override makeSpliterator(Spliterator<T> s, long sliceOrigin, long sliceFence, long origin, long fence)703 protected Spliterator<T> makeSpliterator(Spliterator<T> s, 704 long sliceOrigin, long sliceFence, 705 long origin, long fence) { 706 return new OfRef<>(s, sliceOrigin, sliceFence, origin, fence); 707 } 708 709 @Override tryAdvance(Consumer<? super T> action)710 public boolean tryAdvance(Consumer<? super T> action) { 711 Objects.requireNonNull(action); 712 713 if (sliceOrigin >= fence) 714 return false; 715 716 while (sliceOrigin > index) { 717 s.tryAdvance(e -> {}); 718 index++; 719 } 720 721 if (index >= fence) 722 return false; 723 724 index++; 725 return s.tryAdvance(action); 726 } 727 728 @Override forEachRemaining(Consumer<? super T> action)729 public void forEachRemaining(Consumer<? super T> action) { 730 Objects.requireNonNull(action); 731 732 if (sliceOrigin >= fence) 733 return; 734 735 if (index >= fence) 736 return; 737 738 if (index >= sliceOrigin && (index + s.estimateSize()) <= sliceFence) { 739 // The spliterator is contained within the slice 740 s.forEachRemaining(action); 741 index = fence; 742 } else { 743 // The spliterator intersects with the slice 744 while (sliceOrigin > index) { 745 s.tryAdvance(e -> {}); 746 index++; 747 } 748 // Traverse elements up to the fence 749 for (;index < fence; index++) { 750 s.tryAdvance(action); 751 } 752 } 753 } 754 } 755 756 static abstract class OfPrimitive<T, 757 T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>, 758 T_CONS> 759 extends SliceSpliterator<T, T_SPLITR> 760 implements Spliterator.OfPrimitive<T, T_CONS, T_SPLITR> { 761 OfPrimitive(T_SPLITR s, long sliceOrigin, long sliceFence)762 OfPrimitive(T_SPLITR s, long sliceOrigin, long sliceFence) { 763 this(s, sliceOrigin, sliceFence, 0, Math.min(s.estimateSize(), sliceFence)); 764 } 765 OfPrimitive(T_SPLITR s, long sliceOrigin, long sliceFence, long origin, long fence)766 private OfPrimitive(T_SPLITR s, 767 long sliceOrigin, long sliceFence, long origin, long fence) { 768 super(s, sliceOrigin, sliceFence, origin, fence); 769 } 770 771 @Override tryAdvance(T_CONS action)772 public boolean tryAdvance(T_CONS action) { 773 Objects.requireNonNull(action); 774 775 if (sliceOrigin >= fence) 776 return false; 777 778 while (sliceOrigin > index) { 779 s.tryAdvance(emptyConsumer()); 780 index++; 781 } 782 783 if (index >= fence) 784 return false; 785 786 index++; 787 return s.tryAdvance(action); 788 } 789 790 @Override forEachRemaining(T_CONS action)791 public void forEachRemaining(T_CONS action) { 792 Objects.requireNonNull(action); 793 794 if (sliceOrigin >= fence) 795 return; 796 797 if (index >= fence) 798 return; 799 800 if (index >= sliceOrigin && (index + s.estimateSize()) <= sliceFence) { 801 // The spliterator is contained within the slice 802 s.forEachRemaining(action); 803 index = fence; 804 } else { 805 // The spliterator intersects with the slice 806 while (sliceOrigin > index) { 807 s.tryAdvance(emptyConsumer()); 808 index++; 809 } 810 // Traverse elements up to the fence 811 for (;index < fence; index++) { 812 s.tryAdvance(action); 813 } 814 } 815 } 816 emptyConsumer()817 protected abstract T_CONS emptyConsumer(); 818 } 819 820 static final class OfInt extends OfPrimitive<Integer, Spliterator.OfInt, IntConsumer> 821 implements Spliterator.OfInt { OfInt(Spliterator.OfInt s, long sliceOrigin, long sliceFence)822 OfInt(Spliterator.OfInt s, long sliceOrigin, long sliceFence) { 823 super(s, sliceOrigin, sliceFence); 824 } 825 OfInt(Spliterator.OfInt s, long sliceOrigin, long sliceFence, long origin, long fence)826 OfInt(Spliterator.OfInt s, 827 long sliceOrigin, long sliceFence, long origin, long fence) { 828 super(s, sliceOrigin, sliceFence, origin, fence); 829 } 830 831 @Override makeSpliterator(Spliterator.OfInt s, long sliceOrigin, long sliceFence, long origin, long fence)832 protected Spliterator.OfInt makeSpliterator(Spliterator.OfInt s, 833 long sliceOrigin, long sliceFence, 834 long origin, long fence) { 835 return new SliceSpliterator.OfInt(s, sliceOrigin, sliceFence, origin, fence); 836 } 837 838 @Override emptyConsumer()839 protected IntConsumer emptyConsumer() { 840 return e -> {}; 841 } 842 } 843 844 static final class OfLong extends OfPrimitive<Long, Spliterator.OfLong, LongConsumer> 845 implements Spliterator.OfLong { OfLong(Spliterator.OfLong s, long sliceOrigin, long sliceFence)846 OfLong(Spliterator.OfLong s, long sliceOrigin, long sliceFence) { 847 super(s, sliceOrigin, sliceFence); 848 } 849 OfLong(Spliterator.OfLong s, long sliceOrigin, long sliceFence, long origin, long fence)850 OfLong(Spliterator.OfLong s, 851 long sliceOrigin, long sliceFence, long origin, long fence) { 852 super(s, sliceOrigin, sliceFence, origin, fence); 853 } 854 855 @Override makeSpliterator(Spliterator.OfLong s, long sliceOrigin, long sliceFence, long origin, long fence)856 protected Spliterator.OfLong makeSpliterator(Spliterator.OfLong s, 857 long sliceOrigin, long sliceFence, 858 long origin, long fence) { 859 return new SliceSpliterator.OfLong(s, sliceOrigin, sliceFence, origin, fence); 860 } 861 862 @Override emptyConsumer()863 protected LongConsumer emptyConsumer() { 864 return e -> {}; 865 } 866 } 867 868 static final class OfDouble extends OfPrimitive<Double, Spliterator.OfDouble, DoubleConsumer> 869 implements Spliterator.OfDouble { OfDouble(Spliterator.OfDouble s, long sliceOrigin, long sliceFence)870 OfDouble(Spliterator.OfDouble s, long sliceOrigin, long sliceFence) { 871 super(s, sliceOrigin, sliceFence); 872 } 873 OfDouble(Spliterator.OfDouble s, long sliceOrigin, long sliceFence, long origin, long fence)874 OfDouble(Spliterator.OfDouble s, 875 long sliceOrigin, long sliceFence, long origin, long fence) { 876 super(s, sliceOrigin, sliceFence, origin, fence); 877 } 878 879 @Override makeSpliterator(Spliterator.OfDouble s, long sliceOrigin, long sliceFence, long origin, long fence)880 protected Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s, 881 long sliceOrigin, long sliceFence, 882 long origin, long fence) { 883 return new SliceSpliterator.OfDouble(s, sliceOrigin, sliceFence, origin, fence); 884 } 885 886 @Override emptyConsumer()887 protected DoubleConsumer emptyConsumer() { 888 return e -> {}; 889 } 890 } 891 } 892 893 /** 894 * A slice Spliterator that does not preserve order, if any, of a source 895 * Spliterator. 896 * 897 * Note: The source spliterator may report {@code ORDERED} since that 898 * spliterator be the result of a previous pipeline stage that was 899 * collected to a {@code Node}. It is the order of the pipeline stage 900 * that governs whether the this slice spliterator is to be used or not. 901 */ 902 static abstract class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> { 903 static final int CHUNK_SIZE = 1 << 7; 904 905 // The spliterator to slice 906 protected final T_SPLITR s; 907 protected final boolean unlimited; 908 private final long skipThreshold; 909 private final AtomicLong permits; 910 UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit)911 UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) { 912 this.s = s; 913 this.unlimited = limit < 0; 914 this.skipThreshold = limit >= 0 ? limit : 0; 915 this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip); 916 } 917 UnorderedSliceSpliterator(T_SPLITR s, UnorderedSliceSpliterator<T, T_SPLITR> parent)918 UnorderedSliceSpliterator(T_SPLITR s, 919 UnorderedSliceSpliterator<T, T_SPLITR> parent) { 920 this.s = s; 921 this.unlimited = parent.unlimited; 922 this.permits = parent.permits; 923 this.skipThreshold = parent.skipThreshold; 924 } 925 926 /** 927 * Acquire permission to skip or process elements. The caller must 928 * first acquire the elements, then consult this method for guidance 929 * as to what to do with the data. 930 * 931 * <p>We use an {@code AtomicLong} to atomically maintain a counter, 932 * which is initialized as skip+limit if we are limiting, or skip only 933 * if we are not limiting. The user should consult the method 934 * {@code checkPermits()} before acquiring data elements. 935 * 936 * @param numElements the number of elements the caller has in hand 937 * @return the number of elements that should be processed; any 938 * remaining elements should be discarded. 939 */ acquirePermits(long numElements)940 protected final long acquirePermits(long numElements) { 941 long remainingPermits; 942 long grabbing; 943 // permits never increase, and don't decrease below zero 944 assert numElements > 0; 945 do { 946 remainingPermits = permits.get(); 947 if (remainingPermits == 0) 948 return unlimited ? numElements : 0; 949 grabbing = Math.min(remainingPermits, numElements); 950 } while (grabbing > 0 && 951 !permits.compareAndSet(remainingPermits, remainingPermits - grabbing)); 952 953 if (unlimited) 954 return Math.max(numElements - grabbing, 0); 955 else if (remainingPermits > skipThreshold) 956 return Math.max(grabbing - (remainingPermits - skipThreshold), 0); 957 else 958 return grabbing; 959 } 960 961 enum PermitStatus { NO_MORE, MAYBE_MORE, UNLIMITED } 962 963 /** Call to check if permits might be available before acquiring data */ permitStatus()964 protected final PermitStatus permitStatus() { 965 if (permits.get() > 0) 966 return PermitStatus.MAYBE_MORE; 967 else 968 return unlimited ? PermitStatus.UNLIMITED : PermitStatus.NO_MORE; 969 } 970 trySplit()971 public final T_SPLITR trySplit() { 972 // Stop splitting when there are no more limit permits 973 if (permits.get() == 0) 974 return null; 975 @SuppressWarnings("unchecked") 976 T_SPLITR split = (T_SPLITR) s.trySplit(); 977 return split == null ? null : makeSpliterator(split); 978 } 979 makeSpliterator(T_SPLITR s)980 protected abstract T_SPLITR makeSpliterator(T_SPLITR s); 981 estimateSize()982 public final long estimateSize() { 983 return s.estimateSize(); 984 } 985 characteristics()986 public final int characteristics() { 987 return s.characteristics() & 988 ~(Spliterator.SIZED | Spliterator.SUBSIZED | Spliterator.ORDERED); 989 } 990 991 static final class OfRef<T> extends UnorderedSliceSpliterator<T, Spliterator<T>> 992 implements Spliterator<T>, Consumer<T> { 993 T tmpSlot; 994 OfRef(Spliterator<T> s, long skip, long limit)995 OfRef(Spliterator<T> s, long skip, long limit) { 996 super(s, skip, limit); 997 } 998 OfRef(Spliterator<T> s, OfRef<T> parent)999 OfRef(Spliterator<T> s, OfRef<T> parent) { 1000 super(s, parent); 1001 } 1002 1003 @Override accept(T t)1004 public final void accept(T t) { 1005 tmpSlot = t; 1006 } 1007 1008 @Override tryAdvance(Consumer<? super T> action)1009 public boolean tryAdvance(Consumer<? super T> action) { 1010 Objects.requireNonNull(action); 1011 1012 while (permitStatus() != PermitStatus.NO_MORE) { 1013 if (!s.tryAdvance(this)) 1014 return false; 1015 else if (acquirePermits(1) == 1) { 1016 action.accept(tmpSlot); 1017 tmpSlot = null; 1018 return true; 1019 } 1020 } 1021 return false; 1022 } 1023 1024 @Override forEachRemaining(Consumer<? super T> action)1025 public void forEachRemaining(Consumer<? super T> action) { 1026 Objects.requireNonNull(action); 1027 1028 ArrayBuffer.OfRef<T> sb = null; 1029 PermitStatus permitStatus; 1030 while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) { 1031 if (permitStatus == PermitStatus.MAYBE_MORE) { 1032 // Optimistically traverse elements up to a threshold of CHUNK_SIZE 1033 if (sb == null) 1034 sb = new ArrayBuffer.OfRef<>(CHUNK_SIZE); 1035 else 1036 sb.reset(); 1037 long permitsRequested = 0; 1038 do { } while (s.tryAdvance(sb) && ++permitsRequested < CHUNK_SIZE); 1039 if (permitsRequested == 0) 1040 return; 1041 sb.forEach(action, acquirePermits(permitsRequested)); 1042 } 1043 else { 1044 // Must be UNLIMITED; let 'er rip 1045 s.forEachRemaining(action); 1046 return; 1047 } 1048 } 1049 } 1050 1051 @Override makeSpliterator(Spliterator<T> s)1052 protected Spliterator<T> makeSpliterator(Spliterator<T> s) { 1053 return new UnorderedSliceSpliterator.OfRef<>(s, this); 1054 } 1055 } 1056 1057 /** 1058 * Concrete sub-types must also be an instance of type {@code T_CONS}. 1059 * 1060 * @param <T_BUFF> the type of the spined buffer. Must also be a type of 1061 * {@code T_CONS}. 1062 */ 1063 static abstract class OfPrimitive< 1064 T, 1065 T_CONS, 1066 T_BUFF extends ArrayBuffer.OfPrimitive<T_CONS>, 1067 T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>> 1068 extends UnorderedSliceSpliterator<T, T_SPLITR> 1069 implements Spliterator.OfPrimitive<T, T_CONS, T_SPLITR> { OfPrimitive(T_SPLITR s, long skip, long limit)1070 OfPrimitive(T_SPLITR s, long skip, long limit) { 1071 super(s, skip, limit); 1072 } 1073 OfPrimitive(T_SPLITR s, UnorderedSliceSpliterator.OfPrimitive<T, T_CONS, T_BUFF, T_SPLITR> parent)1074 OfPrimitive(T_SPLITR s, UnorderedSliceSpliterator.OfPrimitive<T, T_CONS, T_BUFF, T_SPLITR> parent) { 1075 super(s, parent); 1076 } 1077 1078 @Override tryAdvance(T_CONS action)1079 public boolean tryAdvance(T_CONS action) { 1080 Objects.requireNonNull(action); 1081 @SuppressWarnings("unchecked") 1082 T_CONS consumer = (T_CONS) this; 1083 1084 while (permitStatus() != PermitStatus.NO_MORE) { 1085 if (!s.tryAdvance(consumer)) 1086 return false; 1087 else if (acquirePermits(1) == 1) { 1088 acceptConsumed(action); 1089 return true; 1090 } 1091 } 1092 return false; 1093 } 1094 acceptConsumed(T_CONS action)1095 protected abstract void acceptConsumed(T_CONS action); 1096 1097 @Override forEachRemaining(T_CONS action)1098 public void forEachRemaining(T_CONS action) { 1099 Objects.requireNonNull(action); 1100 1101 T_BUFF sb = null; 1102 PermitStatus permitStatus; 1103 while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) { 1104 if (permitStatus == PermitStatus.MAYBE_MORE) { 1105 // Optimistically traverse elements up to a threshold of CHUNK_SIZE 1106 if (sb == null) 1107 sb = bufferCreate(CHUNK_SIZE); 1108 else 1109 sb.reset(); 1110 @SuppressWarnings("unchecked") 1111 T_CONS sbc = (T_CONS) sb; 1112 long permitsRequested = 0; 1113 do { } while (s.tryAdvance(sbc) && ++permitsRequested < CHUNK_SIZE); 1114 if (permitsRequested == 0) 1115 return; 1116 sb.forEach(action, acquirePermits(permitsRequested)); 1117 } 1118 else { 1119 // Must be UNLIMITED; let 'er rip 1120 s.forEachRemaining(action); 1121 return; 1122 } 1123 } 1124 } 1125 bufferCreate(int initialCapacity)1126 protected abstract T_BUFF bufferCreate(int initialCapacity); 1127 } 1128 1129 static final class OfInt 1130 extends OfPrimitive<Integer, IntConsumer, ArrayBuffer.OfInt, Spliterator.OfInt> 1131 implements Spliterator.OfInt, IntConsumer { 1132 1133 int tmpValue; 1134 OfInt(Spliterator.OfInt s, long skip, long limit)1135 OfInt(Spliterator.OfInt s, long skip, long limit) { 1136 super(s, skip, limit); 1137 } 1138 OfInt(Spliterator.OfInt s, UnorderedSliceSpliterator.OfInt parent)1139 OfInt(Spliterator.OfInt s, UnorderedSliceSpliterator.OfInt parent) { 1140 super(s, parent); 1141 } 1142 1143 @Override accept(int value)1144 public void accept(int value) { 1145 tmpValue = value; 1146 } 1147 1148 @Override acceptConsumed(IntConsumer action)1149 protected void acceptConsumed(IntConsumer action) { 1150 action.accept(tmpValue); 1151 } 1152 1153 @Override bufferCreate(int initialCapacity)1154 protected ArrayBuffer.OfInt bufferCreate(int initialCapacity) { 1155 return new ArrayBuffer.OfInt(initialCapacity); 1156 } 1157 1158 @Override makeSpliterator(Spliterator.OfInt s)1159 protected Spliterator.OfInt makeSpliterator(Spliterator.OfInt s) { 1160 return new UnorderedSliceSpliterator.OfInt(s, this); 1161 } 1162 } 1163 1164 static final class OfLong 1165 extends OfPrimitive<Long, LongConsumer, ArrayBuffer.OfLong, Spliterator.OfLong> 1166 implements Spliterator.OfLong, LongConsumer { 1167 1168 long tmpValue; 1169 OfLong(Spliterator.OfLong s, long skip, long limit)1170 OfLong(Spliterator.OfLong s, long skip, long limit) { 1171 super(s, skip, limit); 1172 } 1173 OfLong(Spliterator.OfLong s, UnorderedSliceSpliterator.OfLong parent)1174 OfLong(Spliterator.OfLong s, UnorderedSliceSpliterator.OfLong parent) { 1175 super(s, parent); 1176 } 1177 1178 @Override accept(long value)1179 public void accept(long value) { 1180 tmpValue = value; 1181 } 1182 1183 @Override acceptConsumed(LongConsumer action)1184 protected void acceptConsumed(LongConsumer action) { 1185 action.accept(tmpValue); 1186 } 1187 1188 @Override bufferCreate(int initialCapacity)1189 protected ArrayBuffer.OfLong bufferCreate(int initialCapacity) { 1190 return new ArrayBuffer.OfLong(initialCapacity); 1191 } 1192 1193 @Override makeSpliterator(Spliterator.OfLong s)1194 protected Spliterator.OfLong makeSpliterator(Spliterator.OfLong s) { 1195 return new UnorderedSliceSpliterator.OfLong(s, this); 1196 } 1197 } 1198 1199 static final class OfDouble 1200 extends OfPrimitive<Double, DoubleConsumer, ArrayBuffer.OfDouble, Spliterator.OfDouble> 1201 implements Spliterator.OfDouble, DoubleConsumer { 1202 1203 double tmpValue; 1204 OfDouble(Spliterator.OfDouble s, long skip, long limit)1205 OfDouble(Spliterator.OfDouble s, long skip, long limit) { 1206 super(s, skip, limit); 1207 } 1208 OfDouble(Spliterator.OfDouble s, UnorderedSliceSpliterator.OfDouble parent)1209 OfDouble(Spliterator.OfDouble s, UnorderedSliceSpliterator.OfDouble parent) { 1210 super(s, parent); 1211 } 1212 1213 @Override accept(double value)1214 public void accept(double value) { 1215 tmpValue = value; 1216 } 1217 1218 @Override acceptConsumed(DoubleConsumer action)1219 protected void acceptConsumed(DoubleConsumer action) { 1220 action.accept(tmpValue); 1221 } 1222 1223 @Override bufferCreate(int initialCapacity)1224 protected ArrayBuffer.OfDouble bufferCreate(int initialCapacity) { 1225 return new ArrayBuffer.OfDouble(initialCapacity); 1226 } 1227 1228 @Override makeSpliterator(Spliterator.OfDouble s)1229 protected Spliterator.OfDouble makeSpliterator(Spliterator.OfDouble s) { 1230 return new UnorderedSliceSpliterator.OfDouble(s, this); 1231 } 1232 } 1233 } 1234 1235 /** 1236 * A wrapping spliterator that only reports distinct elements of the 1237 * underlying spliterator. Does not preserve size and encounter order. 1238 */ 1239 static final class DistinctSpliterator<T> implements Spliterator<T>, Consumer<T> { 1240 1241 // The value to represent null in the ConcurrentHashMap 1242 private static final Object NULL_VALUE = new Object(); 1243 1244 // The underlying spliterator 1245 private final Spliterator<T> s; 1246 1247 // ConcurrentHashMap holding distinct elements as keys 1248 private final ConcurrentHashMap<T, Boolean> seen; 1249 1250 // Temporary element, only used with tryAdvance 1251 private T tmpSlot; 1252 DistinctSpliterator(Spliterator<T> s)1253 DistinctSpliterator(Spliterator<T> s) { 1254 this(s, new ConcurrentHashMap<>()); 1255 } 1256 DistinctSpliterator(Spliterator<T> s, ConcurrentHashMap<T, Boolean> seen)1257 private DistinctSpliterator(Spliterator<T> s, ConcurrentHashMap<T, Boolean> seen) { 1258 this.s = s; 1259 this.seen = seen; 1260 } 1261 1262 @Override accept(T t)1263 public void accept(T t) { 1264 this.tmpSlot = t; 1265 } 1266 1267 @SuppressWarnings("unchecked") mapNull(T t)1268 private T mapNull(T t) { 1269 return t != null ? t : (T) NULL_VALUE; 1270 } 1271 1272 @Override tryAdvance(Consumer<? super T> action)1273 public boolean tryAdvance(Consumer<? super T> action) { 1274 while (s.tryAdvance(this)) { 1275 if (seen.putIfAbsent(mapNull(tmpSlot), Boolean.TRUE) == null) { 1276 action.accept(tmpSlot); 1277 tmpSlot = null; 1278 return true; 1279 } 1280 } 1281 return false; 1282 } 1283 1284 @Override forEachRemaining(Consumer<? super T> action)1285 public void forEachRemaining(Consumer<? super T> action) { 1286 s.forEachRemaining(t -> { 1287 if (seen.putIfAbsent(mapNull(t), Boolean.TRUE) == null) { 1288 action.accept(t); 1289 } 1290 }); 1291 } 1292 1293 @Override trySplit()1294 public Spliterator<T> trySplit() { 1295 Spliterator<T> split = s.trySplit(); 1296 return (split != null) ? new DistinctSpliterator<>(split, seen) : null; 1297 } 1298 1299 @Override estimateSize()1300 public long estimateSize() { 1301 return s.estimateSize(); 1302 } 1303 1304 @Override characteristics()1305 public int characteristics() { 1306 return (s.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED | 1307 Spliterator.SORTED | Spliterator.ORDERED)) 1308 | Spliterator.DISTINCT; 1309 } 1310 1311 @Override getComparator()1312 public Comparator<? super T> getComparator() { 1313 return s.getComparator(); 1314 } 1315 } 1316 1317 /** 1318 * A Spliterator that infinitely supplies elements in no particular order. 1319 * 1320 * <p>Splitting divides the estimated size in two and stops when the 1321 * estimate size is 0. 1322 * 1323 * <p>The {@code forEachRemaining} method if invoked will never terminate. 1324 * The {@code tryAdvance} method always returns true. 1325 * 1326 */ 1327 static abstract class InfiniteSupplyingSpliterator<T> implements Spliterator<T> { 1328 long estimate; 1329 InfiniteSupplyingSpliterator(long estimate)1330 protected InfiniteSupplyingSpliterator(long estimate) { 1331 this.estimate = estimate; 1332 } 1333 1334 @Override estimateSize()1335 public long estimateSize() { 1336 return estimate; 1337 } 1338 1339 @Override characteristics()1340 public int characteristics() { 1341 return IMMUTABLE; 1342 } 1343 1344 static final class OfRef<T> extends InfiniteSupplyingSpliterator<T> { 1345 final Supplier<T> s; 1346 OfRef(long size, Supplier<T> s)1347 OfRef(long size, Supplier<T> s) { 1348 super(size); 1349 this.s = s; 1350 } 1351 1352 @Override tryAdvance(Consumer<? super T> action)1353 public boolean tryAdvance(Consumer<? super T> action) { 1354 Objects.requireNonNull(action); 1355 1356 action.accept(s.get()); 1357 return true; 1358 } 1359 1360 @Override trySplit()1361 public Spliterator<T> trySplit() { 1362 if (estimate == 0) 1363 return null; 1364 return new InfiniteSupplyingSpliterator.OfRef<>(estimate >>>= 1, s); 1365 } 1366 } 1367 1368 static final class OfInt extends InfiniteSupplyingSpliterator<Integer> 1369 implements Spliterator.OfInt { 1370 final IntSupplier s; 1371 OfInt(long size, IntSupplier s)1372 OfInt(long size, IntSupplier s) { 1373 super(size); 1374 this.s = s; 1375 } 1376 1377 @Override tryAdvance(IntConsumer action)1378 public boolean tryAdvance(IntConsumer action) { 1379 Objects.requireNonNull(action); 1380 1381 action.accept(s.getAsInt()); 1382 return true; 1383 } 1384 1385 @Override trySplit()1386 public Spliterator.OfInt trySplit() { 1387 if (estimate == 0) 1388 return null; 1389 return new InfiniteSupplyingSpliterator.OfInt(estimate = estimate >>> 1, s); 1390 } 1391 } 1392 1393 static final class OfLong extends InfiniteSupplyingSpliterator<Long> 1394 implements Spliterator.OfLong { 1395 final LongSupplier s; 1396 OfLong(long size, LongSupplier s)1397 OfLong(long size, LongSupplier s) { 1398 super(size); 1399 this.s = s; 1400 } 1401 1402 @Override tryAdvance(LongConsumer action)1403 public boolean tryAdvance(LongConsumer action) { 1404 Objects.requireNonNull(action); 1405 1406 action.accept(s.getAsLong()); 1407 return true; 1408 } 1409 1410 @Override trySplit()1411 public Spliterator.OfLong trySplit() { 1412 if (estimate == 0) 1413 return null; 1414 return new InfiniteSupplyingSpliterator.OfLong(estimate = estimate >>> 1, s); 1415 } 1416 } 1417 1418 static final class OfDouble extends InfiniteSupplyingSpliterator<Double> 1419 implements Spliterator.OfDouble { 1420 final DoubleSupplier s; 1421 OfDouble(long size, DoubleSupplier s)1422 OfDouble(long size, DoubleSupplier s) { 1423 super(size); 1424 this.s = s; 1425 } 1426 1427 @Override tryAdvance(DoubleConsumer action)1428 public boolean tryAdvance(DoubleConsumer action) { 1429 Objects.requireNonNull(action); 1430 1431 action.accept(s.getAsDouble()); 1432 return true; 1433 } 1434 1435 @Override trySplit()1436 public Spliterator.OfDouble trySplit() { 1437 if (estimate == 0) 1438 return null; 1439 return new InfiniteSupplyingSpliterator.OfDouble(estimate = estimate >>> 1, s); 1440 } 1441 } 1442 } 1443 1444 // @@@ Consolidate with Node.Builder 1445 static abstract class ArrayBuffer { 1446 int index; 1447 reset()1448 void reset() { 1449 index = 0; 1450 } 1451 1452 static final class OfRef<T> extends ArrayBuffer implements Consumer<T> { 1453 final Object[] array; 1454 OfRef(int size)1455 OfRef(int size) { 1456 this.array = new Object[size]; 1457 } 1458 1459 @Override accept(T t)1460 public void accept(T t) { 1461 array[index++] = t; 1462 } 1463 forEach(Consumer<? super T> action, long fence)1464 public void forEach(Consumer<? super T> action, long fence) { 1465 for (int i = 0; i < fence; i++) { 1466 @SuppressWarnings("unchecked") 1467 T t = (T) array[i]; 1468 action.accept(t); 1469 } 1470 } 1471 } 1472 1473 static abstract class OfPrimitive<T_CONS> extends ArrayBuffer { 1474 int index; 1475 1476 @Override reset()1477 void reset() { 1478 index = 0; 1479 } 1480 forEach(T_CONS action, long fence)1481 abstract void forEach(T_CONS action, long fence); 1482 } 1483 1484 static final class OfInt extends OfPrimitive<IntConsumer> 1485 implements IntConsumer { 1486 final int[] array; 1487 OfInt(int size)1488 OfInt(int size) { 1489 this.array = new int[size]; 1490 } 1491 1492 @Override accept(int t)1493 public void accept(int t) { 1494 array[index++] = t; 1495 } 1496 1497 @Override forEach(IntConsumer action, long fence)1498 public void forEach(IntConsumer action, long fence) { 1499 for (int i = 0; i < fence; i++) { 1500 action.accept(array[i]); 1501 } 1502 } 1503 } 1504 1505 static final class OfLong extends OfPrimitive<LongConsumer> 1506 implements LongConsumer { 1507 final long[] array; 1508 OfLong(int size)1509 OfLong(int size) { 1510 this.array = new long[size]; 1511 } 1512 1513 @Override accept(long t)1514 public void accept(long t) { 1515 array[index++] = t; 1516 } 1517 1518 @Override forEach(LongConsumer action, long fence)1519 public void forEach(LongConsumer action, long fence) { 1520 for (int i = 0; i < fence; i++) { 1521 action.accept(array[i]); 1522 } 1523 } 1524 } 1525 1526 static final class OfDouble extends OfPrimitive<DoubleConsumer> 1527 implements DoubleConsumer { 1528 final double[] array; 1529 OfDouble(int size)1530 OfDouble(int size) { 1531 this.array = new double[size]; 1532 } 1533 1534 @Override accept(double t)1535 public void accept(double t) { 1536 array[index++] = t; 1537 } 1538 1539 @Override forEach(DoubleConsumer action, long fence)1540 void forEach(DoubleConsumer action, long fence) { 1541 for (int i = 0; i < fence; i++) { 1542 action.accept(array[i]); 1543 } 1544 } 1545 } 1546 } 1547 } 1548 1549