1 /*
2  * Copyright (c) 2008, 2009, Oracle and/or its affiliates. All rights reserved.
3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4  *
5  * This code is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 only, as
7  * published by the Free Software Foundation.  Oracle designates this
8  * particular file as subject to the "Classpath" exception as provided
9  * by Oracle in the LICENSE file that accompanied this code.
10  *
11  * This code is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
14  * version 2 for more details (a copy is included in the LICENSE file that
15  * accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License version
18  * 2 along with this work; if not, write to the Free Software Foundation,
19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20  *
21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22  * or visit www.oracle.com if you need additional information or have any
23  * questions.
24  */
25 
26 package sun.nio.ch;
27 
28 import java.nio.channels.Channel;
29 import java.nio.channels.AsynchronousChannelGroup;
30 import java.nio.channels.spi.AsynchronousChannelProvider;
31 import java.io.IOException;
32 import java.io.FileDescriptor;
33 import java.util.Queue;
34 import java.util.concurrent.*;
35 import java.util.concurrent.atomic.AtomicInteger;
36 import java.util.concurrent.atomic.AtomicBoolean;
37 import java.security.PrivilegedAction;
38 import java.security.AccessController;
39 import java.security.AccessControlContext;
40 import sun.security.action.GetIntegerAction;
41 
42 /**
43  * Base implementation of AsynchronousChannelGroup
44  */
45 
46 abstract class AsynchronousChannelGroupImpl
47     extends AsynchronousChannelGroup implements Executor
48 {
49     // number of internal threads handling I/O events when using an unbounded
50     // thread pool. Internal threads do not dispatch to completion handlers.
51     private static final int internalThreadCount = AccessController.doPrivileged(
52         new GetIntegerAction("sun.nio.ch.internalThreadPoolSize", 1));
53 
54     // associated thread pool
55     private final ThreadPool pool;
56 
57     // number of tasks running (including internal)
58     private final AtomicInteger threadCount = new AtomicInteger();
59 
60     // associated Executor for timeouts
61     private ScheduledThreadPoolExecutor timeoutExecutor;
62 
63     // task queue for when using a fixed thread pool. In that case, thread
64     // waiting on I/O events must be awokon to poll tasks from this queue.
65     private final Queue<Runnable> taskQueue;
66 
67     // group shutdown
68     private final AtomicBoolean shutdown = new AtomicBoolean();
69     private final Object shutdownNowLock = new Object();
70     private volatile boolean terminateInitiated;
71 
AsynchronousChannelGroupImpl(AsynchronousChannelProvider provider, ThreadPool pool)72     AsynchronousChannelGroupImpl(AsynchronousChannelProvider provider,
73                                  ThreadPool pool)
74     {
75         super(provider);
76         this.pool = pool;
77 
78         if (pool.isFixedThreadPool()) {
79             taskQueue = new ConcurrentLinkedQueue<Runnable>();
80         } else {
81             taskQueue = null;   // not used
82         }
83 
84         // use default thread factory as thread should not be visible to
85         // application (it doesn't execute completion handlers).
86         this.timeoutExecutor = (ScheduledThreadPoolExecutor)
87             Executors.newScheduledThreadPool(1, ThreadPool.defaultThreadFactory());
88         this.timeoutExecutor.setRemoveOnCancelPolicy(true);
89     }
90 
executor()91     final ExecutorService executor() {
92         return pool.executor();
93     }
94 
isFixedThreadPool()95     final boolean isFixedThreadPool() {
96         return pool.isFixedThreadPool();
97     }
98 
fixedThreadCount()99     final int fixedThreadCount() {
100         if (isFixedThreadPool()) {
101             return pool.poolSize();
102         } else {
103             return pool.poolSize() + internalThreadCount;
104         }
105     }
106 
bindToGroup(final Runnable task)107     private Runnable bindToGroup(final Runnable task) {
108         final AsynchronousChannelGroupImpl thisGroup = this;
109         return new Runnable() {
110             public void run() {
111                 Invoker.bindToGroup(thisGroup);
112                 task.run();
113             }
114         };
115     }
116 
117     private void startInternalThread(final Runnable task) {
118         AccessController.doPrivileged(new PrivilegedAction<Void>() {
119             @Override
120             public Void run() {
121                 // internal threads should not be visible to application so
122                 // cannot use user-supplied thread factory
123                 ThreadPool.defaultThreadFactory().newThread(task).start();
124                 return null;
125             }
126          });
127     }
128 
129     protected final void startThreads(Runnable task) {
130         if (!isFixedThreadPool()) {
131             for (int i=0; i<internalThreadCount; i++) {
132                 startInternalThread(task);
133                 threadCount.incrementAndGet();
134             }
135         }
136         if (pool.poolSize() > 0) {
137             task = bindToGroup(task);
138             try {
139                 for (int i=0; i<pool.poolSize(); i++) {
140                     pool.executor().execute(task);
141                     threadCount.incrementAndGet();
142                 }
143             } catch (RejectedExecutionException  x) {
144                 // nothing we can do
145             }
146         }
147     }
148 
149     final int threadCount() {
150         return threadCount.get();
151     }
152 
153     /**
154      * Invoked by tasks as they terminate
155      */
156     final int threadExit(Runnable task, boolean replaceMe) {
157         if (replaceMe) {
158             try {
159                 if (Invoker.isBoundToAnyGroup()) {
160                     // submit new task to replace this thread
161                     pool.executor().execute(bindToGroup(task));
162                 } else {
163                     // replace internal thread
164                     startInternalThread(task);
165                 }
166                 return threadCount.get();
167             } catch (RejectedExecutionException x) {
168                 // unable to replace
169             }
170         }
171         return threadCount.decrementAndGet();
172     }
173 
174     /**
175      * Wakes up a thread waiting for I/O events to execute the given task.
176      */
177     abstract void executeOnHandlerTask(Runnable task);
178 
179     /**
180      * For a fixed thread pool the task is queued to a thread waiting on I/O
181      * events. For other thread pools we simply submit the task to the thread
182      * pool.
183      */
184     final void executeOnPooledThread(Runnable task) {
185         if (isFixedThreadPool()) {
186             executeOnHandlerTask(task);
187         } else {
188             pool.executor().execute(bindToGroup(task));
189         }
190     }
191 
192     final void offerTask(Runnable task) {
193         taskQueue.offer(task);
194     }
195 
196     final Runnable pollTask() {
197         return (taskQueue == null) ? null : taskQueue.poll();
198     }
199 
200     final Future<?> schedule(Runnable task, long timeout, TimeUnit unit) {
201         try {
202             return timeoutExecutor.schedule(task, timeout, unit);
203         } catch (RejectedExecutionException rej) {
204             if (terminateInitiated) {
205                 // no timeout scheduled as group is terminating
206                 return null;
207             }
208             throw new AssertionError(rej);
209         }
210     }
211 
212     @Override
213     public final boolean isShutdown() {
214         return shutdown.get();
215     }
216 
217     @Override
218     public final boolean isTerminated()  {
219         return pool.executor().isTerminated();
220     }
221 
222     /**
223      * Returns true if there are no channels in the group
224      */
225     abstract boolean isEmpty();
226 
227     /**
228      * Attaches a foreign channel to this group.
229      */
230     abstract Object attachForeignChannel(Channel channel, FileDescriptor fdo)
231         throws IOException;
232 
233     /**
234      * Detaches a foreign channel from this group.
235      */
236     abstract void detachForeignChannel(Object key);
237 
238     /**
239      * Closes all channels in the group
240      */
241     abstract void closeAllChannels() throws IOException;
242 
243     /**
244      * Shutdown all tasks waiting for I/O events.
245      */
246     abstract void shutdownHandlerTasks();
247 
248     private void shutdownExecutors() {
249         AccessController.doPrivileged(new PrivilegedAction<Void>() {
250             public Void run() {
251                 pool.executor().shutdown();
252                 timeoutExecutor.shutdown();
253                 return null;
254             }
255         });
256     }
257 
258     @Override
259     public final void shutdown() {
260         if (shutdown.getAndSet(true)) {
261             // already shutdown
262             return;
263         }
264         // if there are channels in the group then shutdown will continue
265         // when the last channel is closed
266         if (!isEmpty()) {
267             return;
268         }
269         // initiate termination (acquire shutdownNowLock to ensure that other
270         // threads invoking shutdownNow will block).
271         synchronized (shutdownNowLock) {
272             if (!terminateInitiated) {
273                 terminateInitiated = true;
274                 shutdownHandlerTasks();
275                 shutdownExecutors();
276             }
277         }
278     }
279 
280     @Override
281     public final void shutdownNow() throws IOException {
282         shutdown.set(true);
283         synchronized (shutdownNowLock) {
284             if (!terminateInitiated) {
285                 terminateInitiated = true;
286                 closeAllChannels();
287                 shutdownHandlerTasks();
288                 shutdownExecutors();
289             }
290         }
291     }
292 
293     /**
294      * For use by AsynchronousFileChannel to release resources without shutting
295      * down the thread pool.
296      */
297     final void detachFromThreadPool() {
298         if (shutdown.getAndSet(true))
299             throw new AssertionError("Already shutdown");
300         if (!isEmpty())
301             throw new AssertionError("Group not empty");
302         shutdownHandlerTasks();
303     }
304 
305     @Override
306     public final boolean awaitTermination(long timeout, TimeUnit unit)
307         throws InterruptedException
308     {
309         return pool.executor().awaitTermination(timeout, unit);
310     }
311 
312     /**
313      * Executes the given command on one of the channel group's pooled threads.
314      */
315     @Override
316     public final void execute(Runnable task) {
317         SecurityManager sm = System.getSecurityManager();
318         if (sm != null) {
319             // when a security manager is installed then the user's task
320             // must be run with the current calling context
321             final AccessControlContext acc = AccessController.getContext();
322             final Runnable delegate = task;
323             task = new Runnable() {
324                 @Override
325                 public void run() {
326                     AccessController.doPrivileged(new PrivilegedAction<Void>() {
327                         @Override
328                         public Void run() {
329                             delegate.run();
330                             return null;
331                         }
332                     }, acc);
333                 }
334             };
335         }
336         executeOnPooledThread(task);
337     }
338 }
339