package org.glassfish.grizzly.utils;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import org.glassfish.grizzly.Connection;
import org.glassfish.grizzly.Grizzly;
import org.glassfish.grizzly.attributes.Attribute;
import org.glassfish.grizzly.filterchain.BaseFilter;
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.filterchain.NextAction;
public class ActivityCheckFilter extends BaseFilter {
private static final Logger LOGGER = Grizzly.logger(ActivityCheckFilter.class);
public static final String ACTIVE_ATTRIBUTE_NAME = "connection-active-attribute";
private static final Attribute<ActiveRecord> IDLE_ATTR =
Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute(
ACTIVE_ATTRIBUTE_NAME, new NullaryFunction<ActiveRecord>() {
@Override
public ActiveRecord evaluate() {
return new ActiveRecord();
}
});
private final long timeoutMillis;
private final DelayedExecutor.DelayQueue<Connection> queue;
public ActivityCheckFilter(final DelayedExecutor executor,
final long timeout,
final TimeUnit timeoutUnit) {
this(executor, timeout, timeoutUnit, null);
}
public ActivityCheckFilter(final DelayedExecutor executor,
final long timeout,
final TimeUnit timeoutUnit,
final TimeoutHandler handler) {
this(executor, new DefaultWorker(handler), timeout, timeoutUnit);
}
protected ActivityCheckFilter(final DelayedExecutor executor,
final DelayedExecutor.Worker<Connection> worker,
final long timeout,
final TimeUnit timeoutUnit) {
if (executor == null) {
throw new IllegalArgumentException("executor cannot be null");
}
this.timeoutMillis = TimeUnit.MILLISECONDS.convert(timeout, timeoutUnit);
queue = executor.createDelayQueue(worker, new Resolver());
}
@Override
public NextAction handleAccept(final FilterChainContext ctx) throws IOException {
queue.add(ctx.getConnection(), timeoutMillis, TimeUnit.MILLISECONDS);
return ctx.getInvokeAction();
}
@Override
public NextAction handleConnect(final FilterChainContext ctx) throws IOException {
queue.add(ctx.getConnection(), timeoutMillis, TimeUnit.MILLISECONDS);
return ctx.getInvokeAction();
}
@Override
public NextAction handleRead(final FilterChainContext ctx) throws IOException {
IDLE_ATTR.get(ctx.getConnection()).timeoutMillis = System.currentTimeMillis() + timeoutMillis;
return ctx.getInvokeAction();
}
@Override
public NextAction handleWrite(final FilterChainContext ctx) throws IOException {
IDLE_ATTR.get(ctx.getConnection()).timeoutMillis = System.currentTimeMillis() + timeoutMillis;
return ctx.getInvokeAction();
}
@Override
public NextAction handleClose(final FilterChainContext ctx) throws IOException {
queue.remove(ctx.getConnection());
return ctx.getInvokeAction();
}
@SuppressWarnings({"UnusedDeclaration"})
public static DelayedExecutor createDefaultIdleDelayedExecutor() {
return createDefaultIdleDelayedExecutor(1000, TimeUnit.MILLISECONDS);
}
@SuppressWarnings({"UnusedDeclaration"})
public static DelayedExecutor createDefaultIdleDelayedExecutor(final long checkInterval,
final TimeUnit checkIntervalUnit) {
final ExecutorService executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
final Thread newThread = new Thread(r);
newThread.setName("Grizzly-ActiveTimeoutFilter-IdleCheck");
newThread.setDaemon(true);
return newThread;
}
});
return new DelayedExecutor(executor,
((checkInterval > 0)
? checkInterval
: 1000L),
((checkIntervalUnit != null)
? checkIntervalUnit
: TimeUnit.MILLISECONDS));
}
@SuppressWarnings({"UnusedDeclaration"})
public long getTimeout(TimeUnit timeunit) {
return timeunit.convert(timeoutMillis, TimeUnit.MILLISECONDS);
}
public interface TimeoutHandler {
void onTimeout(final Connection c);
}
private static final class Resolver implements DelayedExecutor.Resolver<Connection> {
@Override
public boolean removeTimeout(final Connection connection) {
IDLE_ATTR.get(connection).timeoutMillis = 0;
return true;
}
@Override
public long getTimeoutMillis(final Connection connection) {
return IDLE_ATTR.get(connection).timeoutMillis;
}
@Override
public void setTimeoutMillis(final Connection connection,
final long timeoutMillis) {
IDLE_ATTR.get(connection).timeoutMillis = timeoutMillis;
}
}
private static final class ActiveRecord {
private volatile long timeoutMillis;
}
private static final class DefaultWorker implements DelayedExecutor.Worker<Connection> {
private final TimeoutHandler handler;
DefaultWorker(final TimeoutHandler handler) {
this.handler = handler;
}
@Override
public boolean doWork(final Connection connection) {
if (handler != null) {
handler.onTimeout(connection);
}
connection.closeSilently();
return true;
}
}
}