1 /* 2 * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 /** 27 * Classes to support functional-style operations on streams of elements, such 28 * as map-reduce transformations on collections. For example: 29 * 30 * <pre>{@code 31 * int sum = widgets.stream() 32 * .filter(b -> b.getColor() == RED) 33 * .mapToInt(b -> b.getWeight()) 34 * .sum(); 35 * }</pre> 36 * 37 * <p>Here we use {@code widgets}, a {@code Collection<Widget>}, 38 * as a source for a stream, and then perform a filter-map-reduce on the stream 39 * to obtain the sum of the weights of the red widgets. (Summation is an 40 * example of a <a href="package-summary.html#Reduction">reduction</a> 41 * operation.) 42 * 43 * <p>The key abstraction introduced in this package is <em>stream</em>. The 44 * classes {@link java.util.stream.Stream}, {@link java.util.stream.IntStream}, 45 * {@link java.util.stream.LongStream}, and {@link java.util.stream.DoubleStream} 46 * are streams over objects and the primitive {@code int}, {@code long} and 47 * {@code double} types. Streams differ from collections in several ways: 48 * 49 * <ul> 50 * <li>No storage. A stream is not a data structure that stores elements; 51 * instead, it conveys elements from a source such as a data structure, 52 * an array, a generator function, or an I/O channel, through a pipeline of 53 * computational operations.</li> 54 * <li>Functional in nature. An operation on a stream produces a result, 55 * but does not modify its source. For example, filtering a {@code Stream} 56 * obtained from a collection produces a new {@code Stream} without the 57 * filtered elements, rather than removing elements from the source 58 * collection.</li> 59 * <li>Laziness-seeking. Many stream operations, such as filtering, mapping, 60 * or duplicate removal, can be implemented lazily, exposing opportunities 61 * for optimization. For example, "find the first {@code String} with 62 * three consecutive vowels" need not examine all the input strings. 63 * Stream operations are divided into intermediate ({@code Stream}-producing) 64 * operations and terminal (value- or side-effect-producing) operations. 65 * Intermediate operations are always lazy.</li> 66 * <li>Possibly unbounded. While collections have a finite size, streams 67 * need not. Short-circuiting operations such as {@code limit(n)} or 68 * {@code findFirst()} can allow computations on infinite streams to 69 * complete in finite time.</li> 70 * <li>Consumable. The elements of a stream are only visited once during 71 * the life of a stream. Like an {@link java.util.Iterator}, a new stream 72 * must be generated to revisit the same elements of the source. 73 * </li> 74 * </ul> 75 * 76 * Streams can be obtained in a number of ways. Some examples include: 77 * <ul> 78 * <li>From a {@link java.util.Collection} via the {@code stream()} and 79 * {@code parallelStream()} methods;</li> 80 * <li>From an array via {@link java.util.Arrays#stream(Object[])};</li> 81 * <li>From static factory methods on the stream classes, such as 82 * {@link java.util.stream.Stream#of(Object[])}, 83 * {@link java.util.stream.IntStream#range(int, int)} 84 * or {@link java.util.stream.Stream#iterate(Object, UnaryOperator)};</li> 85 * <li>The lines of a file can be obtained from {@link java.io.BufferedReader#lines()};</li> 86 * <li>Streams of file paths can be obtained from methods in {@link java.nio.file.Files};</li> 87 * <li>Streams of random numbers can be obtained from {@link java.util.Random#ints()};</li> 88 * <li>Numerous other stream-bearing methods in the JDK, including 89 * {@link java.util.BitSet#stream()}, 90 * {@link java.util.regex.Pattern#splitAsStream(java.lang.CharSequence)}, 91 * and {@link java.util.jar.JarFile#stream()}.</li> 92 * </ul> 93 * 94 * <p>Additional stream sources can be provided by third-party libraries using 95 * <a href="package-summary.html#StreamSources">these techniques</a>. 96 * 97 * <h2><a name="StreamOps">Stream operations and pipelines</a></h2> 98 * 99 * <p>Stream operations are divided into <em>intermediate</em> and 100 * <em>terminal</em> operations, and are combined to form <em>stream 101 * pipelines</em>. A stream pipeline consists of a source (such as a 102 * {@code Collection}, an array, a generator function, or an I/O channel); 103 * followed by zero or more intermediate operations such as 104 * {@code Stream.filter} or {@code Stream.map}; and a terminal operation such 105 * as {@code Stream.forEach} or {@code Stream.reduce}. 106 * 107 * <p>Intermediate operations return a new stream. They are always 108 * <em>lazy</em>; executing an intermediate operation such as 109 * {@code filter()} does not actually perform any filtering, but instead 110 * creates a new stream that, when traversed, contains the elements of 111 * the initial stream that match the given predicate. Traversal 112 * of the pipeline source does not begin until the terminal operation of the 113 * pipeline is executed. 114 * 115 * <p>Terminal operations, such as {@code Stream.forEach} or 116 * {@code IntStream.sum}, may traverse the stream to produce a result or a 117 * side-effect. After the terminal operation is performed, the stream pipeline 118 * is considered consumed, and can no longer be used; if you need to traverse 119 * the same data source again, you must return to the data source to get a new 120 * stream. In almost all cases, terminal operations are <em>eager</em>, 121 * completing their traversal of the data source and processing of the pipeline 122 * before returning. Only the terminal operations {@code iterator()} and 123 * {@code spliterator()} are not; these are provided as an "escape hatch" to enable 124 * arbitrary client-controlled pipeline traversals in the event that the 125 * existing operations are not sufficient to the task. 126 * 127 * <p> Processing streams lazily allows for significant efficiencies; in a 128 * pipeline such as the filter-map-sum example above, filtering, mapping, and 129 * summing can be fused into a single pass on the data, with minimal 130 * intermediate state. Laziness also allows avoiding examining all the data 131 * when it is not necessary; for operations such as "find the first string 132 * longer than 1000 characters", it is only necessary to examine just enough 133 * strings to find one that has the desired characteristics without examining 134 * all of the strings available from the source. (This behavior becomes even 135 * more important when the input stream is infinite and not merely large.) 136 * 137 * <p>Intermediate operations are further divided into <em>stateless</em> 138 * and <em>stateful</em> operations. Stateless operations, such as {@code filter} 139 * and {@code map}, retain no state from previously seen element when processing 140 * a new element -- each element can be processed 141 * independently of operations on other elements. Stateful operations, such as 142 * {@code distinct} and {@code sorted}, may incorporate state from previously 143 * seen elements when processing new elements. 144 * 145 * <p>Stateful operations may need to process the entire input 146 * before producing a result. For example, one cannot produce any results from 147 * sorting a stream until one has seen all elements of the stream. As a result, 148 * under parallel computation, some pipelines containing stateful intermediate 149 * operations may require multiple passes on the data or may need to buffer 150 * significant data. Pipelines containing exclusively stateless intermediate 151 * operations can be processed in a single pass, whether sequential or parallel, 152 * with minimal data buffering. 153 * 154 * <p>Further, some operations are deemed <em>short-circuiting</em> operations. 155 * An intermediate operation is short-circuiting if, when presented with 156 * infinite input, it may produce a finite stream as a result. A terminal 157 * operation is short-circuiting if, when presented with infinite input, it may 158 * terminate in finite time. Having a short-circuiting operation in the pipeline 159 * is a necessary, but not sufficient, condition for the processing of an infinite 160 * stream to terminate normally in finite time. 161 * 162 * <h3>Parallelism</h3> 163 * 164 * <p>Processing elements with an explicit {@code for-}loop is inherently serial. 165 * Streams facilitate parallel execution by reframing the computation as a pipeline of 166 * aggregate operations, rather than as imperative operations on each individual 167 * element. All streams operations can execute either in serial or in parallel. 168 * The stream implementations in the JDK create serial streams unless parallelism is 169 * explicitly requested. For example, {@code Collection} has methods 170 * {@link java.util.Collection#stream} and {@link java.util.Collection#parallelStream}, 171 * which produce sequential and parallel streams respectively; other 172 * stream-bearing methods such as {@link java.util.stream.IntStream#range(int, int)} 173 * produce sequential streams but these streams can be efficiently parallelized by 174 * invoking their {@link java.util.stream.BaseStream#parallel()} method. 175 * To execute the prior "sum of weights of widgets" query in parallel, we would 176 * do: 177 * 178 * <pre>{@code 179 * int sumOfWeights = widgets.}<code><b>parallelStream()</b></code>{@code 180 * .filter(b -> b.getColor() == RED) 181 * .mapToInt(b -> b.getWeight()) 182 * .sum(); 183 * }</pre> 184 * 185 * <p>The only difference between the serial and parallel versions of this 186 * example is the creation of the initial stream, using "{@code parallelStream()}" 187 * instead of "{@code stream()}". When the terminal operation is initiated, 188 * the stream pipeline is executed sequentially or in parallel depending on the 189 * orientation of the stream on which it is invoked. Whether a stream will execute in serial or 190 * parallel can be determined with the {@code isParallel()} method, and the 191 * orientation of a stream can be modified with the 192 * {@link java.util.stream.BaseStream#sequential()} and 193 * {@link java.util.stream.BaseStream#parallel()} operations. When the terminal 194 * operation is initiated, the stream pipeline is executed sequentially or in 195 * parallel depending on the mode of the stream on which it is invoked. 196 * 197 * <p>Except for operations identified as explicitly nondeterministic, such 198 * as {@code findAny()}, whether a stream executes sequentially or in parallel 199 * should not change the result of the computation. 200 * 201 * <p>Most stream operations accept parameters that describe user-specified 202 * behavior, which are often lambda expressions. To preserve correct behavior, 203 * these <em>behavioral parameters</em> must be <em>non-interfering</em>, and in 204 * most cases must be <em>stateless</em>. Such parameters are always instances 205 * of a <a href="../function/package-summary.html">functional interface</a> such 206 * as {@link java.util.function.Function}, and are often lambda expressions or 207 * method references. 208 * 209 * <h3><a name="NonInterference">Non-interference</a></h3> 210 * 211 * Streams enable you to execute possibly-parallel aggregate operations over a 212 * variety of data sources, including even non-thread-safe collections such as 213 * {@code ArrayList}. This is possible only if we can prevent 214 * <em>interference</em> with the data source during the execution of a stream 215 * pipeline. Except for the escape-hatch operations {@code iterator()} and 216 * {@code spliterator()}, execution begins when the terminal operation is 217 * invoked, and ends when the terminal operation completes. For most data 218 * sources, preventing interference means ensuring that the data source is 219 * <em>not modified at all</em> during the execution of the stream pipeline. 220 * The notable exception to this are streams whose sources are concurrent 221 * collections, which are specifically designed to handle concurrent modification. 222 * Concurrent stream sources are those whose {@code Spliterator} reports the 223 * {@code CONCURRENT} characteristic. 224 * 225 * <p>Accordingly, behavioral parameters in stream pipelines whose source might 226 * not be concurrent should never modify the stream's data source. 227 * A behavioral parameter is said to <em>interfere</em> with a non-concurrent 228 * data source if it modifies, or causes to be 229 * modified, the stream's data source. The need for non-interference applies 230 * to all pipelines, not just parallel ones. Unless the stream source is 231 * concurrent, modifying a stream's data source during execution of a stream 232 * pipeline can cause exceptions, incorrect answers, or nonconformant behavior. 233 * 234 * For well-behaved stream sources, the source can be modified before the 235 * terminal operation commences and those modifications will be reflected in 236 * the covered elements. For example, consider the following code: 237 * 238 * <pre>{@code 239 * List<String> l = new ArrayList(Arrays.asList("one", "two")); 240 * Stream<String> sl = l.stream(); 241 * l.add("three"); 242 * String s = sl.collect(joining(" ")); 243 * }</pre> 244 * 245 * First a list is created consisting of two strings: "one"; and "two". Then a 246 * stream is created from that list. Next the list is modified by adding a third 247 * string: "three". Finally the elements of the stream are collected and joined 248 * together. Since the list was modified before the terminal {@code collect} 249 * operation commenced the result will be a string of "one two three". All the 250 * streams returned from JDK collections, and most other JDK classes, 251 * are well-behaved in this manner; for streams generated by other libraries, see 252 * <a href="package-summary.html#StreamSources">Low-level stream 253 * construction</a> for requirements for building well-behaved streams. 254 * 255 * <h3><a name="Statelessness">Stateless behaviors</a></h3> 256 * 257 * Stream pipeline results may be nondeterministic or incorrect if the behavioral 258 * parameters to the stream operations are <em>stateful</em>. A stateful lambda 259 * (or other object implementing the appropriate functional interface) is one 260 * whose result depends on any state which might change during the execution 261 * of the stream pipeline. An example of a stateful lambda is the parameter 262 * to {@code map()} in: 263 * 264 * <pre>{@code 265 * Set<Integer> seen = Collections.synchronizedSet(new HashSet<>()); 266 * stream.parallel().map(e -> { if (seen.add(e)) return 0; else return e; })... 267 * }</pre> 268 * 269 * Here, if the mapping operation is performed in parallel, the results for the 270 * same input could vary from run to run, due to thread scheduling differences, 271 * whereas, with a stateless lambda expression the results would always be the 272 * same. 273 * 274 * <p>Note also that attempting to access mutable state from behavioral parameters 275 * presents you with a bad choice with respect to safety and performance; if 276 * you do not synchronize access to that state, you have a data race and 277 * therefore your code is broken, but if you do synchronize access to that 278 * state, you risk having contention undermine the parallelism you are seeking 279 * to benefit from. The best approach is to avoid stateful behavioral 280 * parameters to stream operations entirely; there is usually a way to 281 * restructure the stream pipeline to avoid statefulness. 282 * 283 * <h3>Side-effects</h3> 284 * 285 * Side-effects in behavioral parameters to stream operations are, in general, 286 * discouraged, as they can often lead to unwitting violations of the 287 * statelessness requirement, as well as other thread-safety hazards. 288 * 289 * <p>If the behavioral parameters do have side-effects, unless explicitly 290 * stated, there are no guarantees as to the 291 * <a href="../concurrent/package-summary.html#MemoryVisibility"><i>visibility</i></a> 292 * of those side-effects to other threads, nor are there any guarantees that 293 * different operations on the "same" element within the same stream pipeline 294 * are executed in the same thread. Further, the ordering of those effects 295 * may be surprising. Even when a pipeline is constrained to produce a 296 * <em>result</em> that is consistent with the encounter order of the stream 297 * source (for example, {@code IntStream.range(0,5).parallel().map(x -> x*2).toArray()} 298 * must produce {@code [0, 2, 4, 6, 8]}), no guarantees are made as to the order 299 * in which the mapper function is applied to individual elements, or in what 300 * thread any behavioral parameter is executed for a given element. 301 * 302 * <p>Many computations where one might be tempted to use side effects can be more 303 * safely and efficiently expressed without side-effects, such as using 304 * <a href="package-summary.html#Reduction">reduction</a> instead of mutable 305 * accumulators. However, side-effects such as using {@code println()} for debugging 306 * purposes are usually harmless. A small number of stream operations, such as 307 * {@code forEach()} and {@code peek()}, can operate only via side-effects; 308 * these should be used with care. 309 * 310 * <p>As an example of how to transform a stream pipeline that inappropriately 311 * uses side-effects to one that does not, the following code searches a stream 312 * of strings for those matching a given regular expression, and puts the 313 * matches in a list. 314 * 315 * <pre>{@code 316 * ArrayList<String> results = new ArrayList<>(); 317 * stream.filter(s -> pattern.matcher(s).matches()) 318 * .forEach(s -> results.add(s)); // Unnecessary use of side-effects! 319 * }</pre> 320 * 321 * This code unnecessarily uses side-effects. If executed in parallel, the 322 * non-thread-safety of {@code ArrayList} would cause incorrect results, and 323 * adding needed synchronization would cause contention, undermining the 324 * benefit of parallelism. Furthermore, using side-effects here is completely 325 * unnecessary; the {@code forEach()} can simply be replaced with a reduction 326 * operation that is safer, more efficient, and more amenable to 327 * parallelization: 328 * 329 * <pre>{@code 330 * List<String>results = 331 * stream.filter(s -> pattern.matcher(s).matches()) 332 * .collect(Collectors.toList()); // No side-effects! 333 * }</pre> 334 * 335 * <h3><a name="Ordering">Ordering</a></h3> 336 * 337 * <p>Streams may or may not have a defined <em>encounter order</em>. Whether 338 * or not a stream has an encounter order depends on the source and the 339 * intermediate operations. Certain stream sources (such as {@code List} or 340 * arrays) are intrinsically ordered, whereas others (such as {@code HashSet}) 341 * are not. Some intermediate operations, such as {@code sorted()}, may impose 342 * an encounter order on an otherwise unordered stream, and others may render an 343 * ordered stream unordered, such as {@link java.util.stream.BaseStream#unordered()}. 344 * Further, some terminal operations may ignore encounter order, such as 345 * {@code forEach()}. 346 * 347 * <p>If a stream is ordered, most operations are constrained to operate on the 348 * elements in their encounter order; if the source of a stream is a {@code List} 349 * containing {@code [1, 2, 3]}, then the result of executing {@code map(x -> x*2)} 350 * must be {@code [2, 4, 6]}. However, if the source has no defined encounter 351 * order, then any permutation of the values {@code [2, 4, 6]} would be a valid 352 * result. 353 * 354 * <p>For sequential streams, the presence or absence of an encounter order does 355 * not affect performance, only determinism. If a stream is ordered, repeated 356 * execution of identical stream pipelines on an identical source will produce 357 * an identical result; if it is not ordered, repeated execution might produce 358 * different results. 359 * 360 * <p>For parallel streams, relaxing the ordering constraint can sometimes enable 361 * more efficient execution. Certain aggregate operations, 362 * such as filtering duplicates ({@code distinct()}) or grouped reductions 363 * ({@code Collectors.groupingBy()}) can be implemented more efficiently if ordering of elements 364 * is not relevant. Similarly, operations that are intrinsically tied to encounter order, 365 * such as {@code limit()}, may require 366 * buffering to ensure proper ordering, undermining the benefit of parallelism. 367 * In cases where the stream has an encounter order, but the user does not 368 * particularly <em>care</em> about that encounter order, explicitly de-ordering 369 * the stream with {@link java.util.stream.BaseStream#unordered() unordered()} may 370 * improve parallel performance for some stateful or terminal operations. 371 * However, most stream pipelines, such as the "sum of weight of blocks" example 372 * above, still parallelize efficiently even under ordering constraints. 373 * 374 * <h2><a name="Reduction">Reduction operations</a></h2> 375 * 376 * A <em>reduction</em> operation (also called a <em>fold</em>) takes a sequence 377 * of input elements and combines them into a single summary result by repeated 378 * application of a combining operation, such as finding the sum or maximum of 379 * a set of numbers, or accumulating elements into a list. The streams classes have 380 * multiple forms of general reduction operations, called 381 * {@link java.util.stream.Stream#reduce(java.util.function.BinaryOperator) reduce()} 382 * and {@link java.util.stream.Stream#collect(java.util.stream.Collector) collect()}, 383 * as well as multiple specialized reduction forms such as 384 * {@link java.util.stream.IntStream#sum() sum()}, {@link java.util.stream.IntStream#max() max()}, 385 * or {@link java.util.stream.IntStream#count() count()}. 386 * 387 * <p>Of course, such operations can be readily implemented as simple sequential 388 * loops, as in: 389 * <pre>{@code 390 * int sum = 0; 391 * for (int x : numbers) { 392 * sum += x; 393 * } 394 * }</pre> 395 * However, there are good reasons to prefer a reduce operation 396 * over a mutative accumulation such as the above. Not only is a reduction 397 * "more abstract" -- it operates on the stream as a whole rather than individual 398 * elements -- but a properly constructed reduce operation is inherently 399 * parallelizable, so long as the function(s) used to process the elements 400 * are <a href="package-summary.html#Associativity">associative</a> and 401 * <a href="package-summary.html#NonInterfering">stateless</a>. 402 * For example, given a stream of numbers for which we want to find the sum, we 403 * can write: 404 * <pre>{@code 405 * int sum = numbers.stream().reduce(0, (x,y) -> x+y); 406 * }</pre> 407 * or: 408 * <pre>{@code 409 * int sum = numbers.stream().reduce(0, Integer::sum); 410 * }</pre> 411 * 412 * <p>These reduction operations can run safely in parallel with almost no 413 * modification: 414 * <pre>{@code 415 * int sum = numbers.parallelStream().reduce(0, Integer::sum); 416 * }</pre> 417 * 418 * <p>Reduction parallellizes well because the implementation 419 * can operate on subsets of the data in parallel, and then combine the 420 * intermediate results to get the final correct answer. (Even if the language 421 * had a "parallel for-each" construct, the mutative accumulation approach would 422 * still required the developer to provide 423 * thread-safe updates to the shared accumulating variable {@code sum}, and 424 * the required synchronization would then likely eliminate any performance gain from 425 * parallelism.) Using {@code reduce()} instead removes all of the 426 * burden of parallelizing the reduction operation, and the library can provide 427 * an efficient parallel implementation with no additional synchronization 428 * required. 429 * 430 * <p>The "widgets" examples shown earlier shows how reduction combines with 431 * other operations to replace for loops with bulk operations. If {@code widgets} 432 * is a collection of {@code Widget} objects, which have a {@code getWeight} method, 433 * we can find the heaviest widget with: 434 * <pre>{@code 435 * OptionalInt heaviest = widgets.parallelStream() 436 * .mapToInt(Widget::getWeight) 437 * .max(); 438 * }</pre> 439 * 440 * <p>In its more general form, a {@code reduce} operation on elements of type 441 * {@code <T>} yielding a result of type {@code <U>} requires three parameters: 442 * <pre>{@code 443 * <U> U reduce(U identity, 444 * BiFunction<U, ? super T, U> accumulator, 445 * BinaryOperator<U> combiner); 446 * }</pre> 447 * Here, the <em>identity</em> element is both an initial seed value for the reduction 448 * and a default result if there are no input elements. The <em>accumulator</em> 449 * function takes a partial result and the next element, and produces a new 450 * partial result. The <em>combiner</em> function combines two partial results 451 * to produce a new partial result. (The combiner is necessary in parallel 452 * reductions, where the input is partitioned, a partial accumulation computed 453 * for each partition, and then the partial results are combined to produce a 454 * final result.) 455 * 456 * <p>More formally, the {@code identity} value must be an <em>identity</em> for 457 * the combiner function. This means that for all {@code u}, 458 * {@code combiner.apply(identity, u)} is equal to {@code u}. Additionally, the 459 * {@code combiner} function must be <a href="package-summary.html#Associativity">associative</a> and 460 * must be compatible with the {@code accumulator} function: for all {@code u} 461 * and {@code t}, {@code combiner.apply(u, accumulator.apply(identity, t))} must 462 * be {@code equals()} to {@code accumulator.apply(u, t)}. 463 * 464 * <p>The three-argument form is a generalization of the two-argument form, 465 * incorporating a mapping step into the accumulation step. We could 466 * re-cast the simple sum-of-weights example using the more general form as 467 * follows: 468 * <pre>{@code 469 * int sumOfWeights = widgets.stream() 470 * .reduce(0, 471 * (sum, b) -> sum + b.getWeight()) 472 * Integer::sum); 473 * }</pre> 474 * though the explicit map-reduce form is more readable and therefore should 475 * usually be preferred. The generalized form is provided for cases where 476 * significant work can be optimized away by combining mapping and reducing 477 * into a single function. 478 * 479 * <h3><a name="MutableReduction">Mutable reduction</a></h3> 480 * 481 * A <em>mutable reduction operation</em> accumulates input elements into a 482 * mutable result container, such as a {@code Collection} or {@code StringBuilder}, 483 * as it processes the elements in the stream. 484 * 485 * <p>If we wanted to take a stream of strings and concatenate them into a 486 * single long string, we <em>could</em> achieve this with ordinary reduction: 487 * <pre>{@code 488 * String concatenated = strings.reduce("", String::concat) 489 * }</pre> 490 * 491 * <p>We would get the desired result, and it would even work in parallel. However, 492 * we might not be happy about the performance! Such an implementation would do 493 * a great deal of string copying, and the run time would be <em>O(n^2)</em> in 494 * the number of characters. A more performant approach would be to accumulate 495 * the results into a {@link java.lang.StringBuilder}, which is a mutable 496 * container for accumulating strings. We can use the same technique to 497 * parallelize mutable reduction as we do with ordinary reduction. 498 * 499 * <p>The mutable reduction operation is called 500 * {@link java.util.stream.Stream#collect(Collector) collect()}, 501 * as it collects together the desired results into a result container such 502 * as a {@code Collection}. 503 * A {@code collect} operation requires three functions: 504 * a supplier function to construct new instances of the result container, an 505 * accumulator function to incorporate an input element into a result 506 * container, and a combining function to merge the contents of one result 507 * container into another. The form of this is very similar to the general 508 * form of ordinary reduction: 509 * <pre>{@code 510 * <R> R collect(Supplier<R> supplier, 511 * BiConsumer<R, ? super T> accumulator, 512 * BiConsumer<R, R> combiner); 513 * }</pre> 514 * <p>As with {@code reduce()}, a benefit of expressing {@code collect} in this 515 * abstract way is that it is directly amenable to parallelization: we can 516 * accumulate partial results in parallel and then combine them, so long as the 517 * accumulation and combining functions satisfy the appropriate requirements. 518 * For example, to collect the String representations of the elements in a 519 * stream into an {@code ArrayList}, we could write the obvious sequential 520 * for-each form: 521 * <pre>{@code 522 * ArrayList<String> strings = new ArrayList<>(); 523 * for (T element : stream) { 524 * strings.add(element.toString()); 525 * } 526 * }</pre> 527 * Or we could use a parallelizable collect form: 528 * <pre>{@code 529 * ArrayList<String> strings = stream.collect(() -> new ArrayList<>(), 530 * (c, e) -> c.add(e.toString()), 531 * (c1, c2) -> c1.addAll(c2)); 532 * }</pre> 533 * or, pulling the mapping operation out of the accumulator function, we could 534 * express it more succinctly as: 535 * <pre>{@code 536 * List<String> strings = stream.map(Object::toString) 537 * .collect(ArrayList::new, ArrayList::add, ArrayList::addAll); 538 * }</pre> 539 * Here, our supplier is just the {@link java.util.ArrayList#ArrayList() 540 * ArrayList constructor}, the accumulator adds the stringified element to an 541 * {@code ArrayList}, and the combiner simply uses {@link java.util.ArrayList#addAll addAll} 542 * to copy the strings from one container into the other. 543 * 544 * <p>The three aspects of {@code collect} -- supplier, accumulator, and 545 * combiner -- are tightly coupled. We can use the abstraction of a 546 * {@link java.util.stream.Collector} to capture all three aspects. The 547 * above example for collecting strings into a {@code List} can be rewritten 548 * using a standard {@code Collector} as: 549 * <pre>{@code 550 * List<String> strings = stream.map(Object::toString) 551 * .collect(Collectors.toList()); 552 * }</pre> 553 * 554 * <p>Packaging mutable reductions into a Collector has another advantage: 555 * composability. The class {@link java.util.stream.Collectors} contains a 556 * number of predefined factories for collectors, including combinators 557 * that transform one collector into another. For example, suppose we have a 558 * collector that computes the sum of the salaries of a stream of 559 * employees, as follows: 560 * 561 * <pre>{@code 562 * Collector<Employee, ?, Integer> summingSalaries 563 * = Collectors.summingInt(Employee::getSalary); 564 * }</pre> 565 * 566 * (The {@code ?} for the second type parameter merely indicates that we don't 567 * care about the intermediate representation used by this collector.) 568 * If we wanted to create a collector to tabulate the sum of salaries by 569 * department, we could reuse {@code summingSalaries} using 570 * {@link java.util.stream.Collectors#groupingBy(java.util.function.Function, java.util.stream.Collector) groupingBy}: 571 * 572 * <pre>{@code 573 * Map<Department, Integer> salariesByDept 574 * = employees.stream().collect(Collectors.groupingBy(Employee::getDepartment, 575 * summingSalaries)); 576 * }</pre> 577 * 578 * <p>As with the regular reduction operation, {@code collect()} operations can 579 * only be parallelized if appropriate conditions are met. For any partially 580 * accumulated result, combining it with an empty result container must 581 * produce an equivalent result. That is, for a partially accumulated result 582 * {@code p} that is the result of any series of accumulator and combiner 583 * invocations, {@code p} must be equivalent to 584 * {@code combiner.apply(p, supplier.get())}. 585 * 586 * <p>Further, however the computation is split, it must produce an equivalent 587 * result. For any input elements {@code t1} and {@code t2}, the results 588 * {@code r1} and {@code r2} in the computation below must be equivalent: 589 * <pre>{@code 590 * A a1 = supplier.get(); 591 * accumulator.accept(a1, t1); 592 * accumulator.accept(a1, t2); 593 * R r1 = finisher.apply(a1); // result without splitting 594 * 595 * A a2 = supplier.get(); 596 * accumulator.accept(a2, t1); 597 * A a3 = supplier.get(); 598 * accumulator.accept(a3, t2); 599 * R r2 = finisher.apply(combiner.apply(a2, a3)); // result with splitting 600 * }</pre> 601 * 602 * <p>Here, equivalence generally means according to {@link java.lang.Object#equals(Object)}. 603 * but in some cases equivalence may be relaxed to account for differences in 604 * order. 605 * 606 * <h3><a name="ConcurrentReduction">Reduction, concurrency, and ordering</a></h3> 607 * 608 * With some complex reduction operations, for example a {@code collect()} that 609 * produces a {@code Map}, such as: 610 * <pre>{@code 611 * Map<Buyer, List<Transaction>> salesByBuyer 612 * = txns.parallelStream() 613 * .collect(Collectors.groupingBy(Transaction::getBuyer)); 614 * }</pre> 615 * it may actually be counterproductive to perform the operation in parallel. 616 * This is because the combining step (merging one {@code Map} into another by 617 * key) can be expensive for some {@code Map} implementations. 618 * 619 * <p>Suppose, however, that the result container used in this reduction 620 * was a concurrently modifiable collection -- such as a 621 * {@link java.util.concurrent.ConcurrentHashMap}. In that case, the parallel 622 * invocations of the accumulator could actually deposit their results 623 * concurrently into the same shared result container, eliminating the need for 624 * the combiner to merge distinct result containers. This potentially provides 625 * a boost to the parallel execution performance. We call this a 626 * <em>concurrent</em> reduction. 627 * 628 * <p>A {@link java.util.stream.Collector} that supports concurrent reduction is 629 * marked with the {@link java.util.stream.Collector.Characteristics#CONCURRENT} 630 * characteristic. However, a concurrent collection also has a downside. If 631 * multiple threads are depositing results concurrently into a shared container, 632 * the order in which results are deposited is non-deterministic. Consequently, 633 * a concurrent reduction is only possible if ordering is not important for the 634 * stream being processed. The {@link java.util.stream.Stream#collect(Collector)} 635 * implementation will only perform a concurrent reduction if 636 * <ul> 637 * <li>The stream is parallel;</li> 638 * <li>The collector has the 639 * {@link java.util.stream.Collector.Characteristics#CONCURRENT} characteristic, 640 * and;</li> 641 * <li>Either the stream is unordered, or the collector has the 642 * {@link java.util.stream.Collector.Characteristics#UNORDERED} characteristic. 643 * </ul> 644 * You can ensure the stream is unordered by using the 645 * {@link java.util.stream.BaseStream#unordered()} method. For example: 646 * <pre>{@code 647 * Map<Buyer, List<Transaction>> salesByBuyer 648 * = txns.parallelStream() 649 * .unordered() 650 * .collect(groupingByConcurrent(Transaction::getBuyer)); 651 * }</pre> 652 * (where {@link java.util.stream.Collectors#groupingByConcurrent} is the 653 * concurrent equivalent of {@code groupingBy}). 654 * 655 * <p>Note that if it is important that the elements for a given key appear in 656 * the order they appear in the source, then we cannot use a concurrent 657 * reduction, as ordering is one of the casualties of concurrent insertion. 658 * We would then be constrained to implement either a sequential reduction or 659 * a merge-based parallel reduction. 660 * 661 * <h3><a name="Associativity">Associativity</a></h3> 662 * 663 * An operator or function {@code op} is <em>associative</em> if the following 664 * holds: 665 * <pre>{@code 666 * (a op b) op c == a op (b op c) 667 * }</pre> 668 * The importance of this to parallel evaluation can be seen if we expand this 669 * to four terms: 670 * <pre>{@code 671 * a op b op c op d == (a op b) op (c op d) 672 * }</pre> 673 * So we can evaluate {@code (a op b)} in parallel with {@code (c op d)}, and 674 * then invoke {@code op} on the results. 675 * 676 * <p>Examples of associative operations include numeric addition, min, and 677 * max, and string concatenation. 678 * 679 * <h2><a name="StreamSources">Low-level stream construction</a></h2> 680 * 681 * So far, all the stream examples have used methods like 682 * {@link java.util.Collection#stream()} or {@link java.util.Arrays#stream(Object[])} 683 * to obtain a stream. How are those stream-bearing methods implemented? 684 * 685 * <p>The class {@link java.util.stream.StreamSupport} has a number of 686 * low-level methods for creating a stream, all using some form of a 687 * {@link java.util.Spliterator}. A spliterator is the parallel analogue of an 688 * {@link java.util.Iterator}; it describes a (possibly infinite) collection of 689 * elements, with support for sequentially advancing, bulk traversal, and 690 * splitting off some portion of the input into another spliterator which can 691 * be processed in parallel. At the lowest level, all streams are driven by a 692 * spliterator. 693 * 694 * <p>There are a number of implementation choices in implementing a 695 * spliterator, nearly all of which are tradeoffs between simplicity of 696 * implementation and runtime performance of streams using that spliterator. 697 * The simplest, but least performant, way to create a spliterator is to 698 * create one from an iterator using 699 * {@link java.util.Spliterators#spliteratorUnknownSize(java.util.Iterator, int)}. 700 * While such a spliterator will work, it will likely offer poor parallel 701 * performance, since we have lost sizing information (how big is the 702 * underlying data set), as well as being constrained to a simplistic 703 * splitting algorithm. 704 * 705 * <p>A higher-quality spliterator will provide balanced and known-size 706 * splits, accurate sizing information, and a number of other 707 * {@link java.util.Spliterator#characteristics() characteristics} of the 708 * spliterator or data that can be used by implementations to optimize 709 * execution. 710 * 711 * <p>Spliterators for mutable data sources have an additional challenge; 712 * timing of binding to the data, since the data could change between the time 713 * the spliterator is created and the time the stream pipeline is executed. 714 * Ideally, a spliterator for a stream would report a characteristic of 715 716 * {@code IMMUTABLE} or {@code CONCURRENT}; if not it should be 717 * <a href="../Spliterator.html#binding"><em>late-binding</em></a>. If a source 718 * cannot directly supply a recommended spliterator, it may indirectly supply 719 * a spliterator using a {@code Supplier}, and construct a stream via the 720 * {@code Supplier}-accepting versions of 721 * {@link java.util.stream.StreamSupport#stream(Supplier, int, boolean) stream()}. 722 * The spliterator is obtained from the supplier only after the terminal 723 * operation of the stream pipeline commences. 724 * 725 * <p>These requirements significantly reduce the scope of potential 726 * interference between mutations of the stream source and execution of stream 727 * pipelines. Streams based on spliterators with the desired characteristics, 728 * or those using the Supplier-based factory forms, are immune to 729 * modifications of the data source prior to commencement of the terminal 730 * operation (provided the behavioral parameters to the stream operations meet 731 * the required criteria for non-interference and statelessness). See 732 * <a href="package-summary.html#NonInterference">Non-Interference</a> 733 * for more details. 734 * 735 * @since 1.8 736 */ 737 package java.util.stream; 738 739 import java.util.function.BinaryOperator; 740 import java.util.function.UnaryOperator; 741