/*
 * JBoss, Home of Professional Open Source
 *
 * Copyright 2008 Red Hat, Inc. and/or its affiliates.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.xnio;

import java.io.Closeable;
import java.io.IOException;
import java.net.DatagramSocket;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.Selector;
import java.nio.channels.Channel;
import java.nio.channels.WritableByteChannel;
import java.util.Random;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.zip.ZipFile;
import org.xnio.channels.SuspendableReadChannel;

import java.util.logging.Handler;

import static org.xnio._private.Messages.closeMsg;
import static org.xnio._private.Messages.msg;

General I/O utility methods.
@apiviz.exclude
/** * General I/O utility methods. * * @apiviz.exclude */
public final class IoUtils { private static final Executor NULL_EXECUTOR = new Executor() { private final String string = String.format("null executor <%s>", Integer.toHexString(hashCode())); public void execute(final Runnable command) { // no operation } public String toString() { return string; } }; private static final Executor DIRECT_EXECUTOR = new Executor() { private final String string = String.format("direct executor <%s>", Integer.toHexString(hashCode())); public void execute(final Runnable command) { command.run(); } public String toString() { return string; } }; private static final Closeable NULL_CLOSEABLE = new Closeable() { private final String string = String.format("null closeable <%s>", Integer.toHexString(hashCode())); public void close() throws IOException { // no operation } public String toString() { return string; } }; private static final Cancellable NULL_CANCELLABLE = new Cancellable() { public Cancellable cancel() { return this; } }; @SuppressWarnings("rawtypes") private static final IoUtils.ResultNotifier RESULT_NOTIFIER = new IoUtils.ResultNotifier(); private IoUtils() {}
Get the direct executor. This is an executor that executes the provided task in the same thread.
Returns:a direct executor
/** * Get the direct executor. This is an executor that executes the provided task in the same thread. * * @return a direct executor */
public static Executor directExecutor() { return DIRECT_EXECUTOR; }
Get the null executor. This is an executor that never actually executes the provided task.
Returns:a null executor
/** * Get the null executor. This is an executor that never actually executes the provided task. * * @return a null executor */
public static Executor nullExecutor() { return NULL_EXECUTOR; }
Get the null closeable. This is a simple Closeable instance that does nothing when its close() method is invoked.
Returns:the null closeable
/** * Get the null closeable. This is a simple {@code Closeable} instance that does nothing when its {@code close()} * method is invoked. * * @return the null closeable */
public static Closeable nullCloseable() { return NULL_CLOSEABLE; }
Close a resource, logging an error if an error occurs.
Params:
  • resource – the resource to close
/** * Close a resource, logging an error if an error occurs. * * @param resource the resource to close */
public static void safeClose(final AutoCloseable resource) { try { if (resource != null) { closeMsg.closingResource(resource); resource.close(); } } catch (ClosedChannelException ignored) { } catch (Throwable t) { closeMsg.resourceCloseFailed(t, resource); } }
Close a resource, logging an error if an error occurs.
Params:
  • resource – the resource to close
/** * Close a resource, logging an error if an error occurs. * * @param resource the resource to close */
public static void safeClose(final Closeable resource) { try { if (resource != null) { closeMsg.closingResource(resource); resource.close(); } } catch (ClosedChannelException ignored) { msg.tracef("safeClose, ignoring ClosedChannelException exception"); } catch (Throwable t) { closeMsg.resourceCloseFailed(t, resource); } }
Close a series of resources, logging errors if they occur.
Params:
  • resources – the resources to close
/** * Close a series of resources, logging errors if they occur. * * @param resources the resources to close */
public static void safeClose(final Closeable... resources) { for (Closeable resource : resources) { safeClose(resource); } }
Close a resource, logging an error if an error occurs.
Params:
  • resource – the resource to close
/** * Close a resource, logging an error if an error occurs. * * @param resource the resource to close */
public static void safeClose(final Socket resource) { try { if (resource != null) { closeMsg.closingResource(resource); resource.close(); } } catch (ClosedChannelException ignored) { } catch (Throwable t) { closeMsg.resourceCloseFailed(t, resource); } }
Close a resource, logging an error if an error occurs.
Params:
  • resource – the resource to close
/** * Close a resource, logging an error if an error occurs. * * @param resource the resource to close */
public static void safeClose(final DatagramSocket resource) { try { if (resource != null) { closeMsg.closingResource(resource); resource.close(); } } catch (Throwable t) { closeMsg.resourceCloseFailed(t, resource); } }
Close a resource, logging an error if an error occurs.
Params:
  • resource – the resource to close
/** * Close a resource, logging an error if an error occurs. * * @param resource the resource to close */
public static void safeClose(final Selector resource) { try { if (resource != null) { closeMsg.closingResource(resource); resource.close(); } } catch (ClosedChannelException ignored) { } catch (Throwable t) { closeMsg.resourceCloseFailed(t, resource); } }
Close a resource, logging an error if an error occurs.
Params:
  • resource – the resource to close
/** * Close a resource, logging an error if an error occurs. * * @param resource the resource to close */
public static void safeClose(final ServerSocket resource) { try { if (resource != null) { closeMsg.closingResource(resource); resource.close(); } } catch (ClosedChannelException ignored) { } catch (Throwable t) { closeMsg.resourceCloseFailed(t, resource); } }
Close a resource, logging an error if an error occurs.
Params:
  • resource – the resource to close
/** * Close a resource, logging an error if an error occurs. * * @param resource the resource to close */
public static void safeClose(final ZipFile resource) { try { if (resource != null) { closeMsg.closingResource(resource); resource.close(); } } catch (Throwable t) { closeMsg.resourceCloseFailed(t, resource); } }
Close a resource, logging an error if an error occurs.
Params:
  • resource – the resource to close
/** * Close a resource, logging an error if an error occurs. * * @param resource the resource to close */
public static void safeClose(final Handler resource) { try { if (resource != null) { closeMsg.closingResource(resource); resource.close(); } } catch (Throwable t) { closeMsg.resourceCloseFailed(t, resource); } }
Close a future resource, logging an error if an error occurs. Attempts to cancel the operation if it is still in progress.
Params:
  • futureResource – the resource to close
/** * Close a future resource, logging an error if an error occurs. Attempts to cancel the operation if it is * still in progress. * * @param futureResource the resource to close */
public static void safeClose(final IoFuture<? extends Closeable> futureResource) { if (futureResource != null) { futureResource.cancel().addNotifier(closingNotifier(), null); } } private static final IoFuture.Notifier<Object, Closeable> ATTACHMENT_CLOSING_NOTIFIER = new IoFuture.Notifier<Object, Closeable>() { public void notify(final IoFuture<?> future, final Closeable attachment) { IoUtils.safeClose(attachment); } }; private static final IoFuture.Notifier<Closeable, Void> CLOSING_NOTIFIER = new IoFuture.HandlingNotifier<Closeable, Void>() { public void handleDone(final Closeable result, final Void attachment) { IoUtils.safeClose(result); } };
Get a notifier that closes the attachment.
Returns:a notifier which will close its attachment
/** * Get a notifier that closes the attachment. * * @return a notifier which will close its attachment */
public static IoFuture.Notifier<Object, Closeable> attachmentClosingNotifier() { return ATTACHMENT_CLOSING_NOTIFIER; }
Get a notifier that closes the result.
Returns:a notifier which will close the result of the operation (if successful)
/** * Get a notifier that closes the result. * * @return a notifier which will close the result of the operation (if successful) */
public static IoFuture.Notifier<Closeable, Void> closingNotifier() { return CLOSING_NOTIFIER; }
Get a notifier that runs the supplied action.
Params:
  • runnable – the notifier type
Type parameters:
  • <T> – the future type (not used)
Returns:a notifier which will run the given command
/** * Get a notifier that runs the supplied action. * * @param runnable the notifier type * @param <T> the future type (not used) * @return a notifier which will run the given command */
public static <T> IoFuture.Notifier<T, Void> runnableNotifier(final Runnable runnable) { return new IoFuture.Notifier<T, Void>() { public void notify(final IoFuture<? extends T> future, final Void attachment) { runnable.run(); } }; }
Get the result notifier. This notifier will forward the result of the IoFuture to the attached Result.
Type parameters:
  • <T> – the result type
Returns:the notifier
/** * Get the result notifier. This notifier will forward the result of the {@code IoFuture} to the attached * {@code Result}. * * @param <T> the result type * @return the notifier */
@SuppressWarnings({ "unchecked" }) public static <T> IoFuture.Notifier<T, Result<T>> resultNotifier() { return RESULT_NOTIFIER; }
Get the notifier that invokes the channel listener given as an attachment.
Type parameters:
  • <T> – the channel type
Returns:the notifier
/** * Get the notifier that invokes the channel listener given as an attachment. * * @param <T> the channel type * @return the notifier */
@SuppressWarnings({ "unchecked" }) public static <T extends Channel> IoFuture.Notifier<T, ChannelListener<? super T>> channelListenerNotifier() { return CHANNEL_LISTENER_NOTIFIER; } @SuppressWarnings("rawtypes") private static final IoFuture.Notifier CHANNEL_LISTENER_NOTIFIER = new IoFuture.HandlingNotifier<Channel, ChannelListener<? super Channel>>() { @SuppressWarnings({ "unchecked" }) public void handleDone(final Channel channel, final ChannelListener channelListener) { channelListener.handleEvent(channel); } };
Get a java.util.concurrent-style Future instance wrapper for an IoFuture instance.
Params:
  • ioFuture – the IoFuture to wrap
Returns:a Future
/** * Get a {@code java.util.concurrent}-style {@code Future} instance wrapper for an {@code IoFuture} instance. * * @param ioFuture the {@code IoFuture} to wrap * @return a {@code Future} */
public static <T> Future<T> getFuture(final IoFuture<T> ioFuture) { return new Future<T>() { public boolean cancel(final boolean mayInterruptIfRunning) { ioFuture.cancel(); return ioFuture.await() == IoFuture.Status.CANCELLED; } public boolean isCancelled() { return ioFuture.getStatus() == IoFuture.Status.CANCELLED; } public boolean isDone() { return ioFuture.getStatus() == IoFuture.Status.DONE; } public T get() throws InterruptedException, ExecutionException { try { return ioFuture.getInterruptibly(); } catch (IOException e) { throw new ExecutionException(e); } } public T get(final long timeout, final TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { try { if (ioFuture.awaitInterruptibly(timeout, unit) == IoFuture.Status.WAITING) { throw msg.opTimedOut(); } return ioFuture.getInterruptibly(); } catch (IOException e) { throw new ExecutionException(e); } } public String toString() { return String.format("java.util.concurrent.Future wrapper <%s> for %s", Integer.toHexString(hashCode()), ioFuture); } }; } private static final IoFuture.Notifier<Object, CountDownLatch> COUNT_DOWN_NOTIFIER = new IoFuture.Notifier<Object, CountDownLatch>() { public void notify(final IoFuture<?> future, final CountDownLatch latch) { latch.countDown(); } };
Wait for all the futures to complete.
Params:
  • futures – the futures to wait for
/** * Wait for all the futures to complete. * * @param futures the futures to wait for */
public static void awaitAll(IoFuture<?>... futures) { final int len = futures.length; final CountDownLatch cdl = new CountDownLatch(len); for (IoFuture<?> future : futures) { future.addNotifier(COUNT_DOWN_NOTIFIER, cdl); } boolean intr = false; try { while (cdl.getCount() > 0L) { try { cdl.await(); } catch (InterruptedException e) { intr = true; } } } finally { if (intr) { Thread.currentThread().interrupt(); } } }
Wait for all the futures to complete.
Params:
  • futures – the futures to wait for
Throws:
/** * Wait for all the futures to complete. * * @param futures the futures to wait for * @throws InterruptedException if the current thread is interrupted while waiting */
public static void awaitAllInterruptibly(IoFuture<?>... futures) throws InterruptedException { final int len = futures.length; final CountDownLatch cdl = new CountDownLatch(len); for (IoFuture<?> future : futures) { future.addNotifier(COUNT_DOWN_NOTIFIER, cdl); } cdl.await(); }
Create an IoFuture which wraps another IoFuture, but returns a different type.
Params:
  • parent – the original IoFuture
  • type – the class of the new IoFuture
Type parameters:
  • <I> – the type of the original result
  • <O> – the type of the wrapped result
Returns:a wrapper IoFuture
/** * Create an {@code IoFuture} which wraps another {@code IoFuture}, but returns a different type. * * @param parent the original {@code IoFuture} * @param type the class of the new {@code IoFuture} * @param <I> the type of the original result * @param <O> the type of the wrapped result * @return a wrapper {@code IoFuture} */
public static <I, O> IoFuture<? extends O> cast(final IoFuture<I> parent, final Class<O> type) { return new CastingIoFuture<O, I>(parent, type); }
Safely shutdown reads on the given channel.
Params:
  • channel – the channel
/** * Safely shutdown reads on the given channel. * * @param channel the channel */
public static void safeShutdownReads(final SuspendableReadChannel channel) { if (channel != null) { try { channel.shutdownReads(); } catch (IOException e) { closeMsg.resourceReadShutdownFailed(null, null); } } }
Platform-independent channel-to-channel transfer method. Uses regular read and write operations to move bytes from the source channel to the sink channel. After this call, the throughBuffer should be checked for remaining bytes; if there are any, they should be written to the sink channel before proceeding. This method may be used with NIO channels, XNIO channels, or a combination of the two.

If either or both of the given channels are blocking channels, then this method may block.

Params:
  • source – the source channel to read bytes from
  • count – the number of bytes to transfer (must be >= 0L)
  • throughBuffer – the buffer to transfer through (must not be null)
  • sink – the sink channel to write bytes to
Throws:
  • IOException – if an I/O error occurs during the transfer of bytes
Returns:the number of bytes actually transferred (possibly 0)
/** * Platform-independent channel-to-channel transfer method. Uses regular {@code read} and {@code write} operations * to move bytes from the {@code source} channel to the {@code sink} channel. After this call, the {@code throughBuffer} * should be checked for remaining bytes; if there are any, they should be written to the {@code sink} channel before * proceeding. This method may be used with NIO channels, XNIO channels, or a combination of the two. * <p> * If either or both of the given channels are blocking channels, then this method may block. * * @param source the source channel to read bytes from * @param count the number of bytes to transfer (must be >= {@code 0L}) * @param throughBuffer the buffer to transfer through (must not be {@code null}) * @param sink the sink channel to write bytes to * @return the number of bytes actually transferred (possibly 0) * @throws IOException if an I/O error occurs during the transfer of bytes */
public static long transfer(final ReadableByteChannel source, final long count, final ByteBuffer throughBuffer, final WritableByteChannel sink) throws IOException { long res; long total = 0L; throughBuffer.limit(0); while (total < count) { throughBuffer.compact(); try { if (count - total < (long) throughBuffer.remaining()) { throughBuffer.limit((int) (count - total)); } res = source.read(throughBuffer); if (res <= 0) { return total == 0L ? res : total; } } finally { throughBuffer.flip(); } res = sink.write(throughBuffer); if (res == 0) { return total; } total += res; } return total; } // nested classes private static class CastingIoFuture<O, I> implements IoFuture<O> { private final IoFuture<I> parent; private final Class<O> type; private CastingIoFuture(final IoFuture<I> parent, final Class<O> type) { this.parent = parent; this.type = type; } public IoFuture<O> cancel() { parent.cancel(); return this; } public Status getStatus() { return parent.getStatus(); } public Status await() { return parent.await(); } public Status await(final long time, final TimeUnit timeUnit) { return parent.await(time, timeUnit); } public Status awaitInterruptibly() throws InterruptedException { return parent.awaitInterruptibly(); } public Status awaitInterruptibly(final long time, final TimeUnit timeUnit) throws InterruptedException { return parent.awaitInterruptibly(time, timeUnit); } public O get() throws IOException, CancellationException { return type.cast(parent.get()); } public O getInterruptibly() throws IOException, InterruptedException, CancellationException { return type.cast(parent.getInterruptibly()); } public IOException getException() throws IllegalStateException { return parent.getException(); } public <A> IoFuture<O> addNotifier(final Notifier<? super O, A> notifier, final A attachment) { parent.addNotifier(new Notifier<I, A>() { public void notify(final IoFuture<? extends I> future, final A attachment) { notifier.notify(CastingIoFuture.this, attachment); } }, attachment); return this; } }
Get a notifier which forwards the result to another IoFuture's manager.
Type parameters:
  • <T> – the channel type
Returns:the notifier
/** * Get a notifier which forwards the result to another {@code IoFuture}'s manager. * * @param <T> the channel type * @return the notifier */
@SuppressWarnings({ "unchecked" }) public static <T> IoFuture.Notifier<T, FutureResult<T>> getManagerNotifier() { return MANAGER_NOTIFIER; } @SuppressWarnings("rawtypes") private static final ManagerNotifier MANAGER_NOTIFIER = new ManagerNotifier(); private static class ManagerNotifier<T extends Channel> extends IoFuture.HandlingNotifier<T, FutureResult<T>> { public void handleCancelled(final FutureResult<T> manager) { manager.setCancelled(); } public void handleFailed(final IOException exception, final FutureResult<T> manager) { manager.setException(exception); } public void handleDone(final T result, final FutureResult<T> manager) { manager.setResult(result); } }
A channel source which tries to acquire a channel from a delegate channel source the given number of times before giving up.
Params:
  • delegate – the delegate channel source
  • maxTries – the number of times to retry
Type parameters:
  • <T> – the channel type
Returns:the retrying channel source
/** * A channel source which tries to acquire a channel from a delegate channel source the given number of times before * giving up. * * @param delegate the delegate channel source * @param maxTries the number of times to retry * @param <T> the channel type * @return the retrying channel source */
public static <T extends Channel> ChannelSource<T> getRetryingChannelSource(final ChannelSource<T> delegate, final int maxTries) throws IllegalArgumentException { if (maxTries < 1) { throw msg.minRange("maxTries", 1); } return new RetryingChannelSource<T>(maxTries, delegate); } private static class RetryingNotifier<T extends Channel> extends IoFuture.HandlingNotifier<T, Result<T>> { private volatile int remaining; private final int maxTries; private final Result<T> result; private final ChannelSource<T> delegate; private final ChannelListener<? super T> openListener; RetryingNotifier(final int maxTries, final Result<T> result, final ChannelSource<T> delegate, final ChannelListener<? super T> openListener) { this.maxTries = maxTries; this.result = result; this.delegate = delegate; this.openListener = openListener; remaining = maxTries; } public void handleFailed(final IOException exception, final Result<T> attachment) { if (remaining-- == 0) { result.setException(new IOException("Failed to create channel after " + maxTries + " tries", exception)); return; } tryOne(attachment); } public void handleCancelled(final Result<T> attachment) { result.setCancelled(); } public void handleDone(final T data, final Result<T> attachment) { result.setResult(data); } void tryOne(final Result<T> attachment) { final IoFuture<? extends T> ioFuture = delegate.open(openListener); ioFuture.addNotifier(this, attachment); } } private static class RetryingChannelSource<T extends Channel> implements ChannelSource<T> { private final int maxTries; private final ChannelSource<T> delegate; RetryingChannelSource(final int maxTries, final ChannelSource<T> delegate) { this.maxTries = maxTries; this.delegate = delegate; } public IoFuture<T> open(final ChannelListener<? super T> openListener) { final FutureResult<T> result = new FutureResult<T>(); final IoUtils.RetryingNotifier<T> notifier = new IoUtils.RetryingNotifier<T>(maxTries, result, delegate, openListener); notifier.tryOne(result); return result.getIoFuture(); } }
A cancellable which closes the given resource on cancel.
Params:
  • c – the resource
Returns:the cancellable
/** * A cancellable which closes the given resource on cancel. * * @param c the resource * @return the cancellable */
public static Cancellable closingCancellable(final Closeable c) { return new ClosingCancellable(c); } private static class ClosingCancellable implements Cancellable { private final Closeable c; ClosingCancellable(final Closeable c) { this.c = c; } public Cancellable cancel() { safeClose(c); return this; } }
Get the null cancellable.
Returns:the null cancellable
/** * Get the null cancellable. * * @return the null cancellable */
public static Cancellable nullCancellable() { return NULL_CANCELLABLE; } private static class ResultNotifier<T> extends IoFuture.HandlingNotifier<T, Result<T>> { public void handleCancelled(final Result<T> result) { result.setCancelled(); } public void handleFailed(final IOException exception, final Result<T> result) { result.setException(exception); } public void handleDone(final T value, final Result<T> result) { result.setResult(value); } }
Get a thread-local RNG. Do not share this instance with other threads.
Returns:the thread-local RNG
/** * Get a thread-local RNG. Do not share this instance with other threads. * * @return the thread-local RNG */
public static Random getThreadLocalRandom() { return ThreadLocalRandom.current(); } }