diff options
Diffstat (limited to 'libjava/gnu/java/nio/SelectorImpl.java')
-rw-r--r-- | libjava/gnu/java/nio/SelectorImpl.java | 301 |
1 files changed, 221 insertions, 80 deletions
diff --git a/libjava/gnu/java/nio/SelectorImpl.java b/libjava/gnu/java/nio/SelectorImpl.java index 60a81f98458..f26e0808074 100644 --- a/libjava/gnu/java/nio/SelectorImpl.java +++ b/libjava/gnu/java/nio/SelectorImpl.java @@ -49,12 +49,44 @@ import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.Set; +import gnu.classpath.Configuration; public class SelectorImpl extends AbstractSelector { + static + { + // load the shared library needed for native methods. + if (Configuration.INIT_LOAD_LIBRARY) + { + System.loadLibrary ("javanio"); + } + } + private Set keys; private Set selected; + /** + * A dummy object whose monitor regulates access to both our + * selectThread and unhandledWakeup fields. + */ + private Object selectThreadMutex = new Object (); + + /** + * Any thread that's currently blocked in a select operation. + */ + private Thread selectThread; + + /** + * Indicates whether we have an unhandled wakeup call. This can + * be due to either wakeup() triggering a thread interruption while + * a thread was blocked in a select operation (in which case we need + * to reset this thread's interrupt status after interrupting the + * select), or else that no thread was on a select operation at the + * time that wakeup() was called, in which case the following select() + * operation should return immediately with nothing selected. + */ + private boolean unhandledWakeup; + public SelectorImpl (SelectorProvider provider) { super (provider); @@ -71,30 +103,47 @@ public class SelectorImpl extends AbstractSelector protected final void implCloseSelector() throws IOException { - // FIXME: We surely need to do more here. + // Cancel any pending select operation. wakeup(); + + synchronized (keys) + { + synchronized (selected) + { + synchronized (cancelledKeys ()) + { + // FIXME: Release resources here. + } + } + } } public final Set keys() { + if (!isOpen()) + throw new ClosedSelectorException(); + return Collections.unmodifiableSet (keys); } public final int selectNow() throws IOException { + // FIXME: We're simulating an immediate select + // via a select with a timeout of one millisecond. return select (1); } public final int select() throws IOException { - return select (-1); + return select (0); } - // A timeout value of -1 means block forever. + // A timeout value of 0 means block forever. private static native int implSelect (int[] read, int[] write, - int[] except, long timeout); + int[] except, long timeout) + throws IOException; private final int[] getFDsAsArray (int ops) { @@ -133,111 +182,199 @@ public class SelectorImpl extends AbstractSelector return result; } - public int select (long timeout) + public synchronized int select (long timeout) + throws IOException { if (!isOpen()) - throw new ClosedSelectorException (); - - if (keys == null) - { - return 0; - } - - deregisterCancelledKeys(); - - // Set only keys with the needed interest ops into the arrays. - int[] read = getFDsAsArray (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT); - int[] write = getFDsAsArray (SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT); - int[] except = new int [0]; // FIXME: We dont need to check this yet - int anzahl = read.length + write.length + except.length; - - // Call the native select() on all file descriptors. - begin(); - int result = implSelect (read, write, except, timeout); - end(); - - Iterator it = keys.iterator (); - - while (it.hasNext ()) + throw new ClosedSelectorException(); + + synchronized (keys) { - int ops = 0; - SelectionKeyImpl key = (SelectionKeyImpl) it.next (); - - // If key is already selected retrieve old ready ops. - if (selected.contains (key)) + synchronized (selected) { - ops = key.readyOps (); - } - - // Set new ready read/accept ops - for (int i = 0; i < read.length; i++) - { - if (key.getNativeFD() == read[i]) + deregisterCancelledKeys(); + + // Set only keys with the needed interest ops into the arrays. + int[] read = getFDsAsArray (SelectionKey.OP_READ + | SelectionKey.OP_ACCEPT); + int[] write = getFDsAsArray (SelectionKey.OP_WRITE + | SelectionKey.OP_CONNECT); + + // FIXME: We dont need to check this yet + int[] except = new int [0]; + + // Test to see if we've got an unhandled wakeup call, + // in which case we return immediately. Otherwise, + // remember our current thread and jump into the select. + // The monitor for dummy object selectThreadMutex regulates + // access to these fields. + + // FIXME: Not sure from the spec at what point we should + // return "immediately". Is it here or immediately upon + // entry to this function? + + // NOTE: There's a possibility of another thread calling + // wakeup() immediately after our thread releases + // selectThreadMutex's monitor here, in which case we'll + // do the select anyway. Since calls to wakeup() and select() + // among different threads happen in non-deterministic order, + // I don't think this is an issue. + synchronized (selectThreadMutex) { - if (key.channel () instanceof ServerSocketChannelImpl) + if (unhandledWakeup) { - ops = ops | SelectionKey.OP_ACCEPT; + unhandledWakeup = false; + return 0; } else { - ops = ops | SelectionKey.OP_READ; + selectThread = Thread.currentThread (); } } - } - // Set new ready write ops - for (int i = 0; i < write.length; i++) - { - if (key.getNativeFD() == write[i]) + // Call the native select() on all file descriptors. + int result = 0; + try { - ops = ops | SelectionKey.OP_WRITE; - -// if (key.channel ().isConnected ()) -// { -// ops = ops | SelectionKey.OP_WRITE; -// } -// else -// { -// ops = ops | SelectionKey.OP_CONNECT; -// } - } - } + begin(); + result = implSelect (read, write, except, timeout); + } + finally + { + end(); + } + + // If our unhandled wakeup flag is set at this point, + // reset our thread's interrupt flag because we were + // awakened by wakeup() instead of an external thread + // interruption. + // + // NOTE: If we were blocked in a select() and one thread + // called Thread.interrupt() on the blocked thread followed + // by another thread calling Selector.wakeup(), then race + // conditions could make it so that the thread's interrupt + // flag is reset even though the Thread.interrupt() call + // "was there first". I don't think we need to care about + // this scenario. + synchronized (selectThreadMutex) + { + if (unhandledWakeup) + { + unhandledWakeup = false; + selectThread.interrupted (); + } + selectThread = null; + } - // FIXME: We dont handle exceptional file descriptors yet. + Iterator it = keys.iterator (); - // If key is not yet selected add it. - if (!selected.contains (key)) - { - selected.add (key); - } + while (it.hasNext ()) + { + int ops = 0; + SelectionKeyImpl key = (SelectionKeyImpl) it.next (); - // Set new ready ops - key.readyOps (key.interestOps () & ops); - } + // If key is already selected retrieve old ready ops. + if (selected.contains (key)) + { + ops = key.readyOps (); + } - deregisterCancelledKeys(); - return result; + // Set new ready read/accept ops + for (int i = 0; i < read.length; i++) + { + if (key.getNativeFD() == read[i]) + { + if (key.channel () instanceof ServerSocketChannelImpl) + { + ops = ops | SelectionKey.OP_ACCEPT; + } + else + { + ops = ops | SelectionKey.OP_READ; + } + } + } + + // Set new ready write ops + for (int i = 0; i < write.length; i++) + { + if (key.getNativeFD() == write[i]) + { + ops = ops | SelectionKey.OP_WRITE; + + // if (key.channel ().isConnected ()) + // { + // ops = ops | SelectionKey.OP_WRITE; + // } + // else + // { + // ops = ops | SelectionKey.OP_CONNECT; + // } + } + } + + // FIXME: We dont handle exceptional file descriptors yet. + + // If key is not yet selected add it. + if (!selected.contains (key)) + { + selected.add (key); + } + + // Set new ready ops + key.readyOps (key.interestOps () & ops); + } + deregisterCancelledKeys(); + + return result; + } + } } public final Set selectedKeys() { + if (!isOpen()) + throw new ClosedSelectorException(); + return selected; } public final Selector wakeup() { - return null; + // IMPLEMENTATION NOTE: Whereas the specification says that + // thread interruption should trigger a call to wakeup, we + // do the reverse under the covers: wakeup triggers a thread + // interrupt followed by a subsequent reset of the thread's + // interrupt status within select(). + + // First, acquire the monitor of the object regulating + // access to our selectThread and unhandledWakeup fields. + synchronized (selectThreadMutex) + { + unhandledWakeup = true; + + // Interrupt any thread which is currently blocked in + // a select operation. + if (selectThread != null) + selectThread.interrupt (); + } + + return this; } private final void deregisterCancelledKeys() { - Iterator it = cancelledKeys().iterator(); - - while (it.hasNext ()) - { - keys.remove ((SelectionKeyImpl) it.next ()); - it.remove (); - } + Set ckeys = cancelledKeys (); + synchronized (ckeys) + { + Iterator it = ckeys.iterator(); + + while (it.hasNext ()) + { + keys.remove ((SelectionKeyImpl) it.next ()); + it.remove (); + } + } } protected SelectionKey register (SelectableChannel ch, int ops, Object att) @@ -270,7 +407,11 @@ public class SelectorImpl extends AbstractSelector throw new InternalError ("No known channel type"); } - keys.add (result); + synchronized (keys) + { + keys.add (result); + } + result.interestOps (ops); result.attach (att); return result; |