package org.terracotta.statistics.derived.latency;
import org.terracotta.statistics.Sample;
import org.terracotta.statistics.SampledStatistic;
import org.terracotta.statistics.StatisticType;
import org.terracotta.statistics.Time;
import org.terracotta.statistics.observer.ChainedEventObserver;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;
public class MaximumLatencyHistory implements ChainedEventObserver, SampledStatistic<Long> {
private final AtomicReference<LatencyPeriodAccumulator> latestAccumulator = new AtomicReference<>();
private final Queue<LatencyPeriodAccumulator> archive;
private final long windowSizeNs;
private final Consumer<LatencyPeriodAccumulator> sink;
private final LongSupplier timeSupplier;
private volatile long drift;
public MaximumLatencyHistory(int historySize, long windowSize, TimeUnit windowSizeUnit, LongSupplier timeSupplier) {
this(historySize, windowSize, windowSizeUnit, timeSupplier, accumulator -> {});
}
public MaximumLatencyHistory(int historySize, long windowSize, TimeUnit windowSizeUnit, LongSupplier timeSupplier, Consumer<LatencyPeriodAccumulator> sink) {
this.archive = new ArrayBlockingQueue<>(historySize);
this.windowSizeNs = TimeUnit.NANOSECONDS.convert(windowSize, windowSizeUnit);
this.sink = sink;
this.timeSupplier = timeSupplier;
this.drift = Time.time() - timeSupplier.getAsLong() * 1_000_000;
}
@Override
public void event(long timeNs, long latencyNs) {
while (true) {
LatencyPeriodAccumulator accumulator = latestAccumulator.get();
if (accumulator != null && accumulator.tryAccumulate(timeNs, latencyNs)) {
return;
}
LatencyPeriodAccumulator newAccumulator = new LatencyPeriodAccumulator(timeNs, windowSizeNs, latencyNs);
if (latestAccumulator.compareAndSet(accumulator, newAccumulator)) {
this.drift = Time.time() - timeSupplier.getAsLong() * 1_000_000;
insert(newAccumulator);
return;
}
}
}
@Override
public Long value() {
LatencyPeriodAccumulator accumulator = latestAccumulator.get();
if (accumulator == null || accumulator.end() <= Time.time()) {
return null;
}
return accumulator.maximum();
}
@Override
public StatisticType type() {
return StatisticType.GAUGE;
}
@Override
public List<Sample<Long>> history() {
long drift = this.drift;
return archive.stream()
.map(acumulator -> new Sample<>((acumulator.start() - drift) / 1_000_000, acumulator.maximum()))
.collect(Collectors.toList());
}
@Override
public List<Sample<Long>> history(long sinceMillis) {
long drift = this.drift;
long sinceNs = sinceMillis * 1_000_000 + drift;
return archive.stream()
.filter(acumulator -> acumulator.start() >= sinceNs)
.map(acumulator -> new Sample<>((acumulator.start() - drift) / 1_000_000, acumulator.maximum()))
.collect(Collectors.toList());
}
private void insert(LatencyPeriodAccumulator newAccumulator) {
while (!archive.offer(newAccumulator)) {
LatencyPeriodAccumulator removed = archive.poll();
if (removed != null) {
sink.accept(removed);
}
}
}
}