package org.springframework.boot.actuate.cassandra;
import java.util.Collection;
import java.util.Optional;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.NodeState;
import reactor.core.publisher.Mono;
import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.ReactiveHealthIndicator;
import org.springframework.boot.actuate.health.Status;
import org.springframework.util.Assert;
public class CassandraDriverReactiveHealthIndicator extends AbstractReactiveHealthIndicator {
private final CqlSession session;
public CassandraDriverReactiveHealthIndicator(CqlSession session) {
super("Cassandra health check failed");
Assert.notNull(session, "session must not be null");
this.session = session;
}
@Override
protected Mono<Health> doHealthCheck(Health.Builder builder) {
return Mono.fromSupplier(() -> {
Collection<Node> nodes = this.session.getMetadata().getNodes().values();
Optional<Node> nodeUp = nodes.stream().filter((node) -> node.getState() == NodeState.UP).findAny();
builder.status(nodeUp.isPresent() ? Status.UP : Status.DOWN);
nodeUp.map(Node::getCassandraVersion).ifPresent((version) -> builder.withDetail("version", version));
return builder.build();
});
}
}