/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.service;

import java.net.InetAddress;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import com.google.common.collect.Iterables;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.utils.concurrent.SimpleCondition;

public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackWithFailure<T>
{
    protected static final Logger logger = LoggerFactory.getLogger( AbstractWriteResponseHandler.class );

    private final SimpleCondition condition = new SimpleCondition();
    protected final Keyspace keyspace;
    protected final Collection<InetAddress> naturalEndpoints;
    public final ConsistencyLevel consistencyLevel;
    protected final Runnable callback;
    protected final Collection<InetAddress> pendingEndpoints;
    protected final WriteType writeType;
    private static final AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater
        = AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, "failures");
    private volatile int failures = 0;
    private final Map<InetAddress, RequestFailureReason> failureReasonByEndpoint;
    private final long queryStartNanoTime;
    private volatile boolean supportsBackPressure = true;

    
Params:
  • callback – A callback to be called when the write is successful.
  • queryStartNanoTime –
/** * @param callback A callback to be called when the write is successful. * @param queryStartNanoTime */
protected AbstractWriteResponseHandler(Keyspace keyspace, Collection<InetAddress> naturalEndpoints, Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistencyLevel, Runnable callback, WriteType writeType, long queryStartNanoTime) { this.keyspace = keyspace; this.pendingEndpoints = pendingEndpoints; this.consistencyLevel = consistencyLevel; this.naturalEndpoints = naturalEndpoints; this.callback = callback; this.writeType = writeType; this.failureReasonByEndpoint = new ConcurrentHashMap<>(); this.queryStartNanoTime = queryStartNanoTime; } public void get() throws WriteTimeoutException, WriteFailureException { long timeout = currentTimeout(); boolean success; try { success = condition.await(timeout, TimeUnit.NANOSECONDS); } catch (InterruptedException ex) { throw new AssertionError(ex); } if (!success) { int blockedFor = totalBlockFor(); int acks = ackCount(); // It's pretty unlikely, but we can race between exiting await above and here, so // that we could now have enough acks. In that case, we "lie" on the acks count to // avoid sending confusing info to the user (see CASSANDRA-6491). if (acks >= blockedFor) acks = blockedFor - 1; throw new WriteTimeoutException(writeType, consistencyLevel, acks, blockedFor); } if (totalBlockFor() + failures > totalEndpoints()) { throw new WriteFailureException(consistencyLevel, ackCount(), totalBlockFor(), writeType, failureReasonByEndpoint); } } public final long currentTimeout() { long requestTimeout = writeType == WriteType.COUNTER ? DatabaseDescriptor.getCounterWriteRpcTimeout() : DatabaseDescriptor.getWriteRpcTimeout(); return TimeUnit.MILLISECONDS.toNanos(requestTimeout) - (System.nanoTime() - queryStartNanoTime); }
Returns:the minimum number of endpoints that must reply.
/** * @return the minimum number of endpoints that must reply. */
protected int totalBlockFor() { // During bootstrap, we have to include the pending endpoints or we may fail the consistency level // guarantees (see #833) return consistencyLevel.blockFor(keyspace) + pendingEndpoints.size(); }
Returns:the total number of endpoints the request has been sent to.
/** * @return the total number of endpoints the request has been sent to. */
protected int totalEndpoints() { return naturalEndpoints.size() + pendingEndpoints.size(); }
Returns:true if the message counts towards the totalBlockFor() threshold
/** * @return true if the message counts towards the totalBlockFor() threshold */
protected boolean waitingFor(InetAddress from) { return true; }
Returns:number of responses received
/** * @return number of responses received */
protected abstract int ackCount();
null message means "response from local write"
/** null message means "response from local write" */
public abstract void response(MessageIn<T> msg); public void assureSufficientLiveNodes() throws UnavailableException { consistencyLevel.assureSufficientLiveNodes(keyspace, Iterables.filter(Iterables.concat(naturalEndpoints, pendingEndpoints), isAlive)); } protected void signal() { condition.signalAll(); if (callback != null) callback.run(); } @Override public void onFailure(InetAddress from, RequestFailureReason failureReason) { logger.trace("Got failure from {}", from); int n = waitingFor(from) ? failuresUpdater.incrementAndGet(this) : failures; failureReasonByEndpoint.put(from, failureReason); if (totalBlockFor() + n > totalEndpoints()) signal(); } @Override public boolean supportsBackPressure() { return supportsBackPressure; } public void setSupportsBackPressure(boolean supportsBackPressure) { this.supportsBackPressure = supportsBackPressure; } }