package com.datastax.oss.driver.internal.core.metadata;
import com.datastax.oss.driver.api.core.AsyncAutoCloseable;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.metadata.EndPoint;
import com.datastax.oss.driver.api.core.metadata.Metadata;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.internal.core.config.ConfigChangeEvent;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.control.ControlConnection;
import com.datastax.oss.driver.internal.core.metadata.schema.parsing.SchemaParserFactory;
import com.datastax.oss.driver.internal.core.metadata.schema.queries.SchemaQueriesFactory;
import com.datastax.oss.driver.internal.core.metadata.schema.queries.SchemaRows;
import com.datastax.oss.driver.internal.core.metadata.schema.refresh.SchemaRefresh;
import com.datastax.oss.driver.internal.core.util.Loggers;
import com.datastax.oss.driver.internal.core.util.NanoTime;
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
import com.datastax.oss.driver.internal.core.util.concurrent.Debouncer;
import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule;
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
import edu.umd.cs.findbugs.annotations.NonNull;
import io.netty.util.concurrent.EventExecutor;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ThreadSafe
public class MetadataManager implements AsyncAutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(MetadataManager.class);
static final EndPoint DEFAULT_CONTACT_POINT =
new DefaultEndPoint(new InetSocketAddress("127.0.0.1", 9042));
private final InternalDriverContext context;
private final String logPrefix;
private final EventExecutor adminExecutor;
private final DriverExecutionProfile config;
private final SingleThreaded singleThreaded;
private final ControlConnection controlConnection;
private volatile DefaultMetadata metadata;
private volatile boolean schemaEnabledInConfig;
private volatile List<String> refreshedKeyspaces;
private volatile Boolean schemaEnabledProgrammatically;
private volatile boolean tokenMapEnabled;
private volatile Set<DefaultNode> contactPoints;
private volatile boolean wasImplicitContactPoint;
public MetadataManager(InternalDriverContext context) {
this(context, DefaultMetadata.EMPTY);
}
protected MetadataManager(InternalDriverContext context, DefaultMetadata initialMetadata) {
this.context = context;
this.metadata = initialMetadata;
this.logPrefix = context.getSessionName();
this.adminExecutor = context.getNettyOptions().adminEventExecutorGroup().next();
this.config = context.getConfig().getDefaultProfile();
this.singleThreaded = new SingleThreaded(context, config);
this.controlConnection = context.getControlConnection();
this.schemaEnabledInConfig = config.getBoolean(DefaultDriverOption.METADATA_SCHEMA_ENABLED);
this.refreshedKeyspaces =
config.getStringList(
DefaultDriverOption.METADATA_SCHEMA_REFRESHED_KEYSPACES, Collections.emptyList());
this.tokenMapEnabled = config.getBoolean(DefaultDriverOption.METADATA_TOKEN_MAP_ENABLED);
context.getEventBus().register(ConfigChangeEvent.class, this::onConfigChanged);
}
private void onConfigChanged(@SuppressWarnings("unused") ConfigChangeEvent event) {
boolean schemaEnabledBefore = isSchemaEnabled();
boolean tokenMapEnabledBefore = tokenMapEnabled;
List<String> keyspacesBefore = this.refreshedKeyspaces;
this.schemaEnabledInConfig = config.getBoolean(DefaultDriverOption.METADATA_SCHEMA_ENABLED);
this.refreshedKeyspaces =
config.getStringList(
DefaultDriverOption.METADATA_SCHEMA_REFRESHED_KEYSPACES, Collections.emptyList());
this.tokenMapEnabled = config.getBoolean(DefaultDriverOption.METADATA_TOKEN_MAP_ENABLED);
if ((!schemaEnabledBefore
|| !keyspacesBefore.equals(refreshedKeyspaces)
|| (!tokenMapEnabledBefore && tokenMapEnabled))
&& isSchemaEnabled()) {
refreshSchema(null, false, true)
.whenComplete(
(metadata, error) -> {
if (error != null) {
Loggers.warnWithException(
LOG,
"[{}] Unexpected error while refreshing schema after it was re-enabled "
+ "in the configuration, keeping previous version",
logPrefix,
error);
}
});
}
}
public Metadata getMetadata() {
return this.metadata;
}
public void addContactPoints(Set<EndPoint> providedContactPoints) {
ImmutableSet.Builder<DefaultNode> contactPointsBuilder = ImmutableSet.builder();
if (providedContactPoints == null || providedContactPoints.isEmpty()) {
LOG.info(
"[{}] No contact points provided, defaulting to {}", logPrefix, DEFAULT_CONTACT_POINT);
this.wasImplicitContactPoint = true;
contactPointsBuilder.add(new DefaultNode(DEFAULT_CONTACT_POINT, context));
} else {
for (EndPoint endPoint : providedContactPoints) {
contactPointsBuilder.add(new DefaultNode(endPoint, context));
}
}
this.contactPoints = contactPointsBuilder.build();
LOG.debug("[{}] Adding initial contact points {}", logPrefix, contactPoints);
}
public Set<DefaultNode> getContactPoints() {
return contactPoints;
}
public boolean wasImplicitContactPoint() {
return wasImplicitContactPoint;
}
public CompletionStage<Void> refreshNodes() {
return context
.getTopologyMonitor()
.refreshNodeList()
.thenApplyAsync(singleThreaded::refreshNodes, adminExecutor);
}
public CompletionStage<Void> refreshNode(Node node) {
return context
.getTopologyMonitor()
.refreshNode(node)
.thenApplyAsync(
maybeInfo -> {
if (maybeInfo.isPresent()) {
boolean tokensChanged =
NodesRefresh.copyInfos(maybeInfo.get(), (DefaultNode) node, null, context);
if (tokensChanged) {
apply(new TokensChangedRefresh());
}
} else {
LOG.debug(
"[{}] Topology monitor did not return any info for the refresh of {}, skipping",
logPrefix,
node);
}
return null;
},
adminExecutor);
}
public void addNode(InetSocketAddress broadcastRpcAddress) {
context
.getTopologyMonitor()
.getNewNodeInfo(broadcastRpcAddress)
.whenCompleteAsync(
(info, error) -> {
if (error != null) {
LOG.debug(
"[{}] Error refreshing node info for {}, "
+ "this will be retried on the next full refresh",
logPrefix,
broadcastRpcAddress,
error);
} else {
singleThreaded.addNode(broadcastRpcAddress, info.orElse(null));
}
},
adminExecutor);
}
public void removeNode(InetSocketAddress broadcastRpcAddress) {
RunOrSchedule.on(adminExecutor, () -> singleThreaded.removeNode(broadcastRpcAddress));
}
public CompletionStage<Metadata> refreshSchema(
String keyspace, boolean evenIfDisabled, boolean flushNow) {
CompletableFuture<Metadata> future = new CompletableFuture<>();
RunOrSchedule.on(
adminExecutor,
() -> singleThreaded.refreshSchema(keyspace, evenIfDisabled, flushNow, future));
return future;
}
public boolean isSchemaEnabled() {
return (schemaEnabledProgrammatically != null)
? schemaEnabledProgrammatically
: schemaEnabledInConfig;
}
public CompletionStage<Metadata> setSchemaEnabled(Boolean newValue) {
boolean wasEnabledBefore = isSchemaEnabled();
schemaEnabledProgrammatically = newValue;
if (!wasEnabledBefore && isSchemaEnabled()) {
return refreshSchema(null, false, true);
} else {
return CompletableFuture.completedFuture(metadata);
}
}
public CompletionStage<Void> firstSchemaRefreshFuture() {
return singleThreaded.firstSchemaRefreshFuture;
}
@NonNull
@Override
public CompletionStage<Void> closeFuture() {
return singleThreaded.closeFuture;
}
@NonNull
@Override
public CompletionStage<Void> closeAsync() {
RunOrSchedule.on(adminExecutor, singleThreaded::close);
return singleThreaded.closeFuture;
}
@NonNull
@Override
public CompletionStage<Void> forceCloseAsync() {
return this.closeAsync();
}
private class SingleThreaded {
private final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
private boolean closeWasCalled;
private final CompletableFuture<Void> firstSchemaRefreshFuture = new CompletableFuture<>();
private final Debouncer<CompletableFuture<Metadata>, CompletableFuture<Metadata>>
schemaRefreshDebouncer;
private final SchemaQueriesFactory schemaQueriesFactory;
private final SchemaParserFactory schemaParserFactory;
private CompletableFuture<Metadata> currentSchemaRefresh;
private CompletableFuture<Metadata> queuedSchemaRefresh;
private boolean didFirstNodeListRefresh;
private SingleThreaded(InternalDriverContext context, DriverExecutionProfile config) {
this.schemaRefreshDebouncer =
new Debouncer<>(
adminExecutor,
this::coalesceSchemaRequests,
this::startSchemaRequest,
config.getDuration(DefaultDriverOption.METADATA_SCHEMA_WINDOW),
config.getInt(DefaultDriverOption.METADATA_SCHEMA_MAX_EVENTS));
this.schemaQueriesFactory = context.getSchemaQueriesFactory();
this.schemaParserFactory = context.getSchemaParserFactory();
}
private Void refreshNodes(Iterable<NodeInfo> nodeInfos) {
MetadataRefresh refresh =
didFirstNodeListRefresh
? new FullNodeListRefresh(nodeInfos)
: new InitialNodeListRefresh(nodeInfos, contactPoints);
didFirstNodeListRefresh = true;
return apply(refresh);
}
private void addNode(InetSocketAddress address, NodeInfo info) {
try {
if (info != null) {
if (!address.equals(info.getBroadcastRpcAddress().orElse(null))) {
LOG.warn(
"[{}] Received a request to add a node for broadcast RPC address {}, "
+ "but the provided info reports {}, ignoring it",
logPrefix,
address,
info.getBroadcastAddress());
} else {
apply(new AddNodeRefresh(info));
}
} else {
LOG.debug(
"[{}] Ignoring node addition for {} because the "
+ "topology monitor didn't return any information",
logPrefix,
address);
}
} catch (Throwable t) {
LOG.warn("[" + logPrefix + "] Unexpected exception while handling added node", logPrefix);
}
}
private void removeNode(InetSocketAddress broadcastRpcAddress) {
apply(new RemoveNodeRefresh(broadcastRpcAddress));
}
private void refreshSchema(
String keyspace,
boolean evenIfDisabled,
boolean flushNow,
CompletableFuture<Metadata> future) {
if (!didFirstNodeListRefresh) {
future.complete(metadata);
return;
}
boolean isRefreshedKeyspace =
keyspace == null || refreshedKeyspaces.isEmpty() || refreshedKeyspaces.contains(keyspace);
if (isRefreshedKeyspace && (evenIfDisabled || isSchemaEnabled())) {
acceptSchemaRequest(future, flushNow);
} else {
future.complete(metadata);
singleThreaded.firstSchemaRefreshFuture.complete(null);
}
}
private void acceptSchemaRequest(CompletableFuture<Metadata> future, boolean flushNow) {
assert adminExecutor.inEventLoop();
if (closeWasCalled) {
future.complete(metadata);
} else {
schemaRefreshDebouncer.receive(future);
if (flushNow) {
schemaRefreshDebouncer.flushNow();
}
}
}
private CompletableFuture<Metadata> coalesceSchemaRequests(
List<CompletableFuture<Metadata>> futures) {
assert adminExecutor.inEventLoop();
assert !futures.isEmpty();
CompletableFuture<Metadata> result = null;
for (CompletableFuture<Metadata> future : futures) {
if (result == null) {
result = future;
} else {
CompletableFutures.completeFrom(result, future);
}
}
return result;
}
private void startSchemaRequest(CompletableFuture<Metadata> future) {
assert adminExecutor.inEventLoop();
if (closeWasCalled) {
future.complete(metadata);
return;
}
if (currentSchemaRefresh == null) {
currentSchemaRefresh = future;
LOG.debug("[{}] Starting schema refresh", logPrefix);
initControlConnectionForSchema()
.thenCompose(v -> context.getTopologyMonitor().checkSchemaAgreement())
.thenCompose(b -> schemaQueriesFactory.newInstance(future).execute())
.thenApplyAsync(this::parseAndApplySchemaRows, adminExecutor)
.whenComplete(
(v, error) -> {
if (error != null) {
currentSchemaRefresh.completeExceptionally(error);
}
singleThreaded.firstSchemaRefreshFuture.complete(null);
});
} else if (queuedSchemaRefresh == null) {
queuedSchemaRefresh = future;
} else {
CompletableFutures.completeFrom(queuedSchemaRefresh, future);
}
}
private CompletionStage<Void> initControlConnectionForSchema() {
if (firstSchemaRefreshFuture.isDone()) {
return firstSchemaRefreshFuture;
} else {
return controlConnection.init(false, true, false);
}
}
private Void parseAndApplySchemaRows(SchemaRows schemaRows) {
assert adminExecutor.inEventLoop();
assert schemaRows.refreshFuture() == currentSchemaRefresh;
try {
SchemaRefresh schemaRefresh = schemaParserFactory.newInstance(schemaRows).parse();
long start = System.nanoTime();
apply(schemaRefresh);
currentSchemaRefresh.complete(metadata);
LOG.debug(
"[{}] Applying schema refresh took {}", logPrefix, NanoTime.formatTimeSince(start));
} catch (Throwable t) {
currentSchemaRefresh.completeExceptionally(t);
}
currentSchemaRefresh = null;
if (queuedSchemaRefresh != null) {
CompletableFuture<Metadata> tmp = this.queuedSchemaRefresh;
this.queuedSchemaRefresh = null;
startSchemaRequest(tmp);
}
return null;
}
private void close() {
if (closeWasCalled) {
return;
}
closeWasCalled = true;
LOG.debug("[{}] Closing", logPrefix);
if (queuedSchemaRefresh != null) {
queuedSchemaRefresh.completeExceptionally(new IllegalStateException("Cluster is closed"));
}
closeFuture.complete(null);
}
}
@VisibleForTesting
Void apply(MetadataRefresh refresh) {
assert adminExecutor.inEventLoop();
MetadataRefresh.Result result = refresh.compute(metadata, tokenMapEnabled, context);
metadata = result.newMetadata;
boolean isFirstSchemaRefresh =
refresh instanceof SchemaRefresh && !singleThreaded.firstSchemaRefreshFuture.isDone();
if (!singleThreaded.closeWasCalled && !isFirstSchemaRefresh) {
for (Object event : result.events) {
context.getEventBus().fire(event);
}
}
return null;
}
}