aboutsummaryrefslogtreecommitdiff
path: root/libjava/gnu/java/nio/SelectorImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'libjava/gnu/java/nio/SelectorImpl.java')
-rw-r--r--libjava/gnu/java/nio/SelectorImpl.java301
1 files changed, 221 insertions, 80 deletions
diff --git a/libjava/gnu/java/nio/SelectorImpl.java b/libjava/gnu/java/nio/SelectorImpl.java
index 60a81f98458..f26e0808074 100644
--- a/libjava/gnu/java/nio/SelectorImpl.java
+++ b/libjava/gnu/java/nio/SelectorImpl.java
@@ -49,12 +49,44 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
+import gnu.classpath.Configuration;
public class SelectorImpl extends AbstractSelector
{
+ static
+ {
+ // load the shared library needed for native methods.
+ if (Configuration.INIT_LOAD_LIBRARY)
+ {
+ System.loadLibrary ("javanio");
+ }
+ }
+
private Set keys;
private Set selected;
+ /**
+ * A dummy object whose monitor regulates access to both our
+ * selectThread and unhandledWakeup fields.
+ */
+ private Object selectThreadMutex = new Object ();
+
+ /**
+ * Any thread that's currently blocked in a select operation.
+ */
+ private Thread selectThread;
+
+ /**
+ * Indicates whether we have an unhandled wakeup call. This can
+ * be due to either wakeup() triggering a thread interruption while
+ * a thread was blocked in a select operation (in which case we need
+ * to reset this thread's interrupt status after interrupting the
+ * select), or else that no thread was on a select operation at the
+ * time that wakeup() was called, in which case the following select()
+ * operation should return immediately with nothing selected.
+ */
+ private boolean unhandledWakeup;
+
public SelectorImpl (SelectorProvider provider)
{
super (provider);
@@ -71,30 +103,47 @@ public class SelectorImpl extends AbstractSelector
protected final void implCloseSelector()
throws IOException
{
- // FIXME: We surely need to do more here.
+ // Cancel any pending select operation.
wakeup();
+
+ synchronized (keys)
+ {
+ synchronized (selected)
+ {
+ synchronized (cancelledKeys ())
+ {
+ // FIXME: Release resources here.
+ }
+ }
+ }
}
public final Set keys()
{
+ if (!isOpen())
+ throw new ClosedSelectorException();
+
return Collections.unmodifiableSet (keys);
}
public final int selectNow()
throws IOException
{
+ // FIXME: We're simulating an immediate select
+ // via a select with a timeout of one millisecond.
return select (1);
}
public final int select()
throws IOException
{
- return select (-1);
+ return select (0);
}
- // A timeout value of -1 means block forever.
+ // A timeout value of 0 means block forever.
private static native int implSelect (int[] read, int[] write,
- int[] except, long timeout);
+ int[] except, long timeout)
+ throws IOException;
private final int[] getFDsAsArray (int ops)
{
@@ -133,111 +182,199 @@ public class SelectorImpl extends AbstractSelector
return result;
}
- public int select (long timeout)
+ public synchronized int select (long timeout)
+ throws IOException
{
if (!isOpen())
- throw new ClosedSelectorException ();
-
- if (keys == null)
- {
- return 0;
- }
-
- deregisterCancelledKeys();
-
- // Set only keys with the needed interest ops into the arrays.
- int[] read = getFDsAsArray (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT);
- int[] write = getFDsAsArray (SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT);
- int[] except = new int [0]; // FIXME: We dont need to check this yet
- int anzahl = read.length + write.length + except.length;
-
- // Call the native select() on all file descriptors.
- begin();
- int result = implSelect (read, write, except, timeout);
- end();
-
- Iterator it = keys.iterator ();
-
- while (it.hasNext ())
+ throw new ClosedSelectorException();
+
+ synchronized (keys)
{
- int ops = 0;
- SelectionKeyImpl key = (SelectionKeyImpl) it.next ();
-
- // If key is already selected retrieve old ready ops.
- if (selected.contains (key))
+ synchronized (selected)
{
- ops = key.readyOps ();
- }
-
- // Set new ready read/accept ops
- for (int i = 0; i < read.length; i++)
- {
- if (key.getNativeFD() == read[i])
+ deregisterCancelledKeys();
+
+ // Set only keys with the needed interest ops into the arrays.
+ int[] read = getFDsAsArray (SelectionKey.OP_READ
+ | SelectionKey.OP_ACCEPT);
+ int[] write = getFDsAsArray (SelectionKey.OP_WRITE
+ | SelectionKey.OP_CONNECT);
+
+ // FIXME: We dont need to check this yet
+ int[] except = new int [0];
+
+ // Test to see if we've got an unhandled wakeup call,
+ // in which case we return immediately. Otherwise,
+ // remember our current thread and jump into the select.
+ // The monitor for dummy object selectThreadMutex regulates
+ // access to these fields.
+
+ // FIXME: Not sure from the spec at what point we should
+ // return "immediately". Is it here or immediately upon
+ // entry to this function?
+
+ // NOTE: There's a possibility of another thread calling
+ // wakeup() immediately after our thread releases
+ // selectThreadMutex's monitor here, in which case we'll
+ // do the select anyway. Since calls to wakeup() and select()
+ // among different threads happen in non-deterministic order,
+ // I don't think this is an issue.
+ synchronized (selectThreadMutex)
{
- if (key.channel () instanceof ServerSocketChannelImpl)
+ if (unhandledWakeup)
{
- ops = ops | SelectionKey.OP_ACCEPT;
+ unhandledWakeup = false;
+ return 0;
}
else
{
- ops = ops | SelectionKey.OP_READ;
+ selectThread = Thread.currentThread ();
}
}
- }
- // Set new ready write ops
- for (int i = 0; i < write.length; i++)
- {
- if (key.getNativeFD() == write[i])
+ // Call the native select() on all file descriptors.
+ int result = 0;
+ try
{
- ops = ops | SelectionKey.OP_WRITE;
-
-// if (key.channel ().isConnected ())
-// {
-// ops = ops | SelectionKey.OP_WRITE;
-// }
-// else
-// {
-// ops = ops | SelectionKey.OP_CONNECT;
-// }
- }
- }
+ begin();
+ result = implSelect (read, write, except, timeout);
+ }
+ finally
+ {
+ end();
+ }
+
+ // If our unhandled wakeup flag is set at this point,
+ // reset our thread's interrupt flag because we were
+ // awakened by wakeup() instead of an external thread
+ // interruption.
+ //
+ // NOTE: If we were blocked in a select() and one thread
+ // called Thread.interrupt() on the blocked thread followed
+ // by another thread calling Selector.wakeup(), then race
+ // conditions could make it so that the thread's interrupt
+ // flag is reset even though the Thread.interrupt() call
+ // "was there first". I don't think we need to care about
+ // this scenario.
+ synchronized (selectThreadMutex)
+ {
+ if (unhandledWakeup)
+ {
+ unhandledWakeup = false;
+ selectThread.interrupted ();
+ }
+ selectThread = null;
+ }
- // FIXME: We dont handle exceptional file descriptors yet.
+ Iterator it = keys.iterator ();
- // If key is not yet selected add it.
- if (!selected.contains (key))
- {
- selected.add (key);
- }
+ while (it.hasNext ())
+ {
+ int ops = 0;
+ SelectionKeyImpl key = (SelectionKeyImpl) it.next ();
- // Set new ready ops
- key.readyOps (key.interestOps () & ops);
- }
+ // If key is already selected retrieve old ready ops.
+ if (selected.contains (key))
+ {
+ ops = key.readyOps ();
+ }
- deregisterCancelledKeys();
- return result;
+ // Set new ready read/accept ops
+ for (int i = 0; i < read.length; i++)
+ {
+ if (key.getNativeFD() == read[i])
+ {
+ if (key.channel () instanceof ServerSocketChannelImpl)
+ {
+ ops = ops | SelectionKey.OP_ACCEPT;
+ }
+ else
+ {
+ ops = ops | SelectionKey.OP_READ;
+ }
+ }
+ }
+
+ // Set new ready write ops
+ for (int i = 0; i < write.length; i++)
+ {
+ if (key.getNativeFD() == write[i])
+ {
+ ops = ops | SelectionKey.OP_WRITE;
+
+ // if (key.channel ().isConnected ())
+ // {
+ // ops = ops | SelectionKey.OP_WRITE;
+ // }
+ // else
+ // {
+ // ops = ops | SelectionKey.OP_CONNECT;
+ // }
+ }
+ }
+
+ // FIXME: We dont handle exceptional file descriptors yet.
+
+ // If key is not yet selected add it.
+ if (!selected.contains (key))
+ {
+ selected.add (key);
+ }
+
+ // Set new ready ops
+ key.readyOps (key.interestOps () & ops);
+ }
+ deregisterCancelledKeys();
+
+ return result;
+ }
+ }
}
public final Set selectedKeys()
{
+ if (!isOpen())
+ throw new ClosedSelectorException();
+
return selected;
}
public final Selector wakeup()
{
- return null;
+ // IMPLEMENTATION NOTE: Whereas the specification says that
+ // thread interruption should trigger a call to wakeup, we
+ // do the reverse under the covers: wakeup triggers a thread
+ // interrupt followed by a subsequent reset of the thread's
+ // interrupt status within select().
+
+ // First, acquire the monitor of the object regulating
+ // access to our selectThread and unhandledWakeup fields.
+ synchronized (selectThreadMutex)
+ {
+ unhandledWakeup = true;
+
+ // Interrupt any thread which is currently blocked in
+ // a select operation.
+ if (selectThread != null)
+ selectThread.interrupt ();
+ }
+
+ return this;
}
private final void deregisterCancelledKeys()
{
- Iterator it = cancelledKeys().iterator();
-
- while (it.hasNext ())
- {
- keys.remove ((SelectionKeyImpl) it.next ());
- it.remove ();
- }
+ Set ckeys = cancelledKeys ();
+ synchronized (ckeys)
+ {
+ Iterator it = ckeys.iterator();
+
+ while (it.hasNext ())
+ {
+ keys.remove ((SelectionKeyImpl) it.next ());
+ it.remove ();
+ }
+ }
}
protected SelectionKey register (SelectableChannel ch, int ops, Object att)
@@ -270,7 +407,11 @@ public class SelectorImpl extends AbstractSelector
throw new InternalError ("No known channel type");
}
- keys.add (result);
+ synchronized (keys)
+ {
+ keys.add (result);
+ }
+
result.interestOps (ops);
result.attach (att);
return result;