package com.datastax.oss.driver.internal.core.cql;
import com.datastax.oss.driver.api.core.cql.PrepareRequest;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.session.Request;
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.session.DefaultSession;
import com.datastax.oss.driver.internal.core.session.RequestProcessor;
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
import com.datastax.oss.driver.shaded.guava.common.cache.Cache;
import com.datastax.oss.driver.shaded.guava.common.cache.CacheBuilder;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import net.jcip.annotations.ThreadSafe;
@ThreadSafe
public class CqlPrepareAsyncProcessor
implements RequestProcessor<PrepareRequest, CompletionStage<PreparedStatement>> {
protected final Cache<PrepareRequest, CompletableFuture<PreparedStatement>> cache;
public CqlPrepareAsyncProcessor() {
this(CacheBuilder.newBuilder().weakValues().build());
}
protected CqlPrepareAsyncProcessor(
Cache<PrepareRequest, CompletableFuture<PreparedStatement>> cache) {
this.cache = cache;
}
@Override
public boolean canProcess(Request request, GenericType<?> resultType) {
return request instanceof PrepareRequest && resultType.equals(PrepareRequest.ASYNC);
}
@Override
public CompletionStage<PreparedStatement> process(
PrepareRequest request,
DefaultSession session,
InternalDriverContext context,
String sessionLogPrefix) {
try {
CompletableFuture<PreparedStatement> result = cache.getIfPresent(request);
if (result == null) {
CompletableFuture<PreparedStatement> mine = new CompletableFuture<>();
result = cache.get(request, () -> mine);
if (result == mine) {
new CqlPrepareHandler(request, session, context, sessionLogPrefix)
.handle()
.whenComplete(
(preparedStatement, error) -> {
if (error != null) {
mine.completeExceptionally(error);
cache.invalidate(request);
} else {
mine.complete(preparedStatement);
}
});
}
}
return result;
} catch (ExecutionException e) {
return CompletableFutures.failedFuture(e.getCause());
}
}
@Override
public CompletionStage<PreparedStatement> newFailure(RuntimeException error) {
return CompletableFutures.failedFuture(error);
}
public Cache<PrepareRequest, CompletableFuture<PreparedStatement>> getCache() {
return cache;
}
}