1 /*
2  * Copyright (c) 2013, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 package java.util.stream;
26 
27 import java.util.LongSummaryStatistics;
28 import java.util.Objects;
29 import java.util.OptionalDouble;
30 import java.util.OptionalLong;
31 import java.util.PrimitiveIterator;
32 import java.util.Spliterator;
33 import java.util.Spliterators;
34 import java.util.function.BiConsumer;
35 import java.util.function.BinaryOperator;
36 import java.util.function.IntFunction;
37 import java.util.function.LongBinaryOperator;
38 import java.util.function.LongConsumer;
39 import java.util.function.LongFunction;
40 import java.util.function.LongPredicate;
41 import java.util.function.LongToDoubleFunction;
42 import java.util.function.LongToIntFunction;
43 import java.util.function.LongUnaryOperator;
44 import java.util.function.ObjLongConsumer;
45 import java.util.function.Supplier;
46 
47 /**
48  * Abstract base class for an intermediate pipeline stage or pipeline source
49  * stage implementing whose elements are of type {@code long}.
50  *
51  * @param <E_IN> type of elements in the upstream source
52  * @since 1.8
53  * @hide Visible for CTS testing only (OpenJDK8 tests).
54  */
55 // Android-changed: Made public for CTS tests only.
56 public abstract class LongPipeline<E_IN>
57         extends AbstractPipeline<E_IN, Long, LongStream>
58         implements LongStream {
59 
60     /**
61      * Constructor for the head of a stream pipeline.
62      *
63      * @param source {@code Supplier<Spliterator>} describing the stream source
64      * @param sourceFlags the source flags for the stream source, described in
65      *        {@link StreamOpFlag}
66      * @param parallel {@code true} if the pipeline is parallel
67      */
LongPipeline(Supplier<? extends Spliterator<Long>> source, int sourceFlags, boolean parallel)68     LongPipeline(Supplier<? extends Spliterator<Long>> source,
69                  int sourceFlags, boolean parallel) {
70         super(source, sourceFlags, parallel);
71     }
72 
73     /**
74      * Constructor for the head of a stream pipeline.
75      *
76      * @param source {@code Spliterator} describing the stream source
77      * @param sourceFlags the source flags for the stream source, described in
78      *        {@link StreamOpFlag}
79      * @param parallel {@code true} if the pipeline is parallel
80      */
LongPipeline(Spliterator<Long> source, int sourceFlags, boolean parallel)81     LongPipeline(Spliterator<Long> source,
82                  int sourceFlags, boolean parallel) {
83         super(source, sourceFlags, parallel);
84     }
85 
86     /**
87      * Constructor for appending an intermediate operation onto an existing pipeline.
88      *
89      * @param upstream the upstream element source.
90      * @param opFlags the operation flags
91      */
LongPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags)92     LongPipeline(AbstractPipeline<?, E_IN, ?> upstream, int opFlags) {
93         super(upstream, opFlags);
94     }
95 
96     /**
97      * Adapt a {@code Sink<Long> to an {@code LongConsumer}, ideally simply
98      * by casting.
99      */
adapt(Sink<Long> sink)100     private static LongConsumer adapt(Sink<Long> sink) {
101         if (sink instanceof LongConsumer) {
102             return (LongConsumer) sink;
103         } else {
104             if (Tripwire.ENABLED)
105                 Tripwire.trip(AbstractPipeline.class,
106                               "using LongStream.adapt(Sink<Long> s)");
107             return sink::accept;
108         }
109     }
110 
111     /**
112      * Adapt a {@code Spliterator<Long>} to a {@code Spliterator.OfLong}.
113      *
114      * @implNote
115      * The implementation attempts to cast to a Spliterator.OfLong, and throws
116      * an exception if this cast is not possible.
117      */
adapt(Spliterator<Long> s)118     private static Spliterator.OfLong adapt(Spliterator<Long> s) {
119         if (s instanceof Spliterator.OfLong) {
120             return (Spliterator.OfLong) s;
121         } else {
122             if (Tripwire.ENABLED)
123                 Tripwire.trip(AbstractPipeline.class,
124                               "using LongStream.adapt(Spliterator<Long> s)");
125             throw new UnsupportedOperationException("LongStream.adapt(Spliterator<Long> s)");
126         }
127     }
128 
129 
130     // Shape-specific methods
131 
132     @Override
133     // Android-changed: Make public, to match the method it's overriding.
getOutputShape()134     public final StreamShape getOutputShape() {
135         return StreamShape.LONG_VALUE;
136     }
137 
138     @Override
139     // Android-changed: Make public, to match the method it's overriding.
evaluateToNode(PipelineHelper<Long> helper, Spliterator<P_IN> spliterator, boolean flattenTree, IntFunction<Long[]> generator)140     public final <P_IN> Node<Long> evaluateToNode(PipelineHelper<Long> helper,
141                                            Spliterator<P_IN> spliterator,
142                                            boolean flattenTree,
143                                            IntFunction<Long[]> generator) {
144         return Nodes.collectLong(helper, spliterator, flattenTree);
145     }
146 
147     @Override
148     // Android-changed: Make public, to match the method it's overriding.
wrap(PipelineHelper<Long> ph, Supplier<Spliterator<P_IN>> supplier, boolean isParallel)149     public final <P_IN> Spliterator<Long> wrap(PipelineHelper<Long> ph,
150                                         Supplier<Spliterator<P_IN>> supplier,
151                                         boolean isParallel) {
152         return new StreamSpliterators.LongWrappingSpliterator<>(ph, supplier, isParallel);
153     }
154 
155     @Override
156     @SuppressWarnings("unchecked")
157     // Android-changed: Make public, to match the method it's overriding.
lazySpliterator(Supplier<? extends Spliterator<Long>> supplier)158     public final Spliterator.OfLong lazySpliterator(Supplier<? extends Spliterator<Long>> supplier) {
159         return new StreamSpliterators.DelegatingSpliterator.OfLong((Supplier<Spliterator.OfLong>) supplier);
160     }
161 
162     @Override
163     // Android-changed: Make public, to match the method it's overriding.
forEachWithCancel(Spliterator<Long> spliterator, Sink<Long> sink)164     public final void forEachWithCancel(Spliterator<Long> spliterator, Sink<Long> sink) {
165         Spliterator.OfLong spl = adapt(spliterator);
166         LongConsumer adaptedSink =  adapt(sink);
167         do { } while (!sink.cancellationRequested() && spl.tryAdvance(adaptedSink));
168     }
169 
170     @Override
171     // Android-changed: Make public, to match the method it's overriding.
makeNodeBuilder(long exactSizeIfKnown, IntFunction<Long[]> generator)172     public final Node.Builder<Long> makeNodeBuilder(long exactSizeIfKnown, IntFunction<Long[]> generator) {
173         return Nodes.longBuilder(exactSizeIfKnown);
174     }
175 
176 
177     // LongStream
178 
179     @Override
iterator()180     public final PrimitiveIterator.OfLong iterator() {
181         return Spliterators.iterator(spliterator());
182     }
183 
184     @Override
spliterator()185     public final Spliterator.OfLong spliterator() {
186         return adapt(super.spliterator());
187     }
188 
189     // Stateless intermediate ops from LongStream
190 
191     @Override
asDoubleStream()192     public final DoubleStream asDoubleStream() {
193         return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
194                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
195             @Override
196             // Android-changed: Make public, to match the method it's overriding.
197             public Sink<Long> opWrapSink(int flags, Sink<Double> sink) {
198                 return new Sink.ChainedLong<Double>(sink) {
199                     @Override
200                     public void accept(long t) {
201                         downstream.accept((double) t);
202                     }
203                 };
204             }
205         };
206     }
207 
208     @Override
209     public final Stream<Long> boxed() {
210         return mapToObj(Long::valueOf);
211     }
212 
213     @Override
214     public final LongStream map(LongUnaryOperator mapper) {
215         Objects.requireNonNull(mapper);
216         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
217                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
218             @Override
219             // Android-changed: Make public, to match the method it's overriding.
220             public Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
221                 return new Sink.ChainedLong<Long>(sink) {
222                     @Override
223                     public void accept(long t) {
224                         downstream.accept(mapper.applyAsLong(t));
225                     }
226                 };
227             }
228         };
229     }
230 
231     @Override
232     public final <U> Stream<U> mapToObj(LongFunction<? extends U> mapper) {
233         Objects.requireNonNull(mapper);
234         return new ReferencePipeline.StatelessOp<Long, U>(this, StreamShape.LONG_VALUE,
235                                                           StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
236             @Override
237             // Android-changed: Make public, to match the method it's overriding.
238             public Sink<Long> opWrapSink(int flags, Sink<U> sink) {
239                 return new Sink.ChainedLong<U>(sink) {
240                     @Override
241                     public void accept(long t) {
242                         downstream.accept(mapper.apply(t));
243                     }
244                 };
245             }
246         };
247     }
248 
249     @Override
250     public final IntStream mapToInt(LongToIntFunction mapper) {
251         Objects.requireNonNull(mapper);
252         return new IntPipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
253                                                  StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
254             @Override
255             // Android-changed: Make public, to match the method it's overriding.
256             public Sink<Long> opWrapSink(int flags, Sink<Integer> sink) {
257                 return new Sink.ChainedLong<Integer>(sink) {
258                     @Override
259                     public void accept(long t) {
260                         downstream.accept(mapper.applyAsInt(t));
261                     }
262                 };
263             }
264         };
265     }
266 
267     @Override
268     public final DoubleStream mapToDouble(LongToDoubleFunction mapper) {
269         Objects.requireNonNull(mapper);
270         return new DoublePipeline.StatelessOp<Long>(this, StreamShape.LONG_VALUE,
271                                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
272             @Override
273             // Android-changed: Make public, to match the method it's overriding.
274             public Sink<Long> opWrapSink(int flags, Sink<Double> sink) {
275                 return new Sink.ChainedLong<Double>(sink) {
276                     @Override
277                     public void accept(long t) {
278                         downstream.accept(mapper.applyAsDouble(t));
279                     }
280                 };
281             }
282         };
283     }
284 
285     @Override
286     public final LongStream flatMap(LongFunction<? extends LongStream> mapper) {
287         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
288                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
289             @Override
290             // Android-changed: Make public, to match the method it's overriding.
291             public Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
292                 return new Sink.ChainedLong<Long>(sink) {
293                     @Override
294                     public void begin(long size) {
295                         downstream.begin(-1);
296                     }
297 
298                     @Override
299                     public void accept(long t) {
300                         try (LongStream result = mapper.apply(t)) {
301                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
302                             if (result != null)
303                                 result.sequential().forEach(i -> downstream.accept(i));
304                         }
305                     }
306                 };
307             }
308         };
309     }
310 
311     @Override
312     public LongStream unordered() {
313         if (!isOrdered())
314             return this;
315         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE, StreamOpFlag.NOT_ORDERED) {
316             @Override
317             // Android-changed: Make public, to match the method it's overriding.
318             public Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
319                 return sink;
320             }
321         };
322     }
323 
324     @Override
325     public final LongStream filter(LongPredicate predicate) {
326         Objects.requireNonNull(predicate);
327         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
328                                      StreamOpFlag.NOT_SIZED) {
329             @Override
330             // Android-changed: Make public, to match the method it's overriding.
331             public Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
332                 return new Sink.ChainedLong<Long>(sink) {
333                     @Override
334                     public void begin(long size) {
335                         downstream.begin(-1);
336                     }
337 
338                     @Override
339                     public void accept(long t) {
340                         if (predicate.test(t))
341                             downstream.accept(t);
342                     }
343                 };
344             }
345         };
346     }
347 
348     @Override
349     public final LongStream peek(LongConsumer action) {
350         Objects.requireNonNull(action);
351         return new StatelessOp<Long>(this, StreamShape.LONG_VALUE,
352                                      0) {
353             @Override
354             // Android-changed: Make public, to match the method it's overriding.
355             public Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
356                 return new Sink.ChainedLong<Long>(sink) {
357                     @Override
358                     public void accept(long t) {
359                         action.accept(t);
360                         downstream.accept(t);
361                     }
362                 };
363             }
364         };
365     }
366 
367     // Stateful intermediate ops from LongStream
368 
369     @Override
370     public final LongStream limit(long maxSize) {
371         if (maxSize < 0)
372             throw new IllegalArgumentException(Long.toString(maxSize));
373         return SliceOps.makeLong(this, 0, maxSize);
374     }
375 
376     @Override
377     public final LongStream skip(long n) {
378         if (n < 0)
379             throw new IllegalArgumentException(Long.toString(n));
380         if (n == 0)
381             return this;
382         else
383             return SliceOps.makeLong(this, n, -1);
384     }
385 
386     @Override
387     public final LongStream sorted() {
388         return SortedOps.makeLong(this);
389     }
390 
391     @Override
392     public final LongStream distinct() {
393         // While functional and quick to implement, this approach is not very efficient.
394         // An efficient version requires a long-specific map/set implementation.
395         return boxed().distinct().mapToLong(i -> (long) i);
396     }
397 
398     // Terminal ops from LongStream
399 
400     @Override
401     public void forEach(LongConsumer action) {
402         evaluate(ForEachOps.makeLong(action, false));
403     }
404 
405     @Override
406     public void forEachOrdered(LongConsumer action) {
407         evaluate(ForEachOps.makeLong(action, true));
408     }
409 
410     @Override
411     public final long sum() {
412         // use better algorithm to compensate for intermediate overflow?
413         return reduce(0, Long::sum);
414     }
415 
416     @Override
417     public final OptionalLong min() {
418         return reduce(Math::min);
419     }
420 
421     @Override
422     public final OptionalLong max() {
423         return reduce(Math::max);
424     }
425 
426     @Override
427     public final OptionalDouble average() {
428         long[] avg = collect(() -> new long[2],
429                              (ll, i) -> {
430                                  ll[0]++;
431                                  ll[1] += i;
432                              },
433                              (ll, rr) -> {
434                                  ll[0] += rr[0];
435                                  ll[1] += rr[1];
436                              });
437         return avg[0] > 0
438                ? OptionalDouble.of((double) avg[1] / avg[0])
439                : OptionalDouble.empty();
440     }
441 
442     @Override
443     public final long count() {
444         return map(e -> 1L).sum();
445     }
446 
447     @Override
448     public final LongSummaryStatistics summaryStatistics() {
449         return collect(LongSummaryStatistics::new, LongSummaryStatistics::accept,
450                        LongSummaryStatistics::combine);
451     }
452 
453     @Override
454     public final long reduce(long identity, LongBinaryOperator op) {
455         return evaluate(ReduceOps.makeLong(identity, op));
456     }
457 
458     @Override
459     public final OptionalLong reduce(LongBinaryOperator op) {
460         return evaluate(ReduceOps.makeLong(op));
461     }
462 
463     @Override
464     public final <R> R collect(Supplier<R> supplier,
465                                ObjLongConsumer<R> accumulator,
466                                BiConsumer<R, R> combiner) {
467         BinaryOperator<R> operator = (left, right) -> {
468             combiner.accept(left, right);
469             return left;
470         };
471         return evaluate(ReduceOps.makeLong(supplier, accumulator, operator));
472     }
473 
474     @Override
475     public final boolean anyMatch(LongPredicate predicate) {
476         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ANY));
477     }
478 
479     @Override
480     public final boolean allMatch(LongPredicate predicate) {
481         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.ALL));
482     }
483 
484     @Override
485     public final boolean noneMatch(LongPredicate predicate) {
486         return evaluate(MatchOps.makeLong(predicate, MatchOps.MatchKind.NONE));
487     }
488 
489     @Override
490     public final OptionalLong findFirst() {
491         return evaluate(FindOps.makeLong(true));
492     }
493 
494     @Override
495     public final OptionalLong findAny() {
496         return evaluate(FindOps.makeLong(false));
497     }
498 
499     @Override
500     public final long[] toArray() {
501         return Nodes.flattenLong((Node.OfLong) evaluateToArrayNode(Long[]::new))
502                 .asPrimitiveArray();
503     }
504 
505 
506     //
507 
508     /**
509      * Source stage of a LongPipeline.
510      *
511      * @param <E_IN> type of elements in the upstream source
512      * @since 1.8
513      * @hide Made public for CTS tests only (OpenJDK 8 streams tests).
514      */
515     // Android-changed: Made public for CTS tests only.
516     public static class Head<E_IN> extends LongPipeline<E_IN> {
517         /**
518          * Constructor for the source stage of a LongStream.
519          *
520          * @param source {@code Supplier<Spliterator>} describing the stream
521          *               source
522          * @param sourceFlags the source flags for the stream source, described
523          *                    in {@link StreamOpFlag}
524          * @param parallel {@code true} if the pipeline is parallel
525          */
526         // Android-changed: Made public for CTS tests only.
527         public Head(Supplier<? extends Spliterator<Long>> source,
528              int sourceFlags, boolean parallel) {
529             super(source, sourceFlags, parallel);
530         }
531 
532         /**
533          * Constructor for the source stage of a LongStream.
534          *
535          * @param source {@code Spliterator} describing the stream source
536          * @param sourceFlags the source flags for the stream source, described
537          *                    in {@link StreamOpFlag}
538          * @param parallel {@code true} if the pipeline is parallel
539          */
540         // Android-changed: Made public for CTS tests only.
541         public Head(Spliterator<Long> source,
542              int sourceFlags, boolean parallel) {
543             super(source, sourceFlags, parallel);
544         }
545 
546         @Override
547         // Android-changed: Make public, to match the method it's overriding.
548         public final boolean opIsStateful() {
549             throw new UnsupportedOperationException();
550         }
551 
552         @Override
553         // Android-changed: Make public, to match the method it's overriding.
554         public final Sink<E_IN> opWrapSink(int flags, Sink<Long> sink) {
555             throw new UnsupportedOperationException();
556         }
557 
558         // Optimized sequential terminal operations for the head of the pipeline
559 
560         @Override
561         public void forEach(LongConsumer action) {
562             if (!isParallel()) {
563                 adapt(sourceStageSpliterator()).forEachRemaining(action);
564             } else {
565                 super.forEach(action);
566             }
567         }
568 
569         @Override
570         public void forEachOrdered(LongConsumer action) {
571             if (!isParallel()) {
572                 adapt(sourceStageSpliterator()).forEachRemaining(action);
573             } else {
574                 super.forEachOrdered(action);
575             }
576         }
577     }
578 
579     /** Base class for a stateless intermediate stage of a LongStream.
580      *
581      * @param <E_IN> type of elements in the upstream source
582      * @since 1.8
583      * @hide Visible for CTS testing only (OpenJDK8 tests).
584      */
585     // Android-changed: Made public for CTS tests only.
586     public abstract static class StatelessOp<E_IN> extends LongPipeline<E_IN> {
587         /**
588          * Construct a new LongStream by appending a stateless intermediate
589          * operation to an existing stream.
590          * @param upstream The upstream pipeline stage
591          * @param inputShape The stream shape for the upstream pipeline stage
592          * @param opFlags Operation flags for the new stage
593          */
594         // Android-changed: Made public for CTS tests only.
595         public StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
596                     StreamShape inputShape,
597                     int opFlags) {
598             super(upstream, opFlags);
599             assert upstream.getOutputShape() == inputShape;
600         }
601 
602         @Override
603         // Android-changed: Make public, to match the method it's overriding.
604         public final boolean opIsStateful() {
605             return false;
606         }
607     }
608 
609     /**
610      * Base class for a stateful intermediate stage of a LongStream.
611      *
612      * @param <E_IN> type of elements in the upstream source
613      * @since 1.8
614      * @hide Visible for CTS testing only (OpenJDK8 tests).
615      */
616     // Android-changed: Made public for CTS tests only.
617     public abstract static class StatefulOp<E_IN> extends LongPipeline<E_IN> {
618         /**
619          * Construct a new LongStream by appending a stateful intermediate
620          * operation to an existing stream.
621          * @param upstream The upstream pipeline stage
622          * @param inputShape The stream shape for the upstream pipeline stage
623          * @param opFlags Operation flags for the new stage
624          * @hide Visible for CTS testing only (OpenJDK8 tests).
625          */
626         // Android-changed: Made public for CTS tests only.
627         public StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
628                    StreamShape inputShape,
629                    int opFlags) {
630             super(upstream, opFlags);
631             assert upstream.getOutputShape() == inputShape;
632         }
633 
634         @Override
635         // Android-changed: Make public, to match the method it's overriding.
636         public final boolean opIsStateful() {
637             return true;
638         }
639 
640         @Override
641         // Android-changed: Make public, to match the method it's overriding.
642         public abstract <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
643                                                       Spliterator<P_IN> spliterator,
644                                                       IntFunction<Long[]> generator);
645     }
646 }
647