6834246: (ch) AsynchronousSocketChannel#write completes with wrong number of bytes written under load (win)
authoralanb
Mon, 04 May 2009 19:25:14 +0100
changeset 1191e1a713f0361f
parent 1190 d2114c1adb2d
child 1192 b3720710a4ba
6834246: (ch) AsynchronousSocketChannel#write completes with wrong number of bytes written under load (win)
Reviewed-by: sherman
src/windows/classes/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java
src/windows/native/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.c
test/java/nio/channels/AsynchronousSocketChannel/StressLoopback.java
     1.1 --- a/src/windows/classes/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java	Fri May 01 12:06:14 2009 -0700
     1.2 +++ b/src/windows/classes/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.java	Mon May 04 19:25:14 2009 +0100
     1.3 @@ -475,49 +475,40 @@
     1.4                  // get an OVERLAPPED structure (from the cache or allocate)
     1.5                  overlapped = ioCache.add(result);
     1.6  
     1.7 -                // synchronize on result to allow this thread handle the case
     1.8 -                // where the read completes immediately.
     1.9 -                synchronized (result) {
    1.10 -                    int n = read0(handle, numBufs, readBufferArray, overlapped);
    1.11 -                    if (n == IOStatus.UNAVAILABLE) {
    1.12 -                        // I/O is pending
    1.13 -                        pending = true;
    1.14 -                        return;
    1.15 +                // initiate read
    1.16 +                int n = read0(handle, numBufs, readBufferArray, overlapped);
    1.17 +                if (n == IOStatus.UNAVAILABLE) {
    1.18 +                    // I/O is pending
    1.19 +                    pending = true;
    1.20 +                    return;
    1.21 +                }
    1.22 +                if (n == IOStatus.EOF) {
    1.23 +                    // input shutdown
    1.24 +                    enableReading();
    1.25 +                    if (scatteringRead) {
    1.26 +                        result.setResult((V)Long.valueOf(-1L));
    1.27 +                    } else {
    1.28 +                        result.setResult((V)Integer.valueOf(-1));
    1.29                      }
    1.30 -                    // read completed immediately:
    1.31 -                    // 1. update buffer position
    1.32 -                    // 2. reset read flag
    1.33 -                    // 3. release waiters
    1.34 -                    if (n == 0) {
    1.35 -                        n = -1;
    1.36 -                    } else {
    1.37 -                        updateBuffers(n);
    1.38 -                    }
    1.39 -                    enableReading();
    1.40 -
    1.41 -                    if (scatteringRead) {
    1.42 -                        result.setResult((V)Long.valueOf(n));
    1.43 -                    } else {
    1.44 -                        result.setResult((V)Integer.valueOf(n));
    1.45 -                    }
    1.46 +                } else {
    1.47 +                    throw new InternalError("Read completed immediately");
    1.48                  }
    1.49              } catch (Throwable x) {
    1.50 -                // failed to initiate read:
    1.51 -                // 1. reset read flag
    1.52 -                // 2. free resources
    1.53 -                // 3. release waiters
    1.54 +                // failed to initiate read
    1.55 +                // reset read flag before releasing waiters
    1.56                  enableReading();
    1.57 -                if (overlapped != 0L)
    1.58 -                    ioCache.remove(overlapped);
    1.59                  if (x instanceof ClosedChannelException)
    1.60                      x = new AsynchronousCloseException();
    1.61                  if (!(x instanceof IOException))
    1.62                      x = new IOException(x);
    1.63                  result.setFailure(x);
    1.64              } finally {
    1.65 -                if (prepared && !pending) {
    1.66 -                    // return direct buffer(s) to cache if substituted
    1.67 -                    releaseBuffers();
    1.68 +                // release resources if I/O not pending
    1.69 +                if (!pending) {
    1.70 +                    if (overlapped != 0L)
    1.71 +                        ioCache.remove(overlapped);
    1.72 +                    if (prepared)
    1.73 +                        releaseBuffers();
    1.74                  }
    1.75                  end();
    1.76              }
    1.77 @@ -721,7 +712,6 @@
    1.78          @Override
    1.79          @SuppressWarnings("unchecked")
    1.80          public void run() {
    1.81 -            int n = -1;
    1.82              long overlapped = 0L;
    1.83              boolean prepared = false;
    1.84              boolean pending = false;
    1.85 @@ -736,56 +726,34 @@
    1.86  
    1.87                  // get an OVERLAPPED structure (from the cache or allocate)
    1.88                  overlapped = ioCache.add(result);
    1.89 -
    1.90 -                // synchronize on result to allow this thread handle the case
    1.91 -                // where the read completes immediately.
    1.92 -                synchronized (result) {
    1.93 -                    n = write0(handle, numBufs, writeBufferArray, overlapped);
    1.94 -                    if (n == IOStatus.UNAVAILABLE) {
    1.95 -                        // I/O is pending
    1.96 -                        pending = true;
    1.97 -                        return;
    1.98 -                    }
    1.99 -
   1.100 -                    enableWriting();
   1.101 -
   1.102 -                    if (n == IOStatus.EOF) {
   1.103 -                        // special case for shutdown output
   1.104 -                        shutdown = true;
   1.105 -                        throw new ClosedChannelException();
   1.106 -                    }
   1.107 -
   1.108 -                    // write completed immediately:
   1.109 -                    // 1. enable writing
   1.110 -                    // 2. update buffer position
   1.111 -                    // 3. release waiters
   1.112 -                    updateBuffers(n);
   1.113 -
   1.114 -                    // result is a Long or Integer
   1.115 -                    if (gatheringWrite) {
   1.116 -                        result.setResult((V)Long.valueOf(n));
   1.117 -                    } else {
   1.118 -                        result.setResult((V)Integer.valueOf(n));
   1.119 -                    }
   1.120 +                int n = write0(handle, numBufs, writeBufferArray, overlapped);
   1.121 +                if (n == IOStatus.UNAVAILABLE) {
   1.122 +                    // I/O is pending
   1.123 +                    pending = true;
   1.124 +                    return;
   1.125                  }
   1.126 +                if (n == IOStatus.EOF) {
   1.127 +                    // special case for shutdown output
   1.128 +                    shutdown = true;
   1.129 +                    throw new ClosedChannelException();
   1.130 +                }
   1.131 +                // write completed immediately
   1.132 +                throw new InternalError("Write completed immediately");
   1.133              } catch (Throwable x) {
   1.134 +                // write failed. Enable writing before releasing waiters.
   1.135                  enableWriting();
   1.136 -
   1.137 -                // failed to initiate read:
   1.138                  if (!shutdown && (x instanceof ClosedChannelException))
   1.139                      x = new AsynchronousCloseException();
   1.140                  if (!(x instanceof IOException))
   1.141                      x = new IOException(x);
   1.142                  result.setFailure(x);
   1.143 -
   1.144 -                // release resources
   1.145 -                if (overlapped != 0L)
   1.146 -                    ioCache.remove(overlapped);
   1.147 -
   1.148              } finally {
   1.149 -                if (prepared && !pending) {
   1.150 -                    // return direct buffer(s) to cache if substituted
   1.151 -                    releaseBuffers();
   1.152 +                // release resources if I/O not pending
   1.153 +                if (!pending) {
   1.154 +                    if (overlapped != 0L)
   1.155 +                        ioCache.remove(overlapped);
   1.156 +                    if (prepared)
   1.157 +                        releaseBuffers();
   1.158                  }
   1.159                  end();
   1.160              }
     2.1 --- a/src/windows/native/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.c	Fri May 01 12:06:14 2009 -0700
     2.2 +++ b/src/windows/native/sun/nio/ch/WindowsAsynchronousSocketChannelImpl.c	Mon May 04 19:25:14 2009 +0100
     2.3 @@ -157,14 +157,13 @@
     2.4      WSABUF* lpWsaBuf = (WSABUF*) jlong_to_ptr(address);
     2.5      OVERLAPPED* lpOverlapped = (OVERLAPPED*) jlong_to_ptr(ov);
     2.6      BOOL res;
     2.7 -    DWORD nread = 0;
     2.8      DWORD flags = 0;
     2.9  
    2.10      ZeroMemory((PVOID)lpOverlapped, sizeof(OVERLAPPED));
    2.11      res = WSARecv(s,
    2.12                    lpWsaBuf,
    2.13                    (DWORD)count,
    2.14 -                  &nread,
    2.15 +                  NULL,
    2.16                    &flags,
    2.17                    lpOverlapped,
    2.18                    NULL);
    2.19 @@ -175,17 +174,12 @@
    2.20              return IOS_UNAVAILABLE;
    2.21          }
    2.22          if (error == WSAESHUTDOWN) {
    2.23 -            return 0;       // input shutdown
    2.24 +            return IOS_EOF;       // input shutdown
    2.25          }
    2.26          JNU_ThrowIOExceptionWithLastError(env, "WSARecv failed");
    2.27          return IOS_THROWN;
    2.28      }
    2.29 -    if (nread == 0) {
    2.30 -        // Handle graceful close or bytes not yet available cases
    2.31 -        // via completion port notification.
    2.32 -        return IOS_UNAVAILABLE;
    2.33 -    }
    2.34 -    return (jint)nread;
    2.35 +    return IOS_UNAVAILABLE;
    2.36  }
    2.37  
    2.38  JNIEXPORT jint JNICALL
    2.39 @@ -196,13 +190,12 @@
    2.40      WSABUF* lpWsaBuf = (WSABUF*) jlong_to_ptr(address);
    2.41      OVERLAPPED* lpOverlapped = (OVERLAPPED*) jlong_to_ptr(ov);
    2.42      BOOL res;
    2.43 -    DWORD nwritten;
    2.44  
    2.45      ZeroMemory((PVOID)lpOverlapped, sizeof(OVERLAPPED));
    2.46      res = WSASend(s,
    2.47                    lpWsaBuf,
    2.48                    (DWORD)count,
    2.49 -                  &nwritten,
    2.50 +                  NULL,
    2.51                    0,
    2.52                    lpOverlapped,
    2.53                    NULL);
    2.54 @@ -218,5 +211,5 @@
    2.55          JNU_ThrowIOExceptionWithLastError(env, "WSASend failed");
    2.56          return IOS_THROWN;
    2.57      }
    2.58 -    return (jint)nwritten;
    2.59 +    return IOS_UNAVAILABLE;
    2.60  }
     3.1 --- /dev/null	Thu Jan 01 00:00:00 1970 +0000
     3.2 +++ b/test/java/nio/channels/AsynchronousSocketChannel/StressLoopback.java	Mon May 04 19:25:14 2009 +0100
     3.3 @@ -0,0 +1,183 @@
     3.4 +/*
     3.5 + * Copyright 2008-2009 Sun Microsystems, Inc.  All Rights Reserved.
     3.6 + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
     3.7 + *
     3.8 + * This code is free software; you can redistribute it and/or modify it
     3.9 + * under the terms of the GNU General Public License version 2 only, as
    3.10 + * published by the Free Software Foundation.
    3.11 + *
    3.12 + * This code is distributed in the hope that it will be useful, but WITHOUT
    3.13 + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
    3.14 + * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
    3.15 + * version 2 for more details (a copy is included in the LICENSE file that
    3.16 + * accompanied this code).
    3.17 + *
    3.18 + * You should have received a copy of the GNU General Public License version
    3.19 + * 2 along with this work; if not, write to the Free Software Foundation,
    3.20 + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
    3.21 + *
    3.22 + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
    3.23 + * CA 95054 USA or visit www.sun.com if you need additional information or
    3.24 + * have any questions.
    3.25 + */
    3.26 +
    3.27 +/* @test
    3.28 + * @bug 6834246
    3.29 + * @summary Stress test connections through the loopback interface
    3.30 + */
    3.31 +
    3.32 +import java.nio.ByteBuffer;
    3.33 +import java.net.*;
    3.34 +import java.nio.channels.*;
    3.35 +import java.util.Random;
    3.36 +import java.io.IOException;
    3.37 +
    3.38 +public class StressLoopback {
    3.39 +    static final Random rand = new Random();
    3.40 +
    3.41 +    public static void main(String[] args) throws Exception {
    3.42 +        // setup listener
    3.43 +        AsynchronousServerSocketChannel listener =
    3.44 +            AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(0));
    3.45 +        int port =((InetSocketAddress)(listener.getLocalAddress())).getPort();
    3.46 +        InetAddress lh = InetAddress.getLocalHost();
    3.47 +        SocketAddress remote = new InetSocketAddress(lh, port);
    3.48 +
    3.49 +        // create sources and sinks
    3.50 +        int count = 2 + rand.nextInt(9);
    3.51 +        Source[] source = new Source[count];
    3.52 +        Sink[] sink = new Sink[count];
    3.53 +        for (int i=0; i<count; i++) {
    3.54 +            AsynchronousSocketChannel ch = AsynchronousSocketChannel.open();
    3.55 +            ch.connect(remote).get();
    3.56 +            source[i] = new Source(ch);
    3.57 +            sink[i] = new Sink(listener.accept().get());
    3.58 +        }
    3.59 +
    3.60 +        // start the sinks and sources
    3.61 +        for (int i=0; i<count; i++) {
    3.62 +            sink[i].start();
    3.63 +            source[i].start();
    3.64 +        }
    3.65 +
    3.66 +        // let the test run for a while
    3.67 +        Thread.sleep(20*1000);
    3.68 +
    3.69 +        // wait until everyone is done
    3.70 +        boolean failed = false;
    3.71 +        long total = 0L;
    3.72 +        for (int i=0; i<count; i++) {
    3.73 +            long nwrote = source[i].finish();
    3.74 +            long nread = sink[i].finish();
    3.75 +            if (nread != nwrote)
    3.76 +                failed = true;
    3.77 +            System.out.format("%d -> %d (%s)\n",
    3.78 +                nwrote, nread, (failed) ? "FAIL" : "PASS");
    3.79 +            total += nwrote;
    3.80 +        }
    3.81 +        if (failed)
    3.82 +            throw new RuntimeException("Test failed - see log for details");
    3.83 +        System.out.format("Total sent %d MB\n", total / (1024L * 1024L));
    3.84 +    }
    3.85 +
    3.86 +    /**
    3.87 +     * Writes bytes to a channel until "done". When done the channel is closed.
    3.88 +     */
    3.89 +    static class Source {
    3.90 +        private final AsynchronousByteChannel channel;
    3.91 +        private final ByteBuffer sentBuffer;
    3.92 +        private volatile long bytesSent;
    3.93 +        private volatile boolean finished;
    3.94 +
    3.95 +        Source(AsynchronousByteChannel channel) {
    3.96 +            this.channel = channel;
    3.97 +            int size = 1024 + rand.nextInt(10000);
    3.98 +            this.sentBuffer = (rand.nextBoolean()) ?
    3.99 +                ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
   3.100 +        }
   3.101 +
   3.102 +        void start() {
   3.103 +            sentBuffer.position(0);
   3.104 +            sentBuffer.limit(sentBuffer.capacity());
   3.105 +            channel.write(sentBuffer, null, new CompletionHandler<Integer,Void> () {
   3.106 +                public void completed(Integer nwrote, Void att) {
   3.107 +                    bytesSent += nwrote;
   3.108 +                    if (finished) {
   3.109 +                        closeUnchecked(channel);
   3.110 +                    } else {
   3.111 +                        sentBuffer.position(0);
   3.112 +                        sentBuffer.limit(sentBuffer.capacity());
   3.113 +                        channel.write(sentBuffer, null, this);
   3.114 +                    }
   3.115 +                }
   3.116 +                public void failed(Throwable exc, Void att) {
   3.117 +                    exc.printStackTrace();
   3.118 +                    closeUnchecked(channel);
   3.119 +                }
   3.120 +                public void cancelled(Void att) {
   3.121 +                }
   3.122 +            });
   3.123 +        }
   3.124 +
   3.125 +        long finish() {
   3.126 +            finished = true;
   3.127 +            waitUntilClosed(channel);
   3.128 +            return bytesSent;
   3.129 +        }
   3.130 +    }
   3.131 +
   3.132 +    /**
   3.133 +     * Read bytes from a channel until EOF is received.
   3.134 +     */
   3.135 +    static class Sink {
   3.136 +        private final AsynchronousByteChannel channel;
   3.137 +        private final ByteBuffer readBuffer;
   3.138 +        private volatile long bytesRead;
   3.139 +
   3.140 +        Sink(AsynchronousByteChannel channel) {
   3.141 +            this.channel = channel;
   3.142 +            int size = 1024 + rand.nextInt(10000);
   3.143 +            this.readBuffer = (rand.nextBoolean()) ?
   3.144 +                ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
   3.145 +        }
   3.146 +
   3.147 +        void start() {
   3.148 +            channel.read(readBuffer, null, new CompletionHandler<Integer,Void> () {
   3.149 +                public void completed(Integer nread, Void att) {
   3.150 +                    if (nread < 0) {
   3.151 +                        closeUnchecked(channel);
   3.152 +                    } else {
   3.153 +                        bytesRead += nread;
   3.154 +                        readBuffer.clear();
   3.155 +                        channel.read(readBuffer, null, this);
   3.156 +                    }
   3.157 +                }
   3.158 +                public void failed(Throwable exc, Void att) {
   3.159 +                    exc.printStackTrace();
   3.160 +                    closeUnchecked(channel);
   3.161 +                }
   3.162 +                public void cancelled(Void att) {
   3.163 +                }
   3.164 +            });
   3.165 +        }
   3.166 +
   3.167 +        long finish() {
   3.168 +            waitUntilClosed(channel);
   3.169 +            return bytesRead;
   3.170 +        }
   3.171 +    }
   3.172 +
   3.173 +    static void waitUntilClosed(Channel c) {
   3.174 +        while (c.isOpen()) {
   3.175 +            try {
   3.176 +                Thread.sleep(100);
   3.177 +            } catch (InterruptedException ignore) { }
   3.178 +        }
   3.179 +    }
   3.180 +
   3.181 +    static void closeUnchecked(Channel c) {
   3.182 +        try {
   3.183 +            c.close();
   3.184 +        } catch (IOException ignore) { }
   3.185 +    }
   3.186 +}