package com.datastax.oss.driver.internal.core.metadata;
import com.datastax.oss.driver.api.core.Version;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.metadata.EndPoint;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.internal.core.adminrequest.AdminRequestHandler;
import com.datastax.oss.driver.internal.core.adminrequest.AdminResult;
import com.datastax.oss.driver.internal.core.adminrequest.AdminRow;
import com.datastax.oss.driver.internal.core.adminrequest.UnexpectedResponseException;
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.control.ControlConnection;
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
import com.datastax.oss.protocol.internal.ProtocolConstants;
import com.datastax.oss.protocol.internal.response.Error;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ThreadSafe
public class DefaultTopologyMonitor implements TopologyMonitor {
private static final Logger LOG = LoggerFactory.getLogger(DefaultTopologyMonitor.class);
private static final int INFINITE_PAGE_SIZE = -1;
private final String logPrefix;
private final InternalDriverContext context;
private final ControlConnection controlConnection;
private final Duration timeout;
private final boolean reconnectOnInit;
private final CompletableFuture<Void> closeFuture;
@VisibleForTesting volatile boolean isSchemaV2;
@VisibleForTesting volatile int port = -1;
public DefaultTopologyMonitor(InternalDriverContext context) {
this.logPrefix = context.getSessionName();
this.context = context;
this.controlConnection = context.getControlConnection();
DriverExecutionProfile config = context.getConfig().getDefaultProfile();
this.timeout = config.getDuration(DefaultDriverOption.CONTROL_CONNECTION_TIMEOUT);
this.reconnectOnInit = config.getBoolean(DefaultDriverOption.RECONNECT_ON_INIT);
this.closeFuture = new CompletableFuture<>();
this.isSchemaV2 = true;
}
@Override
public CompletionStage<Void> init() {
if (closeFuture.isDone()) {
return CompletableFutures.failedFuture(new IllegalStateException("closed"));
}
return controlConnection.init(true, reconnectOnInit, true);
}
@Override
public CompletionStage<Void> initFuture() {
return controlConnection.initFuture();
}
@Override
public CompletionStage<Optional<NodeInfo>> refreshNode(Node node) {
if (closeFuture.isDone()) {
return CompletableFutures.failedFuture(new IllegalStateException("closed"));
}
LOG.debug("[{}] Refreshing info for {}", logPrefix, node);
DriverChannel channel = controlConnection.channel();
EndPoint localEndPoint = channel.getEndPoint();
if (node.getEndPoint().equals(channel.getEndPoint())) {
LOG.debug("[{}] Ignoring refresh of control node", logPrefix);
return CompletableFuture.completedFuture(Optional.empty());
} else if (node.getBroadcastAddress().isPresent()) {
CompletionStage<AdminResult> query;
if (isSchemaV2) {
query =
query(
channel,
"SELECT * FROM "
+ getPeerTableName()
+ " WHERE peer = :address and peer_port = :port",
ImmutableMap.of(
"address",
node.getBroadcastAddress().get().getAddress(),
"port",
node.getBroadcastAddress().get().getPort()));
} else {
query =
query(
channel,
"SELECT * FROM " + getPeerTableName() + " WHERE peer = :address",
ImmutableMap.of("address", node.getBroadcastAddress().get().getAddress()));
}
return query.thenApply(result -> firstPeerRowAsNodeInfo(result, localEndPoint));
} else {
return query(channel, "SELECT * FROM " + getPeerTableName())
.thenApply(result -> findInPeers(result, node.getHostId(), localEndPoint));
}
}
@Override
public CompletionStage<Optional<NodeInfo>> getNewNodeInfo(InetSocketAddress broadcastRpcAddress) {
if (closeFuture.isDone()) {
return CompletableFutures.failedFuture(new IllegalStateException("closed"));
}
LOG.debug("[{}] Fetching info for new node {}", logPrefix, broadcastRpcAddress);
DriverChannel channel = controlConnection.channel();
EndPoint localEndPoint = channel.getEndPoint();
return query(channel, "SELECT * FROM " + getPeerTableName())
.thenApply(result -> findInPeers(result, broadcastRpcAddress, localEndPoint));
}
@Override
public CompletionStage<Iterable<NodeInfo>> refreshNodeList() {
if (closeFuture.isDone()) {
return CompletableFutures.failedFuture(new IllegalStateException("closed"));
}
LOG.debug("[{}] Refreshing node list", logPrefix);
DriverChannel channel = controlConnection.channel();
EndPoint localEndPoint = channel.getEndPoint();
savePort(channel);
CompletionStage<AdminResult> localQuery = query(channel, "SELECT * FROM system.local");
CompletionStage<AdminResult> peersV2Query = query(channel, "SELECT * FROM system.peers_v2");
CompletableFuture<AdminResult> peersQuery = new CompletableFuture<>();
peersV2Query.whenComplete(
(r, t) -> {
if (t != null) {
if (t instanceof UnexpectedResponseException
&& ((UnexpectedResponseException) t).message instanceof Error) {
Error error = (Error) ((UnexpectedResponseException) t).message;
if (error.code == ProtocolConstants.ErrorCode.INVALID
|| (error.code == ProtocolConstants.ErrorCode.SERVER_ERROR
&& error.message.contains("Unknown keyspace/cf pair (system.peers_v2)"))) {
this.isSchemaV2 = false;
CompletableFutures.completeFrom(
query(channel, "SELECT * FROM system.peers"), peersQuery);
return;
}
}
peersQuery.completeExceptionally(t);
} else {
peersQuery.complete(r);
}
});
return localQuery.thenCombine(
peersQuery,
(controlNodeResult, peersResult) -> {
List<NodeInfo> nodeInfos = new ArrayList<>();
AdminRow localRow = controlNodeResult.iterator().next();
InetSocketAddress localBroadcastRpcAddress =
getBroadcastRpcAddress(localRow, localEndPoint);
nodeInfos.add(nodeInfoBuilder(localRow, localBroadcastRpcAddress, localEndPoint).build());
for (AdminRow peerRow : peersResult) {
if (isPeerValid(peerRow)) {
InetSocketAddress peerBroadcastRpcAddress =
getBroadcastRpcAddress(peerRow, localEndPoint);
if (peerBroadcastRpcAddress != null) {
NodeInfo nodeInfo =
nodeInfoBuilder(peerRow, peerBroadcastRpcAddress, localEndPoint).build();
nodeInfos.add(nodeInfo);
}
}
}
return nodeInfos;
});
}
@Override
public CompletionStage<Boolean> checkSchemaAgreement() {
if (closeFuture.isDone()) {
return CompletableFuture.completedFuture(true);
}
DriverChannel channel = controlConnection.channel();
return new SchemaAgreementChecker(channel, context, logPrefix).run();
}
@NonNull
@Override
public CompletionStage<Void> closeFuture() {
return closeFuture;
}
@NonNull
@Override
public CompletionStage<Void> closeAsync() {
closeFuture.complete(null);
return closeFuture;
}
@NonNull
@Override
public CompletionStage<Void> forceCloseAsync() {
return closeAsync();
}
@VisibleForTesting
protected CompletionStage<AdminResult> query(
DriverChannel channel, String queryString, Map<String, Object> parameters) {
return AdminRequestHandler.query(
channel, queryString, parameters, timeout, INFINITE_PAGE_SIZE, logPrefix)
.start();
}
private CompletionStage<AdminResult> query(DriverChannel channel, String queryString) {
return query(channel, queryString, Collections.emptyMap());
}
private String getPeerTableName() {
return isSchemaV2 ? "system.peers_v2" : "system.peers";
}
private Optional<NodeInfo> firstPeerRowAsNodeInfo(AdminResult result, EndPoint localEndPoint) {
Iterator<AdminRow> iterator = result.iterator();
if (iterator.hasNext()) {
AdminRow row = iterator.next();
if (isPeerValid(row)) {
return Optional.ofNullable(getBroadcastRpcAddress(row, localEndPoint))
.map(
broadcastRpcAddress ->
nodeInfoBuilder(row, broadcastRpcAddress, localEndPoint).build());
}
}
return Optional.empty();
}
@NonNull
protected DefaultNodeInfo.Builder nodeInfoBuilder(
@NonNull AdminRow row,
@Nullable InetSocketAddress broadcastRpcAddress,
@NonNull EndPoint localEndPoint) {
EndPoint endPoint = buildNodeEndPoint(row, broadcastRpcAddress, localEndPoint);
InetAddress broadcastInetAddress = row.getInetAddress("broadcast_address");
if (broadcastInetAddress == null) {
broadcastInetAddress = row.getInetAddress("peer");
}
Integer broadcastPort = 0;
if (row.contains("broadcast_port")) {
broadcastPort = row.getInteger("broadcast_port");
} else if (row.contains("peer_port")) {
broadcastPort = row.getInteger("peer_port");
}
InetSocketAddress broadcastAddress = null;
if (broadcastInetAddress != null && broadcastPort != null) {
broadcastAddress = new InetSocketAddress(broadcastInetAddress, broadcastPort);
}
InetAddress listenInetAddress = row.getInetAddress("listen_address");
Integer listenPort = 0;
if (row.contains("listen_port")) {
listenPort = row.getInteger("listen_port");
}
InetSocketAddress listenAddress = null;
if (listenInetAddress != null && listenPort != null) {
listenAddress = new InetSocketAddress(listenInetAddress, listenPort);
}
DefaultNodeInfo.Builder rv =
DefaultNodeInfo.builder()
.withEndPoint(endPoint)
.withBroadcastRpcAddress(broadcastRpcAddress)
.withBroadcastAddress(broadcastAddress)
.withListenAddress(listenAddress)
.withDatacenter(row.getString("data_center"))
.withRack(row.getString("rack"))
.withCassandraVersion(row.getString("release_version"))
.withTokens(row.getSetOfString("tokens"))
.withPartitioner(row.getString("partitioner"))
.withHostId(Objects.requireNonNull(row.getUuid("host_id")))
.withSchemaVersion(row.getUuid("schema_version"));
return row.contains("dse_version")
? rv.withExtra(NodeProperties.DSE_VERSION, Version.parse(row.getString("dse_version")))
: rv;
}
@NonNull
protected EndPoint buildNodeEndPoint(
@NonNull AdminRow row,
@Nullable InetSocketAddress broadcastRpcAddress,
@NonNull EndPoint localEndPoint) {
boolean peer = row.contains("peer");
if (peer) {
Objects.requireNonNull(
broadcastRpcAddress, "broadcastRpcAddress cannot be null for a peer row");
return new DefaultEndPoint(context.getAddressTranslator().translate(broadcastRpcAddress));
} else {
return localEndPoint;
}
}
private Optional<NodeInfo> findInPeers(
AdminResult result, InetSocketAddress broadcastRpcAddressToFind, EndPoint localEndPoint) {
for (AdminRow row : result) {
InetSocketAddress broadcastRpcAddress = getBroadcastRpcAddress(row, localEndPoint);
if (broadcastRpcAddress != null
&& broadcastRpcAddress.equals(broadcastRpcAddressToFind)
&& isPeerValid(row)) {
return Optional.of(nodeInfoBuilder(row, broadcastRpcAddress, localEndPoint).build());
}
}
LOG.debug("[{}] Could not find any peer row matching {}", logPrefix, broadcastRpcAddressToFind);
return Optional.empty();
}
private Optional<NodeInfo> findInPeers(
AdminResult result, UUID hostIdToFind, EndPoint localEndPoint) {
for (AdminRow row : result) {
UUID hostId = row.getUuid("host_id");
if (hostId != null && hostId.equals(hostIdToFind) && isPeerValid(row)) {
return Optional.ofNullable(getBroadcastRpcAddress(row, localEndPoint))
.map(
broadcastRpcAddress ->
nodeInfoBuilder(row, broadcastRpcAddress, localEndPoint).build());
}
}
LOG.debug("[{}] Could not find any peer row matching {}", logPrefix, hostIdToFind);
return Optional.empty();
}
private void savePort(DriverChannel channel) {
if (port < 0) {
SocketAddress address = channel.getEndPoint().resolve();
if (address instanceof InetSocketAddress) {
port = ((InetSocketAddress) address).getPort();
}
}
}
@Nullable
protected InetSocketAddress getBroadcastRpcAddress(
@NonNull AdminRow row, @NonNull EndPoint localEndPoint) {
InetAddress broadcastRpcInetAddress = row.getInetAddress("rpc_address");
if (broadcastRpcInetAddress == null) {
broadcastRpcInetAddress = row.getInetAddress("native_address");
if (broadcastRpcInetAddress == null) {
return null;
}
}
Integer broadcastRpcPort = row.getInteger("rpc_port");
if (broadcastRpcPort == null || broadcastRpcPort == 0) {
broadcastRpcPort = row.getInteger("native_port");
if (broadcastRpcPort == null || broadcastRpcPort == 0) {
broadcastRpcPort = port == -1 ? 0 : port;
}
}
InetSocketAddress broadcastRpcAddress =
new InetSocketAddress(broadcastRpcInetAddress, broadcastRpcPort);
if (row.contains("peer") && broadcastRpcAddress.equals(localEndPoint.resolve())) {
LOG.warn(
"[{}] Control node {} has an entry for itself in {}: this entry will be ignored. "
+ "This is likely due to a misconfiguration; please verify your rpc_address "
+ "configuration in cassandra.yaml on all nodes in your cluster.",
logPrefix,
localEndPoint,
getPeerTableName());
return null;
}
return broadcastRpcAddress;
}
protected boolean isPeerValid(AdminRow peerRow) {
boolean hasPeersRpcAddress = peerRow.getInetAddress("rpc_address") != null;
boolean hasPeersV2RpcAddress =
peerRow.getInetAddress("native_address") != null
&& peerRow.getInteger("native_port") != null;
boolean hasRpcAddress = hasPeersV2RpcAddress || hasPeersRpcAddress;
boolean hasHostId = peerRow.getUuid("host_id") != null;
boolean valid = hasRpcAddress && hasHostId;
if (!valid) {
LOG.warn(
"[{}] Found invalid row in {} for peer: {}. "
+ "This is likely a gossip or snitch issue, this node will be ignored.",
logPrefix,
getPeerTableName(),
peerRow.getInetAddress("peer"));
}
return valid;
}
}