1 /* 2 * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.nio.ByteBuffer; 29 import java.nio.channels.*; 30 import java.net.SocketOption; 31 import java.net.StandardSocketOptions; 32 import java.net.SocketAddress; 33 import java.net.InetSocketAddress; 34 import java.io.IOException; 35 import java.io.FileDescriptor; 36 import java.util.Set; 37 import java.util.HashSet; 38 import java.util.Collections; 39 import java.util.concurrent.*; 40 import java.util.concurrent.locks.*; 41 import sun.net.NetHooks; 42 import sun.net.ExtendedOptionsImpl; 43 44 /** 45 * Base implementation of AsynchronousSocketChannel 46 */ 47 48 abstract class AsynchronousSocketChannelImpl 49 extends AsynchronousSocketChannel 50 implements Cancellable, Groupable 51 { 52 protected final FileDescriptor fd; 53 54 // protects state, localAddress, and remoteAddress 55 protected final Object stateLock = new Object(); 56 57 protected volatile InetSocketAddress localAddress = null; 58 protected volatile InetSocketAddress remoteAddress = null; 59 60 // State, increases monotonically 61 static final int ST_UNINITIALIZED = -1; 62 static final int ST_UNCONNECTED = 0; 63 static final int ST_PENDING = 1; 64 static final int ST_CONNECTED = 2; 65 protected volatile int state = ST_UNINITIALIZED; 66 67 // reading state 68 private final Object readLock = new Object(); 69 private boolean reading; 70 private boolean readShutdown; 71 private boolean readKilled; // further reading disallowed due to timeout 72 73 // writing state 74 private final Object writeLock = new Object(); 75 private boolean writing; 76 private boolean writeShutdown; 77 private boolean writeKilled; // further writing disallowed due to timeout 78 79 // close support 80 private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); 81 private volatile boolean open = true; 82 83 // set true when exclusive binding is on and SO_REUSEADDR is emulated 84 private boolean isReuseAddress; 85 AsynchronousSocketChannelImpl(AsynchronousChannelGroupImpl group)86 AsynchronousSocketChannelImpl(AsynchronousChannelGroupImpl group) 87 throws IOException 88 { 89 super(group.provider()); 90 this.fd = Net.socket(true); 91 this.state = ST_UNCONNECTED; 92 } 93 94 // Constructor for sockets obtained from AsynchronousServerSocketChannelImpl AsynchronousSocketChannelImpl(AsynchronousChannelGroupImpl group, FileDescriptor fd, InetSocketAddress remote)95 AsynchronousSocketChannelImpl(AsynchronousChannelGroupImpl group, 96 FileDescriptor fd, 97 InetSocketAddress remote) 98 throws IOException 99 { 100 super(group.provider()); 101 this.fd = fd; 102 this.state = ST_CONNECTED; 103 this.localAddress = Net.localAddress(fd); 104 this.remoteAddress = remote; 105 } 106 107 @Override isOpen()108 public final boolean isOpen() { 109 return open; 110 } 111 112 /** 113 * Marks beginning of access to file descriptor/handle 114 */ begin()115 final void begin() throws IOException { 116 closeLock.readLock().lock(); 117 if (!isOpen()) 118 throw new ClosedChannelException(); 119 } 120 121 /** 122 * Marks end of access to file descriptor/handle 123 */ end()124 final void end() { 125 closeLock.readLock().unlock(); 126 } 127 128 /** 129 * Invoked to close socket and release other resources. 130 */ implClose()131 abstract void implClose() throws IOException; 132 133 @Override close()134 public final void close() throws IOException { 135 // synchronize with any threads initiating asynchronous operations 136 closeLock.writeLock().lock(); 137 try { 138 if (!open) 139 return; // already closed 140 open = false; 141 } finally { 142 closeLock.writeLock().unlock(); 143 } 144 implClose(); 145 } 146 enableReading(boolean killed)147 final void enableReading(boolean killed) { 148 synchronized (readLock) { 149 reading = false; 150 if (killed) 151 readKilled = true; 152 } 153 } 154 enableReading()155 final void enableReading() { 156 enableReading(false); 157 } 158 enableWriting(boolean killed)159 final void enableWriting(boolean killed) { 160 synchronized (writeLock) { 161 writing = false; 162 if (killed) 163 writeKilled = true; 164 } 165 } 166 enableWriting()167 final void enableWriting() { 168 enableWriting(false); 169 } 170 killReading()171 final void killReading() { 172 synchronized (readLock) { 173 readKilled = true; 174 } 175 } 176 killWriting()177 final void killWriting() { 178 synchronized (writeLock) { 179 writeKilled = true; 180 } 181 } 182 killConnect()183 final void killConnect() { 184 // when a connect is cancelled then the connection may have been 185 // established so prevent reading or writing. 186 killReading(); 187 killWriting(); 188 } 189 190 /** 191 * Invoked by connect to initiate the connect operation. 192 */ implConnect(SocketAddress remote, A attachment, CompletionHandler<Void,? super A> handler)193 abstract <A> Future<Void> implConnect(SocketAddress remote, 194 A attachment, 195 CompletionHandler<Void,? super A> handler); 196 197 @Override connect(SocketAddress remote)198 public final Future<Void> connect(SocketAddress remote) { 199 return implConnect(remote, null, null); 200 } 201 202 @Override connect(SocketAddress remote, A attachment, CompletionHandler<Void,? super A> handler)203 public final <A> void connect(SocketAddress remote, 204 A attachment, 205 CompletionHandler<Void,? super A> handler) 206 { 207 if (handler == null) 208 throw new NullPointerException("'handler' is null"); 209 implConnect(remote, attachment, handler); 210 } 211 212 /** 213 * Invoked by read to initiate the I/O operation. 214 */ implRead(boolean isScatteringRead, ByteBuffer dst, ByteBuffer[] dsts, long timeout, TimeUnit unit, A attachment, CompletionHandler<V,? super A> handler)215 abstract <V extends Number,A> Future<V> implRead(boolean isScatteringRead, 216 ByteBuffer dst, 217 ByteBuffer[] dsts, 218 long timeout, 219 TimeUnit unit, 220 A attachment, 221 CompletionHandler<V,? super A> handler); 222 223 @SuppressWarnings("unchecked") read(boolean isScatteringRead, ByteBuffer dst, ByteBuffer[] dsts, long timeout, TimeUnit unit, A att, CompletionHandler<V,? super A> handler)224 private <V extends Number,A> Future<V> read(boolean isScatteringRead, 225 ByteBuffer dst, 226 ByteBuffer[] dsts, 227 long timeout, 228 TimeUnit unit, 229 A att, 230 CompletionHandler<V,? super A> handler) 231 { 232 if (!isOpen()) { 233 Throwable e = new ClosedChannelException(); 234 if (handler == null) 235 return CompletedFuture.withFailure(e); 236 Invoker.invoke(this, handler, att, null, e); 237 return null; 238 } 239 240 if (remoteAddress == null) 241 throw new NotYetConnectedException(); 242 243 boolean hasSpaceToRead = isScatteringRead || dst.hasRemaining(); 244 boolean shutdown = false; 245 246 // check and update state 247 synchronized (readLock) { 248 if (readKilled) 249 throw new IllegalStateException("Reading not allowed due to timeout or cancellation"); 250 if (reading) 251 throw new ReadPendingException(); 252 if (readShutdown) { 253 shutdown = true; 254 } else { 255 if (hasSpaceToRead) { 256 reading = true; 257 } 258 } 259 } 260 261 // immediately complete with -1 if shutdown for read 262 // immediately complete with 0 if no space remaining 263 if (shutdown || !hasSpaceToRead) { 264 Number result; 265 if (isScatteringRead) { 266 result = (shutdown) ? Long.valueOf(-1L) : Long.valueOf(0L); 267 } else { 268 result = (shutdown) ? -1 : 0; 269 } 270 if (handler == null) 271 return CompletedFuture.withResult((V)result); 272 Invoker.invoke(this, handler, att, (V)result, null); 273 return null; 274 } 275 276 return implRead(isScatteringRead, dst, dsts, timeout, unit, att, handler); 277 } 278 279 @Override read(ByteBuffer dst)280 public final Future<Integer> read(ByteBuffer dst) { 281 if (dst.isReadOnly()) 282 throw new IllegalArgumentException("Read-only buffer"); 283 return read(false, dst, null, 0L, TimeUnit.MILLISECONDS, null, null); 284 } 285 286 @Override read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer,? super A> handler)287 public final <A> void read(ByteBuffer dst, 288 long timeout, 289 TimeUnit unit, 290 A attachment, 291 CompletionHandler<Integer,? super A> handler) 292 { 293 if (handler == null) 294 throw new NullPointerException("'handler' is null"); 295 if (dst.isReadOnly()) 296 throw new IllegalArgumentException("Read-only buffer"); 297 read(false, dst, null, timeout, unit, attachment, handler); 298 } 299 300 @Override read(ByteBuffer[] dsts, int offset, int length, long timeout, TimeUnit unit, A attachment, CompletionHandler<Long,? super A> handler)301 public final <A> void read(ByteBuffer[] dsts, 302 int offset, 303 int length, 304 long timeout, 305 TimeUnit unit, 306 A attachment, 307 CompletionHandler<Long,? super A> handler) 308 { 309 if (handler == null) 310 throw new NullPointerException("'handler' is null"); 311 if ((offset < 0) || (length < 0) || (offset > dsts.length - length)) 312 throw new IndexOutOfBoundsException(); 313 ByteBuffer[] bufs = Util.subsequence(dsts, offset, length); 314 for (int i=0; i<bufs.length; i++) { 315 if (bufs[i].isReadOnly()) 316 throw new IllegalArgumentException("Read-only buffer"); 317 } 318 read(true, null, bufs, timeout, unit, attachment, handler); 319 } 320 321 /** 322 * Invoked by write to initiate the I/O operation. 323 */ implWrite(boolean isGatheringWrite, ByteBuffer src, ByteBuffer[] srcs, long timeout, TimeUnit unit, A attachment, CompletionHandler<V,? super A> handler)324 abstract <V extends Number,A> Future<V> implWrite(boolean isGatheringWrite, 325 ByteBuffer src, 326 ByteBuffer[] srcs, 327 long timeout, 328 TimeUnit unit, 329 A attachment, 330 CompletionHandler<V,? super A> handler); 331 332 @SuppressWarnings("unchecked") write(boolean isGatheringWrite, ByteBuffer src, ByteBuffer[] srcs, long timeout, TimeUnit unit, A att, CompletionHandler<V,? super A> handler)333 private <V extends Number,A> Future<V> write(boolean isGatheringWrite, 334 ByteBuffer src, 335 ByteBuffer[] srcs, 336 long timeout, 337 TimeUnit unit, 338 A att, 339 CompletionHandler<V,? super A> handler) 340 { 341 boolean hasDataToWrite = isGatheringWrite || src.hasRemaining(); 342 343 boolean closed = false; 344 if (isOpen()) { 345 if (remoteAddress == null) 346 throw new NotYetConnectedException(); 347 // check and update state 348 synchronized (writeLock) { 349 if (writeKilled) 350 throw new IllegalStateException("Writing not allowed due to timeout or cancellation"); 351 if (writing) 352 throw new WritePendingException(); 353 if (writeShutdown) { 354 closed = true; 355 } else { 356 if (hasDataToWrite) 357 writing = true; 358 } 359 } 360 } else { 361 closed = true; 362 } 363 364 // channel is closed or shutdown for write 365 if (closed) { 366 Throwable e = new ClosedChannelException(); 367 if (handler == null) 368 return CompletedFuture.withFailure(e); 369 Invoker.invoke(this, handler, att, null, e); 370 return null; 371 } 372 373 // nothing to write so complete immediately 374 if (!hasDataToWrite) { 375 Number result = (isGatheringWrite) ? (Number)0L : (Number)0; 376 if (handler == null) 377 return CompletedFuture.withResult((V)result); 378 Invoker.invoke(this, handler, att, (V)result, null); 379 return null; 380 } 381 382 return implWrite(isGatheringWrite, src, srcs, timeout, unit, att, handler); 383 } 384 385 @Override write(ByteBuffer src)386 public final Future<Integer> write(ByteBuffer src) { 387 return write(false, src, null, 0L, TimeUnit.MILLISECONDS, null, null); 388 } 389 390 @Override write(ByteBuffer src, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer,? super A> handler)391 public final <A> void write(ByteBuffer src, 392 long timeout, 393 TimeUnit unit, 394 A attachment, 395 CompletionHandler<Integer,? super A> handler) 396 { 397 if (handler == null) 398 throw new NullPointerException("'handler' is null"); 399 write(false, src, null, timeout, unit, attachment, handler); 400 } 401 402 @Override write(ByteBuffer[] srcs, int offset, int length, long timeout, TimeUnit unit, A attachment, CompletionHandler<Long,? super A> handler)403 public final <A> void write(ByteBuffer[] srcs, 404 int offset, 405 int length, 406 long timeout, 407 TimeUnit unit, 408 A attachment, 409 CompletionHandler<Long,? super A> handler) 410 { 411 if (handler == null) 412 throw new NullPointerException("'handler' is null"); 413 if ((offset < 0) || (length < 0) || (offset > srcs.length - length)) 414 throw new IndexOutOfBoundsException(); 415 srcs = Util.subsequence(srcs, offset, length); 416 write(true, null, srcs, timeout, unit, attachment, handler); 417 } 418 419 @Override bind(SocketAddress local)420 public final AsynchronousSocketChannel bind(SocketAddress local) 421 throws IOException 422 { 423 try { 424 begin(); 425 synchronized (stateLock) { 426 if (state == ST_PENDING) 427 throw new ConnectionPendingException(); 428 if (localAddress != null) 429 throw new AlreadyBoundException(); 430 InetSocketAddress isa = (local == null) ? 431 new InetSocketAddress(0) : Net.checkAddress(local); 432 SecurityManager sm = System.getSecurityManager(); 433 if (sm != null) { 434 sm.checkListen(isa.getPort()); 435 } 436 NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort()); 437 Net.bind(fd, isa.getAddress(), isa.getPort()); 438 localAddress = Net.localAddress(fd); 439 } 440 } finally { 441 end(); 442 } 443 return this; 444 } 445 446 @Override getLocalAddress()447 public final SocketAddress getLocalAddress() throws IOException { 448 if (!isOpen()) 449 throw new ClosedChannelException(); 450 return Net.getRevealedLocalAddress(localAddress); 451 } 452 453 @Override setOption(SocketOption<T> name, T value)454 public final <T> AsynchronousSocketChannel setOption(SocketOption<T> name, T value) 455 throws IOException 456 { 457 if (name == null) 458 throw new NullPointerException(); 459 if (!supportedOptions().contains(name)) 460 throw new UnsupportedOperationException("'" + name + "' not supported"); 461 462 try { 463 begin(); 464 if (writeShutdown) 465 throw new IOException("Connection has been shutdown for writing"); 466 if (name == StandardSocketOptions.SO_REUSEADDR && 467 Net.useExclusiveBind()) 468 { 469 // SO_REUSEADDR emulated when using exclusive bind 470 isReuseAddress = (Boolean)value; 471 } else { 472 Net.setSocketOption(fd, Net.UNSPEC, name, value); 473 } 474 return this; 475 } finally { 476 end(); 477 } 478 } 479 480 @Override 481 @SuppressWarnings("unchecked") getOption(SocketOption<T> name)482 public final <T> T getOption(SocketOption<T> name) throws IOException { 483 if (name == null) 484 throw new NullPointerException(); 485 if (!supportedOptions().contains(name)) 486 throw new UnsupportedOperationException("'" + name + "' not supported"); 487 488 try { 489 begin(); 490 if (name == StandardSocketOptions.SO_REUSEADDR && 491 Net.useExclusiveBind()) 492 { 493 // SO_REUSEADDR emulated when using exclusive bind 494 return (T)Boolean.valueOf(isReuseAddress); 495 } 496 return (T) Net.getSocketOption(fd, Net.UNSPEC, name); 497 } finally { 498 end(); 499 } 500 } 501 502 private static class DefaultOptionsHolder { 503 static final Set<SocketOption<?>> defaultOptions = defaultOptions(); 504 defaultOptions()505 private static Set<SocketOption<?>> defaultOptions() { 506 HashSet<SocketOption<?>> set = new HashSet<SocketOption<?>>(5); 507 set.add(StandardSocketOptions.SO_SNDBUF); 508 set.add(StandardSocketOptions.SO_RCVBUF); 509 set.add(StandardSocketOptions.SO_KEEPALIVE); 510 set.add(StandardSocketOptions.SO_REUSEADDR); 511 set.add(StandardSocketOptions.TCP_NODELAY); 512 if (ExtendedOptionsImpl.flowSupported()) { 513 set.add(jdk.net.ExtendedSocketOptions.SO_FLOW_SLA); 514 } 515 return Collections.unmodifiableSet(set); 516 } 517 } 518 519 @Override supportedOptions()520 public final Set<SocketOption<?>> supportedOptions() { 521 return DefaultOptionsHolder.defaultOptions; 522 } 523 524 @Override getRemoteAddress()525 public final SocketAddress getRemoteAddress() throws IOException { 526 if (!isOpen()) 527 throw new ClosedChannelException(); 528 return remoteAddress; 529 } 530 531 @Override shutdownInput()532 public final AsynchronousSocketChannel shutdownInput() throws IOException { 533 try { 534 begin(); 535 if (remoteAddress == null) 536 throw new NotYetConnectedException(); 537 synchronized (readLock) { 538 if (!readShutdown) { 539 Net.shutdown(fd, Net.SHUT_RD); 540 readShutdown = true; 541 } 542 } 543 } finally { 544 end(); 545 } 546 return this; 547 } 548 549 @Override shutdownOutput()550 public final AsynchronousSocketChannel shutdownOutput() throws IOException { 551 try { 552 begin(); 553 if (remoteAddress == null) 554 throw new NotYetConnectedException(); 555 synchronized (writeLock) { 556 if (!writeShutdown) { 557 Net.shutdown(fd, Net.SHUT_WR); 558 writeShutdown = true; 559 } 560 } 561 } finally { 562 end(); 563 } 564 return this; 565 } 566 567 @Override toString()568 public final String toString() { 569 StringBuilder sb = new StringBuilder(); 570 sb.append(this.getClass().getName()); 571 sb.append('['); 572 synchronized (stateLock) { 573 if (!isOpen()) { 574 sb.append("closed"); 575 } else { 576 switch (state) { 577 case ST_UNCONNECTED: 578 sb.append("unconnected"); 579 break; 580 case ST_PENDING: 581 sb.append("connection-pending"); 582 break; 583 case ST_CONNECTED: 584 sb.append("connected"); 585 if (readShutdown) 586 sb.append(" ishut"); 587 if (writeShutdown) 588 sb.append(" oshut"); 589 break; 590 } 591 if (localAddress != null) { 592 sb.append(" local="); 593 sb.append( 594 Net.getRevealedLocalAddressAsString(localAddress)); 595 } 596 if (remoteAddress != null) { 597 sb.append(" remote="); 598 sb.append(remoteAddress.toString()); 599 } 600 } 601 } 602 sb.append(']'); 603 return sb.toString(); 604 } 605 } 606