/*
 * Copyright 2016 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package io.netty.handler.flush;

import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelOutboundInvoker;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;

import java.util.concurrent.Future;

ChannelDuplexHandler which consolidates Channel.flush() / ChannelHandlerContext.flush() operations (which also includes ChannelOutboundInvoker.writeAndFlush(Object) / ChannelOutboundInvoker.writeAndFlush(Object, ChannelPromise) and ChannelOutboundInvoker.writeAndFlush(Object) / ChannelOutboundInvoker.writeAndFlush(Object, ChannelPromise)).

Flush operations are generally speaking expensive as these may trigger a syscall on the transport level. Thus it is in most cases (where write latency can be traded with throughput) a good idea to try to minimize flush operations as much as possible.

If a read loop is currently ongoing, flush(ChannelHandlerContext) will not be passed on to the next ChannelOutboundHandler in the ChannelPipeline, as it will pick up any pending flushes when channelReadComplete(ChannelHandlerContext) is triggered. If no read loop is ongoing, the behavior depends on the consolidateWhenNoReadInProgress constructor argument:

  • if false, flushes are passed on to the next handler directly;
  • if true, the invocation of the next handler is submitted as a separate task on the event loop. Under high throughput, this gives the opportunity to process other flushes before the task gets executed, thus batching multiple flushes into one.
If explicitFlushAfterFlushes is reached the flush will also be forwarded as well (whether while in a read loop, or while batching outside of a read loop).

If the Channel becomes non-writable it will also try to execute any pending flush operations.

The FlushConsolidationHandler should be put as first ChannelHandler in the ChannelPipeline to have the best effect.

/** * {@link ChannelDuplexHandler} which consolidates {@link Channel#flush()} / {@link ChannelHandlerContext#flush()} * operations (which also includes * {@link Channel#writeAndFlush(Object)} / {@link Channel#writeAndFlush(Object, ChannelPromise)} and * {@link ChannelOutboundInvoker#writeAndFlush(Object)} / * {@link ChannelOutboundInvoker#writeAndFlush(Object, ChannelPromise)}). * <p> * Flush operations are generally speaking expensive as these may trigger a syscall on the transport level. Thus it is * in most cases (where write latency can be traded with throughput) a good idea to try to minimize flush operations * as much as possible. * <p> * If a read loop is currently ongoing, {@link #flush(ChannelHandlerContext)} will not be passed on to the next * {@link ChannelOutboundHandler} in the {@link ChannelPipeline}, as it will pick up any pending flushes when * {@link #channelReadComplete(ChannelHandlerContext)} is triggered. * If no read loop is ongoing, the behavior depends on the {@code consolidateWhenNoReadInProgress} constructor argument: * <ul> * <li>if {@code false}, flushes are passed on to the next handler directly;</li> * <li>if {@code true}, the invocation of the next handler is submitted as a separate task on the event loop. Under * high throughput, this gives the opportunity to process other flushes before the task gets executed, thus * batching multiple flushes into one.</li> * </ul> * If {@code explicitFlushAfterFlushes} is reached the flush will also be forwarded as well (whether while in a read * loop, or while batching outside of a read loop). * <p> * If the {@link Channel} becomes non-writable it will also try to execute any pending flush operations. * <p> * The {@link FlushConsolidationHandler} should be put as first {@link ChannelHandler} in the * {@link ChannelPipeline} to have the best effect. */
public class FlushConsolidationHandler extends ChannelDuplexHandler { private final int explicitFlushAfterFlushes; private final boolean consolidateWhenNoReadInProgress; private final Runnable flushTask; private int flushPendingCount; private boolean readInProgress; private ChannelHandlerContext ctx; private Future<?> nextScheduledFlush;
Create new instance which explicit flush after 256 pending flush operations latest.
/** * Create new instance which explicit flush after 256 pending flush operations latest. */
public FlushConsolidationHandler() { this(256, false); }
Create new instance which doesn't consolidate flushes when no read is in progress.
Params:
  • explicitFlushAfterFlushes – the number of flushes after which an explicit flush will be done.
/** * Create new instance which doesn't consolidate flushes when no read is in progress. * * @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done. */
public FlushConsolidationHandler(int explicitFlushAfterFlushes) { this(explicitFlushAfterFlushes, false); }
Create new instance.
Params:
  • explicitFlushAfterFlushes – the number of flushes after which an explicit flush will be done.
  • consolidateWhenNoReadInProgress – whether to consolidate flushes even when no read loop is currently ongoing.
/** * Create new instance. * * @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done. * @param consolidateWhenNoReadInProgress whether to consolidate flushes even when no read loop is currently * ongoing. */
public FlushConsolidationHandler(int explicitFlushAfterFlushes, boolean consolidateWhenNoReadInProgress) { if (explicitFlushAfterFlushes <= 0) { throw new IllegalArgumentException("explicitFlushAfterFlushes: " + explicitFlushAfterFlushes + " (expected: > 0)"); } this.explicitFlushAfterFlushes = explicitFlushAfterFlushes; this.consolidateWhenNoReadInProgress = consolidateWhenNoReadInProgress; flushTask = consolidateWhenNoReadInProgress ? new Runnable() { @Override public void run() { if (flushPendingCount > 0 && !readInProgress) { flushPendingCount = 0; ctx.flush(); nextScheduledFlush = null; } // else we'll flush when the read completes } } : null; } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { this.ctx = ctx; } @Override public void flush(ChannelHandlerContext ctx) throws Exception { if (readInProgress) { // If there is still a read in progress we are sure we will see a channelReadComplete(...) call. Thus // we only need to flush if we reach the explicitFlushAfterFlushes limit. if (++flushPendingCount == explicitFlushAfterFlushes) { flushNow(ctx); } } else if (consolidateWhenNoReadInProgress) { // Flush immediately if we reach the threshold, otherwise schedule if (++flushPendingCount == explicitFlushAfterFlushes) { flushNow(ctx); } else { scheduleFlush(ctx); } } else { // Always flush directly flushNow(ctx); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // This may be the last event in the read loop, so flush now! resetReadAndFlushIfNeeded(ctx); ctx.fireChannelReadComplete(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { readInProgress = true; ctx.fireChannelRead(msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // To ensure we not miss to flush anything, do it now. resetReadAndFlushIfNeeded(ctx); ctx.fireExceptionCaught(cause); } @Override public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { // Try to flush one last time if flushes are pending before disconnect the channel. resetReadAndFlushIfNeeded(ctx); ctx.disconnect(promise); } @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { // Try to flush one last time if flushes are pending before close the channel. resetReadAndFlushIfNeeded(ctx); ctx.close(promise); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { if (!ctx.channel().isWritable()) { // The writability of the channel changed to false, so flush all consolidated flushes now to free up memory. flushIfNeeded(ctx); } ctx.fireChannelWritabilityChanged(); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { flushIfNeeded(ctx); } private void resetReadAndFlushIfNeeded(ChannelHandlerContext ctx) { readInProgress = false; flushIfNeeded(ctx); } private void flushIfNeeded(ChannelHandlerContext ctx) { if (flushPendingCount > 0) { flushNow(ctx); } } private void flushNow(ChannelHandlerContext ctx) { cancelScheduledFlush(); flushPendingCount = 0; ctx.flush(); } private void scheduleFlush(final ChannelHandlerContext ctx) { if (nextScheduledFlush == null) { // Run as soon as possible, but still yield to give a chance for additional writes to enqueue. nextScheduledFlush = ctx.channel().eventLoop().submit(flushTask); } } private void cancelScheduledFlush() { if (nextScheduledFlush != null) { nextScheduledFlush.cancel(false); nextScheduledFlush = null; } } }