package com.netflix.hystrix.metric;
import com.netflix.hystrix.HystrixCommandKey;
import rx.Observable;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class HystrixCommandStartStream implements HystrixEventStream<HystrixCommandExecutionStarted> {
private final HystrixCommandKey commandKey;
private final Subject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted> writeOnlySubject;
private final Observable<HystrixCommandExecutionStarted> readOnlyStream;
private static final ConcurrentMap<String, HystrixCommandStartStream> streams = new ConcurrentHashMap<String, HystrixCommandStartStream>();
public static HystrixCommandStartStream getInstance(HystrixCommandKey commandKey) {
HystrixCommandStartStream initialStream = streams.get(commandKey.name());
if (initialStream != null) {
return initialStream;
} else {
synchronized (HystrixCommandStartStream.class) {
HystrixCommandStartStream existingStream = streams.get(commandKey.name());
if (existingStream == null) {
HystrixCommandStartStream newStream = new HystrixCommandStartStream(commandKey);
streams.putIfAbsent(commandKey.name(), newStream);
return newStream;
} else {
return existingStream;
}
}
}
}
HystrixCommandStartStream(final HystrixCommandKey commandKey) {
this.commandKey = commandKey;
this.writeOnlySubject = new SerializedSubject<HystrixCommandExecutionStarted, HystrixCommandExecutionStarted>(PublishSubject.<HystrixCommandExecutionStarted>create());
this.readOnlyStream = writeOnlySubject.share();
}
public static void reset() {
streams.clear();
}
public void write(HystrixCommandExecutionStarted event) {
writeOnlySubject.onNext(event);
}
@Override
public Observable<HystrixCommandExecutionStarted> observe() {
return readOnlyStream;
}
@Override
public String toString() {
return "HystrixCommandStartStream(" + commandKey.name() + ")";
}
}