package org.eclipse.jetty.server;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import org.eclipse.jetty.http.HttpCompliance;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpGenerator;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpParser.RequestHandler;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class HttpConnection extends AbstractConnection implements Runnable, HttpTransport, WriteFlusher.Listener, Connection.UpgradeFrom, Connection.UpgradeTo
{
private static final Logger LOG = Log.getLogger(HttpConnection.class);
public static final HttpField CONNECTION_CLOSE = new PreEncodedHttpField(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString());
public static final String UPGRADE_CONNECTION_ATTRIBUTE = "org.eclipse.jetty.server.HttpConnection.UPGRADE";
private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<>();
private final HttpConfiguration _config;
private final Connector _connector;
private final ByteBufferPool _bufferPool;
private final HttpInput _input;
private final HttpGenerator _generator;
private final HttpChannelOverHttp _channel;
private final HttpParser _parser;
private final AtomicInteger _contentBufferReferences = new AtomicInteger();
private volatile ByteBuffer _requestBuffer = null;
private volatile ByteBuffer _chunk = null;
private final BlockingReadCallback _blockingReadCallback = new BlockingReadCallback();
private final AsyncReadCallback _asyncReadCallback = new AsyncReadCallback();
private final SendCallback _sendCallback = new SendCallback();
private final boolean _recordHttpComplianceViolations;
private final LongAdder bytesIn = new LongAdder();
private final LongAdder bytesOut = new LongAdder();
public static HttpConnection getCurrentConnection()
{
return __currentConnection.get();
}
protected static HttpConnection setCurrentConnection(HttpConnection connection)
{
HttpConnection last = __currentConnection.get();
__currentConnection.set(connection);
return last;
}
public HttpConnection(HttpConfiguration config, Connector connector, EndPoint endPoint, boolean recordComplianceViolations)
{
super(endPoint, connector.getExecutor());
_config = config;
_connector = connector;
_bufferPool = _connector.getByteBufferPool();
_generator = newHttpGenerator();
_channel = newHttpChannel();
_input = _channel.getRequest().getHttpInput();
_parser = newHttpParser(config.getHttpCompliance());
_recordHttpComplianceViolations = recordComplianceViolations;
if (LOG.isDebugEnabled())
LOG.debug("New HTTP Connection {}", this);
}
public HttpConfiguration getHttpConfiguration()
{
return _config;
}
public boolean isRecordHttpComplianceViolations()
{
return _recordHttpComplianceViolations;
}
protected HttpGenerator newHttpGenerator()
{
return new HttpGenerator(_config.getSendServerVersion(), _config.getSendXPoweredBy());
}
protected HttpChannelOverHttp newHttpChannel()
{
return new HttpChannelOverHttp(this, _connector, _config, getEndPoint(), this);
}
protected HttpParser newHttpParser(HttpCompliance compliance)
{
return new HttpParser(newRequestHandler(), getHttpConfiguration().getRequestHeaderSize(), compliance);
}
protected HttpParser.RequestHandler newRequestHandler()
{
return _channel;
}
public Server getServer()
{
return _connector.getServer();
}
public Connector getConnector()
{
return _connector;
}
public HttpChannel getHttpChannel()
{
return _channel;
}
public HttpParser getParser()
{
return _parser;
}
public HttpGenerator getGenerator()
{
return _generator;
}
@Override
public boolean isOptimizedForDirectBuffers()
{
return getEndPoint().isOptimizedForDirectBuffers();
}
@Override
public long getMessagesIn()
{
return getHttpChannel().getRequests();
}
@Override
public long getMessagesOut()
{
return getHttpChannel().getRequests();
}
@Override
public ByteBuffer onUpgradeFrom()
{
if (BufferUtil.hasContent(_requestBuffer))
{
ByteBuffer buffer = _requestBuffer;
_requestBuffer = null;
return buffer;
}
return null;
}
@Override
public void onUpgradeTo(ByteBuffer buffer)
{
if (BufferUtil.hasContent(buffer))
BufferUtil.append(getRequestBuffer(), buffer);
}
@Override
public void onFlushed(long bytes) throws IOException
{
_channel.getResponse().getHttpOutput().onFlushed(bytes);
}
void releaseRequestBuffer()
{
if (_requestBuffer != null && !_requestBuffer.hasRemaining())
{
if (LOG.isDebugEnabled())
LOG.debug("releaseRequestBuffer {}", this);
ByteBuffer buffer = _requestBuffer;
_requestBuffer = null;
_bufferPool.release(buffer);
}
}
public ByteBuffer getRequestBuffer()
{
if (_requestBuffer == null)
_requestBuffer = _bufferPool.acquire(getInputBufferSize(), _config.isUseDirectByteBuffers());
return _requestBuffer;
}
public boolean isRequestBufferEmpty()
{
return BufferUtil.isEmpty(_requestBuffer);
}
@Override
public void onFillable()
{
if (LOG.isDebugEnabled())
LOG.debug("{} onFillable enter {} {}", this, _channel.getState(), BufferUtil.toDetailString(_requestBuffer));
HttpConnection last = setCurrentConnection(this);
try
{
while (getEndPoint().isOpen())
{
int filled = fillRequestBuffer();
if (filled > 0)
bytesIn.add(filled);
else if (filled == -1 && getEndPoint().isOutputShutdown())
close();
boolean handle = parseRequestBuffer();
if (getEndPoint().getConnection() != this)
break;
if (handle)
{
boolean suspended = !_channel.handle();
if (suspended || getEndPoint().getConnection() != this)
break;
}
else if (filled == 0)
{
fillInterested();
break;
}
else if (filled < 0)
{
switch (_channel.getState().getState())
{
case COMPLETING:
case COMPLETED:
case IDLE:
case THROWN:
case ASYNC_ERROR:
getEndPoint().shutdownOutput();
break;
default:
break;
}
break;
}
}
}
finally
{
setCurrentConnection(last);
if (LOG.isDebugEnabled())
LOG.debug("{} onFillable exit {} {}", this, _channel.getState(), BufferUtil.toDetailString(_requestBuffer));
}
}
protected boolean fillAndParseForContent()
{
boolean handled = false;
while (_parser.inContentState())
{
int filled = fillRequestBuffer();
handled = parseRequestBuffer();
if (handled || filled <= 0 || _input.hasContent())
break;
}
return handled;
}
private int fillRequestBuffer()
{
if (_contentBufferReferences.get() > 0)
{
LOG.warn("{} fill with unconsumed content!", this);
return 0;
}
if (BufferUtil.isEmpty(_requestBuffer))
{
_requestBuffer = getRequestBuffer();
try
{
int filled = getEndPoint().fill(_requestBuffer);
if (filled == 0)
filled = getEndPoint().fill(_requestBuffer);
if (filled < 0)
_parser.atEOF();
if (LOG.isDebugEnabled())
LOG.debug("{} filled {} {}", this, filled, BufferUtil.toDetailString(_requestBuffer));
return filled;
}
catch (IOException e)
{
LOG.debug(e);
_parser.atEOF();
return -1;
}
}
return 0;
}
private boolean parseRequestBuffer()
{
if (LOG.isDebugEnabled())
LOG.debug("{} parse {} {}", this, BufferUtil.toDetailString(_requestBuffer));
boolean handle = _parser.parseNext(_requestBuffer == null ? BufferUtil.EMPTY_BUFFER : _requestBuffer);
if (LOG.isDebugEnabled())
LOG.debug("{} parsed {} {}", this, handle, _parser);
if (_contentBufferReferences.get() == 0)
releaseRequestBuffer();
return handle;
}
@Override
public void onCompleted()
{
if (_channel.getResponse().getStatus() == HttpStatus.SWITCHING_PROTOCOLS_101)
{
Connection connection = (Connection)_channel.getRequest().getAttribute(UPGRADE_CONNECTION_ATTRIBUTE);
if (connection != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Upgrade from {} to {}", this, connection);
_channel.getState().upgrade();
getEndPoint().upgrade(connection);
_channel.recycle();
_parser.reset();
_generator.reset();
if (_contentBufferReferences.get() == 0)
releaseRequestBuffer();
else
{
LOG.warn("{} lingering content references?!?!", this);
_requestBuffer = null;
_contentBufferReferences.set(0);
}
return;
}
}
if (_channel.isExpecting100Continue())
{
_parser.close();
}
else if (_parser.inContentState() && _generator.isPersistent())
{
if (_input.isAsync())
{
if (LOG.isDebugEnabled())
LOG.debug("{}unconsumed input {}", _parser.isChunking() ? "Possible " : "", this);
_channel.abort(new IOException("unconsumed input"));
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("{}unconsumed input {}", _parser.isChunking() ? "Possible " : "", this);
if (!_input.consumeAll())
_channel.abort(new IOException("unconsumed input"));
}
}
_channel.recycle();
if (!_parser.isClosed())
{
if (_generator.isPersistent())
_parser.reset();
else
_parser.close();
}
if (_chunk != null)
_bufferPool.release(_chunk);
_chunk = null;
_generator.reset();
if (getCurrentConnection() != this)
{
if (_parser.isStart())
{
if (BufferUtil.isEmpty(_requestBuffer))
{
fillInterested();
}
else if (getConnector().isRunning())
{
try
{
getExecutor().execute(this);
}
catch (RejectedExecutionException e)
{
if (getConnector().isRunning())
LOG.warn(e);
else
LOG.ignore(e);
getEndPoint().close();
}
}
else
{
getEndPoint().close();
}
}
else if (getEndPoint().isOpen())
fillInterested();
}
}
@Override
protected boolean onReadTimeout(Throwable timeout)
{
return _channel.onIdleTimeout(timeout);
}
@Override
protected void onFillInterestedFailed(Throwable cause)
{
_parser.close();
super.onFillInterestedFailed(cause);
}
@Override
public void onOpen()
{
super.onOpen();
if (isRequestBufferEmpty())
fillInterested();
else
getExecutor().execute(this);
}
@Override
public void onClose(Throwable cause)
{
if (cause == null)
_sendCallback.close();
else
_sendCallback.failed(cause);
super.onClose(cause);
}
@Override
public void run()
{
onFillable();
}
@Override
public void send(MetaData.Response info, boolean head, ByteBuffer content, boolean lastContent, Callback callback)
{
if (info == null)
{
if (!lastContent && BufferUtil.isEmpty(content))
{
callback.succeeded();
return;
}
}
else
{
if (_channel.isExpecting100Continue())
_generator.setPersistent(false);
}
if (_sendCallback.reset(info, head, content, lastContent, callback))
{
_sendCallback.iterate();
}
}
HttpInput.Content newContent(ByteBuffer c)
{
return new Content(c);
}
@Override
public void abort(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("abort {} {}", this, failure);
getEndPoint().close();
}
@Override
public boolean isPushSupported()
{
return false;
}
@Override
public void push(org.eclipse.jetty.http.MetaData.Request request)
{
LOG.debug("ignore push in {}", this);
}
public void asyncReadFillInterested()
{
getEndPoint().fillInterested(_asyncReadCallback);
}
public void blockingReadFillInterested()
{
getEndPoint().tryFillInterested(_blockingReadCallback);
}
public void blockingReadFailure(Throwable e)
{
_blockingReadCallback.failed(e);
}
@Override
public long getBytesIn()
{
return bytesIn.longValue();
}
@Override
public long getBytesOut()
{
return bytesOut.longValue();
}
@Override
public String toConnectionString()
{
return String.format("%s@%x[p=%s,g=%s]=>%s",
getClass().getSimpleName(),
hashCode(),
_parser,
_generator,
_channel);
}
private class Content extends HttpInput.Content
{
public Content(ByteBuffer content)
{
super(content);
_contentBufferReferences.incrementAndGet();
}
@Override
public void succeeded()
{
if (_contentBufferReferences.decrementAndGet() == 0)
releaseRequestBuffer();
}
@Override
public void failed(Throwable x)
{
succeeded();
}
}
private class BlockingReadCallback implements Callback
{
@Override
public void succeeded()
{
_input.unblock();
}
@Override
public void failed(Throwable x)
{
_input.failed(x);
}
@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}
}
private class AsyncReadCallback implements Callback
{
@Override
public void succeeded()
{
if (_channel.getState().onReadPossible())
_channel.handle();
}
@Override
public void failed(Throwable x)
{
if (_input.failed(x))
_channel.handle();
}
}
private class SendCallback extends IteratingCallback
{
private MetaData.Response _info;
private boolean _head;
private ByteBuffer _content;
private boolean _lastContent;
private Callback _callback;
private ByteBuffer ;
private boolean _shutdownOut;
private SendCallback()
{
super(true);
}
@Override
public InvocationType getInvocationType()
{
return _callback.getInvocationType();
}
private boolean reset(MetaData.Response info, boolean head, ByteBuffer content, boolean last, Callback callback)
{
if (reset())
{
_info = info;
_head = head;
_content = content;
_lastContent = last;
_callback = callback;
_header = null;
_shutdownOut = false;
if (getConnector().isShutdown())
_generator.setPersistent(false);
return true;
}
if (isClosed())
callback.failed(new EofException());
else
callback.failed(new WritePendingException());
return false;
}
@Override
public Action process() throws Exception
{
if (_callback == null)
throw new IllegalStateException();
ByteBuffer chunk = _chunk;
while (true)
{
HttpGenerator.Result result = _generator.generateResponse(_info, _head, _header, chunk, _content, _lastContent);
if (LOG.isDebugEnabled())
LOG.debug("{} generate: {} ({},{},{})@{}",
this,
result,
BufferUtil.toSummaryString(_header),
BufferUtil.toSummaryString(_content),
_lastContent,
_generator.getState());
switch (result)
{
case NEED_INFO:
throw new EofException("request lifecycle violation");
case NEED_HEADER:
{
_header = _bufferPool.acquire(_config.getResponseHeaderSize(), _config.isUseDirectByteBuffers());
continue;
}
case NEED_CHUNK:
{
chunk = _chunk = _bufferPool.acquire(HttpGenerator.CHUNK_SIZE, _config.isUseDirectByteBuffers());
continue;
}
case NEED_CHUNK_TRAILER:
{
if (_chunk != null)
_bufferPool.release(_chunk);
chunk = _chunk = _bufferPool.acquire(_config.getResponseHeaderSize(), _config.isUseDirectByteBuffers());
continue;
}
case FLUSH:
{
if (_head || _generator.isNoContent())
{
BufferUtil.clear(chunk);
BufferUtil.clear(_content);
}
byte gatherWrite = 0;
long bytes = 0;
if (BufferUtil.hasContent(_header))
{
gatherWrite += 4;
bytes += _header.remaining();
}
if (BufferUtil.hasContent(chunk))
{
gatherWrite += 2;
bytes += chunk.remaining();
}
if (BufferUtil.hasContent(_content))
{
gatherWrite += 1;
bytes += _content.remaining();
}
HttpConnection.this.bytesOut.add(bytes);
switch (gatherWrite)
{
case 7:
getEndPoint().write(this, _header, chunk, _content);
break;
case 6:
getEndPoint().write(this, _header, chunk);
break;
case 5:
getEndPoint().write(this, _header, _content);
break;
case 4:
getEndPoint().write(this, _header);
break;
case 3:
getEndPoint().write(this, chunk, _content);
break;
case 2:
getEndPoint().write(this, chunk);
break;
case 1:
getEndPoint().write(this, _content);
break;
default:
succeeded();
}
return Action.SCHEDULED;
}
case SHUTDOWN_OUT:
{
_shutdownOut = true;
continue;
}
case DONE:
{
if (getConnector().isShutdown())
_shutdownOut = true;
return Action.SUCCEEDED;
}
case CONTINUE:
{
break;
}
default:
{
throw new IllegalStateException("generateResponse=" + result);
}
}
}
}
private void ()
{
ByteBuffer h = _header;
_header = null;
if (h != null)
_bufferPool.release(h);
}
@Override
protected void onCompleteSuccess()
{
releaseHeader();
_callback.succeeded();
if (_shutdownOut)
getEndPoint().shutdownOutput();
}
@Override
public void onCompleteFailure(final Throwable x)
{
releaseHeader();
failedCallback(_callback, x);
if (_shutdownOut)
getEndPoint().shutdownOutput();
}
@Override
public String toString()
{
return String.format("%s[i=%s,cb=%s]", super.toString(), _info, _callback);
}
}
}