package io.reactivex.internal.operators.flowable;
import org.reactivestreams.Subscriber;
import io.reactivex.Flowable;
import io.reactivex.exceptions.*;
import io.reactivex.functions.Function;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.subscribers.SinglePostCompleteSubscriber;
public final class FlowableOnErrorReturn<T> extends AbstractFlowableWithUpstream<T, T> {
final Function<? super Throwable, ? extends T> valueSupplier;
public FlowableOnErrorReturn(Flowable<T> source, Function<? super Throwable, ? extends T> valueSupplier) {
super(source);
this.valueSupplier = valueSupplier;
}
@Override
protected void subscribeActual(Subscriber<? super T> s) {
source.subscribe(new OnErrorReturnSubscriber<T>(s, valueSupplier));
}
static final class OnErrorReturnSubscriber<T>
extends SinglePostCompleteSubscriber<T, T> {
private static final long serialVersionUID = -3740826063558713822L;
final Function<? super Throwable, ? extends T> valueSupplier;
OnErrorReturnSubscriber(Subscriber<? super T> actual, Function<? super Throwable, ? extends T> valueSupplier) {
super(actual);
this.valueSupplier = valueSupplier;
}
@Override
public void onNext(T t) {
produced++;
downstream.onNext(t);
}
@Override
public void onError(Throwable t) {
T v;
try {
v = ObjectHelper.requireNonNull(valueSupplier.apply(t), "The valueSupplier returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(new CompositeException(t, ex));
return;
}
complete(v);
}
@Override
public void onComplete() {
downstream.onComplete();
}
}
}