/*
 ***** BEGIN LICENSE BLOCK *****
 * Version: EPL 2.0/GPL 2.0/LGPL 2.1
 *
 * The contents of this file are subject to the Eclipse Public
 * License Version 2.0 (the "License"); you may not use this file
 * except in compliance with the License. You may obtain a copy of
 * the License at http://www.eclipse.org/legal/epl-v20.html
 *
 * Software distributed under the License is distributed on an "AS
 * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or
 * implied. See the License for the specific language governing
 * rights and limitations under the License.
 *
 * Alternatively, the contents of this file may be used under the terms of
 * either of the GNU General Public License Version 2 or later (the "GPL"),
 * or the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
 * in which case the provisions of the GPL or the LGPL are applicable instead
 * of those above. If you wish to allow use of your version of this file only
 * under the terms of either the GPL or the LGPL, and not to allow others to
 * use your version of this file under the terms of the EPL, indicate your
 * decision by deleting the provisions above and replace them with the notice
 * and other provisions required by the GPL or the LGPL. If you do not delete
 * the provisions above, a recipient may use your version of this file under
 * the terms of any one of the EPL, the GPL or the LGPL.
 ***** END LICENSE BLOCK *****/

package org.jruby.util.io;

import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyInteger;
import org.jruby.RubyFloat;
import org.jruby.RubyIO;
import org.jruby.RubyThread;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.nio.channels.spi.SelectorProvider;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.jruby.exceptions.RaiseException;

import static com.headius.backport9.buffer.Buffers.flipBuffer;

This is a reimplementation of MRI's IO#select logic. It has been rewritten from an earlier version in JRuby to improve performance and readability. This version avoids allocating a selector or any data structures to hold data about the channels/IOs being selected unless absolutely necessary. It also uses simple boolean arrays to track characteristics like whether an IO is pending or unselectable, rather than maintaining Set structures. It avoids hitting Java Integration code to get IO objects out of the incoming Array. Finally, it tries to build a minimal number of data structures an reuse them as much as possible.
/** * This is a reimplementation of MRI's IO#select logic. It has been rewritten * from an earlier version in JRuby to improve performance and readability. * * This version avoids allocating a selector or any data structures to hold * data about the channels/IOs being selected unless absolutely necessary. It * also uses simple boolean arrays to track characteristics like whether an IO * is pending or unselectable, rather than maintaining Set structures. It avoids * hitting Java Integration code to get IO objects out of the incoming Array. * Finally, it tries to build a minimal number of data structures an reuse them * as much as possible. */
@Deprecated public class SelectBlob { public SelectBlob() {} public IRubyObject goForIt(ThreadContext context, Ruby runtime, IRubyObject[] args) { this.runtime = runtime; try { processReads(runtime, args, context); processWrites(runtime, args, context); if (args.length > 2 && !args[2].isNil()) { checkArrayType(runtime, args[2]); // Java's select doesn't do anything about this, so we leave it be. } final boolean has_timeout = args.length > 3 && !args[3].isNil(); final long timeout = !has_timeout ? 0 : convertTimeout(context, args[3]); if (timeout < 0) { throw runtime.newArgumentError("time interval must be positive"); } // If all streams are nil, just sleep the specified time (JRUBY-4699) if (args[0].isNil() && args[1].isNil() && args[2].isNil()) { RubyThread thread = context.getThread(); if (has_timeout) { if (timeout > 0) { long now = System.currentTimeMillis(); thread.sleep(timeout); // Guard against spurious wakeup while (System.currentTimeMillis() < now + timeout) { thread.sleep(1); } } } else { thread.sleep(0); } } else { doSelect(runtime, has_timeout, timeout); processSelectedKeys(runtime); processPendingAndUnselectable(); tidyUp(); } if (readResults == null && writeResults == null && errorResults == null) { return runtime.getNil(); } return constructResults(runtime); } catch (BadDescriptorException e) { throw runtime.newErrnoEBADFError(); } catch (CancelledKeyException e) { throw runtime.newErrnoEBADFError(); } catch (IOException e) { throw runtime.newIOErrorFromException(e); } catch (InterruptedException ie) { throw runtime.newThreadError("select interrupted"); } finally { for (Selector selector : selectors.values()) { try { selector.close(); } catch (Exception e) { } } } } private void processReads(Ruby runtime, IRubyObject[] args, ThreadContext context) throws BadDescriptorException, IOException { if (!args[0].isNil()) { // read checkArrayType(runtime, args[0]); readArray = (RubyArray) args[0]; readSize = readArray.size(); if (readSize == 0) { // clear reference; we aren't going to do anything readArray = null; } else { readIOs = new RubyIO[readSize]; Map<Character,Integer> attachment = new HashMap<Character,Integer>(1); for (int i = 0; i < readSize; i++) { RubyIO ioObj = saveReadIO(i, context); saveReadBlocking(ioObj, i); saveBufferedRead(ioObj, i); attachment.clear(); attachment.put('r', i); trySelectRead(context, attachment, ioObj.getOpenFileChecked()); } } } } private RubyIO saveReadIO(int i, ThreadContext context) { IRubyObject obj = readArray.eltOk(i); RubyIO ioObj = RubyIO.convertToIO(context, obj); readIOs[i] = ioObj; return ioObj; } private void saveReadBlocking(RubyIO ioObj, int i) { // save blocking state if (ioObj.getChannel() instanceof SelectableChannel) { getReadBlocking()[i] = ((SelectableChannel) ioObj.getChannel()).isBlocking(); } } private void saveBufferedRead(RubyIO ioObj, int i) throws BadDescriptorException { // already buffered data? don't bother selecting if (ioObj.getOpenFile().READ_DATA_BUFFERED()) { getUnselectableReads()[i] = true; } } private void trySelectRead(ThreadContext context, Map<Character,Integer> attachment, OpenFile fptr) throws IOException { if (fptr.selectChannel() != null && registerSelect(getSelector(context, fptr.selectChannel()), attachment, fptr.selectChannel(), READ_ACCEPT_OPS)) { selectedReads++; if (fptr.READ_CHAR_PENDING() || fptr.READ_DATA_PENDING()) { getPendingReads()[attachment.get('r')] = true; } } else { if (fptr.isReadable()) { getUnselectableReads()[attachment.get('r')] = true; } } } private void processWrites(Ruby runtime, IRubyObject[] args, ThreadContext context) throws IOException { if (args.length > 1 && !args[1].isNil()) { // write checkArrayType(runtime, args[1]); writeArray = (RubyArray) args[1]; writeSize = writeArray.size(); if (writeArray.size() == 0) { // clear reference; we aren't going to do anything writeArray = null; } else { writeIOs = new RubyIO[writeSize]; Map<Character,Integer> attachment = new HashMap<Character,Integer>(1); for (int i = 0; i < writeSize; i++) { RubyIO ioObj = saveWriteIO(i, context); saveWriteBlocking(ioObj, i); attachment.clear(); attachment.put('w', i); trySelectWrite(context, attachment, ioObj.getOpenFileChecked()); } } } } private RubyIO saveWriteIO(int i, ThreadContext context) { IRubyObject obj = writeArray.eltOk(i); RubyIO ioObj = RubyIO.convertToIO(context, obj); writeIOs[i] = ioObj.GetWriteIO(); return ioObj; } private void saveWriteBlocking(RubyIO ioObj, int i) { if (ioObj.getChannel() instanceof SelectableChannel) { // save blocking state if (readBlocking != null) { // some read has saved blocking state // find obj int readIndex = fastSearch(readIOs, ioObj); if (readIndex == -1) { // save blocking only if not found getWriteBlocking()[i] = ((SelectableChannel) ioObj.getChannel()).isBlocking(); } } else { getWriteBlocking()[i] = ((SelectableChannel) ioObj.getChannel()).isBlocking(); } } } private void trySelectWrite(ThreadContext context, Map<Character,Integer> attachment, OpenFile fptr) throws IOException { if (fptr.selectChannel() == null || false == registerSelect(getSelector(context, fptr.selectChannel()), attachment, fptr.selectChannel(), WRITE_CONNECT_OPS)) { selectedReads++; if (fptr.isWritable()) { getUnselectableWrites()[attachment.get('w')] = true; } } } private static long convertTimeout(final ThreadContext context, IRubyObject timeoutArg) { final long timeout; if (timeoutArg instanceof RubyFloat) { timeout = Math.round(((RubyFloat) timeoutArg).getDoubleValue() * 1000); } else if (timeoutArg instanceof RubyInteger) { timeout = Math.round(((RubyInteger) timeoutArg).getDoubleValue() * 1000); } else { final Ruby runtime = context.runtime; if ( ! runtime.is1_8() ) { RubyFloat t = null; try { t = timeoutArg.callMethod(context, "to_f").convertToFloat(); } catch (RaiseException e) { /* fallback to TypeError */ } timeout = t != null ? Math.round(t.getDoubleValue() * 1000) : -1; } else timeout = -1; if ( timeout == -1 ) { throw runtime.newTypeError("can't convert " + timeoutArg.getMetaClass().getName() + " into time interval"); } } if ( timeout < 0 ) throw context.runtime.newArgumentError("negative timeout given"); return timeout; } private void doSelect(Ruby runtime, final boolean has_timeout, long timeout) throws IOException { if (mainSelector != null) { if (pendingReads == null && unselectableReads == null && unselectableWrites == null) { if (has_timeout && timeout == 0) { for (Selector selector : selectors.values()) selector.selectNow(); } else { List<Future> futures = new ArrayList<Future>(enxioSelectors.size()); for (ENXIOSelector enxioSelector : enxioSelectors) { futures.add(runtime.getExecutor().submit(enxioSelector)); } mainSelector.select(has_timeout ? timeout : 0); for (ENXIOSelector enxioSelector : enxioSelectors) enxioSelector.selector.wakeup(); // ensure all the enxio threads have finished for (Future f : futures) try { f.get(); } catch (InterruptedException iex) { } catch (ExecutionException eex) { if (eex.getCause() instanceof IOException) { throw (IOException) eex.getCause(); } } } } else { for (Selector selector : selectors.values()) selector.selectNow(); } } // If any enxio selectors woke up, remove them from the selected key set of the main selector for (ENXIOSelector enxioSelector : enxioSelectors) { Pipe.SourceChannel source = enxioSelector.pipe.source(); SelectionKey key = source.keyFor(mainSelector); if (key != null && mainSelector.selectedKeys().contains(key)) { mainSelector.selectedKeys().remove(key); ByteBuffer buf = ByteBuffer.allocate(1); source.read(buf); } } } public static final int READ_ACCEPT_OPS = SelectExecutor.READ_ACCEPT_OPS; public static final int WRITE_CONNECT_OPS = SelectExecutor.WRITE_CONNECT_OPS; private static final int CANCELLED_OPS = SelectionKey.OP_READ | SelectionKey.OP_ACCEPT | SelectionKey.OP_CONNECT; private static boolean ready(int ops, int mask) { return (ops & mask) != 0; } private static boolean readAcceptReady(int ops) { return ready(ops, READ_ACCEPT_OPS); } private static boolean writeConnectReady(int ops) { return ready(ops, WRITE_CONNECT_OPS); } private static boolean cancelReady(int ops) { return ready(ops, CANCELLED_OPS); } private static boolean writeReady(int ops) { return ready(ops, SelectionKey.OP_WRITE); } @SuppressWarnings("unchecked") private void processSelectedKeys(Ruby runtime) throws IOException { for (Selector selector : selectors.values()) { for (SelectionKey key : selector.selectedKeys()) { int readIoIndex = 0; int writeIoIndex = 0; int interestAndReady = key.interestOps() & key.readyOps(); if (readArray != null && readAcceptReady(interestAndReady)) { readIoIndex = ((Map<Character,Integer>)key.attachment()).get('r'); getReadResults().append(readArray.eltOk(readIoIndex)); if (pendingReads != null) { pendingReads[readIoIndex] = false; } } if (writeArray != null && writeConnectReady(interestAndReady)) { writeIoIndex = ((Map<Character,Integer>)key.attachment()).get('w'); getWriteResults().append(writeArray.eltOk(writeIoIndex)); } } } } private void processPendingAndUnselectable() { if (pendingReads != null) { for (int i = 0; i < pendingReads.length; i++) { if (pendingReads[i]) { getReadResults().append(readArray.eltOk(i)); } } } if (unselectableReads != null) { for (int i = 0; i < unselectableReads.length; i++) { if (unselectableReads[i]) { getReadResults().append(readArray.eltOk(i)); } } } if (unselectableWrites != null) { for (int i = 0; i < unselectableWrites.length; i++) { if (unselectableWrites[i]) { getWriteResults().append(writeArray.eltOk(i)); } } } } private void tidyUp() throws IOException { // make all sockets blocking as configured again for (Selector selector : selectors.values()) { selector.close(); // close unregisters all channels, so we can safely reset blocking modes } for (ENXIOSelector enxioSelector : enxioSelectors) { enxioSelector.pipe.sink().close(); enxioSelector.pipe.source().close(); } if (readBlocking != null) { for (int i = 0; i < readBlocking.length; i++) { if (readBlocking[i] != null) { try { ((SelectableChannel) readIOs[i].getChannel()).configureBlocking(readBlocking[i]); } catch (IllegalBlockingModeException ibme) { throw runtime.newConcurrencyError("can not set IO blocking after select; concurrent select detected?"); } } } } if (writeBlocking != null) { for (int i = 0; i < writeBlocking.length; i++) { if (writeBlocking[i] != null) { try { ((SelectableChannel) writeIOs[i].getChannel()).configureBlocking(writeBlocking[i]); } catch (IllegalBlockingModeException ibme) { throw runtime.newConcurrencyError("can not set IO blocking after select; concurrent select detected?"); } } } } } private RubyArray getReadResults() { if (readResults == null) { readResults = RubyArray.newArray(runtime, readArray.size()); } return readResults; } private RubyArray getWriteResults() { if (writeResults == null) { writeResults = RubyArray.newArray(runtime, writeArray.size()); } return writeResults; } private RubyArray getErrorResults() { if (errorResults != null) { errorResults = RubyArray.newArray(runtime, readArray.size() + writeArray.size()); } return errorResults; } private Selector getSelector(ThreadContext context, SelectableChannel channel) throws IOException { Selector selector = selectors.get(channel.provider()); if (selector == null) { selector = SelectorFactory.openWithRetryFrom(context.runtime, channel.provider()); if (selectors.isEmpty()) { selectors = new HashMap<SelectorProvider, Selector>(); } selectors.put(channel.provider(), selector); if (!selector.provider().equals(SelectorProvider.provider())) { // need to create pipe between alt impl selector and native NIO selector Pipe pipe = Pipe.open(); ENXIOSelector enxioSelector = new ENXIOSelector(selector, pipe); if (enxioSelectors.isEmpty()) enxioSelectors = new ArrayList<ENXIOSelector>(); enxioSelectors.add(enxioSelector); pipe.source().configureBlocking(false); pipe.source().register(getSelector(context, pipe.source()), SelectionKey.OP_READ, enxioSelector); } else if (mainSelector == null) { mainSelector = selector; } } return selector; } private Boolean[] getReadBlocking() { if (readBlocking == null) { readBlocking = new Boolean[readSize]; } return readBlocking; } private Boolean[] getWriteBlocking() { if (writeBlocking == null) { writeBlocking = new Boolean[writeSize]; } return writeBlocking; } private boolean[] getUnselectableReads() { if (unselectableReads == null) { unselectableReads = new boolean[readSize]; } return unselectableReads; } private boolean[] getUnselectableWrites() { if (unselectableWrites == null) { unselectableWrites = new boolean[writeSize]; } return unselectableWrites; } private boolean[] getPendingReads() { if (pendingReads == null) { pendingReads = new boolean[readSize]; } return pendingReads; } private IRubyObject constructResults(Ruby runtime) { return RubyArray.newArrayLight( runtime, readResults == null ? RubyArray.newEmptyArray(runtime) : readResults, writeResults == null ? RubyArray.newEmptyArray(runtime) : writeResults, errorResults == null ? RubyArray.newEmptyArray(runtime) : errorResults); } private int fastSearch(Object[] ary, Object obj) { for (int i = 0; i < ary.length; i++) { if (ary[i] == obj) { return i; } } return -1; } private static void checkArrayType(Ruby runtime, IRubyObject obj) { if (!(obj instanceof RubyArray)) { throw runtime.newTypeError("wrong argument type " + obj.getMetaClass().getName() + " (expected Array)"); } } @SuppressWarnings("unchecked") private static boolean registerSelect(Selector selector, Map<Character,Integer> obj, SelectableChannel channel, int ops) throws IOException { channel.configureBlocking(false); int real_ops = channel.validOps() & ops; SelectionKey key = channel.keyFor(selector); if (key == null) { Map<Character,Integer> attachment = new HashMap<Character,Integer> (1); attachment.putAll(obj); channel.register(selector, real_ops, attachment ); } else { key.interestOps(key.interestOps() | real_ops); Map<Character,Integer> att = (Map<Character,Integer>)key.attachment(); att.putAll(obj); key.attach(att); } return true; } private static final class ENXIOSelector implements Callable<Object> { private final Selector selector; private final Pipe pipe; private ENXIOSelector(Selector selector, Pipe pipe) { this.selector = selector; this.pipe = pipe; } public Object call() throws Exception { try { selector.select(); } finally { ByteBuffer buf = ByteBuffer.allocate(1); buf.put((byte) 0); flipBuffer(buf); pipe.sink().write(buf); } return null; } } Ruby runtime; RubyArray readArray = null; int readSize = 0; RubyIO[] readIOs = null; boolean[] unselectableReads = null; boolean[] pendingReads = null; Boolean[] readBlocking = null; int selectedReads = 0; RubyArray writeArray = null; int writeSize = 0; RubyIO[] writeIOs = null; boolean[] unselectableWrites = null; Boolean[] writeBlocking = null; int selectedWrites = 0; Selector mainSelector = null; Map<SelectorProvider, Selector> selectors = Collections.emptyMap(); Collection<ENXIOSelector> enxioSelectors = Collections.emptyList(); RubyArray readResults = null; RubyArray writeResults = null; RubyArray errorResults = null; }