6834246: (ch) AsynchronousSocketChannel#write completes with wrong number of bytes written under load (win)
Reviewed-by: sherman
1.1 --- a/src/windows/classes/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java Fri May 01 12:06:14 2009 -0700
1.2 +++ b/src/windows/classes/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java Mon May 04 19:25:14 2009 +0100
1.3 @@ -475,49 +475,40 @@
1.4 // get an OVERLAPPED structure (from the cache or allocate)
1.5 overlapped = ioCache.add(result);
1.6
1.7 - // synchronize on result to allow this thread handle the case
1.8 - // where the read completes immediately.
1.9 - synchronized (result) {
1.10 - int n = read0(handle, numBufs, readBufferArray, overlapped);
1.11 - if (n == IOStatus.UNAVAILABLE) {
1.12 - // I/O is pending
1.13 - pending = true;
1.14 - return;
1.15 + // initiate read
1.16 + int n = read0(handle, numBufs, readBufferArray, overlapped);
1.17 + if (n == IOStatus.UNAVAILABLE) {
1.18 + // I/O is pending
1.19 + pending = true;
1.20 + return;
1.21 + }
1.22 + if (n == IOStatus.EOF) {
1.23 + // input shutdown
1.24 + enableReading();
1.25 + if (scatteringRead) {
1.26 + result.setResult((V)Long.valueOf(-1L));
1.27 + } else {
1.28 + result.setResult((V)Integer.valueOf(-1));
1.29 }
1.30 - // read completed immediately:
1.31 - // 1. update buffer position
1.32 - // 2. reset read flag
1.33 - // 3. release waiters
1.34 - if (n == 0) {
1.35 - n = -1;
1.36 - } else {
1.37 - updateBuffers(n);
1.38 - }
1.39 - enableReading();
1.40 -
1.41 - if (scatteringRead) {
1.42 - result.setResult((V)Long.valueOf(n));
1.43 - } else {
1.44 - result.setResult((V)Integer.valueOf(n));
1.45 - }
1.46 + } else {
1.47 + throw new InternalError("Read completed immediately");
1.48 }
1.49 } catch (Throwable x) {
1.50 - // failed to initiate read:
1.51 - // 1. reset read flag
1.52 - // 2. free resources
1.53 - // 3. release waiters
1.54 + // failed to initiate read
1.55 + // reset read flag before releasing waiters
1.56 enableReading();
1.57 - if (overlapped != 0L)
1.58 - ioCache.remove(overlapped);
1.59 if (x instanceof ClosedChannelException)
1.60 x = new AsynchronousCloseException();
1.61 if (!(x instanceof IOException))
1.62 x = new IOException(x);
1.63 result.setFailure(x);
1.64 } finally {
1.65 - if (prepared && !pending) {
1.66 - // return direct buffer(s) to cache if substituted
1.67 - releaseBuffers();
1.68 + // release resources if I/O not pending
1.69 + if (!pending) {
1.70 + if (overlapped != 0L)
1.71 + ioCache.remove(overlapped);
1.72 + if (prepared)
1.73 + releaseBuffers();
1.74 }
1.75 end();
1.76 }
1.77 @@ -721,7 +712,6 @@
1.78 @Override
1.79 @SuppressWarnings("unchecked")
1.80 public void run() {
1.81 - int n = -1;
1.82 long overlapped = 0L;
1.83 boolean prepared = false;
1.84 boolean pending = false;
1.85 @@ -736,56 +726,34 @@
1.86
1.87 // get an OVERLAPPED structure (from the cache or allocate)
1.88 overlapped = ioCache.add(result);
1.89 -
1.90 - // synchronize on result to allow this thread handle the case
1.91 - // where the read completes immediately.
1.92 - synchronized (result) {
1.93 - n = write0(handle, numBufs, writeBufferArray, overlapped);
1.94 - if (n == IOStatus.UNAVAILABLE) {
1.95 - // I/O is pending
1.96 - pending = true;
1.97 - return;
1.98 - }
1.99 -
1.100 - enableWriting();
1.101 -
1.102 - if (n == IOStatus.EOF) {
1.103 - // special case for shutdown output
1.104 - shutdown = true;
1.105 - throw new ClosedChannelException();
1.106 - }
1.107 -
1.108 - // write completed immediately:
1.109 - // 1. enable writing
1.110 - // 2. update buffer position
1.111 - // 3. release waiters
1.112 - updateBuffers(n);
1.113 -
1.114 - // result is a Long or Integer
1.115 - if (gatheringWrite) {
1.116 - result.setResult((V)Long.valueOf(n));
1.117 - } else {
1.118 - result.setResult((V)Integer.valueOf(n));
1.119 - }
1.120 + int n = write0(handle, numBufs, writeBufferArray, overlapped);
1.121 + if (n == IOStatus.UNAVAILABLE) {
1.122 + // I/O is pending
1.123 + pending = true;
1.124 + return;
1.125 }
1.126 + if (n == IOStatus.EOF) {
1.127 + // special case for shutdown output
1.128 + shutdown = true;
1.129 + throw new ClosedChannelException();
1.130 + }
1.131 + // write completed immediately
1.132 + throw new InternalError("Write completed immediately");
1.133 } catch (Throwable x) {
1.134 + // write failed. Enable writing before releasing waiters.
1.135 enableWriting();
1.136 -
1.137 - // failed to initiate read:
1.138 if (!shutdown && (x instanceof ClosedChannelException))
1.139 x = new AsynchronousCloseException();
1.140 if (!(x instanceof IOException))
1.141 x = new IOException(x);
1.142 result.setFailure(x);
1.143 -
1.144 - // release resources
1.145 - if (overlapped != 0L)
1.146 - ioCache.remove(overlapped);
1.147 -
1.148 } finally {
1.149 - if (prepared && !pending) {
1.150 - // return direct buffer(s) to cache if substituted
1.151 - releaseBuffers();
1.152 + // release resources if I/O not pending
1.153 + if (!pending) {
1.154 + if (overlapped != 0L)
1.155 + ioCache.remove(overlapped);
1.156 + if (prepared)
1.157 + releaseBuffers();
1.158 }
1.159 end();
1.160 }
2.1 --- a/src/windows/native/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.c Fri May 01 12:06:14 2009 -0700
2.2 +++ b/src/windows/native/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.c Mon May 04 19:25:14 2009 +0100
2.3 @@ -157,14 +157,13 @@
2.4 WSABUF* lpWsaBuf = (WSABUF*) jlong_to_ptr(address);
2.5 OVERLAPPED* lpOverlapped = (OVERLAPPED*) jlong_to_ptr(ov);
2.6 BOOL res;
2.7 - DWORD nread = 0;
2.8 DWORD flags = 0;
2.9
2.10 ZeroMemory((PVOID)lpOverlapped, sizeof(OVERLAPPED));
2.11 res = WSARecv(s,
2.12 lpWsaBuf,
2.13 (DWORD)count,
2.14 - &nread,
2.15 + NULL,
2.16 &flags,
2.17 lpOverlapped,
2.18 NULL);
2.19 @@ -175,17 +174,12 @@
2.20 return IOS_UNAVAILABLE;
2.21 }
2.22 if (error == WSAESHUTDOWN) {
2.23 - return 0; // input shutdown
2.24 + return IOS_EOF; // input shutdown
2.25 }
2.26 JNU_ThrowIOExceptionWithLastError(env, "WSARecv failed");
2.27 return IOS_THROWN;
2.28 }
2.29 - if (nread == 0) {
2.30 - // Handle graceful close or bytes not yet available cases
2.31 - // via completion port notification.
2.32 - return IOS_UNAVAILABLE;
2.33 - }
2.34 - return (jint)nread;
2.35 + return IOS_UNAVAILABLE;
2.36 }
2.37
2.38 JNIEXPORT jint JNICALL
2.39 @@ -196,13 +190,12 @@
2.40 WSABUF* lpWsaBuf = (WSABUF*) jlong_to_ptr(address);
2.41 OVERLAPPED* lpOverlapped = (OVERLAPPED*) jlong_to_ptr(ov);
2.42 BOOL res;
2.43 - DWORD nwritten;
2.44
2.45 ZeroMemory((PVOID)lpOverlapped, sizeof(OVERLAPPED));
2.46 res = WSASend(s,
2.47 lpWsaBuf,
2.48 (DWORD)count,
2.49 - &nwritten,
2.50 + NULL,
2.51 0,
2.52 lpOverlapped,
2.53 NULL);
2.54 @@ -218,5 +211,5 @@
2.55 JNU_ThrowIOExceptionWithLastError(env, "WSASend failed");
2.56 return IOS_THROWN;
2.57 }
2.58 - return (jint)nwritten;
2.59 + return IOS_UNAVAILABLE;
2.60 }
3.1 --- /dev/null Thu Jan 01 00:00:00 1970 +0000
3.2 +++ b/test/java/nio/channels/AsynchronousSocketChannel/StressLoopback.java Mon May 04 19:25:14 2009 +0100
3.3 @@ -0,0 +1,183 @@
3.4 +/*
3.5 + * Copyright 2008-2009 Sun Microsystems, Inc. All Rights Reserved.
3.6 + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3.7 + *
3.8 + * This code is free software; you can redistribute it and/or modify it
3.9 + * under the terms of the GNU General Public License version 2 only, as
3.10 + * published by the Free Software Foundation.
3.11 + *
3.12 + * This code is distributed in the hope that it will be useful, but WITHOUT
3.13 + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
3.14 + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
3.15 + * version 2 for more details (a copy is included in the LICENSE file that
3.16 + * accompanied this code).
3.17 + *
3.18 + * You should have received a copy of the GNU General Public License version
3.19 + * 2 along with this work; if not, write to the Free Software Foundation,
3.20 + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
3.21 + *
3.22 + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
3.23 + * CA 95054 USA or visit www.sun.com if you need additional information or
3.24 + * have any questions.
3.25 + */
3.26 +
3.27 +/* @test
3.28 + * @bug 6834246
3.29 + * @summary Stress test connections through the loopback interface
3.30 + */
3.31 +
3.32 +import java.nio.ByteBuffer;
3.33 +import java.net.*;
3.34 +import java.nio.channels.*;
3.35 +import java.util.Random;
3.36 +import java.io.IOException;
3.37 +
3.38 +public class StressLoopback {
3.39 + static final Random rand = new Random();
3.40 +
3.41 + public static void main(String[] args) throws Exception {
3.42 + // setup listener
3.43 + AsynchronousServerSocketChannel listener =
3.44 + AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(0));
3.45 + int port =((InetSocketAddress)(listener.getLocalAddress())).getPort();
3.46 + InetAddress lh = InetAddress.getLocalHost();
3.47 + SocketAddress remote = new InetSocketAddress(lh, port);
3.48 +
3.49 + // create sources and sinks
3.50 + int count = 2 + rand.nextInt(9);
3.51 + Source[] source = new Source[count];
3.52 + Sink[] sink = new Sink[count];
3.53 + for (int i=0; i<count; i++) {
3.54 + AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
3.55 + ch.connect(remote).get();
3.56 + source[i] = new Source(ch);
3.57 + sink[i] = new Sink(listener.accept().get());
3.58 + }
3.59 +
3.60 + // start the sinks and sources
3.61 + for (int i=0; i<count; i++) {
3.62 + sink[i].start();
3.63 + source[i].start();
3.64 + }
3.65 +
3.66 + // let the test run for a while
3.67 + Thread.sleep(20*1000);
3.68 +
3.69 + // wait until everyone is done
3.70 + boolean failed = false;
3.71 + long total = 0L;
3.72 + for (int i=0; i<count; i++) {
3.73 + long nwrote = source[i].finish();
3.74 + long nread = sink[i].finish();
3.75 + if (nread != nwrote)
3.76 + failed = true;
3.77 + System.out.format("%d -> %d (%s)\n",
3.78 + nwrote, nread, (failed) ? "FAIL" : "PASS");
3.79 + total += nwrote;
3.80 + }
3.81 + if (failed)
3.82 + throw new RuntimeException("Test failed - see log for details");
3.83 + System.out.format("Total sent %d MB\n", total / (1024L * 1024L));
3.84 + }
3.85 +
3.86 + /**
3.87 + * Writes bytes to a channel until "done". When done the channel is closed.
3.88 + */
3.89 + static class Source {
3.90 + private final AsynchronousByteChannel channel;
3.91 + private final ByteBuffer sentBuffer;
3.92 + private volatile long bytesSent;
3.93 + private volatile boolean finished;
3.94 +
3.95 + Source(AsynchronousByteChannel channel) {
3.96 + this.channel = channel;
3.97 + int size = 1024 + rand.nextInt(10000);
3.98 + this.sentBuffer = (rand.nextBoolean()) ?
3.99 + ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
3.100 + }
3.101 +
3.102 + void start() {
3.103 + sentBuffer.position(0);
3.104 + sentBuffer.limit(sentBuffer.capacity());
3.105 + channel.write(sentBuffer, null, new CompletionHandler<Integer,Void> () {
3.106 + public void completed(Integer nwrote, Void att) {
3.107 + bytesSent += nwrote;
3.108 + if (finished) {
3.109 + closeUnchecked(channel);
3.110 + } else {
3.111 + sentBuffer.position(0);
3.112 + sentBuffer.limit(sentBuffer.capacity());
3.113 + channel.write(sentBuffer, null, this);
3.114 + }
3.115 + }
3.116 + public void failed(Throwable exc, Void att) {
3.117 + exc.printStackTrace();
3.118 + closeUnchecked(channel);
3.119 + }
3.120 + public void cancelled(Void att) {
3.121 + }
3.122 + });
3.123 + }
3.124 +
3.125 + long finish() {
3.126 + finished = true;
3.127 + waitUntilClosed(channel);
3.128 + return bytesSent;
3.129 + }
3.130 + }
3.131 +
3.132 + /**
3.133 + * Read bytes from a channel until EOF is received.
3.134 + */
3.135 + static class Sink {
3.136 + private final AsynchronousByteChannel channel;
3.137 + private final ByteBuffer readBuffer;
3.138 + private volatile long bytesRead;
3.139 +
3.140 + Sink(AsynchronousByteChannel channel) {
3.141 + this.channel = channel;
3.142 + int size = 1024 + rand.nextInt(10000);
3.143 + this.readBuffer = (rand.nextBoolean()) ?
3.144 + ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
3.145 + }
3.146 +
3.147 + void start() {
3.148 + channel.read(readBuffer, null, new CompletionHandler<Integer,Void> () {
3.149 + public void completed(Integer nread, Void att) {
3.150 + if (nread < 0) {
3.151 + closeUnchecked(channel);
3.152 + } else {
3.153 + bytesRead += nread;
3.154 + readBuffer.clear();
3.155 + channel.read(readBuffer, null, this);
3.156 + }
3.157 + }
3.158 + public void failed(Throwable exc, Void att) {
3.159 + exc.printStackTrace();
3.160 + closeUnchecked(channel);
3.161 + }
3.162 + public void cancelled(Void att) {
3.163 + }
3.164 + });
3.165 + }
3.166 +
3.167 + long finish() {
3.168 + waitUntilClosed(channel);
3.169 + return bytesRead;
3.170 + }
3.171 + }
3.172 +
3.173 + static void waitUntilClosed(Channel c) {
3.174 + while (c.isOpen()) {
3.175 + try {
3.176 + Thread.sleep(100);
3.177 + } catch (InterruptedException ignore) { }
3.178 + }
3.179 + }
3.180 +
3.181 + static void closeUnchecked(Channel c) {
3.182 + try {
3.183 + c.close();
3.184 + } catch (IOException ignore) { }
3.185 + }
3.186 +}