1 /* 2 * Copyright (c) 2008, 2009, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.nio.channels.Channel; 29 import java.nio.channels.AsynchronousChannelGroup; 30 import java.nio.channels.spi.AsynchronousChannelProvider; 31 import java.io.IOException; 32 import java.io.FileDescriptor; 33 import java.util.Queue; 34 import java.util.concurrent.*; 35 import java.util.concurrent.atomic.AtomicInteger; 36 import java.util.concurrent.atomic.AtomicBoolean; 37 import java.security.PrivilegedAction; 38 import java.security.AccessController; 39 import java.security.AccessControlContext; 40 import sun.security.action.GetIntegerAction; 41 42 /** 43 * Base implementation of AsynchronousChannelGroup 44 */ 45 46 abstract class AsynchronousChannelGroupImpl 47 extends AsynchronousChannelGroup implements Executor 48 { 49 // number of internal threads handling I/O events when using an unbounded 50 // thread pool. Internal threads do not dispatch to completion handlers. 51 private static final int internalThreadCount = AccessController.doPrivileged( 52 new GetIntegerAction("sun.nio.ch.internalThreadPoolSize", 1)); 53 54 // associated thread pool 55 private final ThreadPool pool; 56 57 // number of tasks running (including internal) 58 private final AtomicInteger threadCount = new AtomicInteger(); 59 60 // associated Executor for timeouts 61 private ScheduledThreadPoolExecutor timeoutExecutor; 62 63 // task queue for when using a fixed thread pool. In that case, thread 64 // waiting on I/O events must be awokon to poll tasks from this queue. 65 private final Queue<Runnable> taskQueue; 66 67 // group shutdown 68 private final AtomicBoolean shutdown = new AtomicBoolean(); 69 private final Object shutdownNowLock = new Object(); 70 private volatile boolean terminateInitiated; 71 AsynchronousChannelGroupImpl(AsynchronousChannelProvider provider, ThreadPool pool)72 AsynchronousChannelGroupImpl(AsynchronousChannelProvider provider, 73 ThreadPool pool) 74 { 75 super(provider); 76 this.pool = pool; 77 78 if (pool.isFixedThreadPool()) { 79 taskQueue = new ConcurrentLinkedQueue<Runnable>(); 80 } else { 81 taskQueue = null; // not used 82 } 83 84 // use default thread factory as thread should not be visible to 85 // application (it doesn't execute completion handlers). 86 this.timeoutExecutor = (ScheduledThreadPoolExecutor) 87 Executors.newScheduledThreadPool(1, ThreadPool.defaultThreadFactory()); 88 this.timeoutExecutor.setRemoveOnCancelPolicy(true); 89 } 90 executor()91 final ExecutorService executor() { 92 return pool.executor(); 93 } 94 isFixedThreadPool()95 final boolean isFixedThreadPool() { 96 return pool.isFixedThreadPool(); 97 } 98 fixedThreadCount()99 final int fixedThreadCount() { 100 if (isFixedThreadPool()) { 101 return pool.poolSize(); 102 } else { 103 return pool.poolSize() + internalThreadCount; 104 } 105 } 106 bindToGroup(final Runnable task)107 private Runnable bindToGroup(final Runnable task) { 108 final AsynchronousChannelGroupImpl thisGroup = this; 109 return new Runnable() { 110 public void run() { 111 Invoker.bindToGroup(thisGroup); 112 task.run(); 113 } 114 }; 115 } 116 117 private void startInternalThread(final Runnable task) { 118 AccessController.doPrivileged(new PrivilegedAction<Void>() { 119 @Override 120 public Void run() { 121 // internal threads should not be visible to application so 122 // cannot use user-supplied thread factory 123 ThreadPool.defaultThreadFactory().newThread(task).start(); 124 return null; 125 } 126 }); 127 } 128 129 protected final void startThreads(Runnable task) { 130 if (!isFixedThreadPool()) { 131 for (int i=0; i<internalThreadCount; i++) { 132 startInternalThread(task); 133 threadCount.incrementAndGet(); 134 } 135 } 136 if (pool.poolSize() > 0) { 137 task = bindToGroup(task); 138 try { 139 for (int i=0; i<pool.poolSize(); i++) { 140 pool.executor().execute(task); 141 threadCount.incrementAndGet(); 142 } 143 } catch (RejectedExecutionException x) { 144 // nothing we can do 145 } 146 } 147 } 148 149 final int threadCount() { 150 return threadCount.get(); 151 } 152 153 /** 154 * Invoked by tasks as they terminate 155 */ 156 final int threadExit(Runnable task, boolean replaceMe) { 157 if (replaceMe) { 158 try { 159 if (Invoker.isBoundToAnyGroup()) { 160 // submit new task to replace this thread 161 pool.executor().execute(bindToGroup(task)); 162 } else { 163 // replace internal thread 164 startInternalThread(task); 165 } 166 return threadCount.get(); 167 } catch (RejectedExecutionException x) { 168 // unable to replace 169 } 170 } 171 return threadCount.decrementAndGet(); 172 } 173 174 /** 175 * Wakes up a thread waiting for I/O events to execute the given task. 176 */ 177 abstract void executeOnHandlerTask(Runnable task); 178 179 /** 180 * For a fixed thread pool the task is queued to a thread waiting on I/O 181 * events. For other thread pools we simply submit the task to the thread 182 * pool. 183 */ 184 final void executeOnPooledThread(Runnable task) { 185 if (isFixedThreadPool()) { 186 executeOnHandlerTask(task); 187 } else { 188 pool.executor().execute(bindToGroup(task)); 189 } 190 } 191 192 final void offerTask(Runnable task) { 193 taskQueue.offer(task); 194 } 195 196 final Runnable pollTask() { 197 return (taskQueue == null) ? null : taskQueue.poll(); 198 } 199 200 final Future<?> schedule(Runnable task, long timeout, TimeUnit unit) { 201 try { 202 return timeoutExecutor.schedule(task, timeout, unit); 203 } catch (RejectedExecutionException rej) { 204 if (terminateInitiated) { 205 // no timeout scheduled as group is terminating 206 return null; 207 } 208 throw new AssertionError(rej); 209 } 210 } 211 212 @Override 213 public final boolean isShutdown() { 214 return shutdown.get(); 215 } 216 217 @Override 218 public final boolean isTerminated() { 219 return pool.executor().isTerminated(); 220 } 221 222 /** 223 * Returns true if there are no channels in the group 224 */ 225 abstract boolean isEmpty(); 226 227 /** 228 * Attaches a foreign channel to this group. 229 */ 230 abstract Object attachForeignChannel(Channel channel, FileDescriptor fdo) 231 throws IOException; 232 233 /** 234 * Detaches a foreign channel from this group. 235 */ 236 abstract void detachForeignChannel(Object key); 237 238 /** 239 * Closes all channels in the group 240 */ 241 abstract void closeAllChannels() throws IOException; 242 243 /** 244 * Shutdown all tasks waiting for I/O events. 245 */ 246 abstract void shutdownHandlerTasks(); 247 248 private void shutdownExecutors() { 249 AccessController.doPrivileged(new PrivilegedAction<Void>() { 250 public Void run() { 251 pool.executor().shutdown(); 252 timeoutExecutor.shutdown(); 253 return null; 254 } 255 }); 256 } 257 258 @Override 259 public final void shutdown() { 260 if (shutdown.getAndSet(true)) { 261 // already shutdown 262 return; 263 } 264 // if there are channels in the group then shutdown will continue 265 // when the last channel is closed 266 if (!isEmpty()) { 267 return; 268 } 269 // initiate termination (acquire shutdownNowLock to ensure that other 270 // threads invoking shutdownNow will block). 271 synchronized (shutdownNowLock) { 272 if (!terminateInitiated) { 273 terminateInitiated = true; 274 shutdownHandlerTasks(); 275 shutdownExecutors(); 276 } 277 } 278 } 279 280 @Override 281 public final void shutdownNow() throws IOException { 282 shutdown.set(true); 283 synchronized (shutdownNowLock) { 284 if (!terminateInitiated) { 285 terminateInitiated = true; 286 closeAllChannels(); 287 shutdownHandlerTasks(); 288 shutdownExecutors(); 289 } 290 } 291 } 292 293 /** 294 * For use by AsynchronousFileChannel to release resources without shutting 295 * down the thread pool. 296 */ 297 final void detachFromThreadPool() { 298 if (shutdown.getAndSet(true)) 299 throw new AssertionError("Already shutdown"); 300 if (!isEmpty()) 301 throw new AssertionError("Group not empty"); 302 shutdownHandlerTasks(); 303 } 304 305 @Override 306 public final boolean awaitTermination(long timeout, TimeUnit unit) 307 throws InterruptedException 308 { 309 return pool.executor().awaitTermination(timeout, unit); 310 } 311 312 /** 313 * Executes the given command on one of the channel group's pooled threads. 314 */ 315 @Override 316 public final void execute(Runnable task) { 317 SecurityManager sm = System.getSecurityManager(); 318 if (sm != null) { 319 // when a security manager is installed then the user's task 320 // must be run with the current calling context 321 final AccessControlContext acc = AccessController.getContext(); 322 final Runnable delegate = task; 323 task = new Runnable() { 324 @Override 325 public void run() { 326 AccessController.doPrivileged(new PrivilegedAction<Void>() { 327 @Override 328 public Void run() { 329 delegate.run(); 330 return null; 331 } 332 }, acc); 333 } 334 }; 335 } 336 executeOnPooledThread(task); 337 } 338 } 339