/*
 *  Licensed to the Apache Software Foundation (ASF) under one or more
 *  contributor license agreements.  See the NOTICE file distributed with
 *  this work for additional information regarding copyright ownership.
 *  The ASF licenses this file to You under the Apache License, Version 2.0
 *  (the "License"); you may not use this file except in compliance with
 *  the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */
package org.apache.coyote;

import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import javax.management.InstanceNotFoundException;
import javax.management.MBeanRegistration;
import javax.management.MBeanRegistrationException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.servlet.http.HttpUpgradeHandler;
import javax.servlet.http.WebConnection;

import org.apache.coyote.http11.upgrade.InternalHttpUpgradeHandler;
import org.apache.juli.logging.Log;
import org.apache.tomcat.InstanceManager;
import org.apache.tomcat.util.ExceptionUtils;
import org.apache.tomcat.util.collections.SynchronizedStack;
import org.apache.tomcat.util.modeler.Registry;
import org.apache.tomcat.util.net.AbstractEndpoint;
import org.apache.tomcat.util.net.AbstractEndpoint.Handler;
import org.apache.tomcat.util.net.SocketEvent;
import org.apache.tomcat.util.net.SocketWrapperBase;
import org.apache.tomcat.util.res.StringManager;

public abstract class AbstractProtocol<S> implements ProtocolHandler,
        MBeanRegistration {

    
The string manager for this package.
/** * The string manager for this package. */
private static final StringManager sm = StringManager.getManager(AbstractProtocol.class);
Counter used to generate unique JMX names for connectors using automatic port binding.
/** * Counter used to generate unique JMX names for connectors using automatic * port binding. */
private static final AtomicInteger nameCounter = new AtomicInteger(0);
Name of MBean for the Global Request Processor.
/** * Name of MBean for the Global Request Processor. */
protected ObjectName rgOname = null;
Unique ID for this connector. Only used if the connector is configured to use a random port as the port will change if stop(), start() is called.
/** * Unique ID for this connector. Only used if the connector is configured * to use a random port as the port will change if stop(), start() is * called. */
private int nameIndex = 0;
Endpoint that provides low-level network I/O - must be matched to the ProtocolHandler implementation (ProtocolHandler using NIO, requires NIO Endpoint etc.).
/** * Endpoint that provides low-level network I/O - must be matched to the * ProtocolHandler implementation (ProtocolHandler using NIO, requires NIO * Endpoint etc.). */
private final AbstractEndpoint<S,?> endpoint; private Handler<S> handler; private final Set<Processor> waitingProcessors = Collections.newSetFromMap(new ConcurrentHashMap<Processor, Boolean>());
Controller for the timeout scheduling.
/** * Controller for the timeout scheduling. */
private ScheduledFuture<?> timeoutFuture = null; private ScheduledFuture<?> monitorFuture; public AbstractProtocol(AbstractEndpoint<S,?> endpoint) { this.endpoint = endpoint; setConnectionLinger(Constants.DEFAULT_CONNECTION_LINGER); setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY); } // ----------------------------------------------- Generic property handling
Generic property setter used by the digester. Other code should not need to use this. The digester will only use this method if it can't find a more specific setter. That means the property belongs to the Endpoint, the ServerSocketFactory or some other lower level component. This method ensures that it is visible to both.
Params:
  • name – The name of the property to set
  • value – The value, in string form, to set for the property
Returns:true if the property was set successfully, otherwise false
/** * Generic property setter used by the digester. Other code should not need * to use this. The digester will only use this method if it can't find a * more specific setter. That means the property belongs to the Endpoint, * the ServerSocketFactory or some other lower level component. This method * ensures that it is visible to both. * * @param name The name of the property to set * @param value The value, in string form, to set for the property * * @return <code>true</code> if the property was set successfully, otherwise * <code>false</code> */
public boolean setProperty(String name, String value) { return endpoint.setProperty(name, value); }
Generic property getter used by the digester. Other code should not need to use this.
Params:
  • name – The name of the property to get
Returns:The value of the property converted to a string
/** * Generic property getter used by the digester. Other code should not need * to use this. * * @param name The name of the property to get * * @return The value of the property converted to a string */
public String getProperty(String name) { return endpoint.getProperty(name); } // ------------------------------- Properties managed by the ProtocolHandler
The adapter provides the link between the ProtocolHandler and the connector.
/** * The adapter provides the link between the ProtocolHandler and the * connector. */
protected Adapter adapter; @Override public void setAdapter(Adapter adapter) { this.adapter = adapter; } @Override public Adapter getAdapter() { return adapter; }
The maximum number of idle processors that will be retained in the cache and re-used with a subsequent request. The default is 200. A value of -1 means unlimited. In the unlimited case, the theoretical maximum number of cached Processor objects is getMaxConnections() although it will usually be closer to getMaxThreads().
/** * The maximum number of idle processors that will be retained in the cache * and re-used with a subsequent request. The default is 200. A value of -1 * means unlimited. In the unlimited case, the theoretical maximum number of * cached Processor objects is {@link #getMaxConnections()} although it will * usually be closer to {@link #getMaxThreads()}. */
protected int processorCache = 200; public int getProcessorCache() { return this.processorCache; } public void setProcessorCache(int processorCache) { this.processorCache = processorCache; } private String clientCertProvider = null;
When client certificate information is presented in a form other than instances of X509Certificate it needs to be converted before it can be used and this property controls which JSSE provider is used to perform the conversion. For example it is used with the AJP connectors, the HTTP APR connector and with the SSLValve. If not specified, the default provider will be used.
Returns:The name of the JSSE provider to use
/** * When client certificate information is presented in a form other than * instances of {@link java.security.cert.X509Certificate} it needs to be * converted before it can be used and this property controls which JSSE * provider is used to perform the conversion. For example it is used with * the AJP connectors, the HTTP APR connector and with the * {@link org.apache.catalina.valves.SSLValve}. If not specified, the * default provider will be used. * * @return The name of the JSSE provider to use */
public String getClientCertProvider() { return clientCertProvider; } public void setClientCertProvider(String s) { this.clientCertProvider = s; } private int maxHeaderCount = 100; public int getMaxHeaderCount() { return maxHeaderCount; } public void setMaxHeaderCount(int maxHeaderCount) { this.maxHeaderCount = maxHeaderCount; } @Override public boolean isAprRequired() { return false; } @Override public boolean isSendfileSupported() { return endpoint.getUseSendfile(); } // ---------------------- Properties that are passed through to the EndPoint @Override public Executor getExecutor() { return endpoint.getExecutor(); } @Override public void setExecutor(Executor executor) { endpoint.setExecutor(executor); } @Override public ScheduledExecutorService getUtilityExecutor() { return endpoint.getUtilityExecutor(); } @Override public void setUtilityExecutor(ScheduledExecutorService utilityExecutor) { endpoint.setUtilityExecutor(utilityExecutor); } public int getMaxThreads() { return endpoint.getMaxThreads(); } public void setMaxThreads(int maxThreads) { endpoint.setMaxThreads(maxThreads); } public int getMaxConnections() { return endpoint.getMaxConnections(); } public void setMaxConnections(int maxConnections) { endpoint.setMaxConnections(maxConnections); } public int getMinSpareThreads() { return endpoint.getMinSpareThreads(); } public void setMinSpareThreads(int minSpareThreads) { endpoint.setMinSpareThreads(minSpareThreads); } public int getThreadPriority() { return endpoint.getThreadPriority(); } public void setThreadPriority(int threadPriority) { endpoint.setThreadPriority(threadPriority); } public int getAcceptCount() { return endpoint.getAcceptCount(); } public void setAcceptCount(int acceptCount) { endpoint.setAcceptCount(acceptCount); } public boolean getTcpNoDelay() { return endpoint.getTcpNoDelay(); } public void setTcpNoDelay(boolean tcpNoDelay) { endpoint.setTcpNoDelay(tcpNoDelay); } public int getConnectionLinger() { return endpoint.getConnectionLinger(); } public void setConnectionLinger(int connectionLinger) { endpoint.setConnectionLinger(connectionLinger); }
The time Tomcat will wait for a subsequent request before closing the connection. The default is getConnectionTimeout().
Returns:The timeout in milliseconds
/** * The time Tomcat will wait for a subsequent request before closing the * connection. The default is {@link #getConnectionTimeout()}. * * @return The timeout in milliseconds */
public int getKeepAliveTimeout() { return endpoint.getKeepAliveTimeout(); } public void setKeepAliveTimeout(int keepAliveTimeout) { endpoint.setKeepAliveTimeout(keepAliveTimeout); } public InetAddress getAddress() { return endpoint.getAddress(); } public void setAddress(InetAddress ia) { endpoint.setAddress(ia); } public int getPort() { return endpoint.getPort(); } public void setPort(int port) { endpoint.setPort(port); } public int getPortOffset() { return endpoint.getPortOffset(); } public void setPortOffset(int portOffset) { endpoint.setPortOffset(portOffset); } public int getPortWithOffset() { return endpoint.getPortWithOffset(); } public int getLocalPort() { return endpoint.getLocalPort(); } /* * When Tomcat expects data from the client, this is the time Tomcat will * wait for that data to arrive before closing the connection. */ public int getConnectionTimeout() { return endpoint.getConnectionTimeout(); } public void setConnectionTimeout(int timeout) { endpoint.setConnectionTimeout(timeout); } public long getConnectionCount() { return endpoint.getConnectionCount(); }
NO-OP.
Params:
  • threadCount – Unused
Deprecated:Will be removed in Tomcat 10.
/** * NO-OP. * * @param threadCount Unused * * @deprecated Will be removed in Tomcat 10. */
@Deprecated public void setAcceptorThreadCount(int threadCount) { }
Always returns 1.
Returns:Always 1.
Deprecated:Will be removed in Tomcat 10.
/** * Always returns 1. * * @return Always 1. * * @deprecated Will be removed in Tomcat 10. */
@Deprecated public int getAcceptorThreadCount() { return 1; } public void setAcceptorThreadPriority(int threadPriority) { endpoint.setAcceptorThreadPriority(threadPriority); } public int getAcceptorThreadPriority() { return endpoint.getAcceptorThreadPriority(); } // ---------------------------------------------------------- Public methods public synchronized int getNameIndex() { if (nameIndex == 0) { nameIndex = nameCounter.incrementAndGet(); } return nameIndex; }
The name will be prefix-address-port if address is non-null and prefix-port if the address is null.
Returns:A name for this protocol instance that is appropriately quoted for use in an ObjectName.
/** * The name will be prefix-address-port if address is non-null and * prefix-port if the address is null. * * @return A name for this protocol instance that is appropriately quoted * for use in an ObjectName. */
public String getName() { return ObjectName.quote(getNameInternal()); } private String getNameInternal() { StringBuilder name = new StringBuilder(getNamePrefix()); name.append('-'); if (getAddress() != null) { name.append(getAddress().getHostAddress()); name.append('-'); } int port = getPortWithOffset(); if (port == 0) { // Auto binding is in use. Check if port is known name.append("auto-"); name.append(getNameIndex()); port = getLocalPort(); if (port != -1) { name.append('-'); name.append(port); } } else { name.append(port); } return name.toString(); } public void addWaitingProcessor(Processor processor) { waitingProcessors.add(processor); } public void removeWaitingProcessor(Processor processor) { waitingProcessors.remove(processor); } // ----------------------------------------------- Accessors for sub-classes protected AbstractEndpoint<S,?> getEndpoint() { return endpoint; } protected Handler<S> getHandler() { return handler; } protected void setHandler(Handler<S> handler) { this.handler = handler; } // -------------------------------------------------------- Abstract methods
Concrete implementations need to provide access to their logger to be used by the abstract classes.
Returns:the logger
/** * Concrete implementations need to provide access to their logger to be * used by the abstract classes. * @return the logger */
protected abstract Log getLog();
Obtain the prefix to be used when construction a name for this protocol handler. The name will be prefix-address-port.
Returns:the prefix
/** * Obtain the prefix to be used when construction a name for this protocol * handler. The name will be prefix-address-port. * @return the prefix */
protected abstract String getNamePrefix();
Obtain the name of the protocol, (Http, Ajp, etc.). Used with JMX.
Returns:the protocol name
/** * Obtain the name of the protocol, (Http, Ajp, etc.). Used with JMX. * @return the protocol name */
protected abstract String getProtocolName();
Find a suitable handler for the protocol negotiated at the network layer.
Params:
  • name – The name of the requested negotiated protocol.
Returns:The instance where UpgradeProtocol.getAlpnName() matches the requested protocol
/** * Find a suitable handler for the protocol negotiated * at the network layer. * @param name The name of the requested negotiated protocol. * @return The instance where {@link UpgradeProtocol#getAlpnName()} matches * the requested protocol */
protected abstract UpgradeProtocol getNegotiatedProtocol(String name);
Find a suitable handler for the protocol upgraded name specified. This is used for direct connection protocol selection.
Params:
  • name – The name of the requested negotiated protocol.
Returns:The instance where UpgradeProtocol.getAlpnName() matches the requested protocol
/** * Find a suitable handler for the protocol upgraded name specified. This * is used for direct connection protocol selection. * @param name The name of the requested negotiated protocol. * @return The instance where {@link UpgradeProtocol#getAlpnName()} matches * the requested protocol */
protected abstract UpgradeProtocol getUpgradeProtocol(String name);
Create and configure a new Processor instance for the current protocol implementation.
Returns:A fully configured Processor instance that is ready to use
/** * Create and configure a new Processor instance for the current protocol * implementation. * * @return A fully configured Processor instance that is ready to use */
protected abstract Processor createProcessor(); protected abstract Processor createUpgradeProcessor( SocketWrapperBase<?> socket, UpgradeToken upgradeToken); // ----------------------------------------------------- JMX related methods protected String domain; protected ObjectName oname; protected MBeanServer mserver; public ObjectName getObjectName() { return oname; } public String getDomain() { return domain; } @Override public ObjectName preRegister(MBeanServer server, ObjectName name) throws Exception { oname = name; mserver = server; domain = name.getDomain(); return name; } @Override public void postRegister(Boolean registrationDone) { // NOOP } @Override public void preDeregister() throws Exception { // NOOP } @Override public void postDeregister() { // NOOP } private ObjectName createObjectName() throws MalformedObjectNameException { // Use the same domain as the connector domain = getAdapter().getDomain(); if (domain == null) { return null; } StringBuilder name = new StringBuilder(getDomain()); name.append(":type=ProtocolHandler,port="); int port = getPortWithOffset(); if (port > 0) { name.append(port); } else { name.append("auto-"); name.append(getNameIndex()); } InetAddress address = getAddress(); if (address != null) { name.append(",address="); name.append(ObjectName.quote(address.getHostAddress())); } return new ObjectName(name.toString()); } // ------------------------------------------------------- Lifecycle methods /* * NOTE: There is no maintenance of state or checking for valid transitions * within this class. It is expected that the connector will maintain state * and prevent invalid state transitions. */ @Override public void init() throws Exception { if (getLog().isInfoEnabled()) { getLog().info(sm.getString("abstractProtocolHandler.init", getName())); logPortOffset(); } if (oname == null) { // Component not pre-registered so register it oname = createObjectName(); if (oname != null) { Registry.getRegistry(null, null).registerComponent(this, oname, null); } } if (this.domain != null) { rgOname = new ObjectName(domain + ":type=GlobalRequestProcessor,name=" + getName()); Registry.getRegistry(null, null).registerComponent( getHandler().getGlobal(), rgOname, null); } String endpointName = getName(); endpoint.setName(endpointName.substring(1, endpointName.length()-1)); endpoint.setDomain(domain); endpoint.init(); } @Override public void start() throws Exception { if (getLog().isInfoEnabled()) { getLog().info(sm.getString("abstractProtocolHandler.start", getName())); logPortOffset(); } endpoint.start(); monitorFuture = getUtilityExecutor().scheduleWithFixedDelay( new Runnable() { @Override public void run() { if (!isPaused()) { startAsyncTimeout(); } } }, 0, 60, TimeUnit.SECONDS); }
Note: The name of this method originated with the Servlet 3.0 asynchronous processing but evolved over time to represent a timeout that is triggered independently of the socket read/write timeouts.
/** * Note: The name of this method originated with the Servlet 3.0 * asynchronous processing but evolved over time to represent a timeout that * is triggered independently of the socket read/write timeouts. */
protected void startAsyncTimeout() { if (timeoutFuture == null || (timeoutFuture != null && timeoutFuture.isDone())) { if (timeoutFuture != null && timeoutFuture.isDone()) { // There was an error executing the scheduled task, get it and log it try { timeoutFuture.get(); } catch (InterruptedException | ExecutionException e) { getLog().error(sm.getString("abstractProtocolHandler.asyncTimeoutError"), e); } } timeoutFuture = getUtilityExecutor().scheduleAtFixedRate( new Runnable() { @Override public void run() { long now = System.currentTimeMillis(); for (Processor processor : waitingProcessors) { processor.timeoutAsync(now); } } }, 1, 1, TimeUnit.SECONDS); } } protected void stopAsyncTimeout() { if (timeoutFuture != null) { timeoutFuture.cancel(false); timeoutFuture = null; } } @Override public void pause() throws Exception { if (getLog().isInfoEnabled()) { getLog().info(sm.getString("abstractProtocolHandler.pause", getName())); } stopAsyncTimeout(); endpoint.pause(); } public boolean isPaused() { return endpoint.isPaused(); } @Override public void resume() throws Exception { if(getLog().isInfoEnabled()) { getLog().info(sm.getString("abstractProtocolHandler.resume", getName())); } endpoint.resume(); startAsyncTimeout(); } @Override public void stop() throws Exception { if(getLog().isInfoEnabled()) { getLog().info(sm.getString("abstractProtocolHandler.stop", getName())); logPortOffset(); } if (monitorFuture != null) { monitorFuture.cancel(true); monitorFuture = null; } stopAsyncTimeout(); // Timeout any waiting processor for (Processor processor : waitingProcessors) { processor.timeoutAsync(-1); } endpoint.stop(); } @Override public void destroy() throws Exception { if(getLog().isInfoEnabled()) { getLog().info(sm.getString("abstractProtocolHandler.destroy", getName())); logPortOffset(); } try { endpoint.destroy(); } finally { if (oname != null) { if (mserver == null) { Registry.getRegistry(null, null).unregisterComponent(oname); } else { // Possibly registered with a different MBeanServer try { mserver.unregisterMBean(oname); } catch (MBeanRegistrationException | InstanceNotFoundException e) { getLog().info(sm.getString("abstractProtocol.mbeanDeregistrationFailed", oname, mserver)); } } } if (rgOname != null) { Registry.getRegistry(null, null).unregisterComponent(rgOname); } } } @Override public void closeServerSocketGraceful() { endpoint.closeServerSocketGraceful(); } private void logPortOffset() { if (getPort() != getPortWithOffset()) { getLog().info(sm.getString("abstractProtocolHandler.portOffset", getName(), String.valueOf(getPort()), String.valueOf(getPortOffset()))); } } // ------------------------------------------- Connection handler base class protected static class ConnectionHandler<S> implements AbstractEndpoint.Handler<S> { private final AbstractProtocol<S> proto; private final RequestGroupInfo global = new RequestGroupInfo(); private final AtomicLong registerCount = new AtomicLong(0); private final Map<S,Processor> connections = new ConcurrentHashMap<>(); private final RecycledProcessors recycledProcessors = new RecycledProcessors(this); public ConnectionHandler(AbstractProtocol<S> proto) { this.proto = proto; } protected AbstractProtocol<S> getProtocol() { return proto; } protected Log getLog() { return getProtocol().getLog(); } @Override public Object getGlobal() { return global; } @Override public void recycle() { recycledProcessors.clear(); } @Override public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) { if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.process", wrapper.getSocket(), status)); } if (wrapper == null) { // Nothing to do. Socket has been closed. return SocketState.CLOSED; } S socket = wrapper.getSocket(); Processor processor = connections.get(socket); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet", processor, socket)); } // Timeouts are calculated on a dedicated thread and then // dispatched. Because of delays in the dispatch process, the // timeout may no longer be required. Check here and avoid // unnecessary processing. if (SocketEvent.TIMEOUT == status && (processor == null || !processor.isAsync() && !processor.isUpgrade() || processor.isAsync() && !processor.checkAsyncTimeoutGeneration())) { // This is effectively a NO-OP return SocketState.OPEN; } if (processor != null) { // Make sure an async timeout doesn't fire getProtocol().removeWaitingProcessor(processor); } else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) { // Nothing to do. Endpoint requested a close and there is no // longer a processor associated with this socket. return SocketState.CLOSED; } ContainerThreadMarker.set(); try { if (processor == null) { String negotiatedProtocol = wrapper.getNegotiatedProtocol(); // OpenSSL typically returns null whereas JSSE typically // returns "" when no protocol is negotiated if (negotiatedProtocol != null && negotiatedProtocol.length() > 0) { UpgradeProtocol upgradeProtocol = getProtocol().getNegotiatedProtocol(negotiatedProtocol); if (upgradeProtocol != null) { processor = upgradeProtocol.getProcessor( wrapper, getProtocol().getAdapter()); } else if (negotiatedProtocol.equals("http/1.1")) { // Explicitly negotiated the default protocol. // Obtain a processor below. } else { // TODO: // OpenSSL 1.0.2's ALPN callback doesn't support // failing the handshake with an error if no // protocol can be negotiated. Therefore, we need to // fail the connection here. Once this is fixed, // replace the code below with the commented out // block. if (getLog().isDebugEnabled()) { getLog().debug(sm.getString( "abstractConnectionHandler.negotiatedProcessor.fail", negotiatedProtocol)); } return SocketState.CLOSED; /* * To replace the code above once OpenSSL 1.1.0 is * used. // Failed to create processor. This is a bug. throw new IllegalStateException(sm.getString( "abstractConnectionHandler.negotiatedProcessor.fail", negotiatedProtocol)); */ } } } if (processor == null) { processor = recycledProcessors.pop(); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.processorPop", processor)); } } if (processor == null) { processor = getProtocol().createProcessor(); register(processor); } processor.setSslSupport( wrapper.getSslSupport(getProtocol().getClientCertProvider())); // Associate the processor with the connection connections.put(socket, processor); SocketState state = SocketState.CLOSED; do { state = processor.process(wrapper, status); if (state == SocketState.UPGRADING) { // Get the HTTP upgrade handler UpgradeToken upgradeToken = processor.getUpgradeToken(); // Retrieve leftover input ByteBuffer leftOverInput = processor.getLeftoverInput(); if (upgradeToken == null) { // Assume direct HTTP/2 connection UpgradeProtocol upgradeProtocol = getProtocol().getUpgradeProtocol("h2c"); if (upgradeProtocol != null) { processor = upgradeProtocol.getProcessor( wrapper, getProtocol().getAdapter()); wrapper.unRead(leftOverInput); // Associate with the processor with the connection connections.put(socket, processor); } else { if (getLog().isDebugEnabled()) { getLog().debug(sm.getString( "abstractConnectionHandler.negotiatedProcessor.fail", "h2c")); } return SocketState.CLOSED; } } else { HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler(); // Release the Http11 processor to be re-used release(processor); // Create the upgrade processor processor = getProtocol().createUpgradeProcessor(wrapper, upgradeToken); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate", processor, wrapper)); } wrapper.unRead(leftOverInput); // Mark the connection as upgraded wrapper.setUpgraded(true); // Associate with the processor with the connection connections.put(socket, processor); // Initialise the upgrade handler (which may trigger // some IO using the new protocol which is why the lines // above are necessary) // This cast should be safe. If it fails the error // handling for the surrounding try/catch will deal with // it. if (upgradeToken.getInstanceManager() == null) { httpUpgradeHandler.init((WebConnection) processor); } else { ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null); try { httpUpgradeHandler.init((WebConnection) processor); } finally { upgradeToken.getContextBind().unbind(false, oldCL); } } if (httpUpgradeHandler instanceof InternalHttpUpgradeHandler) { if (((InternalHttpUpgradeHandler) httpUpgradeHandler).hasAsyncIO()) { // The handler will initiate all further I/O state = SocketState.LONG; } } } } } while ( state == SocketState.UPGRADING); if (state == SocketState.LONG) { // In the middle of processing a request/response. Keep the // socket associated with the processor. Exact requirements // depend on type of long poll longPoll(wrapper, processor); if (processor.isAsync()) { getProtocol().addWaitingProcessor(processor); } } else if (state == SocketState.OPEN) { // In keep-alive but between requests. OK to recycle // processor. Continue to poll for the next request. connections.remove(socket); release(processor); wrapper.registerReadInterest(); } else if (state == SocketState.SENDFILE) { // Sendfile in progress. If it fails, the socket will be // closed. If it works, the socket either be added to the // poller (or equivalent) to await more data or processed // if there are any pipe-lined requests remaining. } else if (state == SocketState.UPGRADED) { // Don't add sockets back to the poller if this was a // non-blocking write otherwise the poller may trigger // multiple read events which may lead to thread starvation // in the connector. The write() method will add this socket // to the poller if necessary. if (status != SocketEvent.OPEN_WRITE) { longPoll(wrapper, processor); getProtocol().addWaitingProcessor(processor); } } else if (state == SocketState.SUSPENDED) { // Don't add sockets back to the poller. // The resumeProcessing() method will add this socket // to the poller. } else { // Connection closed. OK to recycle the processor. Upgrade // processors are not recycled. connections.remove(socket); if (processor.isUpgrade()) { UpgradeToken upgradeToken = processor.getUpgradeToken(); HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler(); InstanceManager instanceManager = upgradeToken.getInstanceManager(); if (instanceManager == null) { httpUpgradeHandler.destroy(); } else { ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null); try { httpUpgradeHandler.destroy(); } finally { try { instanceManager.destroyInstance(httpUpgradeHandler); } catch (Throwable e) { ExceptionUtils.handleThrowable(e); getLog().error(sm.getString("abstractConnectionHandler.error"), e); } upgradeToken.getContextBind().unbind(false, oldCL); } } } else { release(processor); } } return state; } catch(java.net.SocketException e) { // SocketExceptions are normal getLog().debug(sm.getString( "abstractConnectionHandler.socketexception.debug"), e); } catch (java.io.IOException e) { // IOExceptions are normal getLog().debug(sm.getString( "abstractConnectionHandler.ioexception.debug"), e); } catch (ProtocolException e) { // Protocol exceptions normally mean the client sent invalid or // incomplete data. getLog().debug(sm.getString( "abstractConnectionHandler.protocolexception.debug"), e); } // Future developers: if you discover any other // rare-but-nonfatal exceptions, catch them here, and log as // above. catch (OutOfMemoryError oome) { // Try and handle this here to give Tomcat a chance to close the // connection and prevent clients waiting until they time out. // Worst case, it isn't recoverable and the attempt at logging // will trigger another OOME. getLog().error(sm.getString("abstractConnectionHandler.oome"), oome); } catch (Throwable e) { ExceptionUtils.handleThrowable(e); // any other exception or error is odd. Here we log it // with "ERROR" level, so it will show up even on // less-than-verbose logs. getLog().error(sm.getString("abstractConnectionHandler.error"), e); } finally { ContainerThreadMarker.clear(); } // Make sure socket/processor is removed from the list of current // connections connections.remove(socket); release(processor); return SocketState.CLOSED; } protected void longPoll(SocketWrapperBase<?> socket, Processor processor) { if (!processor.isAsync()) { // This is currently only used with HTTP // Either: // - this is an upgraded connection // - the request line/headers have not been completely // read socket.registerReadInterest(); } } @Override public Set<S> getOpenSockets() { return connections.keySet(); }
Expected to be used by the handler once the processor is no longer required.
Params:
  • processor – Processor being released (that was associated with the socket)
/** * Expected to be used by the handler once the processor is no longer * required. * * @param processor Processor being released (that was associated with * the socket) */
private void release(Processor processor) { if (processor != null) { processor.recycle(); // After recycling, only instances of UpgradeProcessorBase will // return true for isUpgrade(). // Instances of UpgradeProcessorBase should not be added to // recycledProcessors since that pool is only for AJP or HTTP // processors if (!processor.isUpgrade()) { recycledProcessors.push(processor); getLog().debug("Pushed Processor [" + processor + "]"); } } }
Expected to be used by the Endpoint to release resources on socket close, errors etc.
/** * Expected to be used by the Endpoint to release resources on socket * close, errors etc. */
@Override public void release(SocketWrapperBase<S> socketWrapper) { S socket = socketWrapper.getSocket(); Processor processor = connections.remove(socket); release(processor); } protected void register(Processor processor) { if (getProtocol().getDomain() != null) { synchronized (this) { try { long count = registerCount.incrementAndGet(); RequestInfo rp = processor.getRequest().getRequestProcessor(); rp.setGlobalProcessor(global); ObjectName rpName = new ObjectName( getProtocol().getDomain() + ":type=RequestProcessor,worker=" + getProtocol().getName() + ",name=" + getProtocol().getProtocolName() + "Request" + count); if (getLog().isDebugEnabled()) { getLog().debug("Register " + rpName); } Registry.getRegistry(null, null).registerComponent(rp, rpName, null); rp.setRpName(rpName); } catch (Exception e) { getLog().warn("Error registering request"); } } } } protected void unregister(Processor processor) { if (getProtocol().getDomain() != null) { synchronized (this) { try { Request r = processor.getRequest(); if (r == null) { // Probably an UpgradeProcessor return; } RequestInfo rp = r.getRequestProcessor(); rp.setGlobalProcessor(null); ObjectName rpName = rp.getRpName(); if (getLog().isDebugEnabled()) { getLog().debug("Unregister " + rpName); } Registry.getRegistry(null, null).unregisterComponent( rpName); rp.setRpName(null); } catch (Exception e) { getLog().warn("Error unregistering request", e); } } } } @Override public final void pause() { /* * Inform all the processors associated with current connections * that the endpoint is being paused. Most won't care. Those * processing multiplexed streams may wish to take action. For * example, HTTP/2 may wish to stop accepting new streams. * * Note that even if the endpoint is resumed, there is (currently) * no API to inform the Processors of this. */ for (Processor processor : connections.values()) { processor.pause(); } } } protected static class RecycledProcessors extends SynchronizedStack<Processor> { private final transient ConnectionHandler<?> handler; protected final AtomicInteger size = new AtomicInteger(0); public RecycledProcessors(ConnectionHandler<?> handler) { this.handler = handler; } @SuppressWarnings("sync-override") // Size may exceed cache size a bit @Override public boolean push(Processor processor) { int cacheSize = handler.getProtocol().getProcessorCache(); boolean offer = cacheSize == -1 ? true : size.get() < cacheSize; //avoid over growing our cache or add after we have stopped boolean result = false; if (offer) { result = super.push(processor); if (result) { size.incrementAndGet(); } } if (!result) handler.unregister(processor); return result; } @SuppressWarnings("sync-override") // OK if size is too big briefly @Override public Processor pop() { Processor result = super.pop(); if (result != null) { size.decrementAndGet(); } return result; } @Override public synchronized void clear() { Processor next = pop(); while (next != null) { handler.unregister(next); next = pop(); } super.clear(); size.set(0); } } }