package org.glassfish.grizzly.portunif;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.glassfish.grizzly.CompletionHandler;
import org.glassfish.grizzly.Connection;
import org.glassfish.grizzly.Context;
import org.glassfish.grizzly.Grizzly;
import org.glassfish.grizzly.IOEvent;
import org.glassfish.grizzly.IOEventLifeCycleListener;
import org.glassfish.grizzly.ProcessorExecutor;
import org.glassfish.grizzly.attributes.Attribute;
import org.glassfish.grizzly.filterchain.BaseFilter;
import org.glassfish.grizzly.filterchain.Filter;
import org.glassfish.grizzly.filterchain.FilterChain;
import org.glassfish.grizzly.filterchain.FilterChainBuilder;
import org.glassfish.grizzly.filterchain.FilterChainContext;
import org.glassfish.grizzly.filterchain.FilterChainContext.CopyListener;
import org.glassfish.grizzly.filterchain.FilterChainEvent;
import org.glassfish.grizzly.filterchain.NextAction;
import org.glassfish.grizzly.utils.ArraySet;
public class PUFilter extends BaseFilter {
private static final Logger LOGGER = Grizzly.logger(PUFilter.class);
private final SuspendedContextCopyListener suspendedContextCopyListener =
new SuspendedContextCopyListener();
private final BackChannelFilter backChannelFilter =
new BackChannelFilter(this);
private final ArraySet<PUProtocol> protocols =
new ArraySet<PUProtocol>(PUProtocol.class);
final Attribute<PUContext> puContextAttribute;
final Attribute<FilterChainContext> suspendedContextAttribute;
private final boolean isCloseUnrecognizedConnection;
public PUFilter() {
this(true);
}
public PUFilter(final boolean isCloseUnrecognizedConnection) {
this.isCloseUnrecognizedConnection = isCloseUnrecognizedConnection;
puContextAttribute =
Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute(
PUFilter.class.getName() + '-' + hashCode() + ".puContext");
suspendedContextAttribute =
Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute(
PUFilter.class.getName() + '-' + hashCode() + ".suspendedContext");
}
public PUProtocol register(final ProtocolFinder protocolFinder,
final FilterChain filterChain) {
final PUProtocol puProtocol = new PUProtocol(protocolFinder, filterChain);
register(puProtocol);
return puProtocol;
}
public void register(final PUProtocol puProtocol) {
final Filter filter = puProtocol.getFilterChain().get(0);
if (filter != backChannelFilter) {
throw new IllegalStateException("The first Filter in the protocol should be the BackChannelFilter");
}
protocols.add(puProtocol);
}
public void deregister(final PUProtocol puProtocol) {
protocols.remove(puProtocol);
}
public Set<PUProtocol> getProtocols() {
return protocols;
}
public Filter getBackChannelFilter() {
return backChannelFilter;
}
public FilterChainBuilder getPUFilterChainBuilder() {
final FilterChainBuilder builder = FilterChainBuilder.stateless();
builder.add(backChannelFilter);
return builder;
}
public boolean isCloseUnrecognizedConnection() {
return isCloseUnrecognizedConnection;
}
@Override
public NextAction handleRead(final FilterChainContext ctx) throws IOException {
final Connection connection = ctx.getConnection();
PUContext puContext = puContextAttribute.get(connection);
if (puContext == null) {
puContext = new PUContext(this);
puContextAttribute.set(connection, puContext);
} else if (puContext.noProtocolsFound()) {
return ctx.getInvokeAction();
}
PUProtocol protocol = puContext.protocol;
if (protocol == null) {
findProtocol(puContext, ctx);
protocol = puContext.protocol;
}
if (protocol != null) {
if (!puContext.isSticky) {
puContext.reset();
}
final FilterChainContext filterChainContext =
obtainChildFilterChainContext(protocol, connection, ctx);
filterChainContext.addCopyListener(suspendedContextCopyListener);
suspendedContextAttribute.set(filterChainContext, ctx);
final NextAction suspendAction = ctx.getSuspendAction();
ctx.suspend();
ProcessorExecutor.execute(filterChainContext.getInternalContext());
return suspendAction;
}
if (puContext.noProtocolsFound()) {
if (isCloseUnrecognizedConnection) {
connection.closeSilently();
return ctx.getStopAction();
}
return ctx.getInvokeAction();
}
return ctx.getStopAction(ctx.getMessage());
}
private FilterChainContext obtainChildFilterChainContext(
final PUProtocol protocol,
final Connection connection,
final FilterChainContext ctx) {
final FilterChain filterChain = protocol.getFilterChain();
final FilterChainContext filterChainContext =
filterChain.obtainFilterChainContext(connection);
final Context context = filterChainContext.getInternalContext();
context.setIoEvent(IOEvent.READ);
context.addLifeCycleListener(new InternalProcessingHandler(ctx));
filterChainContext.setAddressHolder(ctx.getAddressHolder());
filterChainContext.setMessage(ctx.getMessage());
return filterChainContext;
}
@Override
public NextAction handleEvent(final FilterChainContext ctx,
final FilterChainEvent event) throws IOException {
if (isUpstream(ctx)) {
final Connection connection = ctx.getConnection();
final PUContext puContext = puContextAttribute.get(connection);
final PUProtocol protocol;
if (puContext != null && (protocol = puContext.protocol) != null) {
final FilterChain filterChain = protocol.getFilterChain();
final FilterChainContext context = filterChain.obtainFilterChainContext(connection);
context.setStartIdx(-1);
context.setFilterIdx(-1);
context.setEndIdx(filterChain.size());
suspendedContextAttribute.set(context, ctx);
ctx.suspend();
final NextAction suspendAction = ctx.getSuspendAction();
context.notifyUpstream(event, new InternalCompletionHandler(ctx));
return suspendAction;
}
}
return ctx.getInvokeAction();
}
@Override
public NextAction handleClose(FilterChainContext ctx) throws IOException {
return super.handleClose(ctx);
}
protected void findProtocol(final PUContext puContext,
final FilterChainContext ctx) {
final PUProtocol[] protocolArray = protocols.getArray();
for (int i = 0; i < protocolArray.length; i++) {
final PUProtocol protocol = protocolArray[i];
if ((puContext.skippedProtocolFinders & 1 << i) != 0) {
continue;
}
try {
final ProtocolFinder.Result result =
protocol.getProtocolFinder().find(puContext, ctx);
switch (result) {
case FOUND:
puContext.protocol = protocol;
return;
case NOT_FOUND:
puContext.skippedProtocolFinders ^= 1 << i;
}
} catch (Exception e) {
LOGGER.log(Level.WARNING,
"ProtocolFinder " + protocol.getProtocolFinder() +
" reported error", e);
}
}
}
private static boolean isUpstream(final FilterChainContext context) {
return context.getStartIdx() < context.getEndIdx();
}
private class InternalProcessingHandler extends IOEventLifeCycleListener.Adapter {
private final FilterChainContext parentContext;
private InternalProcessingHandler(final FilterChainContext parentContext) {
this.parentContext = parentContext;
}
@Override
public void onReregister(final Context context) throws IOException {
final FilterChainContext suspendedContext =
suspendedContextAttribute.get(context);
assert suspendedContext != null;
suspendedContext.resume(suspendedContext.getForkAction());
}
@Override
public void onComplete(final Context context, final Object data)
throws IOException {
final FilterChainContext suspendedContext =
suspendedContextAttribute.remove(context);
assert suspendedContext != null;
suspendedContext.resume(suspendedContext.getStopAction());
}
}
private static class InternalCompletionHandler implements
CompletionHandler<FilterChainContext> {
private final FilterChainContext suspendedContext;
public InternalCompletionHandler(FilterChainContext suspendedContext) {
this.suspendedContext = suspendedContext;
}
@Override
public void cancelled() {
failed(new CancellationException());
}
@Override
public void failed(final Throwable throwable) {
suspendedContext.fail(throwable);
}
@Override
public void completed(final FilterChainContext context) {
suspendedContext.resume(suspendedContext.getStopAction());
}
@Override
public void updated(FilterChainContext result) {
}
}
private class SuspendedContextCopyListener implements CopyListener {
@Override
public void onCopy(final FilterChainContext srcContext,
final FilterChainContext copiedContext) {
final FilterChainContext suspendedContextCopy =
suspendedContextAttribute.get(srcContext).copy();
suspendedContextAttribute.set(copiedContext, suspendedContextCopy);
copiedContext.addCopyListener(this);
}
}
}