package com.netflix.hystrix.metric.consumer;
import com.netflix.hystrix.metric.CachedValuesHistogram;
import com.netflix.hystrix.metric.HystrixEvent;
import com.netflix.hystrix.metric.HystrixEventStream;
import org.HdrHistogram.Histogram;
import rx.Observable;
import rx.Subscription;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.subjects.BehaviorSubject;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class RollingDistributionStream<Event extends HystrixEvent> {
private AtomicReference<Subscription> rollingDistributionSubscription = new AtomicReference<Subscription>(null);
private final BehaviorSubject<CachedValuesHistogram> rollingDistribution = BehaviorSubject.create(CachedValuesHistogram.backedBy(CachedValuesHistogram.getNewHistogram()));
private final Observable<CachedValuesHistogram> rollingDistributionStream;
private static final Func2<Histogram, Histogram, Histogram> distributionAggregator = new Func2<Histogram, Histogram, Histogram>() {
@Override
public Histogram call(Histogram initialDistribution, Histogram distributionToAdd) {
initialDistribution.add(distributionToAdd);
return initialDistribution;
}
};
private static final Func1<Observable<Histogram>, Observable<Histogram>> reduceWindowToSingleDistribution = new Func1<Observable<Histogram>, Observable<Histogram>>() {
@Override
public Observable<Histogram> call(Observable<Histogram> window) {
return window.reduce(distributionAggregator);
}
};
private static final Func1<Histogram, CachedValuesHistogram> cacheHistogramValues = new Func1<Histogram, CachedValuesHistogram>() {
@Override
public CachedValuesHistogram call(Histogram histogram) {
return CachedValuesHistogram.backedBy(histogram);
}
};
private static final Func1<Observable<CachedValuesHistogram>, Observable<List<CachedValuesHistogram>>> convertToList =
new Func1<Observable<CachedValuesHistogram>, Observable<List<CachedValuesHistogram>>>() {
@Override
public Observable<List<CachedValuesHistogram>> call(Observable<CachedValuesHistogram> windowOf2) {
return windowOf2.toList();
}
};
protected RollingDistributionStream(final HystrixEventStream<Event> stream, final int numBuckets, final int bucketSizeInMs,
final Func2<Histogram, Event, Histogram> addValuesToBucket) {
final List<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>();
for (int i = 0; i < numBuckets; i++) {
emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram());
}
final Func1<Observable<Event>, Observable<Histogram>> reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>() {
@Override
public Observable<Histogram> call(Observable<Event> bucket) {
return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);
}
};
rollingDistributionStream = stream
.observe()
.window(bucketSizeInMs, TimeUnit.MILLISECONDS)
.flatMap(reduceBucketToSingleDistribution)
.startWith(emptyDistributionsToStart)
.window(numBuckets, 1)
.flatMap(reduceWindowToSingleDistribution)
.map(cacheHistogramValues)
.share()
.onBackpressureDrop();
}
public Observable<CachedValuesHistogram> observe() {
return rollingDistributionStream;
}
public int getLatestMean() {
CachedValuesHistogram latest = getLatest();
if (latest != null) {
return latest.getMean();
} else {
return 0;
}
}
public int getLatestPercentile(double percentile) {
CachedValuesHistogram latest = getLatest();
if (latest != null) {
return latest.getValueAtPercentile(percentile);
} else {
return 0;
}
}
public void startCachingStreamValuesIfUnstarted() {
if (rollingDistributionSubscription.get() == null) {
Subscription candidateSubscription = observe().subscribe(rollingDistribution);
if (rollingDistributionSubscription.compareAndSet(null, candidateSubscription)) {
} else {
candidateSubscription.unsubscribe();
}
}
}
CachedValuesHistogram getLatest() {
startCachingStreamValuesIfUnstarted();
if (rollingDistribution.hasValue()) {
return rollingDistribution.getValue();
} else {
return null;
}
}
public void unsubscribe() {
Subscription s = rollingDistributionSubscription.get();
if (s != null) {
s.unsubscribe();
rollingDistributionSubscription.compareAndSet(s, null);
}
}
}