package com.datastax.oss.driver.internal.core.util.concurrent;
import com.datastax.oss.driver.api.core.connection.ReconnectionPolicy.ReconnectionSchedule;
import com.datastax.oss.driver.internal.core.util.Loggers;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ScheduledFuture;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import net.jcip.annotations.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@NotThreadSafe
public class Reconnection {
private static final Logger LOG = LoggerFactory.getLogger(Reconnection.class);
private enum State {
STOPPED,
SCHEDULED,
ATTEMPT_IN_PROGRESS,
STOP_AFTER_CURRENT,
;
}
private final String logPrefix;
private final EventExecutor executor;
private final Supplier<ReconnectionSchedule> scheduleSupplier;
private final Callable<CompletionStage<Boolean>> reconnectionTask;
private final Runnable onStart;
private final Runnable onStop;
private State state = State.STOPPED;
private ReconnectionSchedule reconnectionSchedule;
private ScheduledFuture<CompletionStage<Boolean>> nextAttempt;
public Reconnection(
String logPrefix,
EventExecutor executor,
Supplier<ReconnectionSchedule> scheduleSupplier,
Callable<CompletionStage<Boolean>> reconnectionTask,
Runnable onStart,
Runnable onStop) {
this.logPrefix = logPrefix;
this.executor = executor;
this.scheduleSupplier = scheduleSupplier;
this.reconnectionTask = reconnectionTask;
this.onStart = onStart;
this.onStop = onStop;
}
public Reconnection(
String logPrefix,
EventExecutor executor,
Supplier<ReconnectionSchedule> scheduleSupplier,
Callable<CompletionStage<Boolean>> reconnectionTask) {
this(logPrefix, executor, scheduleSupplier, reconnectionTask, () -> {}, () -> {});
}
public boolean isRunning() {
assert executor.inEventLoop();
return state != State.STOPPED;
}
public void start() {
start(null);
}
public void start(ReconnectionSchedule customSchedule) {
assert executor.inEventLoop();
switch (state) {
case SCHEDULED:
case ATTEMPT_IN_PROGRESS:
break;
case STOP_AFTER_CURRENT:
state = State.ATTEMPT_IN_PROGRESS;
break;
case STOPPED:
reconnectionSchedule = (customSchedule == null) ? scheduleSupplier.get() : customSchedule;
onStart.run();
scheduleNextAttempt();
break;
}
}
public void reconnectNow(boolean forceIfStopped) {
assert executor.inEventLoop();
if (state == State.ATTEMPT_IN_PROGRESS || state == State.STOP_AFTER_CURRENT) {
LOG.debug(
"[{}] reconnectNow and current attempt was still running, letting it complete",
logPrefix);
if (state == State.STOP_AFTER_CURRENT) {
state = State.ATTEMPT_IN_PROGRESS;
}
} else if (state == State.STOPPED && !forceIfStopped) {
LOG.debug("[{}] reconnectNow(false) while stopped, nothing to do", logPrefix);
} else {
assert state == State.SCHEDULED || (state == State.STOPPED && forceIfStopped);
LOG.debug("[{}] Forcing next attempt now", logPrefix);
if (nextAttempt != null) {
nextAttempt.cancel(true);
}
try {
onNextAttemptStarted(reconnectionTask.call());
} catch (Exception e) {
Loggers.warnWithException(
LOG, "[{}] Uncaught error while starting reconnection attempt", logPrefix, e);
scheduleNextAttempt();
}
}
}
public void stop() {
assert executor.inEventLoop();
switch (state) {
case STOPPED:
case STOP_AFTER_CURRENT:
break;
case ATTEMPT_IN_PROGRESS:
state = State.STOP_AFTER_CURRENT;
break;
case SCHEDULED:
reallyStop();
break;
}
}
private void reallyStop() {
LOG.debug("[{}] Stopping reconnection", logPrefix);
state = State.STOPPED;
if (nextAttempt != null) {
nextAttempt.cancel(true);
nextAttempt = null;
}
onStop.run();
reconnectionSchedule = null;
}
private void scheduleNextAttempt() {
assert executor.inEventLoop();
state = State.SCHEDULED;
if (reconnectionSchedule == null) {
reconnectionSchedule = scheduleSupplier.get();
}
Duration nextInterval = reconnectionSchedule.nextDelay();
LOG.debug("[{}] Scheduling next reconnection in {}", logPrefix, nextInterval);
nextAttempt = executor.schedule(reconnectionTask, nextInterval.toNanos(), TimeUnit.NANOSECONDS);
nextAttempt.addListener(
(Future<CompletionStage<Boolean>> f) -> {
if (f.isSuccess()) {
onNextAttemptStarted(f.getNow());
} else if (!f.isCancelled()) {
Loggers.warnWithException(
LOG,
"[{}] Uncaught error while starting reconnection attempt",
logPrefix,
f.cause());
scheduleNextAttempt();
}
});
}
private void onNextAttemptStarted(CompletionStage<Boolean> futureOutcome) {
assert executor.inEventLoop();
state = State.ATTEMPT_IN_PROGRESS;
futureOutcome
.whenCompleteAsync(this::onNextAttemptCompleted, executor)
.exceptionally(UncaughtExceptions::log);
}
private void onNextAttemptCompleted(Boolean success, Throwable error) {
assert executor.inEventLoop();
if (success) {
LOG.debug("[{}] Reconnection successful", logPrefix);
reallyStop();
} else {
if (error != null && !(error instanceof CancellationException)) {
Loggers.warnWithException(
LOG, "[{}] Uncaught error while starting reconnection attempt", logPrefix, error);
}
if (state == State.STOP_AFTER_CURRENT) {
reallyStop();
} else {
assert state == State.ATTEMPT_IN_PROGRESS;
scheduleNextAttempt();
}
}
}
}