package org.glassfish.jersey.server.internal.monitoring;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ws.rs.ProcessingException;
import javax.ws.rs.core.Configuration;
import org.glassfish.jersey.internal.inject.InjectionManager;
import org.glassfish.jersey.internal.util.PropertiesHelper;
import org.glassfish.jersey.server.BackgroundSchedulerLiteral;
import org.glassfish.jersey.server.ExtendedResourceContext;
import org.glassfish.jersey.server.ServerProperties;
import org.glassfish.jersey.server.internal.LocalizationMessages;
import org.glassfish.jersey.server.model.ResourceMethod;
import org.glassfish.jersey.server.model.ResourceModel;
import org.glassfish.jersey.server.monitoring.MonitoringStatisticsListener;
import org.glassfish.jersey.server.monitoring.RequestEvent;
final class MonitoringStatisticsProcessor {
private static final Logger LOGGER = Logger.getLogger(MonitoringStatisticsProcessor.class.getName());
private static final int DEFAULT_INTERVAL = 500;
private static final int SHUTDOWN_TIMEOUT = 10;
private final MonitoringEventListener monitoringEventListener;
private final MonitoringStatisticsImpl.Builder statisticsBuilder;
private final List<MonitoringStatisticsListener> statisticsCallbackList;
private final ScheduledExecutorService scheduler;
private final int interval;
MonitoringStatisticsProcessor(
final InjectionManager injectionManager, final MonitoringEventListener monitoringEventListener) {
this.monitoringEventListener = monitoringEventListener;
final ResourceModel resourceModel = injectionManager.getInstance(ExtendedResourceContext.class).getResourceModel();
this.statisticsBuilder = new MonitoringStatisticsImpl.Builder(resourceModel);
this.statisticsCallbackList = injectionManager.getAllInstances(MonitoringStatisticsListener.class);
this.scheduler =
injectionManager.getInstance(ScheduledExecutorService.class, BackgroundSchedulerLiteral.INSTANCE);
this.interval = PropertiesHelper.getValue(injectionManager.getInstance(Configuration.class).getProperties(),
ServerProperties.MONITORING_STATISTICS_REFRESH_INTERVAL, DEFAULT_INTERVAL,
Collections.<String, String>emptyMap());
}
public void startMonitoringWorker() {
scheduler.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
processRequestItems();
processResponseCodeEvents();
processExceptionMapperEvents();
} catch (final Throwable t) {
LOGGER.log(Level.SEVERE, LocalizationMessages.ERROR_MONITORING_STATISTICS_GENERATION(), t);
throw new ProcessingException(LocalizationMessages.ERROR_MONITORING_STATISTICS_GENERATION(), t);
}
final MonitoringStatisticsImpl immutableStats = statisticsBuilder.build();
final Iterator<MonitoringStatisticsListener> iterator = statisticsCallbackList.iterator();
while (iterator.hasNext() && !Thread.currentThread().isInterrupted()) {
final MonitoringStatisticsListener listener = iterator.next();
try {
listener.onStatistics(immutableStats);
} catch (final Throwable t) {
LOGGER.log(Level.SEVERE,
LocalizationMessages.ERROR_MONITORING_STATISTICS_LISTENER(listener.getClass()), t);
iterator.remove();
}
}
}
}, 0, interval, TimeUnit.MILLISECONDS);
}
private void processExceptionMapperEvents() {
final Queue<RequestEvent> eventQueue = monitoringEventListener.getExceptionMapperEvents();
final FloodingLogger floodingLogger = new FloodingLogger(eventQueue);
while (!eventQueue.isEmpty()) {
floodingLogger.conditionallyLogFlooding();
final RequestEvent event = eventQueue.remove();
final ExceptionMapperStatisticsImpl.Builder mapperStats = statisticsBuilder.getExceptionMapperStatisticsBuilder();
if (event.getExceptionMapper() != null) {
mapperStats.addExceptionMapperExecution(event.getExceptionMapper().getClass(), 1);
}
mapperStats.addMapping(event.isResponseSuccessfullyMapped(), 1);
}
}
private void processRequestItems() {
final Queue<MonitoringEventListener.RequestStats> requestQueuedItems = monitoringEventListener.getRequestQueuedItems();
final FloodingLogger floodingLogger = new FloodingLogger(requestQueuedItems);
while (!requestQueuedItems.isEmpty()) {
floodingLogger.conditionallyLogFlooding();
final MonitoringEventListener.RequestStats event = requestQueuedItems.remove();
final MonitoringEventListener.TimeStats requestStats = event.getRequestStats();
statisticsBuilder.addRequestExecution(requestStats.getStartTime(), requestStats.getDuration());
final MonitoringEventListener.MethodStats methodStat = event.getMethodStats();
if (methodStat != null) {
final ResourceMethod method = methodStat.getMethod();
statisticsBuilder.addExecution(event.getRequestUri(), method,
methodStat.getStartTime(), methodStat.getDuration(),
requestStats.getStartTime(), requestStats.getDuration());
}
}
}
private void processResponseCodeEvents() {
final Queue<Integer> responseEvents = monitoringEventListener.getResponseStatuses();
final FloodingLogger floodingLogger = new FloodingLogger(responseEvents);
while (!responseEvents.isEmpty()) {
floodingLogger.conditionallyLogFlooding();
final Integer code = responseEvents.remove();
statisticsBuilder.addResponseCode(code);
}
}
void shutDown() throws InterruptedException {
scheduler.shutdown();
final boolean success = scheduler.awaitTermination(SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
if (!success) {
LOGGER.warning(LocalizationMessages.ERROR_MONITORING_SCHEDULER_DESTROY_TIMEOUT());
}
}
private static class FloodingLogger {
private static final int FLOODING_WARNING_LOG_INTERVAL_MILLIS = 5_000;
private final Collection<?> collection;
private final long startTime = System.nanoTime();
private int i = 0;
private int lastSize;
public FloodingLogger(final Collection<?> collection) {
this.collection = collection;
this.lastSize = collection.size();
}
public void conditionallyLogFlooding() {
if ((System.nanoTime() - startTime) / TimeUnit.NANOSECONDS.convert(FLOODING_WARNING_LOG_INTERVAL_MILLIS,
TimeUnit.MILLISECONDS) <= i) {
return;
}
if (collection.size() > lastSize) {
LOGGER.warning(LocalizationMessages.ERROR_MONITORING_QUEUE_FLOODED(collection.size()));
}
i++;
lastSize = collection.size();
}
}
}