package com.netflix.hystrix.metric.consumer;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixCommandMetrics;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixEventType;
import com.netflix.hystrix.metric.HystrixCommandCompletion;
import com.netflix.hystrix.metric.HystrixCommandCompletionStream;
import rx.functions.Func2;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class HealthCountsStream extends BucketedRollingCounterStream<HystrixCommandCompletion, long[], HystrixCommandMetrics.HealthCounts> {
private static final ConcurrentMap<String, HealthCountsStream> streams = new ConcurrentHashMap<String, HealthCountsStream>();
private static final int NUM_EVENT_TYPES = HystrixEventType.values().length;
private static final Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts> healthCheckAccumulator = new Func2<HystrixCommandMetrics.HealthCounts, long[], HystrixCommandMetrics.HealthCounts>() {
@Override
public HystrixCommandMetrics.HealthCounts call(HystrixCommandMetrics.HealthCounts healthCounts, long[] bucketEventCounts) {
return healthCounts.plus(bucketEventCounts);
}
};
public static HealthCountsStream getInstance(HystrixCommandKey commandKey, HystrixCommandProperties properties) {
final int healthCountBucketSizeInMs = properties.metricsHealthSnapshotIntervalInMilliseconds().get();
if (healthCountBucketSizeInMs == 0) {
throw new RuntimeException("You have set the bucket size to 0ms. Please set a positive number, so that the metric stream can be properly consumed");
}
final int numHealthCountBuckets = properties.metricsRollingStatisticalWindowInMilliseconds().get() / healthCountBucketSizeInMs;
return getInstance(commandKey, numHealthCountBuckets, healthCountBucketSizeInMs);
}
public static HealthCountsStream getInstance(HystrixCommandKey commandKey, int numBuckets, int bucketSizeInMs) {
HealthCountsStream initialStream = streams.get(commandKey.name());
if (initialStream != null) {
return initialStream;
} else {
final HealthCountsStream healthStream;
synchronized (HealthCountsStream.class) {
HealthCountsStream existingStream = streams.get(commandKey.name());
if (existingStream == null) {
HealthCountsStream newStream = new HealthCountsStream(commandKey, numBuckets, bucketSizeInMs,
HystrixCommandMetrics.appendEventToBucket);
streams.putIfAbsent(commandKey.name(), newStream);
healthStream = newStream;
} else {
healthStream = existingStream;
}
}
healthStream.startCachingStreamValuesIfUnstarted();
return healthStream;
}
}
public static void reset() {
streams.clear();
}
public static void removeByKey(HystrixCommandKey key) {
streams.remove(key.name());
}
private HealthCountsStream(final HystrixCommandKey commandKey, final int numBuckets, final int bucketSizeInMs,
Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion) {
super(HystrixCommandCompletionStream.getInstance(commandKey), numBuckets, bucketSizeInMs, reduceCommandCompletion, healthCheckAccumulator);
}
@Override
long[] getEmptyBucketSummary() {
return new long[NUM_EVENT_TYPES];
}
@Override
HystrixCommandMetrics.HealthCounts getEmptyOutputValue() {
return HystrixCommandMetrics.HealthCounts.empty();
}
}