1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.lang.ref.WeakReference; 39 import java.util.AbstractQueue; 40 import java.util.Arrays; 41 import java.util.Collection; 42 import java.util.Iterator; 43 import java.util.NoSuchElementException; 44 import java.util.Objects; 45 import java.util.Spliterator; 46 import java.util.Spliterators; 47 import java.util.concurrent.locks.Condition; 48 import java.util.concurrent.locks.ReentrantLock; 49 50 // BEGIN android-note 51 // removed link to collections framework docs 52 // END android-note 53 54 /** 55 * A bounded {@linkplain BlockingQueue blocking queue} backed by an 56 * array. This queue orders elements FIFO (first-in-first-out). The 57 * <em>head</em> of the queue is that element that has been on the 58 * queue the longest time. The <em>tail</em> of the queue is that 59 * element that has been on the queue the shortest time. New elements 60 * are inserted at the tail of the queue, and the queue retrieval 61 * operations obtain elements at the head of the queue. 62 * 63 * <p>This is a classic "bounded buffer", in which a 64 * fixed-sized array holds elements inserted by producers and 65 * extracted by consumers. Once created, the capacity cannot be 66 * changed. Attempts to {@code put} an element into a full queue 67 * will result in the operation blocking; attempts to {@code take} an 68 * element from an empty queue will similarly block. 69 * 70 * <p>This class supports an optional fairness policy for ordering 71 * waiting producer and consumer threads. By default, this ordering 72 * is not guaranteed. However, a queue constructed with fairness set 73 * to {@code true} grants threads access in FIFO order. Fairness 74 * generally decreases throughput but reduces variability and avoids 75 * starvation. 76 * 77 * <p>This class and its iterator implement all of the 78 * <em>optional</em> methods of the {@link Collection} and {@link 79 * Iterator} interfaces. 80 * 81 * @since 1.5 82 * @author Doug Lea 83 * @param <E> the type of elements held in this queue 84 */ 85 public class ArrayBlockingQueue<E> extends AbstractQueue<E> 86 implements BlockingQueue<E>, java.io.Serializable { 87 88 /** 89 * Serialization ID. This class relies on default serialization 90 * even for the items array, which is default-serialized, even if 91 * it is empty. Otherwise it could not be declared final, which is 92 * necessary here. 93 */ 94 private static final long serialVersionUID = -817911632652898426L; 95 96 /** The queued items */ 97 final Object[] items; 98 99 /** items index for next take, poll, peek or remove */ 100 int takeIndex; 101 102 /** items index for next put, offer, or add */ 103 int putIndex; 104 105 /** Number of elements in the queue */ 106 int count; 107 108 /* 109 * Concurrency control uses the classic two-condition algorithm 110 * found in any textbook. 111 */ 112 113 /** Main lock guarding all access */ 114 final ReentrantLock lock; 115 116 /** Condition for waiting takes */ 117 private final Condition notEmpty; 118 119 /** Condition for waiting puts */ 120 private final Condition notFull; 121 122 /** 123 * Shared state for currently active iterators, or null if there 124 * are known not to be any. Allows queue operations to update 125 * iterator state. 126 */ 127 transient Itrs itrs; 128 129 // Internal helper methods 130 131 /** 132 * Circularly decrements array index i. 133 */ dec(int i)134 final int dec(int i) { 135 return ((i == 0) ? items.length : i) - 1; 136 } 137 138 /** 139 * Returns item at index i. 140 */ 141 @SuppressWarnings("unchecked") itemAt(int i)142 final E itemAt(int i) { 143 return (E) items[i]; 144 } 145 146 /** 147 * Inserts element at current put position, advances, and signals. 148 * Call only when holding lock. 149 */ enqueue(E x)150 private void enqueue(E x) { 151 // assert lock.getHoldCount() == 1; 152 // assert items[putIndex] == null; 153 final Object[] items = this.items; 154 items[putIndex] = x; 155 if (++putIndex == items.length) putIndex = 0; 156 count++; 157 notEmpty.signal(); 158 } 159 160 /** 161 * Extracts element at current take position, advances, and signals. 162 * Call only when holding lock. 163 */ dequeue()164 private E dequeue() { 165 // assert lock.getHoldCount() == 1; 166 // assert items[takeIndex] != null; 167 final Object[] items = this.items; 168 @SuppressWarnings("unchecked") 169 E x = (E) items[takeIndex]; 170 items[takeIndex] = null; 171 if (++takeIndex == items.length) takeIndex = 0; 172 count--; 173 if (itrs != null) 174 itrs.elementDequeued(); 175 notFull.signal(); 176 return x; 177 } 178 179 /** 180 * Deletes item at array index removeIndex. 181 * Utility for remove(Object) and iterator.remove. 182 * Call only when holding lock. 183 */ removeAt(final int removeIndex)184 void removeAt(final int removeIndex) { 185 // assert lock.getHoldCount() == 1; 186 // assert items[removeIndex] != null; 187 // assert removeIndex >= 0 && removeIndex < items.length; 188 final Object[] items = this.items; 189 if (removeIndex == takeIndex) { 190 // removing front item; just advance 191 items[takeIndex] = null; 192 if (++takeIndex == items.length) takeIndex = 0; 193 count--; 194 if (itrs != null) 195 itrs.elementDequeued(); 196 } else { 197 // an "interior" remove 198 199 // slide over all others up through putIndex. 200 for (int i = removeIndex, putIndex = this.putIndex;;) { 201 int pred = i; 202 if (++i == items.length) i = 0; 203 if (i == putIndex) { 204 items[pred] = null; 205 this.putIndex = pred; 206 break; 207 } 208 items[pred] = items[i]; 209 } 210 count--; 211 if (itrs != null) 212 itrs.removedAt(removeIndex); 213 } 214 notFull.signal(); 215 } 216 217 /** 218 * Creates an {@code ArrayBlockingQueue} with the given (fixed) 219 * capacity and default access policy. 220 * 221 * @param capacity the capacity of this queue 222 * @throws IllegalArgumentException if {@code capacity < 1} 223 */ ArrayBlockingQueue(int capacity)224 public ArrayBlockingQueue(int capacity) { 225 this(capacity, false); 226 } 227 228 /** 229 * Creates an {@code ArrayBlockingQueue} with the given (fixed) 230 * capacity and the specified access policy. 231 * 232 * @param capacity the capacity of this queue 233 * @param fair if {@code true} then queue accesses for threads blocked 234 * on insertion or removal, are processed in FIFO order; 235 * if {@code false} the access order is unspecified. 236 * @throws IllegalArgumentException if {@code capacity < 1} 237 */ ArrayBlockingQueue(int capacity, boolean fair)238 public ArrayBlockingQueue(int capacity, boolean fair) { 239 if (capacity <= 0) 240 throw new IllegalArgumentException(); 241 this.items = new Object[capacity]; 242 lock = new ReentrantLock(fair); 243 notEmpty = lock.newCondition(); 244 notFull = lock.newCondition(); 245 } 246 247 /** 248 * Creates an {@code ArrayBlockingQueue} with the given (fixed) 249 * capacity, the specified access policy and initially containing the 250 * elements of the given collection, 251 * added in traversal order of the collection's iterator. 252 * 253 * @param capacity the capacity of this queue 254 * @param fair if {@code true} then queue accesses for threads blocked 255 * on insertion or removal, are processed in FIFO order; 256 * if {@code false} the access order is unspecified. 257 * @param c the collection of elements to initially contain 258 * @throws IllegalArgumentException if {@code capacity} is less than 259 * {@code c.size()}, or less than 1. 260 * @throws NullPointerException if the specified collection or any 261 * of its elements are null 262 */ ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)263 public ArrayBlockingQueue(int capacity, boolean fair, 264 Collection<? extends E> c) { 265 this(capacity, fair); 266 267 final ReentrantLock lock = this.lock; 268 lock.lock(); // Lock only for visibility, not mutual exclusion 269 try { 270 int i = 0; 271 try { 272 for (E e : c) 273 items[i++] = Objects.requireNonNull(e); 274 } catch (ArrayIndexOutOfBoundsException ex) { 275 throw new IllegalArgumentException(); 276 } 277 count = i; 278 putIndex = (i == capacity) ? 0 : i; 279 } finally { 280 lock.unlock(); 281 } 282 } 283 284 /** 285 * Inserts the specified element at the tail of this queue if it is 286 * possible to do so immediately without exceeding the queue's capacity, 287 * returning {@code true} upon success and throwing an 288 * {@code IllegalStateException} if this queue is full. 289 * 290 * @param e the element to add 291 * @return {@code true} (as specified by {@link Collection#add}) 292 * @throws IllegalStateException if this queue is full 293 * @throws NullPointerException if the specified element is null 294 */ add(E e)295 public boolean add(E e) { 296 return super.add(e); 297 } 298 299 /** 300 * Inserts the specified element at the tail of this queue if it is 301 * possible to do so immediately without exceeding the queue's capacity, 302 * returning {@code true} upon success and {@code false} if this queue 303 * is full. This method is generally preferable to method {@link #add}, 304 * which can fail to insert an element only by throwing an exception. 305 * 306 * @throws NullPointerException if the specified element is null 307 */ offer(E e)308 public boolean offer(E e) { 309 Objects.requireNonNull(e); 310 final ReentrantLock lock = this.lock; 311 lock.lock(); 312 try { 313 if (count == items.length) 314 return false; 315 else { 316 enqueue(e); 317 return true; 318 } 319 } finally { 320 lock.unlock(); 321 } 322 } 323 324 /** 325 * Inserts the specified element at the tail of this queue, waiting 326 * for space to become available if the queue is full. 327 * 328 * @throws InterruptedException {@inheritDoc} 329 * @throws NullPointerException {@inheritDoc} 330 */ put(E e)331 public void put(E e) throws InterruptedException { 332 Objects.requireNonNull(e); 333 final ReentrantLock lock = this.lock; 334 lock.lockInterruptibly(); 335 try { 336 while (count == items.length) 337 notFull.await(); 338 enqueue(e); 339 } finally { 340 lock.unlock(); 341 } 342 } 343 344 /** 345 * Inserts the specified element at the tail of this queue, waiting 346 * up to the specified wait time for space to become available if 347 * the queue is full. 348 * 349 * @throws InterruptedException {@inheritDoc} 350 * @throws NullPointerException {@inheritDoc} 351 */ offer(E e, long timeout, TimeUnit unit)352 public boolean offer(E e, long timeout, TimeUnit unit) 353 throws InterruptedException { 354 355 Objects.requireNonNull(e); 356 long nanos = unit.toNanos(timeout); 357 final ReentrantLock lock = this.lock; 358 lock.lockInterruptibly(); 359 try { 360 while (count == items.length) { 361 if (nanos <= 0L) 362 return false; 363 nanos = notFull.awaitNanos(nanos); 364 } 365 enqueue(e); 366 return true; 367 } finally { 368 lock.unlock(); 369 } 370 } 371 poll()372 public E poll() { 373 final ReentrantLock lock = this.lock; 374 lock.lock(); 375 try { 376 return (count == 0) ? null : dequeue(); 377 } finally { 378 lock.unlock(); 379 } 380 } 381 take()382 public E take() throws InterruptedException { 383 final ReentrantLock lock = this.lock; 384 lock.lockInterruptibly(); 385 try { 386 while (count == 0) 387 notEmpty.await(); 388 return dequeue(); 389 } finally { 390 lock.unlock(); 391 } 392 } 393 poll(long timeout, TimeUnit unit)394 public E poll(long timeout, TimeUnit unit) throws InterruptedException { 395 long nanos = unit.toNanos(timeout); 396 final ReentrantLock lock = this.lock; 397 lock.lockInterruptibly(); 398 try { 399 while (count == 0) { 400 if (nanos <= 0L) 401 return null; 402 nanos = notEmpty.awaitNanos(nanos); 403 } 404 return dequeue(); 405 } finally { 406 lock.unlock(); 407 } 408 } 409 peek()410 public E peek() { 411 final ReentrantLock lock = this.lock; 412 lock.lock(); 413 try { 414 return itemAt(takeIndex); // null when queue is empty 415 } finally { 416 lock.unlock(); 417 } 418 } 419 420 // this doc comment is overridden to remove the reference to collections 421 // greater in size than Integer.MAX_VALUE 422 /** 423 * Returns the number of elements in this queue. 424 * 425 * @return the number of elements in this queue 426 */ size()427 public int size() { 428 final ReentrantLock lock = this.lock; 429 lock.lock(); 430 try { 431 return count; 432 } finally { 433 lock.unlock(); 434 } 435 } 436 437 // this doc comment is a modified copy of the inherited doc comment, 438 // without the reference to unlimited queues. 439 /** 440 * Returns the number of additional elements that this queue can ideally 441 * (in the absence of memory or resource constraints) accept without 442 * blocking. This is always equal to the initial capacity of this queue 443 * less the current {@code size} of this queue. 444 * 445 * <p>Note that you <em>cannot</em> always tell if an attempt to insert 446 * an element will succeed by inspecting {@code remainingCapacity} 447 * because it may be the case that another thread is about to 448 * insert or remove an element. 449 */ remainingCapacity()450 public int remainingCapacity() { 451 final ReentrantLock lock = this.lock; 452 lock.lock(); 453 try { 454 return items.length - count; 455 } finally { 456 lock.unlock(); 457 } 458 } 459 460 /** 461 * Removes a single instance of the specified element from this queue, 462 * if it is present. More formally, removes an element {@code e} such 463 * that {@code o.equals(e)}, if this queue contains one or more such 464 * elements. 465 * Returns {@code true} if this queue contained the specified element 466 * (or equivalently, if this queue changed as a result of the call). 467 * 468 * <p>Removal of interior elements in circular array based queues 469 * is an intrinsically slow and disruptive operation, so should 470 * be undertaken only in exceptional circumstances, ideally 471 * only when the queue is known not to be accessible by other 472 * threads. 473 * 474 * @param o element to be removed from this queue, if present 475 * @return {@code true} if this queue changed as a result of the call 476 */ remove(Object o)477 public boolean remove(Object o) { 478 if (o == null) return false; 479 final ReentrantLock lock = this.lock; 480 lock.lock(); 481 try { 482 if (count > 0) { 483 final Object[] items = this.items; 484 final int putIndex = this.putIndex; 485 int i = takeIndex; 486 do { 487 if (o.equals(items[i])) { 488 removeAt(i); 489 return true; 490 } 491 if (++i == items.length) i = 0; 492 } while (i != putIndex); 493 } 494 return false; 495 } finally { 496 lock.unlock(); 497 } 498 } 499 500 /** 501 * Returns {@code true} if this queue contains the specified element. 502 * More formally, returns {@code true} if and only if this queue contains 503 * at least one element {@code e} such that {@code o.equals(e)}. 504 * 505 * @param o object to be checked for containment in this queue 506 * @return {@code true} if this queue contains the specified element 507 */ contains(Object o)508 public boolean contains(Object o) { 509 if (o == null) return false; 510 final ReentrantLock lock = this.lock; 511 lock.lock(); 512 try { 513 if (count > 0) { 514 final Object[] items = this.items; 515 final int putIndex = this.putIndex; 516 int i = takeIndex; 517 do { 518 if (o.equals(items[i])) 519 return true; 520 if (++i == items.length) i = 0; 521 } while (i != putIndex); 522 } 523 return false; 524 } finally { 525 lock.unlock(); 526 } 527 } 528 529 /** 530 * Returns an array containing all of the elements in this queue, in 531 * proper sequence. 532 * 533 * <p>The returned array will be "safe" in that no references to it are 534 * maintained by this queue. (In other words, this method must allocate 535 * a new array). The caller is thus free to modify the returned array. 536 * 537 * <p>This method acts as bridge between array-based and collection-based 538 * APIs. 539 * 540 * @return an array containing all of the elements in this queue 541 */ toArray()542 public Object[] toArray() { 543 final ReentrantLock lock = this.lock; 544 lock.lock(); 545 try { 546 final Object[] items = this.items; 547 final int end = takeIndex + count; 548 final Object[] a = Arrays.copyOfRange(items, takeIndex, end); 549 if (end != putIndex) 550 System.arraycopy(items, 0, a, items.length - takeIndex, putIndex); 551 return a; 552 } finally { 553 lock.unlock(); 554 } 555 } 556 557 /** 558 * Returns an array containing all of the elements in this queue, in 559 * proper sequence; the runtime type of the returned array is that of 560 * the specified array. If the queue fits in the specified array, it 561 * is returned therein. Otherwise, a new array is allocated with the 562 * runtime type of the specified array and the size of this queue. 563 * 564 * <p>If this queue fits in the specified array with room to spare 565 * (i.e., the array has more elements than this queue), the element in 566 * the array immediately following the end of the queue is set to 567 * {@code null}. 568 * 569 * <p>Like the {@link #toArray()} method, this method acts as bridge between 570 * array-based and collection-based APIs. Further, this method allows 571 * precise control over the runtime type of the output array, and may, 572 * under certain circumstances, be used to save allocation costs. 573 * 574 * <p>Suppose {@code x} is a queue known to contain only strings. 575 * The following code can be used to dump the queue into a newly 576 * allocated array of {@code String}: 577 * 578 * <pre> {@code String[] y = x.toArray(new String[0]);}</pre> 579 * 580 * Note that {@code toArray(new Object[0])} is identical in function to 581 * {@code toArray()}. 582 * 583 * @param a the array into which the elements of the queue are to 584 * be stored, if it is big enough; otherwise, a new array of the 585 * same runtime type is allocated for this purpose 586 * @return an array containing all of the elements in this queue 587 * @throws ArrayStoreException if the runtime type of the specified array 588 * is not a supertype of the runtime type of every element in 589 * this queue 590 * @throws NullPointerException if the specified array is null 591 */ 592 @SuppressWarnings("unchecked") toArray(T[] a)593 public <T> T[] toArray(T[] a) { 594 final ReentrantLock lock = this.lock; 595 lock.lock(); 596 try { 597 final Object[] items = this.items; 598 final int count = this.count; 599 final int firstLeg = Math.min(items.length - takeIndex, count); 600 if (a.length < count) { 601 a = (T[]) Arrays.copyOfRange(items, takeIndex, takeIndex + count, 602 a.getClass()); 603 } else { 604 System.arraycopy(items, takeIndex, a, 0, firstLeg); 605 if (a.length > count) 606 a[count] = null; 607 } 608 if (firstLeg < count) 609 System.arraycopy(items, 0, a, firstLeg, putIndex); 610 return a; 611 } finally { 612 lock.unlock(); 613 } 614 } 615 toString()616 public String toString() { 617 return Helpers.collectionToString(this); 618 } 619 620 /** 621 * Atomically removes all of the elements from this queue. 622 * The queue will be empty after this call returns. 623 */ clear()624 public void clear() { 625 final ReentrantLock lock = this.lock; 626 lock.lock(); 627 try { 628 int k = count; 629 if (k > 0) { 630 final Object[] items = this.items; 631 final int putIndex = this.putIndex; 632 int i = takeIndex; 633 do { 634 items[i] = null; 635 if (++i == items.length) i = 0; 636 } while (i != putIndex); 637 takeIndex = putIndex; 638 count = 0; 639 if (itrs != null) 640 itrs.queueIsEmpty(); 641 for (; k > 0 && lock.hasWaiters(notFull); k--) 642 notFull.signal(); 643 } 644 } finally { 645 lock.unlock(); 646 } 647 } 648 649 /** 650 * @throws UnsupportedOperationException {@inheritDoc} 651 * @throws ClassCastException {@inheritDoc} 652 * @throws NullPointerException {@inheritDoc} 653 * @throws IllegalArgumentException {@inheritDoc} 654 */ drainTo(Collection<? super E> c)655 public int drainTo(Collection<? super E> c) { 656 return drainTo(c, Integer.MAX_VALUE); 657 } 658 659 /** 660 * @throws UnsupportedOperationException {@inheritDoc} 661 * @throws ClassCastException {@inheritDoc} 662 * @throws NullPointerException {@inheritDoc} 663 * @throws IllegalArgumentException {@inheritDoc} 664 */ drainTo(Collection<? super E> c, int maxElements)665 public int drainTo(Collection<? super E> c, int maxElements) { 666 Objects.requireNonNull(c); 667 if (c == this) 668 throw new IllegalArgumentException(); 669 if (maxElements <= 0) 670 return 0; 671 final Object[] items = this.items; 672 final ReentrantLock lock = this.lock; 673 lock.lock(); 674 try { 675 int n = Math.min(maxElements, count); 676 int take = takeIndex; 677 int i = 0; 678 try { 679 while (i < n) { 680 @SuppressWarnings("unchecked") 681 E x = (E) items[take]; 682 c.add(x); 683 items[take] = null; 684 if (++take == items.length) take = 0; 685 i++; 686 } 687 return n; 688 } finally { 689 // Restore invariants even if c.add() threw 690 if (i > 0) { 691 count -= i; 692 takeIndex = take; 693 if (itrs != null) { 694 if (count == 0) 695 itrs.queueIsEmpty(); 696 else if (i > take) 697 itrs.takeIndexWrapped(); 698 } 699 for (; i > 0 && lock.hasWaiters(notFull); i--) 700 notFull.signal(); 701 } 702 } 703 } finally { 704 lock.unlock(); 705 } 706 } 707 708 /** 709 * Returns an iterator over the elements in this queue in proper sequence. 710 * The elements will be returned in order from first (head) to last (tail). 711 * 712 * <p>The returned iterator is 713 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 714 * 715 * @return an iterator over the elements in this queue in proper sequence 716 */ iterator()717 public Iterator<E> iterator() { 718 return new Itr(); 719 } 720 721 /** 722 * Shared data between iterators and their queue, allowing queue 723 * modifications to update iterators when elements are removed. 724 * 725 * This adds a lot of complexity for the sake of correctly 726 * handling some uncommon operations, but the combination of 727 * circular-arrays and supporting interior removes (i.e., those 728 * not at head) would cause iterators to sometimes lose their 729 * places and/or (re)report elements they shouldn't. To avoid 730 * this, when a queue has one or more iterators, it keeps iterator 731 * state consistent by: 732 * 733 * (1) keeping track of the number of "cycles", that is, the 734 * number of times takeIndex has wrapped around to 0. 735 * (2) notifying all iterators via the callback removedAt whenever 736 * an interior element is removed (and thus other elements may 737 * be shifted). 738 * 739 * These suffice to eliminate iterator inconsistencies, but 740 * unfortunately add the secondary responsibility of maintaining 741 * the list of iterators. We track all active iterators in a 742 * simple linked list (accessed only when the queue's lock is 743 * held) of weak references to Itr. The list is cleaned up using 744 * 3 different mechanisms: 745 * 746 * (1) Whenever a new iterator is created, do some O(1) checking for 747 * stale list elements. 748 * 749 * (2) Whenever takeIndex wraps around to 0, check for iterators 750 * that have been unused for more than one wrap-around cycle. 751 * 752 * (3) Whenever the queue becomes empty, all iterators are notified 753 * and this entire data structure is discarded. 754 * 755 * So in addition to the removedAt callback that is necessary for 756 * correctness, iterators have the shutdown and takeIndexWrapped 757 * callbacks that help remove stale iterators from the list. 758 * 759 * Whenever a list element is examined, it is expunged if either 760 * the GC has determined that the iterator is discarded, or if the 761 * iterator reports that it is "detached" (does not need any 762 * further state updates). Overhead is maximal when takeIndex 763 * never advances, iterators are discarded before they are 764 * exhausted, and all removals are interior removes, in which case 765 * all stale iterators are discovered by the GC. But even in this 766 * case we don't increase the amortized complexity. 767 * 768 * Care must be taken to keep list sweeping methods from 769 * reentrantly invoking another such method, causing subtle 770 * corruption bugs. 771 */ 772 class Itrs { 773 774 /** 775 * Node in a linked list of weak iterator references. 776 */ 777 private class Node extends WeakReference<Itr> { 778 Node next; 779 Node(Itr iterator, Node next)780 Node(Itr iterator, Node next) { 781 super(iterator); 782 this.next = next; 783 } 784 } 785 786 /** Incremented whenever takeIndex wraps around to 0 */ 787 int cycles; 788 789 /** Linked list of weak iterator references */ 790 private Node head; 791 792 /** Used to expunge stale iterators */ 793 private Node sweeper; 794 795 private static final int SHORT_SWEEP_PROBES = 4; 796 private static final int LONG_SWEEP_PROBES = 16; 797 Itrs(Itr initial)798 Itrs(Itr initial) { 799 register(initial); 800 } 801 802 /** 803 * Sweeps itrs, looking for and expunging stale iterators. 804 * If at least one was found, tries harder to find more. 805 * Called only from iterating thread. 806 * 807 * @param tryHarder whether to start in try-harder mode, because 808 * there is known to be at least one iterator to collect 809 */ doSomeSweeping(boolean tryHarder)810 void doSomeSweeping(boolean tryHarder) { 811 // assert lock.getHoldCount() == 1; 812 // assert head != null; 813 int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES; 814 Node o, p; 815 final Node sweeper = this.sweeper; 816 boolean passedGo; // to limit search to one full sweep 817 818 if (sweeper == null) { 819 o = null; 820 p = head; 821 passedGo = true; 822 } else { 823 o = sweeper; 824 p = o.next; 825 passedGo = false; 826 } 827 828 for (; probes > 0; probes--) { 829 if (p == null) { 830 if (passedGo) 831 break; 832 o = null; 833 p = head; 834 passedGo = true; 835 } 836 final Itr it = p.get(); 837 final Node next = p.next; 838 if (it == null || it.isDetached()) { 839 // found a discarded/exhausted iterator 840 probes = LONG_SWEEP_PROBES; // "try harder" 841 // unlink p 842 p.clear(); 843 p.next = null; 844 if (o == null) { 845 head = next; 846 if (next == null) { 847 // We've run out of iterators to track; retire 848 itrs = null; 849 return; 850 } 851 } 852 else 853 o.next = next; 854 } else { 855 o = p; 856 } 857 p = next; 858 } 859 860 this.sweeper = (p == null) ? null : o; 861 } 862 863 /** 864 * Adds a new iterator to the linked list of tracked iterators. 865 */ register(Itr itr)866 void register(Itr itr) { 867 // assert lock.getHoldCount() == 1; 868 head = new Node(itr, head); 869 } 870 871 /** 872 * Called whenever takeIndex wraps around to 0. 873 * 874 * Notifies all iterators, and expunges any that are now stale. 875 */ takeIndexWrapped()876 void takeIndexWrapped() { 877 // assert lock.getHoldCount() == 1; 878 cycles++; 879 for (Node o = null, p = head; p != null;) { 880 final Itr it = p.get(); 881 final Node next = p.next; 882 if (it == null || it.takeIndexWrapped()) { 883 // unlink p 884 // assert it == null || it.isDetached(); 885 p.clear(); 886 p.next = null; 887 if (o == null) 888 head = next; 889 else 890 o.next = next; 891 } else { 892 o = p; 893 } 894 p = next; 895 } 896 if (head == null) // no more iterators to track 897 itrs = null; 898 } 899 900 /** 901 * Called whenever an interior remove (not at takeIndex) occurred. 902 * 903 * Notifies all iterators, and expunges any that are now stale. 904 */ removedAt(int removedIndex)905 void removedAt(int removedIndex) { 906 for (Node o = null, p = head; p != null;) { 907 final Itr it = p.get(); 908 final Node next = p.next; 909 if (it == null || it.removedAt(removedIndex)) { 910 // unlink p 911 // assert it == null || it.isDetached(); 912 p.clear(); 913 p.next = null; 914 if (o == null) 915 head = next; 916 else 917 o.next = next; 918 } else { 919 o = p; 920 } 921 p = next; 922 } 923 if (head == null) // no more iterators to track 924 itrs = null; 925 } 926 927 /** 928 * Called whenever the queue becomes empty. 929 * 930 * Notifies all active iterators that the queue is empty, 931 * clears all weak refs, and unlinks the itrs datastructure. 932 */ queueIsEmpty()933 void queueIsEmpty() { 934 // assert lock.getHoldCount() == 1; 935 for (Node p = head; p != null; p = p.next) { 936 Itr it = p.get(); 937 if (it != null) { 938 p.clear(); 939 it.shutdown(); 940 } 941 } 942 head = null; 943 itrs = null; 944 } 945 946 /** 947 * Called whenever an element has been dequeued (at takeIndex). 948 */ elementDequeued()949 void elementDequeued() { 950 // assert lock.getHoldCount() == 1; 951 if (count == 0) 952 queueIsEmpty(); 953 else if (takeIndex == 0) 954 takeIndexWrapped(); 955 } 956 } 957 958 /** 959 * Iterator for ArrayBlockingQueue. 960 * 961 * To maintain weak consistency with respect to puts and takes, we 962 * read ahead one slot, so as to not report hasNext true but then 963 * not have an element to return. 964 * 965 * We switch into "detached" mode (allowing prompt unlinking from 966 * itrs without help from the GC) when all indices are negative, or 967 * when hasNext returns false for the first time. This allows the 968 * iterator to track concurrent updates completely accurately, 969 * except for the corner case of the user calling Iterator.remove() 970 * after hasNext() returned false. Even in this case, we ensure 971 * that we don't remove the wrong element by keeping track of the 972 * expected element to remove, in lastItem. Yes, we may fail to 973 * remove lastItem from the queue if it moved due to an interleaved 974 * interior remove while in detached mode. 975 */ 976 private class Itr implements Iterator<E> { 977 /** Index to look for new nextItem; NONE at end */ 978 private int cursor; 979 980 /** Element to be returned by next call to next(); null if none */ 981 private E nextItem; 982 983 /** Index of nextItem; NONE if none, REMOVED if removed elsewhere */ 984 private int nextIndex; 985 986 /** Last element returned; null if none or not detached. */ 987 private E lastItem; 988 989 /** Index of lastItem, NONE if none, REMOVED if removed elsewhere */ 990 private int lastRet; 991 992 /** Previous value of takeIndex, or DETACHED when detached */ 993 private int prevTakeIndex; 994 995 /** Previous value of iters.cycles */ 996 private int prevCycles; 997 998 /** Special index value indicating "not available" or "undefined" */ 999 private static final int NONE = -1; 1000 1001 /** 1002 * Special index value indicating "removed elsewhere", that is, 1003 * removed by some operation other than a call to this.remove(). 1004 */ 1005 private static final int REMOVED = -2; 1006 1007 /** Special value for prevTakeIndex indicating "detached mode" */ 1008 private static final int DETACHED = -3; 1009 Itr()1010 Itr() { 1011 // assert lock.getHoldCount() == 0; 1012 lastRet = NONE; 1013 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1014 lock.lock(); 1015 try { 1016 if (count == 0) { 1017 // assert itrs == null; 1018 cursor = NONE; 1019 nextIndex = NONE; 1020 prevTakeIndex = DETACHED; 1021 } else { 1022 final int takeIndex = ArrayBlockingQueue.this.takeIndex; 1023 prevTakeIndex = takeIndex; 1024 nextItem = itemAt(nextIndex = takeIndex); 1025 cursor = incCursor(takeIndex); 1026 if (itrs == null) { 1027 itrs = new Itrs(this); 1028 } else { 1029 itrs.register(this); // in this order 1030 itrs.doSomeSweeping(false); 1031 } 1032 prevCycles = itrs.cycles; 1033 // assert takeIndex >= 0; 1034 // assert prevTakeIndex == takeIndex; 1035 // assert nextIndex >= 0; 1036 // assert nextItem != null; 1037 } 1038 } finally { 1039 lock.unlock(); 1040 } 1041 } 1042 isDetached()1043 boolean isDetached() { 1044 // assert lock.getHoldCount() == 1; 1045 return prevTakeIndex < 0; 1046 } 1047 incCursor(int index)1048 private int incCursor(int index) { 1049 // assert lock.getHoldCount() == 1; 1050 if (++index == items.length) index = 0; 1051 if (index == putIndex) index = NONE; 1052 return index; 1053 } 1054 1055 /** 1056 * Returns true if index is invalidated by the given number of 1057 * dequeues, starting from prevTakeIndex. 1058 */ invalidated(int index, int prevTakeIndex, long dequeues, int length)1059 private boolean invalidated(int index, int prevTakeIndex, 1060 long dequeues, int length) { 1061 if (index < 0) 1062 return false; 1063 int distance = index - prevTakeIndex; 1064 if (distance < 0) 1065 distance += length; 1066 return dequeues > distance; 1067 } 1068 1069 /** 1070 * Adjusts indices to incorporate all dequeues since the last 1071 * operation on this iterator. Call only from iterating thread. 1072 */ incorporateDequeues()1073 private void incorporateDequeues() { 1074 // assert lock.getHoldCount() == 1; 1075 // assert itrs != null; 1076 // assert !isDetached(); 1077 // assert count > 0; 1078 1079 final int cycles = itrs.cycles; 1080 final int takeIndex = ArrayBlockingQueue.this.takeIndex; 1081 final int prevCycles = this.prevCycles; 1082 final int prevTakeIndex = this.prevTakeIndex; 1083 1084 if (cycles != prevCycles || takeIndex != prevTakeIndex) { 1085 final int len = items.length; 1086 // how far takeIndex has advanced since the previous 1087 // operation of this iterator 1088 long dequeues = (cycles - prevCycles) * len 1089 + (takeIndex - prevTakeIndex); 1090 1091 // Check indices for invalidation 1092 if (invalidated(lastRet, prevTakeIndex, dequeues, len)) 1093 lastRet = REMOVED; 1094 if (invalidated(nextIndex, prevTakeIndex, dequeues, len)) 1095 nextIndex = REMOVED; 1096 if (invalidated(cursor, prevTakeIndex, dequeues, len)) 1097 cursor = takeIndex; 1098 1099 if (cursor < 0 && nextIndex < 0 && lastRet < 0) 1100 detach(); 1101 else { 1102 this.prevCycles = cycles; 1103 this.prevTakeIndex = takeIndex; 1104 } 1105 } 1106 } 1107 1108 /** 1109 * Called when itrs should stop tracking this iterator, either 1110 * because there are no more indices to update (cursor < 0 && 1111 * nextIndex < 0 && lastRet < 0) or as a special exception, when 1112 * lastRet >= 0, because hasNext() is about to return false for the 1113 * first time. Call only from iterating thread. 1114 */ detach()1115 private void detach() { 1116 // Switch to detached mode 1117 // assert lock.getHoldCount() == 1; 1118 // assert cursor == NONE; 1119 // assert nextIndex < 0; 1120 // assert lastRet < 0 || nextItem == null; 1121 // assert lastRet < 0 ^ lastItem != null; 1122 if (prevTakeIndex >= 0) { 1123 // assert itrs != null; 1124 prevTakeIndex = DETACHED; 1125 // try to unlink from itrs (but not too hard) 1126 itrs.doSomeSweeping(true); 1127 } 1128 } 1129 1130 /** 1131 * For performance reasons, we would like not to acquire a lock in 1132 * hasNext in the common case. To allow for this, we only access 1133 * fields (i.e. nextItem) that are not modified by update operations 1134 * triggered by queue modifications. 1135 */ hasNext()1136 public boolean hasNext() { 1137 // assert lock.getHoldCount() == 0; 1138 if (nextItem != null) 1139 return true; 1140 noNext(); 1141 return false; 1142 } 1143 noNext()1144 private void noNext() { 1145 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1146 lock.lock(); 1147 try { 1148 // assert cursor == NONE; 1149 // assert nextIndex == NONE; 1150 if (!isDetached()) { 1151 // assert lastRet >= 0; 1152 incorporateDequeues(); // might update lastRet 1153 if (lastRet >= 0) { 1154 lastItem = itemAt(lastRet); 1155 // assert lastItem != null; 1156 detach(); 1157 } 1158 } 1159 // assert isDetached(); 1160 // assert lastRet < 0 ^ lastItem != null; 1161 } finally { 1162 lock.unlock(); 1163 } 1164 } 1165 next()1166 public E next() { 1167 // assert lock.getHoldCount() == 0; 1168 final E x = nextItem; 1169 if (x == null) 1170 throw new NoSuchElementException(); 1171 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1172 lock.lock(); 1173 try { 1174 if (!isDetached()) 1175 incorporateDequeues(); 1176 // assert nextIndex != NONE; 1177 // assert lastItem == null; 1178 lastRet = nextIndex; 1179 final int cursor = this.cursor; 1180 if (cursor >= 0) { 1181 nextItem = itemAt(nextIndex = cursor); 1182 // assert nextItem != null; 1183 this.cursor = incCursor(cursor); 1184 } else { 1185 nextIndex = NONE; 1186 nextItem = null; 1187 } 1188 } finally { 1189 lock.unlock(); 1190 } 1191 return x; 1192 } 1193 remove()1194 public void remove() { 1195 // assert lock.getHoldCount() == 0; 1196 final ReentrantLock lock = ArrayBlockingQueue.this.lock; 1197 lock.lock(); 1198 try { 1199 if (!isDetached()) 1200 incorporateDequeues(); // might update lastRet or detach 1201 final int lastRet = this.lastRet; 1202 this.lastRet = NONE; 1203 if (lastRet >= 0) { 1204 if (!isDetached()) 1205 removeAt(lastRet); 1206 else { 1207 final E lastItem = this.lastItem; 1208 // assert lastItem != null; 1209 this.lastItem = null; 1210 if (itemAt(lastRet) == lastItem) 1211 removeAt(lastRet); 1212 } 1213 } else if (lastRet == NONE) 1214 throw new IllegalStateException(); 1215 // else lastRet == REMOVED and the last returned element was 1216 // previously asynchronously removed via an operation other 1217 // than this.remove(), so nothing to do. 1218 1219 if (cursor < 0 && nextIndex < 0) 1220 detach(); 1221 } finally { 1222 lock.unlock(); 1223 // assert lastRet == NONE; 1224 // assert lastItem == null; 1225 } 1226 } 1227 1228 /** 1229 * Called to notify the iterator that the queue is empty, or that it 1230 * has fallen hopelessly behind, so that it should abandon any 1231 * further iteration, except possibly to return one more element 1232 * from next(), as promised by returning true from hasNext(). 1233 */ shutdown()1234 void shutdown() { 1235 // assert lock.getHoldCount() == 1; 1236 cursor = NONE; 1237 if (nextIndex >= 0) 1238 nextIndex = REMOVED; 1239 if (lastRet >= 0) { 1240 lastRet = REMOVED; 1241 lastItem = null; 1242 } 1243 prevTakeIndex = DETACHED; 1244 // Don't set nextItem to null because we must continue to be 1245 // able to return it on next(). 1246 // 1247 // Caller will unlink from itrs when convenient. 1248 } 1249 distance(int index, int prevTakeIndex, int length)1250 private int distance(int index, int prevTakeIndex, int length) { 1251 int distance = index - prevTakeIndex; 1252 if (distance < 0) 1253 distance += length; 1254 return distance; 1255 } 1256 1257 /** 1258 * Called whenever an interior remove (not at takeIndex) occurred. 1259 * 1260 * @return true if this iterator should be unlinked from itrs 1261 */ removedAt(int removedIndex)1262 boolean removedAt(int removedIndex) { 1263 // assert lock.getHoldCount() == 1; 1264 if (isDetached()) 1265 return true; 1266 1267 final int takeIndex = ArrayBlockingQueue.this.takeIndex; 1268 final int prevTakeIndex = this.prevTakeIndex; 1269 final int len = items.length; 1270 // distance from prevTakeIndex to removedIndex 1271 final int removedDistance = 1272 len * (itrs.cycles - this.prevCycles 1273 + ((removedIndex < takeIndex) ? 1 : 0)) 1274 + (removedIndex - prevTakeIndex); 1275 // assert itrs.cycles - this.prevCycles >= 0; 1276 // assert itrs.cycles - this.prevCycles <= 1; 1277 // assert removedDistance > 0; 1278 // assert removedIndex != takeIndex; 1279 int cursor = this.cursor; 1280 if (cursor >= 0) { 1281 int x = distance(cursor, prevTakeIndex, len); 1282 if (x == removedDistance) { 1283 if (cursor == putIndex) 1284 this.cursor = cursor = NONE; 1285 } 1286 else if (x > removedDistance) { 1287 // assert cursor != prevTakeIndex; 1288 this.cursor = cursor = dec(cursor); 1289 } 1290 } 1291 int lastRet = this.lastRet; 1292 if (lastRet >= 0) { 1293 int x = distance(lastRet, prevTakeIndex, len); 1294 if (x == removedDistance) 1295 this.lastRet = lastRet = REMOVED; 1296 else if (x > removedDistance) 1297 this.lastRet = lastRet = dec(lastRet); 1298 } 1299 int nextIndex = this.nextIndex; 1300 if (nextIndex >= 0) { 1301 int x = distance(nextIndex, prevTakeIndex, len); 1302 if (x == removedDistance) 1303 this.nextIndex = nextIndex = REMOVED; 1304 else if (x > removedDistance) 1305 this.nextIndex = nextIndex = dec(nextIndex); 1306 } 1307 if (cursor < 0 && nextIndex < 0 && lastRet < 0) { 1308 this.prevTakeIndex = DETACHED; 1309 return true; 1310 } 1311 return false; 1312 } 1313 1314 /** 1315 * Called whenever takeIndex wraps around to zero. 1316 * 1317 * @return true if this iterator should be unlinked from itrs 1318 */ takeIndexWrapped()1319 boolean takeIndexWrapped() { 1320 // assert lock.getHoldCount() == 1; 1321 if (isDetached()) 1322 return true; 1323 if (itrs.cycles - prevCycles > 1) { 1324 // All the elements that existed at the time of the last 1325 // operation are gone, so abandon further iteration. 1326 shutdown(); 1327 return true; 1328 } 1329 return false; 1330 } 1331 1332 // /** Uncomment for debugging. */ 1333 // public String toString() { 1334 // return ("cursor=" + cursor + " " + 1335 // "nextIndex=" + nextIndex + " " + 1336 // "lastRet=" + lastRet + " " + 1337 // "nextItem=" + nextItem + " " + 1338 // "lastItem=" + lastItem + " " + 1339 // "prevCycles=" + prevCycles + " " + 1340 // "prevTakeIndex=" + prevTakeIndex + " " + 1341 // "size()=" + size() + " " + 1342 // "remainingCapacity()=" + remainingCapacity()); 1343 // } 1344 } 1345 1346 /** 1347 * Returns a {@link Spliterator} over the elements in this queue. 1348 * 1349 * <p>The returned spliterator is 1350 * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>. 1351 * 1352 * <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT}, 1353 * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}. 1354 * 1355 * @implNote 1356 * The {@code Spliterator} implements {@code trySplit} to permit limited 1357 * parallelism. 1358 * 1359 * @return a {@code Spliterator} over the elements in this queue 1360 * @since 1.8 1361 */ spliterator()1362 public Spliterator<E> spliterator() { 1363 return Spliterators.spliterator 1364 (this, (Spliterator.ORDERED | 1365 Spliterator.NONNULL | 1366 Spliterator.CONCURRENT)); 1367 } 1368 1369 } 1370