/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache license, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the license for the specific language governing permissions and
 * limitations under the license.
 */
package org.apache.logging.log4j.core.appender;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.logging.log4j.core.AbstractLogEvent;
import org.apache.logging.log4j.core.Appender;
import org.apache.logging.log4j.core.Core;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.async.ArrayBlockingQueueFactory;
import org.apache.logging.log4j.core.async.AsyncQueueFullMessageUtil;
import org.apache.logging.log4j.core.async.AsyncQueueFullPolicy;
import org.apache.logging.log4j.core.async.AsyncQueueFullPolicyFactory;
import org.apache.logging.log4j.core.async.BlockingQueueFactory;
import org.apache.logging.log4j.core.async.DiscardingAsyncQueueFullPolicy;
import org.apache.logging.log4j.core.async.EventRoute;
import org.apache.logging.log4j.core.async.InternalAsyncUtil;
import org.apache.logging.log4j.core.config.AppenderControl;
import org.apache.logging.log4j.core.config.AppenderRef;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.ConfigurationException;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginAliases;
import org.apache.logging.log4j.core.config.plugins.PluginBuilderAttribute;
import org.apache.logging.log4j.core.config.plugins.PluginBuilderFactory;
import org.apache.logging.log4j.core.config.plugins.PluginConfiguration;
import org.apache.logging.log4j.core.config.plugins.PluginElement;
import org.apache.logging.log4j.core.config.plugins.validation.constraints.Required;
import org.apache.logging.log4j.core.filter.AbstractFilterable;
import org.apache.logging.log4j.core.impl.Log4jLogEvent;
import org.apache.logging.log4j.core.util.Log4jThread;
import org.apache.logging.log4j.spi.AbstractLogger;

Appends to one or more Appenders asynchronously. You can configure an AsyncAppender with one or more Appenders and an Appender to append to if the queue is full. The AsyncAppender does not allow a filter to be specified on the Appender references.
/** * Appends to one or more Appenders asynchronously. You can configure an AsyncAppender with one or more Appenders and an * Appender to append to if the queue is full. The AsyncAppender does not allow a filter to be specified on the Appender * references. */
@Plugin(name = "Async", category = Core.CATEGORY_NAME, elementType = Appender.ELEMENT_TYPE, printObject = true) public final class AsyncAppender extends AbstractAppender { private static final int DEFAULT_QUEUE_SIZE = 1024; private static final LogEvent SHUTDOWN_LOG_EVENT = new AbstractLogEvent() { private static final long serialVersionUID = -1761035149477086330L; }; private static final AtomicLong THREAD_SEQUENCE = new AtomicLong(1); private final BlockingQueue<LogEvent> queue; private final int queueSize; private final boolean blocking; private final long shutdownTimeout; private final Configuration config; private final AppenderRef[] appenderRefs; private final String errorRef; private final boolean includeLocation; private AppenderControl errorAppender; private AsyncThread thread; private AsyncQueueFullPolicy asyncQueueFullPolicy; private AsyncAppender(final String name, final Filter filter, final AppenderRef[] appenderRefs, final String errorRef, final int queueSize, final boolean blocking, final boolean ignoreExceptions, final long shutdownTimeout, final Configuration config, final boolean includeLocation, final BlockingQueueFactory<LogEvent> blockingQueueFactory, final Property[] properties) { super(name, filter, null, ignoreExceptions, properties); this.queue = blockingQueueFactory.create(queueSize); this.queueSize = queueSize; this.blocking = blocking; this.shutdownTimeout = shutdownTimeout; this.config = config; this.appenderRefs = appenderRefs; this.errorRef = errorRef; this.includeLocation = includeLocation; } @Override public void start() { final Map<String, Appender> map = config.getAppenders(); final List<AppenderControl> appenders = new ArrayList<>(); for (final AppenderRef appenderRef : appenderRefs) { final Appender appender = map.get(appenderRef.getRef()); if (appender != null) { appenders.add(new AppenderControl(appender, appenderRef.getLevel(), appenderRef.getFilter())); } else { LOGGER.error("No appender named {} was configured", appenderRef); } } if (errorRef != null) { final Appender appender = map.get(errorRef); if (appender != null) { errorAppender = new AppenderControl(appender, null, null); } else { LOGGER.error("Unable to set up error Appender. No appender named {} was configured", errorRef); } } if (appenders.size() > 0) { thread = new AsyncThread(appenders, queue); thread.setName("AsyncAppender-" + getName()); } else if (errorRef == null) { throw new ConfigurationException("No appenders are available for AsyncAppender " + getName()); } asyncQueueFullPolicy = AsyncQueueFullPolicyFactory.create(); thread.start(); super.start(); } @Override public boolean stop(final long timeout, final TimeUnit timeUnit) { setStopping(); super.stop(timeout, timeUnit, false); LOGGER.trace("AsyncAppender stopping. Queue still has {} events.", queue.size()); thread.shutdown(); try { thread.join(shutdownTimeout); } catch (final InterruptedException ex) { LOGGER.warn("Interrupted while stopping AsyncAppender {}", getName()); } LOGGER.trace("AsyncAppender stopped. Queue has {} events.", queue.size()); if (DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy) > 0) { LOGGER.trace("AsyncAppender: {} discarded {} events.", asyncQueueFullPolicy, DiscardingAsyncQueueFullPolicy.getDiscardCount(asyncQueueFullPolicy)); } setStopped(); return true; }
Actual writing occurs here.
Params:
  • logEvent – The LogEvent.
/** * Actual writing occurs here. * * @param logEvent The LogEvent. */
@Override public void append(final LogEvent logEvent) { if (!isStarted()) { throw new IllegalStateException("AsyncAppender " + getName() + " is not active"); } final Log4jLogEvent memento = Log4jLogEvent.createMemento(logEvent, includeLocation); InternalAsyncUtil.makeMessageImmutable(logEvent.getMessage()); if (!transfer(memento)) { if (blocking) { if (AbstractLogger.getRecursionDepth() > 1) { // LOG4J2-1518, LOG4J2-2031 // If queue is full AND we are in a recursive call, call appender directly to prevent deadlock AsyncQueueFullMessageUtil.logWarningToStatusLogger(); logMessageInCurrentThread(logEvent); } else { // delegate to the event router (which may discard, enqueue and block, or log in current thread) final EventRoute route = asyncQueueFullPolicy.getRoute(thread.getId(), memento.getLevel()); route.logMessage(this, memento); } } else { error("Appender " + getName() + " is unable to write primary appenders. queue is full"); logToErrorAppenderIfNecessary(false, memento); } } } private boolean transfer(final LogEvent memento) { return queue instanceof TransferQueue ? ((TransferQueue<LogEvent>) queue).tryTransfer(memento) : queue.offer(memento); }
FOR INTERNAL USE ONLY.
Params:
  • logEvent – the event to log
/** * FOR INTERNAL USE ONLY. * * @param logEvent the event to log */
public void logMessageInCurrentThread(final LogEvent logEvent) { logEvent.setEndOfBatch(queue.isEmpty()); final boolean appendSuccessful = thread.callAppenders(logEvent); logToErrorAppenderIfNecessary(appendSuccessful, logEvent); }
FOR INTERNAL USE ONLY.
Params:
  • logEvent – the event to log
/** * FOR INTERNAL USE ONLY. * * @param logEvent the event to log */
public void logMessageInBackgroundThread(final LogEvent logEvent) { try { // wait for free slots in the queue queue.put(logEvent); } catch (final InterruptedException e) { final boolean appendSuccessful = handleInterruptedException(logEvent); logToErrorAppenderIfNecessary(appendSuccessful, logEvent); } } // LOG4J2-1049: Some applications use Thread.interrupt() to send // messages between application threads. This does not necessarily // mean that the queue is full. To prevent dropping a log message, // quickly try to offer the event to the queue again. // (Yes, this means there is a possibility the same event is logged twice.) // // Finally, catching the InterruptedException means the // interrupted flag has been cleared on the current thread. // This may interfere with the application's expectation of // being interrupted, so when we are done, we set the interrupted // flag again. private boolean handleInterruptedException(final LogEvent memento) { final boolean appendSuccessful = queue.offer(memento); if (!appendSuccessful) { LOGGER.warn("Interrupted while waiting for a free slot in the AsyncAppender LogEvent-queue {}", getName()); } // set the interrupted flag again. Thread.currentThread().interrupt(); return appendSuccessful; } private void logToErrorAppenderIfNecessary(final boolean appendSuccessful, final LogEvent logEvent) { if (!appendSuccessful && errorAppender != null) { errorAppender.callAppender(logEvent); } }
Create an AsyncAppender. This method is retained for backwards compatibility. New code should use the Builder instead. This factory will use ArrayBlockingQueueFactory by default as was the behavior pre-2.7.
Params:
  • appenderRefs – The Appenders to reference.
  • errorRef – An optional Appender to write to if the queue is full or other errors occur.
  • blocking – True if the Appender should wait when the queue is full. The default is true.
  • shutdownTimeout – How many milliseconds the Appender should wait to flush outstanding log events in the queue on shutdown. The default is zero which means to wait forever.
  • size – The size of the event queue. The default is 128.
  • name – The name of the Appender.
  • includeLocation – whether to include location information. The default is false.
  • filter – The Filter or null.
  • config – The Configuration.
  • ignoreExceptions – If "true" (default) exceptions encountered when appending events are logged; otherwise they are propagated to the caller.
Returns:The AsyncAppender.
Deprecated:use Builder instead
/** * Create an AsyncAppender. This method is retained for backwards compatibility. New code should use the * {@link Builder} instead. This factory will use {@link ArrayBlockingQueueFactory} by default as was the behavior * pre-2.7. * * @param appenderRefs The Appenders to reference. * @param errorRef An optional Appender to write to if the queue is full or other errors occur. * @param blocking True if the Appender should wait when the queue is full. The default is true. * @param shutdownTimeout How many milliseconds the Appender should wait to flush outstanding log events * in the queue on shutdown. The default is zero which means to wait forever. * @param size The size of the event queue. The default is 128. * @param name The name of the Appender. * @param includeLocation whether to include location information. The default is false. * @param filter The Filter or null. * @param config The Configuration. * @param ignoreExceptions If {@code "true"} (default) exceptions encountered when appending events are logged; * otherwise they are propagated to the caller. * @return The AsyncAppender. * @deprecated use {@link Builder} instead */
@Deprecated public static AsyncAppender createAppender(final AppenderRef[] appenderRefs, final String errorRef, final boolean blocking, final long shutdownTimeout, final int size, final String name, final boolean includeLocation, final Filter filter, final Configuration config, final boolean ignoreExceptions) { if (name == null) { LOGGER.error("No name provided for AsyncAppender"); return null; } if (appenderRefs == null) { LOGGER.error("No appender references provided to AsyncAppender {}", name); } return new AsyncAppender(name, filter, appenderRefs, errorRef, size, blocking, ignoreExceptions, shutdownTimeout, config, includeLocation, new ArrayBlockingQueueFactory<LogEvent>(), null); } @PluginBuilderFactory public static Builder newBuilder() { return new Builder(); } public static class Builder<B extends Builder<B>> extends AbstractFilterable.Builder<B> implements org.apache.logging.log4j.core.util.Builder<AsyncAppender> { @PluginElement("AppenderRef") @Required(message = "No appender references provided to AsyncAppender") private AppenderRef[] appenderRefs; @PluginBuilderAttribute @PluginAliases("error-ref") private String errorRef; @PluginBuilderAttribute private boolean blocking = true; @PluginBuilderAttribute private long shutdownTimeout = 0L; @PluginBuilderAttribute private int bufferSize = DEFAULT_QUEUE_SIZE; @PluginBuilderAttribute @Required(message = "No name provided for AsyncAppender") private String name; @PluginBuilderAttribute private boolean includeLocation = false; @PluginConfiguration private Configuration configuration; @PluginBuilderAttribute private boolean ignoreExceptions = true; @PluginElement(BlockingQueueFactory.ELEMENT_TYPE) private BlockingQueueFactory<LogEvent> blockingQueueFactory = new ArrayBlockingQueueFactory<>(); public Builder setAppenderRefs(final AppenderRef[] appenderRefs) { this.appenderRefs = appenderRefs; return this; } public Builder setErrorRef(final String errorRef) { this.errorRef = errorRef; return this; } public Builder setBlocking(final boolean blocking) { this.blocking = blocking; return this; } public Builder setShutdownTimeout(final long shutdownTimeout) { this.shutdownTimeout = shutdownTimeout; return this; } public Builder setBufferSize(final int bufferSize) { this.bufferSize = bufferSize; return this; } public Builder setName(final String name) { this.name = name; return this; } public Builder setIncludeLocation(final boolean includeLocation) { this.includeLocation = includeLocation; return this; } public Builder setConfiguration(final Configuration configuration) { this.configuration = configuration; return this; } public Builder setIgnoreExceptions(final boolean ignoreExceptions) { this.ignoreExceptions = ignoreExceptions; return this; } public Builder setBlockingQueueFactory(final BlockingQueueFactory<LogEvent> blockingQueueFactory) { this.blockingQueueFactory = blockingQueueFactory; return this; } @Override public AsyncAppender build() { return new AsyncAppender(name, getFilter(), appenderRefs, errorRef, bufferSize, blocking, ignoreExceptions, shutdownTimeout, configuration, includeLocation, blockingQueueFactory, getPropertyArray()); } }
Thread that calls the Appenders.
/** * Thread that calls the Appenders. */
private class AsyncThread extends Log4jThread { private volatile boolean shutdown = false; private final List<AppenderControl> appenders; private final BlockingQueue<LogEvent> queue; public AsyncThread(final List<AppenderControl> appenders, final BlockingQueue<LogEvent> queue) { super("AsyncAppender-" + THREAD_SEQUENCE.getAndIncrement()); this.appenders = appenders; this.queue = queue; setDaemon(true); } @Override public void run() { while (!shutdown) { LogEvent event; try { event = queue.take(); if (event == SHUTDOWN_LOG_EVENT) { shutdown = true; continue; } } catch (final InterruptedException ex) { break; // LOG4J2-830 } event.setEndOfBatch(queue.isEmpty()); final boolean success = callAppenders(event); if (!success && errorAppender != null) { try { errorAppender.callAppender(event); } catch (final Exception ex) { // Silently accept the error. } } } // Process any remaining items in the queue. LOGGER.trace("AsyncAppender.AsyncThread shutting down. Processing remaining {} queue events.", queue.size()); int count = 0; int ignored = 0; while (!queue.isEmpty()) { try { final LogEvent event = queue.take(); if (event instanceof Log4jLogEvent) { final Log4jLogEvent logEvent = (Log4jLogEvent) event; logEvent.setEndOfBatch(queue.isEmpty()); callAppenders(logEvent); count++; } else { ignored++; LOGGER.trace("Ignoring event of class {}", event.getClass().getName()); } } catch (final InterruptedException ex) { // May have been interrupted to shut down. // Here we ignore interrupts and try to process all remaining events. } } LOGGER.trace("AsyncAppender.AsyncThread stopped. Queue has {} events remaining. " + "Processed {} and ignored {} events since shutdown started.", queue.size(), count, ignored); }
Calls callAppender on all registered AppenderControl objects, and returns true if at least one appender call was successful, false otherwise. Any exceptions are silently ignored.
Params:
  • event – the event to forward to the registered appenders
Returns:true if at least one appender call succeeded, false otherwise
/** * Calls {@link AppenderControl#callAppender(LogEvent) callAppender} on all registered {@code AppenderControl} * objects, and returns {@code true} if at least one appender call was successful, {@code false} otherwise. Any * exceptions are silently ignored. * * @param event the event to forward to the registered appenders * @return {@code true} if at least one appender call succeeded, {@code false} otherwise */
boolean callAppenders(final LogEvent event) { boolean success = false; for (final AppenderControl control : appenders) { try { control.callAppender(event); success = true; } catch (final Exception ex) { // If no appender is successful the error appender will get it. } } return success; } public void shutdown() { shutdown = true; if (queue.isEmpty()) { queue.offer(SHUTDOWN_LOG_EVENT); } if (getState() == State.TIMED_WAITING || getState() == State.WAITING) { this.interrupt(); // LOG4J2-1422: if underlying appender is stuck in wait/sleep/join/park call } } }
Returns the names of the appenders that this asyncAppender delegates to as an array of Strings.
Returns:the names of the sink appenders
/** * Returns the names of the appenders that this asyncAppender delegates to as an array of Strings. * * @return the names of the sink appenders */
public String[] getAppenderRefStrings() { final String[] result = new String[appenderRefs.length]; for (int i = 0; i < result.length; i++) { result[i] = appenderRefs[i].getRef(); } return result; }
Returns true if this AsyncAppender will take a snapshot of the stack with every log event to determine the class and method where the logging call was made.
Returns:true if location is included with every event, false otherwise
/** * Returns {@code true} if this AsyncAppender will take a snapshot of the stack with every log event to determine * the class and method where the logging call was made. * * @return {@code true} if location is included with every event, {@code false} otherwise */
public boolean isIncludeLocation() { return includeLocation; }
Returns true if this AsyncAppender will block when the queue is full, or false if events are dropped when the queue is full.
Returns:whether this AsyncAppender will block or drop events when the queue is full.
/** * Returns {@code true} if this AsyncAppender will block when the queue is full, or {@code false} if events are * dropped when the queue is full. * * @return whether this AsyncAppender will block or drop events when the queue is full. */
public boolean isBlocking() { return blocking; }
Returns the name of the appender that any errors are logged to or null.
Returns:the name of the appender that any errors are logged to or null
/** * Returns the name of the appender that any errors are logged to or {@code null}. * * @return the name of the appender that any errors are logged to or {@code null} */
public String getErrorRef() { return errorRef; } public int getQueueCapacity() { return queueSize; } public int getQueueRemainingCapacity() { return queue.remainingCapacity(); }
Returns the number of elements in the queue.
Returns:the number of elements in the queue.
Since:2.11.1
/** * Returns the number of elements in the queue. * * @return the number of elements in the queue. * @since 2.11.1 */
public int getQueueSize() { return queue.size(); } }