/*
 * Copyright DataStax, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.datastax.oss.driver.internal.core.session.throttling;

import com.datastax.oss.driver.api.core.RequestThrottlingException;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.session.throttling.RequestThrottler;
import com.datastax.oss.driver.api.core.session.throttling.Throttled;
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.locks.ReentrantLock;
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

A request throttler that limits the number of concurrent requests.

To activate this throttler, modify the advanced.throttler section in the driver configuration, for example:

datastax-java-driver {
  advanced.throttler {
    class = ConcurrencyLimitingRequestThrottler
    max-concurrent-requests = 10000
    max-queue-size = 10000
  }
}
See reference.conf (in the manual or core driver JAR) for more details.
/** * A request throttler that limits the number of concurrent requests. * * <p>To activate this throttler, modify the {@code advanced.throttler} section in the driver * configuration, for example: * * <pre> * datastax-java-driver { * advanced.throttler { * class = ConcurrencyLimitingRequestThrottler * max-concurrent-requests = 10000 * max-queue-size = 10000 * } * } * </pre> * * See {@code reference.conf} (in the manual or core driver JAR) for more details. */
@ThreadSafe public class ConcurrencyLimitingRequestThrottler implements RequestThrottler { private static final Logger LOG = LoggerFactory.getLogger(ConcurrencyLimitingRequestThrottler.class); private final String logPrefix; private final int maxConcurrentRequests; private final int maxQueueSize; private final ReentrantLock lock = new ReentrantLock(); @GuardedBy("lock") private int concurrentRequests; @GuardedBy("lock") private Deque<Throttled> queue = new ArrayDeque<>(); @GuardedBy("lock") private boolean closed; public ConcurrencyLimitingRequestThrottler(DriverContext context) { this.logPrefix = context.getSessionName(); DriverExecutionProfile config = context.getConfig().getDefaultProfile(); this.maxConcurrentRequests = config.getInt(DefaultDriverOption.REQUEST_THROTTLER_MAX_CONCURRENT_REQUESTS); this.maxQueueSize = config.getInt(DefaultDriverOption.REQUEST_THROTTLER_MAX_QUEUE_SIZE); LOG.debug( "[{}] Initializing with maxConcurrentRequests = {}, maxQueueSize = {}", logPrefix, maxConcurrentRequests, maxQueueSize); } @Override public void register(@NonNull Throttled request) { lock.lock(); try { if (closed) { LOG.trace("[{}] Rejecting request after shutdown", logPrefix); fail(request, "The session is shutting down"); } else if (queue.isEmpty() && concurrentRequests < maxConcurrentRequests) { // We have capacity for one more concurrent request LOG.trace("[{}] Starting newly registered request", logPrefix); concurrentRequests += 1; request.onThrottleReady(false); } else if (queue.size() < maxQueueSize) { LOG.trace("[{}] Enqueuing request", logPrefix); queue.add(request); } else { LOG.trace("[{}] Rejecting request because of full queue", logPrefix); fail( request, String.format( "The session has reached its maximum capacity " + "(concurrent requests: %d, queue size: %d)", maxConcurrentRequests, maxQueueSize)); } } finally { lock.unlock(); } } @Override public void signalSuccess(@NonNull Throttled request) { lock.lock(); try { onRequestDone(); } finally { lock.unlock(); } } @Override public void signalError(@NonNull Throttled request, @NonNull Throwable error) { signalSuccess(request); // not treated differently } @Override public void signalTimeout(@NonNull Throttled request) { lock.lock(); try { if (!closed) { if (queue.remove(request)) { // The request timed out before it was active LOG.trace("[{}] Removing timed out request from the queue", logPrefix); } else { onRequestDone(); } } } finally { lock.unlock(); } } @SuppressWarnings("GuardedBy") // this method is only called with the lock held private void onRequestDone() { assert lock.isHeldByCurrentThread(); if (!closed) { if (queue.isEmpty()) { concurrentRequests -= 1; } else { LOG.trace("[{}] Starting dequeued request", logPrefix); queue.poll().onThrottleReady(true); // don't touch concurrentRequests since we finished one but started another } } } @Override public void close() { lock.lock(); try { closed = true; LOG.debug("[{}] Rejecting {} queued requests after shutdown", logPrefix, queue.size()); for (Throttled request : queue) { fail(request, "The session is shutting down"); } } finally { lock.unlock(); } } public int getQueueSize() { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } @VisibleForTesting int getConcurrentRequests() { lock.lock(); try { return concurrentRequests; } finally { lock.unlock(); } } @VisibleForTesting Deque<Throttled> getQueue() { lock.lock(); try { return queue; } finally { lock.unlock(); } } private static void fail(Throttled request, String message) { request.onThrottleFailure(new RequestThrottlingException(message)); } }