Copyright (C) 2009-2013 Barchart, Inc. All rights reserved. Licensed under the OSI BSD License. http://www.opensource.org/licenses/bsd-license.php
/** * Copyright (C) 2009-2013 Barchart, Inc. <http://www.barchart.com/> * * All rights reserved. Licensed under the OSI BSD License. * * http://www.opensource.org/licenses/bsd-license.php */
package com.barchart.udt.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.ConnectionPendingException; import java.nio.channels.IllegalBlockingModeException; import java.nio.channels.SocketChannel; import java.nio.channels.UnresolvedAddressException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.barchart.udt.ExceptionUDT; import com.barchart.udt.SocketUDT; import com.barchart.udt.TypeUDT; import com.barchart.udt.anno.ThreadSafe;
SocketChannel-like wrapper for SocketUDT, can be either stream or message oriented, depending on TypeUDT

The UDT socket that this SocketChannel wraps will be switched to blocking mode since this is the default for all SocketChannels on construction. If you require non-blocking functionality, you will need to call configureBlocking on the constructed SocketChannel class.

you must use SelectorProviderUDT.openSocketChannel() to obtain instance of this class; do not use JDK SocketChannel.open();

example:

SelectorProvider provider = SelectorProviderUDT.DATAGRAM;
SocketChannel clientChannel = provider.openSocketChannel();
clientChannel.configureBlocking(true);
Socket clientSocket = clientChannel.socket();
InetSocketAddress clientAddress = new InetSocketAddress("localhost", 10000);
clientSocket.bind(clientAddress);
assert clientSocket.isBound();
InetSocketAddress serverAddress = new InetSocketAddress("localhost", 12345);
clientChannel.connect(serverAddress);
assert clientSocket.isConnected();
/** * {@link SocketChannel}-like wrapper for {@link SocketUDT}, can be either * stream or message oriented, depending on {@link TypeUDT} * <p> * The UDT socket that this SocketChannel wraps will be switched to blocking * mode since this is the default for all SocketChannels on construction. If you * require non-blocking functionality, you will need to call configureBlocking * on the constructed SocketChannel class. * <p> * you must use {@link SelectorProviderUDT#openSocketChannel()} to obtain * instance of this class; do not use JDK * {@link java.nio.channels.SocketChannel#open()}; * <p> * example: * * <pre> * SelectorProvider provider = SelectorProviderUDT.DATAGRAM; * SocketChannel clientChannel = provider.openSocketChannel(); * clientChannel.configureBlocking(true); * Socket clientSocket = clientChannel.socket(); * InetSocketAddress clientAddress = new InetSocketAddress(&quot;localhost&quot;, 10000); * clientSocket.bind(clientAddress); * assert clientSocket.isBound(); * InetSocketAddress serverAddress = new InetSocketAddress(&quot;localhost&quot;, 12345); * clientChannel.connect(serverAddress); * assert clientSocket.isConnected(); * </pre> */
public class SocketChannelUDT extends SocketChannel implements ChannelUDT { protected static final Logger log = LoggerFactory .getLogger(SocketChannelUDT.class); protected final Object connectLock = new Object();
local volatile variable, which mirrors super.blocking, to avoid the cost of synchronized call inside isBlocking()
/** * local volatile variable, which mirrors super.blocking, to avoid the cost * of synchronized call inside isBlocking() */
protected volatile boolean isBlockingMode = isBlocking(); protected volatile boolean isConnectFinished; protected volatile boolean isConnectionPending; @ThreadSafe("this") protected NioSocketUDT socketAdapter; protected final SocketUDT socketUDT; protected SocketChannelUDT( // final SelectorProviderUDT provider, // final SocketUDT socketUDT // ) throws ExceptionUDT { super(provider); this.socketUDT = socketUDT; this.socketUDT.setBlocking(true); } protected SocketChannelUDT( // final SelectorProviderUDT provider, // final SocketUDT socketUDT, // final boolean isConnected // ) throws ExceptionUDT { this(provider, socketUDT); if (isConnected) { isConnectFinished = true; isConnectionPending = false; } else { isConnectFinished = false; isConnectionPending = true; } } @Override public boolean connect(final SocketAddress remote) throws IOException { if (!isOpen()) { throw new ClosedChannelException(); } if (isConnected()) { log.warn("already connected; ignoring remote={}", remote); return true; } if (remote == null) { close(); log.error("remote == null"); throw new NullPointerException(); } final InetSocketAddress remoteSocket = (InetSocketAddress) remote; if (remoteSocket.isUnresolved()) { log.error("can not use unresolved address: remote={}", remote); close(); throw new UnresolvedAddressException(); } if (isBlocking()) { synchronized (connectLock) { try { if (isConnectionPending) { close(); throw new ConnectionPendingException(); } isConnectionPending = true; begin(); socketUDT.connect(remoteSocket); } finally { end(true); isConnectionPending = false; connectLock.notifyAll(); } } return socketUDT.isConnected(); } else { /** non Blocking */ if (!isRegistered()) { /** this channel is independent of any selector */ log.error("UDT channel is in NON blocking mode; " + "must register with a selector " // + "before trying to connect(); " // + "socketId=" + socketUDT.id()); throw new IllegalBlockingModeException(); } /** this channel is registered with a selector */ synchronized (connectLock) { if (isConnectionPending) { close(); log.error("connection already in progress"); throw new ConnectionPendingException(); } isConnectFinished = false; isConnectionPending = true; socketUDT.connect(remoteSocket); } /** * connection operation must later be completed by invoking the * #finishConnect() method. */ return false; } } @Override public boolean finishConnect() throws IOException { if (!isOpen()) { throw new ClosedChannelException(); } if (isBlocking()) { synchronized (connectLock) { while (isConnectionPending) { try { connectLock.wait(); } catch (final InterruptedException e) { throw new IOException(e); } } } } if (isConnected()) { isConnectFinished = true; isConnectionPending = false; return true; } else { log.error("connect failure : {}", socketUDT); throw new IOException(); } } @Override protected void implCloseSelectableChannel() throws IOException { socketUDT.close(); } @Override protected void implConfigureBlocking(final boolean block) throws IOException { socketUDT.setBlocking(block); isBlockingMode = block; } @Override public boolean isConnected() { return socketUDT.isConnected(); } @Override public boolean isConnectFinished() { return isConnectFinished; } @Override public boolean isConnectionPending() { return isConnectionPending; } @Override public KindUDT kindUDT() { return KindUDT.CONNECTOR; } @Override public SelectorProviderUDT providerUDT() { return (SelectorProviderUDT) super.provider(); } //
See SocketChannel.read(ByteBuffer) contract; note: this method does not return (-1) as EOS (end of stream flag)
See Also:
Returns:<0 should not happen
=0 blocking mode: timeout occurred on receive
=0 non-blocking mode: nothing is received by the underlying UDT socket
>0 actual bytes received count
/** * See {@link java.nio.channels.SocketChannel#read(ByteBuffer)} contract; * note: this method does not return (-1) as EOS (end of stream flag) * * @return <code><0</code> should not happen<br> * <code>=0</code> blocking mode: timeout occurred on receive<br> * <code>=0</code> non-blocking mode: nothing is received by the * underlying UDT socket<br> * <code>>0</code> actual bytes received count<br> * @see com.barchart.udt.SocketUDT#receive(ByteBuffer) * @see com.barchart.udt.SocketUDT#receive(byte[], int, int) */
@Override public int read(final ByteBuffer buffer) throws IOException { final int remaining = buffer.remaining(); if (remaining <= 0) { return 0; } final SocketUDT socket = socketUDT; final boolean isBlocking = isBlockingMode; final int sizeReceived; try { if (isBlocking) { begin(); // JDK contract for NIO blocking calls } if (buffer.isDirect()) { sizeReceived = socket.receive(buffer); } else { final byte[] array = buffer.array(); final int position = buffer.position(); final int limit = buffer.limit(); sizeReceived = socket.receive(array, position, limit); if (0 < sizeReceived && sizeReceived <= remaining) { buffer.position(position + sizeReceived); } } } finally { if (isBlocking) { end(true); // JDK contract for NIO blocking calls } } // see contract for receive() if (sizeReceived < 0) { // log.trace("nothing was received; socket={}", socket); return 0; } if (sizeReceived == 0) { // log.trace("receive timeout; socket={}", socket); return 0; } if (sizeReceived <= remaining) { return sizeReceived; } else { log.error("should not happen: socket={}", socket); return 0; } } @Override public long read(final ByteBuffer[] dsts, final int offset, final int length) throws IOException { throw new RuntimeException("feature not available"); } @Override public synchronized NioSocketUDT socket() { if (socketAdapter == null) { try { socketAdapter = new NioSocketUDT(this); } catch (final ExceptionUDT e) { log.error("failed to make socket", e); } } return socketAdapter; } @Override public SocketUDT socketUDT() { return socketUDT; } @Override public String toString() { return socketUDT.toString(); }
See Also:
Returns:<0 should not happen
=0 blocking mode: timeout occurred on send
=0 non-blocking mode: buffer is full in the underlying UDT socket; nothing is sent
>0 actual bytes sent count
/** * See {@link java.nio.channels.SocketChannel#write(ByteBuffer)} contract; * * @return <code><0</code> should not happen<br> * <code>=0</code> blocking mode: timeout occurred on send<br> * <code>=0</code> non-blocking mode: buffer is full in the * underlying UDT socket; nothing is sent<br> * <code>>0</code> actual bytes sent count<br> * @see com.barchart.udt.SocketUDT#send(ByteBuffer) * @see com.barchart.udt.SocketUDT#send(byte[], int, int) */
@Override public int write(final ByteBuffer buffer) throws IOException { // writeCount.incrementAndGet(); if (buffer == null) { throw new NullPointerException("buffer == null"); } final int remaining = buffer.remaining(); if (remaining <= 0) { return 0; } final SocketUDT socket = socketUDT; final boolean isBlocking = isBlockingMode; int sizeSent = 0; int ret = 0; try { if (isBlocking) { begin(); // JDK contract for NIO blocking calls } if (buffer.isDirect()) { do { ret = socket.send(buffer); if (ret > 0) sizeSent += ret; } while (buffer.hasRemaining() && isBlocking); } else { final byte[] array = buffer.array(); int position = buffer.position(); final int limit = buffer.limit(); do { ret = socket.send(array, position, limit); if (0 < ret && ret <= remaining) { sizeSent += ret; position += ret; buffer.position(position); } } while (buffer.hasRemaining() && isBlocking); } } finally { if (isBlocking) { end(true); // JDK contract for NIO blocking calls } } // see contract for send() if (ret < 0) { // log.trace("no buffer space; socket={}", socket); return 0; } if (ret == 0) { // log.trace("send timeout; socket={}", socket); return 0; } if (sizeSent <= remaining) { return sizeSent; } else { log.error("should not happen; socket={}", socket); return 0; } } @Override public long write(final ByteBuffer[] bufferArray, final int offset, final int length) throws IOException { try { long total = 0; for (int index = offset; index < offset + length; index++) { final ByteBuffer buffer = bufferArray[index]; final int remaining = buffer.remaining(); final int processed = write(buffer); if (remaining == processed) { total += processed; } else { throw new IllegalStateException( "failed to write buffer in array"); } } return total; } catch (final Throwable e) { throw new IOException("failed to write buffer array", e); } } @Override public TypeUDT typeUDT() { return providerUDT().type(); }
java 7
/** java 7 */
public SocketChannelUDT bind(final SocketAddress localAddress) throws IOException { socketUDT.bind((InetSocketAddress) localAddress); return this; } }