/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.locator;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.util.*;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.WriteType;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.RingPosition;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.service.AbstractWriteResponseHandler;
import org.apache.cassandra.service.DatacenterSyncWriteResponseHandler;
import org.apache.cassandra.service.DatacenterWriteResponseHandler;
import org.apache.cassandra.service.WriteResponseHandler;
import org.apache.cassandra.utils.FBUtilities;
import org.cliffc.high_scale_lib.NonBlockingHashMap;

A abstract parent for all replication strategies.
/** * A abstract parent for all replication strategies. */
public abstract class AbstractReplicationStrategy { private static final Logger logger = LoggerFactory.getLogger(AbstractReplicationStrategy.class); @VisibleForTesting final String keyspaceName; private Keyspace keyspace; public final Map<String, String> configOptions; private final TokenMetadata tokenMetadata; // track when the token range changes, signaling we need to invalidate our endpoint cache private volatile long lastInvalidatedVersion = 0; public IEndpointSnitch snitch; protected AbstractReplicationStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions) { assert keyspaceName != null; assert snitch != null; assert tokenMetadata != null; this.tokenMetadata = tokenMetadata; this.snitch = snitch; this.configOptions = configOptions == null ? Collections.<String, String>emptyMap() : configOptions; this.keyspaceName = keyspaceName; // lazy-initialize keyspace itself since we don't create them until after the replication strategies } private final Map<Token, ArrayList<InetAddress>> cachedEndpoints = new NonBlockingHashMap<Token, ArrayList<InetAddress>>(); public ArrayList<InetAddress> getCachedEndpoints(Token t) { long lastVersion = tokenMetadata.getRingVersion(); if (lastVersion > lastInvalidatedVersion) { synchronized (this) { if (lastVersion > lastInvalidatedVersion) { logger.trace("clearing cached endpoints"); cachedEndpoints.clear(); lastInvalidatedVersion = lastVersion; } } } return cachedEndpoints.get(t); }
get the (possibly cached) endpoints that should store the given Token. Note that while the endpoints are conceptually a Set (no duplicates will be included), we return a List to avoid an extra allocation when sorting by proximity later
Params:
  • searchPosition – the position the natural endpoints are requested for
Returns:a copy of the natural endpoints for the given token
/** * get the (possibly cached) endpoints that should store the given Token. * Note that while the endpoints are conceptually a Set (no duplicates will be included), * we return a List to avoid an extra allocation when sorting by proximity later * @param searchPosition the position the natural endpoints are requested for * @return a copy of the natural endpoints for the given token */
public ArrayList<InetAddress> getNaturalEndpoints(RingPosition searchPosition) { Token searchToken = searchPosition.getToken(); Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken); ArrayList<InetAddress> endpoints = getCachedEndpoints(keyToken); if (endpoints == null) { TokenMetadata tm = tokenMetadata.cachedOnlyTokenMap(); // if our cache got invalidated, it's possible there is a new token to account for too keyToken = TokenMetadata.firstToken(tm.sortedTokens(), searchToken); endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tm)); cachedEndpoints.put(keyToken, endpoints); } return new ArrayList<InetAddress>(endpoints); }
calculate the natural endpoints for the given token
Params:
  • searchToken – the token the natural endpoints are requested for
See Also:
  • getNaturalEndpoints(RingPosition)
Returns:a copy of the natural endpoints for the given token
/** * calculate the natural endpoints for the given token * * @see #getNaturalEndpoints(org.apache.cassandra.dht.RingPosition) * * @param searchToken the token the natural endpoints are requested for * @return a copy of the natural endpoints for the given token */
public abstract List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata); public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(Collection<InetAddress> naturalEndpoints, Collection<InetAddress> pendingEndpoints, ConsistencyLevel consistency_level, Runnable callback, WriteType writeType, long queryStartNanoTime) { if (consistency_level.isDatacenterLocal()) { // block for in this context will be localnodes block. return new DatacenterWriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime); } else if (consistency_level == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy)) { return new DatacenterSyncWriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime); } return new WriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime); } private Keyspace getKeyspace() { if (keyspace == null) keyspace = Keyspace.open(keyspaceName); return keyspace; }
calculate the RF based on strategy_options. When overwriting, ensure that this get() is FAST, as this is called often.
Returns:the replication factor
/** * calculate the RF based on strategy_options. When overwriting, ensure that this get() * is FAST, as this is called often. * * @return the replication factor */
public abstract int getReplicationFactor(); /* * NOTE: this is pretty inefficient. also the inverse (getRangeAddresses) below. * this is fine as long as we don't use this on any critical path. * (fixing this would probably require merging tokenmetadata into replicationstrategy, * so we could cache/invalidate cleanly.) */ public Multimap<InetAddress, Range<Token>> getAddressRanges(TokenMetadata metadata) { Multimap<InetAddress, Range<Token>> map = HashMultimap.create(); for (Token token : metadata.sortedTokens()) { Range<Token> range = metadata.getPrimaryRangeFor(token); for (InetAddress ep : calculateNaturalEndpoints(token, metadata)) { map.put(ep, range); } } return map; } public Multimap<Range<Token>, InetAddress> getRangeAddresses(TokenMetadata metadata) { Multimap<Range<Token>, InetAddress> map = HashMultimap.create(); for (Token token : metadata.sortedTokens()) { Range<Token> range = metadata.getPrimaryRangeFor(token); for (InetAddress ep : calculateNaturalEndpoints(token, metadata)) { map.put(range, ep); } } return map; } public Multimap<InetAddress, Range<Token>> getAddressRanges() { return getAddressRanges(tokenMetadata.cloneOnlyTokenMap()); } public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddress pendingAddress) { return getPendingAddressRanges(metadata, Arrays.asList(pendingToken), pendingAddress); } public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata metadata, Collection<Token> pendingTokens, InetAddress pendingAddress) { TokenMetadata temp = metadata.cloneOnlyTokenMap(); temp.updateNormalTokens(pendingTokens, pendingAddress); return getAddressRanges(temp).get(pendingAddress); } public abstract void validateOptions() throws ConfigurationException; /* * The options recognized by the strategy. * The empty collection means that no options are accepted, but null means * that any option is accepted. */ public Collection<String> recognizedOptions() { // We default to null for backward compatibility sake return null; } private static AbstractReplicationStrategy createInternal(String keyspaceName, Class<? extends AbstractReplicationStrategy> strategyClass, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> strategyOptions) throws ConfigurationException { AbstractReplicationStrategy strategy; Class [] parameterTypes = new Class[] {String.class, TokenMetadata.class, IEndpointSnitch.class, Map.class}; try { Constructor<? extends AbstractReplicationStrategy> constructor = strategyClass.getConstructor(parameterTypes); strategy = constructor.newInstance(keyspaceName, tokenMetadata, snitch, strategyOptions); } catch (InvocationTargetException e) { Throwable targetException = e.getTargetException(); throw new ConfigurationException(targetException.getMessage(), targetException); } catch (Exception e) { throw new ConfigurationException("Error constructing replication strategy class", e); } return strategy; } public static AbstractReplicationStrategy createReplicationStrategy(String keyspaceName, Class<? extends AbstractReplicationStrategy> strategyClass, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> strategyOptions) { AbstractReplicationStrategy strategy = createInternal(keyspaceName, strategyClass, tokenMetadata, snitch, strategyOptions); // Because we used to not properly validate unrecognized options, we only log a warning if we find one. try { strategy.validateExpectedOptions(); } catch (ConfigurationException e) { logger.warn("Ignoring {}", e.getMessage()); } strategy.validateOptions(); return strategy; } public static void validateReplicationStrategy(String keyspaceName, Class<? extends AbstractReplicationStrategy> strategyClass, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> strategyOptions) throws ConfigurationException { AbstractReplicationStrategy strategy = createInternal(keyspaceName, strategyClass, tokenMetadata, snitch, strategyOptions); strategy.validateExpectedOptions(); strategy.validateOptions(); } public static Class<AbstractReplicationStrategy> getClass(String cls) throws ConfigurationException { String className = cls.contains(".") ? cls : "org.apache.cassandra.locator." + cls; Class<AbstractReplicationStrategy> strategyClass = FBUtilities.classForName(className, "replication strategy"); if (!AbstractReplicationStrategy.class.isAssignableFrom(strategyClass)) { throw new ConfigurationException(String.format("Specified replication strategy class (%s) is not derived from AbstractReplicationStrategy", className)); } return strategyClass; } public boolean hasSameSettings(AbstractReplicationStrategy other) { return getClass().equals(other.getClass()) && getReplicationFactor() == other.getReplicationFactor(); } protected void validateReplicationFactor(String rf) throws ConfigurationException { try { if (Integer.parseInt(rf) < 0) { throw new ConfigurationException("Replication factor must be non-negative; found " + rf); } } catch (NumberFormatException e2) { throw new ConfigurationException("Replication factor must be numeric; found " + rf); } } private void validateExpectedOptions() throws ConfigurationException { Collection expectedOptions = recognizedOptions(); if (expectedOptions == null) return; for (String key : configOptions.keySet()) { if (!expectedOptions.contains(key)) throw new ConfigurationException(String.format("Unrecognized strategy option {%s} passed to %s for keyspace %s", key, getClass().getSimpleName(), keyspaceName)); } } }