/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.db;

import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import javax.management.*;
import javax.management.openmbean.*;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.*;
import com.google.common.base.Throwables;
import com.google.common.collect.*;
import com.google.common.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.clearspring.analytics.stream.Counter;
import org.apache.cassandra.cache.*;
import org.apache.cassandra.concurrent.*;
import org.apache.cassandra.config.*;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.compaction.*;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.view.TableViews;
import org.apache.cassandra.db.lifecycle.*;
import org.apache.cassandra.db.partitions.CachedPartition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.CellPath;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.StartupException;
import org.apache.cassandra.index.SecondaryIndexManager;
import org.apache.cassandra.index.internal.CassandraIndex;
import org.apache.cassandra.index.transactions.UpdateTransaction;
import org.apache.cassandra.io.FSError;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.Component;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.SSTableMultiWriter;
import org.apache.cassandra.io.sstable.format.*;
import org.apache.cassandra.io.sstable.format.big.BigFormat;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.metrics.TableMetrics.Sampler;
import org.apache.cassandra.schema.*;
import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.TopKSampler.SamplerResult;
import org.apache.cassandra.utils.concurrent.OpOrder;
import org.apache.cassandra.utils.concurrent.Refs;
import org.apache.cassandra.utils.memory.MemtableAllocator;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;

import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
import static org.apache.cassandra.utils.ExecutorUtils.shutdown;
import static org.apache.cassandra.utils.Throwables.maybeFail;

public class ColumnFamilyStore implements ColumnFamilyStoreMBean
{
    // The directories which will be searched for sstables on cfs instantiation.
    private static volatile Directories.DataDirectory[] initialDirectories = Directories.dataDirectories;

    
A hook to add additional directories to initialDirectories. Any additional directories should be added prior to ColumnFamilyStore instantiation on startup Since the directories used by a given table are determined by the compaction strategy, it's possible for sstables to be written to directories specified outside of cassandra.yaml. By adding additional directories to initialDirectories, sstables in these extra locations are made discoverable on sstable instantiation.
/** * A hook to add additional directories to initialDirectories. * Any additional directories should be added prior to ColumnFamilyStore instantiation on startup * * Since the directories used by a given table are determined by the compaction strategy, * it's possible for sstables to be written to directories specified outside of cassandra.yaml. * By adding additional directories to initialDirectories, sstables in these extra locations are * made discoverable on sstable instantiation. */
public static synchronized void addInitialDirectories(Directories.DataDirectory[] newDirectories) { assert newDirectories != null; Set<Directories.DataDirectory> existing = Sets.newHashSet(initialDirectories); List<Directories.DataDirectory> replacementList = Lists.newArrayList(initialDirectories); for (Directories.DataDirectory directory: newDirectories) { if (!existing.contains(directory)) { replacementList.add(directory); } } Directories.DataDirectory[] replacementArray = new Directories.DataDirectory[replacementList.size()]; replacementList.toArray(replacementArray); initialDirectories = replacementArray; } public static Directories.DataDirectory[] getInitialDirectories() { Directories.DataDirectory[] src = initialDirectories; return Arrays.copyOf(src, src.length); } private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class); /* We keep a pool of threads for each data directory, size of each pool is memtable_flush_writers. When flushing we start a Flush runnable in the flushExecutor. Flush calculates how to split the memtable ranges over the existing data directories and creates a FlushRunnable for each of the directories. The FlushRunnables are executed in the perDiskflushExecutors and the Flush will block until all FlushRunnables are finished. By having flushExecutor size the same size as each of the perDiskflushExecutors we make sure we can have that many flushes going at the same time. */ private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(), StageManager.KEEPALIVE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("MemtableFlushWriter"), "internal"); private static final ExecutorService [] perDiskflushExecutors = new ExecutorService[DatabaseDescriptor.getAllDataFileLocations().length]; static { for (int i = 0; i < DatabaseDescriptor.getAllDataFileLocations().length; i++) { perDiskflushExecutors[i] = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(), StageManager.KEEPALIVE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("PerDiskMemtableFlushWriter_"+i), "internal"); } } // post-flush executor is single threaded to provide guarantee that any flush Future on a CF will never return until prior flushes have completed private static final ExecutorService postFlushExecutor = new JMXEnabledThreadPoolExecutor(1, StageManager.KEEPALIVE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("MemtablePostFlush"), "internal"); private static final ExecutorService reclaimExecutor = new JMXEnabledThreadPoolExecutor(1, StageManager.KEEPALIVE, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("MemtableReclaimMemory"), "internal"); private static final String[] COUNTER_NAMES = new String[]{"raw", "count", "error", "string"}; private static final String[] COUNTER_DESCS = new String[] { "partition key in raw hex bytes", "value of this partition for given sampler", "value is within the error bounds plus or minus of this", "the partition key turned into a human readable format" }; private static final CompositeType COUNTER_COMPOSITE_TYPE; private static final TabularType COUNTER_TYPE; private static final String[] SAMPLER_NAMES = new String[]{"cardinality", "partitions"}; private static final String[] SAMPLER_DESCS = new String[] { "cardinality of partitions", "list of counter results" }; private static final String SAMPLING_RESULTS_NAME = "SAMPLING_RESULTS"; private static final CompositeType SAMPLING_RESULT; public static final String SNAPSHOT_TRUNCATE_PREFIX = "truncated"; public static final String SNAPSHOT_DROP_PREFIX = "dropped"; static { try { OpenType<?>[] counterTypes = new OpenType[] { SimpleType.STRING, SimpleType.LONG, SimpleType.LONG, SimpleType.STRING }; COUNTER_COMPOSITE_TYPE = new CompositeType(SAMPLING_RESULTS_NAME, SAMPLING_RESULTS_NAME, COUNTER_NAMES, COUNTER_DESCS, counterTypes); COUNTER_TYPE = new TabularType(SAMPLING_RESULTS_NAME, SAMPLING_RESULTS_NAME, COUNTER_COMPOSITE_TYPE, COUNTER_NAMES); OpenType<?>[] samplerTypes = new OpenType[] { SimpleType.LONG, COUNTER_TYPE }; SAMPLING_RESULT = new CompositeType(SAMPLING_RESULTS_NAME, SAMPLING_RESULTS_NAME, SAMPLER_NAMES, SAMPLER_DESCS, samplerTypes); } catch (OpenDataException e) { throw Throwables.propagate(e); } } public final Keyspace keyspace; public final String name; public final CFMetaData metadata; private final String mbeanName; @Deprecated private final String oldMBeanName; private volatile boolean valid = true;
Memtables and SSTables on disk for this column family. We synchronize on the Tracker to ensure isolation when we want to make sure that the memtable we're acting on doesn't change out from under us. I.e., flush syncronizes on it to make sure it can submit on both executors atomically, so anyone else who wants to make sure flush doesn't interfere should as well.
/** * Memtables and SSTables on disk for this column family. * * We synchronize on the Tracker to ensure isolation when we want to make sure * that the memtable we're acting on doesn't change out from under us. I.e., flush * syncronizes on it to make sure it can submit on both executors atomically, * so anyone else who wants to make sure flush doesn't interfere should as well. */
private final Tracker data; /* The read order, used to track accesses to off-heap memtable storage */ public final OpOrder readOrdering = new OpOrder(); /* This is used to generate the next index for a SSTable */ private final AtomicInteger fileIndexGenerator = new AtomicInteger(0); public final SecondaryIndexManager indexManager; public final TableViews viewManager; /* These are locally held copies to be changed from the config during runtime */ private volatile DefaultValue<Integer> minCompactionThreshold; private volatile DefaultValue<Integer> maxCompactionThreshold; private volatile DefaultValue<Double> crcCheckChance; private final CompactionStrategyManager compactionStrategyManager; private volatile Directories directories; public final TableMetrics metric; public volatile long sampleLatencyNanos; private final ScheduledFuture<?> latencyCalculator; private volatile boolean compactionSpaceCheck = true; @VisibleForTesting final DiskBoundaryManager diskBoundaryManager = new DiskBoundaryManager(); public static void shutdownFlushExecutor() throws InterruptedException { flushExecutor.shutdown(); flushExecutor.awaitTermination(60, TimeUnit.SECONDS); } public static void shutdownPostFlushExecutor() throws InterruptedException { postFlushExecutor.shutdown(); postFlushExecutor.awaitTermination(60, TimeUnit.SECONDS); } public static void shutdownExecutorsAndWait(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { List<ExecutorService> executors = new ArrayList<>(perDiskflushExecutors.length + 3); Collections.addAll(executors, reclaimExecutor, postFlushExecutor, flushExecutor); Collections.addAll(executors, perDiskflushExecutors); ExecutorUtils.shutdownAndWait(timeout, unit, executors); } public void reload() { // metadata object has been mutated directly. make all the members jibe with new settings. // only update these runtime-modifiable settings if they have not been modified. if (!minCompactionThreshold.isModified()) for (ColumnFamilyStore cfs : concatWithIndexes()) cfs.minCompactionThreshold = new DefaultValue(metadata.params.compaction.minCompactionThreshold()); if (!maxCompactionThreshold.isModified()) for (ColumnFamilyStore cfs : concatWithIndexes()) cfs.maxCompactionThreshold = new DefaultValue(metadata.params.compaction.maxCompactionThreshold()); if (!crcCheckChance.isModified()) for (ColumnFamilyStore cfs : concatWithIndexes()) cfs.crcCheckChance = new DefaultValue(metadata.params.crcCheckChance); compactionStrategyManager.maybeReload(metadata); directories = compactionStrategyManager.getDirectories(); scheduleFlush(); indexManager.reload(); // If the CF comparator has changed, we need to change the memtable, // because the old one still aliases the previous comparator. if (data.getView().getCurrentMemtable().initialComparator != metadata.comparator) switchMemtable(); } void scheduleFlush() { int period = metadata.params.memtableFlushPeriodInMs; if (period > 0) { logger.trace("scheduling flush in {} ms", period); WrappedRunnable runnable = new WrappedRunnable() { protected void runMayThrow() { synchronized (data) { Memtable current = data.getView().getCurrentMemtable(); // if we're not expired, we've been hit by a scheduled flush for an already flushed memtable, so ignore if (current.isExpired()) { if (current.isClean()) { // if we're still clean, instead of swapping just reschedule a flush for later scheduleFlush(); } else { // we'll be rescheduled by the constructor of the Memtable. forceFlush(); } } } } }; ScheduledExecutors.scheduledTasks.schedule(runnable, period, TimeUnit.MILLISECONDS); } } public static Runnable getBackgroundCompactionTaskSubmitter() { return new Runnable() { public void run() { for (Keyspace keyspace : Keyspace.all()) for (ColumnFamilyStore cfs : keyspace.getColumnFamilyStores()) CompactionManager.instance.submitBackground(cfs); } }; } public void setCompactionParametersJson(String options) { setCompactionParameters(FBUtilities.fromJsonMap(options)); } public String getCompactionParametersJson() { return FBUtilities.json(getCompactionParameters()); } public void setCompactionParameters(Map<String, String> options) { try { CompactionParams compactionParams = CompactionParams.fromMap(options); compactionParams.validate(); compactionStrategyManager.setNewLocalCompactionStrategy(compactionParams); } catch (Throwable t) { logger.error("Could not set new local compaction strategy", t); // dont propagate the ConfigurationException over jmx, user will only see a ClassNotFoundException throw new IllegalArgumentException("Could not set new local compaction strategy: "+t.getMessage()); } } public Map<String, String> getCompactionParameters() { return compactionStrategyManager.getCompactionParams().asMap(); } public Map<String,String> getCompressionParameters() { return metadata.params.compression.asMap(); } public void setCompressionParameters(Map<String,String> opts) { try { metadata.compression(CompressionParams.fromMap(opts)); metadata.params.compression.validate(); } catch (ConfigurationException e) { throw new IllegalArgumentException(e.getMessage()); } } @VisibleForTesting public ColumnFamilyStore(Keyspace keyspace, String columnFamilyName, int generation, CFMetaData metadata, Directories directories, boolean loadSSTables, boolean registerBookeeping, boolean offline) { assert directories != null; assert metadata != null : "null metadata for " + keyspace + ":" + columnFamilyName; this.keyspace = keyspace; this.metadata = metadata; name = columnFamilyName; minCompactionThreshold = new DefaultValue<>(metadata.params.compaction.minCompactionThreshold()); maxCompactionThreshold = new DefaultValue<>(metadata.params.compaction.maxCompactionThreshold()); crcCheckChance = new DefaultValue<>(metadata.params.crcCheckChance); indexManager = new SecondaryIndexManager(this); viewManager = keyspace.viewManager.forTable(metadata); metric = new TableMetrics(this); fileIndexGenerator.set(generation); sampleLatencyNanos = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getReadRpcTimeout() / 2); logger.info("Initializing {}.{}", keyspace.getName(), name); // Create Memtable only on online Memtable initialMemtable = null; if (DatabaseDescriptor.isDaemonInitialized()) initialMemtable = new Memtable(new AtomicReference<>(CommitLog.instance.getCurrentPosition()), this); data = new Tracker(initialMemtable, loadSSTables); // scan for sstables corresponding to this cf and load them if (data.loadsstables) { Directories.SSTableLister sstableFiles = directories.sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true); Collection<SSTableReader> sstables = SSTableReader.openAll(sstableFiles.list().entrySet(), metadata); data.addInitialSSTables(sstables); } /** * When creating a CFS offline we change the default logic needed by CASSANDRA-8671 * and link the passed directories to be picked up by the compaction strategy */ if (offline) this.directories = directories; else this.directories = new Directories(metadata, Directories.dataDirectories); // compaction strategy should be created after the CFS has been prepared compactionStrategyManager = new CompactionStrategyManager(this); // Since compaction can re-define data dir we need to reinit directories this.directories = compactionStrategyManager.getDirectories(); if (maxCompactionThreshold.value() <= 0 || minCompactionThreshold.value() <=0) { logger.warn("Disabling compaction strategy by setting compaction thresholds to 0 is deprecated, set the compaction option 'enabled' to 'false' instead."); this.compactionStrategyManager.disable(); } // create the private ColumnFamilyStores for the secondary column indexes for (IndexMetadata info : metadata.getIndexes()) indexManager.addIndex(info); if (registerBookeeping) { // register the mbean mbeanName = String.format("org.apache.cassandra.db:type=%s,keyspace=%s,table=%s", isIndex() ? "IndexTables" : "Tables", keyspace.getName(), name); oldMBeanName = String.format("org.apache.cassandra.db:type=%s,keyspace=%s,columnfamily=%s", isIndex() ? "IndexColumnFamilies" : "ColumnFamilies", keyspace.getName(), name); try { ObjectName[] objectNames = {new ObjectName(mbeanName), new ObjectName(oldMBeanName)}; for (ObjectName objectName : objectNames) { MBeanWrapper.instance.registerMBean(this, objectName); } } catch (Exception e) { throw new RuntimeException(e); } logger.trace("retryPolicy for {} is {}", name, this.metadata.params.speculativeRetry); latencyCalculator = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(new Runnable() { public void run() { SpeculativeRetryParam retryPolicy = ColumnFamilyStore.this.metadata.params.speculativeRetry; switch (retryPolicy.kind()) { case PERCENTILE: // get percentile in nanos sampleLatencyNanos = (long) (metric.coordinatorReadLatency.getSnapshot().getValue(retryPolicy.threshold())); break; case CUSTOM: sampleLatencyNanos = (long) retryPolicy.threshold(); break; default: sampleLatencyNanos = Long.MAX_VALUE; break; } } }, DatabaseDescriptor.getReadRpcTimeout(), DatabaseDescriptor.getReadRpcTimeout(), TimeUnit.MILLISECONDS); } else { latencyCalculator = ScheduledExecutors.optionalTasks.schedule(Runnables.doNothing(), 0, TimeUnit.NANOSECONDS); mbeanName = null; oldMBeanName= null; } } public Directories getDirectories() { return directories; } public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, int sstableLevel, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker) { MetadataCollector collector = new MetadataCollector(metadata.comparator).sstableLevel(sstableLevel); return createSSTableMultiWriter(descriptor, keyCount, repairedAt, collector, header, lifecycleNewTracker); } public SSTableMultiWriter createSSTableMultiWriter(Descriptor descriptor, long keyCount, long repairedAt, MetadataCollector metadataCollector, SerializationHeader header, LifecycleNewTracker lifecycleNewTracker) { return getCompactionStrategyManager().createSSTableMultiWriter(descriptor, keyCount, repairedAt, metadataCollector, header, indexManager.listIndexes(), lifecycleNewTracker); } public boolean supportsEarlyOpen() { return compactionStrategyManager.supportsEarlyOpen(); }
call when dropping or renaming a CF. Performs mbean housekeeping and invalidates CFS to other operations
/** call when dropping or renaming a CF. Performs mbean housekeeping and invalidates CFS to other operations */
public void invalidate() { invalidate(true); } public void invalidate(boolean expectMBean) { // disable and cancel in-progress compactions before invalidating valid = false; try { unregisterMBean(); } catch (Exception e) { if (expectMBean) { JVMStabilityInspector.inspectThrowable(e); // this shouldn't block anything. logger.warn("Failed unregistering mbean: {}", mbeanName, e); } } latencyCalculator.cancel(false); compactionStrategyManager.shutdown(); SystemKeyspace.removeTruncationRecord(metadata.cfId); data.dropSSTables(); LifecycleTransaction.waitForDeletions(); indexManager.invalidateAllIndexesBlocking(); invalidateCaches(); }
Removes every SSTable in the directory from the Tracker's view.
Params:
  • directory – the unreadable directory, possibly with SSTables in it, but not necessarily.
/** * Removes every SSTable in the directory from the Tracker's view. * @param directory the unreadable directory, possibly with SSTables in it, but not necessarily. */
void maybeRemoveUnreadableSSTables(File directory) { data.removeUnreadableSSTables(directory); } void unregisterMBean() throws MalformedObjectNameException { ObjectName[] objectNames = {new ObjectName(mbeanName), new ObjectName(oldMBeanName)}; for (ObjectName objectName : objectNames) { if (MBeanWrapper.instance.isRegistered(objectName)) MBeanWrapper.instance.unregisterMBean(objectName); } // unregister metrics metric.release(); } public static ColumnFamilyStore createColumnFamilyStore(Keyspace keyspace, CFMetaData metadata, boolean loadSSTables) { return createColumnFamilyStore(keyspace, metadata.cfName, metadata, loadSSTables); } public static synchronized ColumnFamilyStore createColumnFamilyStore(Keyspace keyspace, String columnFamily, CFMetaData metadata, boolean loadSSTables) { Directories directories = new Directories(metadata, initialDirectories); return createColumnFamilyStore(keyspace, columnFamily, metadata, directories, loadSSTables, true, false); }
This is only directly used by offline tools
/** This is only directly used by offline tools */
public static synchronized ColumnFamilyStore createColumnFamilyStore(Keyspace keyspace, String columnFamily, CFMetaData metadata, Directories directories, boolean loadSSTables, boolean registerBookkeeping, boolean offline) { // get the max generation number, to prevent generation conflicts Directories.SSTableLister lister = directories.sstableLister(Directories.OnTxnErr.IGNORE).includeBackups(true); List<Integer> generations = new ArrayList<Integer>(); for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet()) { Descriptor desc = entry.getKey(); generations.add(desc.generation); if (!desc.isCompatible()) throw new RuntimeException(String.format("Incompatible SSTable found. Current version %s is unable to read file: %s. Please run upgradesstables.", desc.getFormat().getLatestVersion(), desc)); } Collections.sort(generations); int value = (generations.size() > 0) ? (generations.get(generations.size() - 1)) : 0; return new ColumnFamilyStore(keyspace, columnFamily, value, metadata, directories, loadSSTables, registerBookkeeping, offline); }
Removes unnecessary files from the cf directory at startup: these include temp files, orphans, zero-length files and compacted sstables. Files that cannot be recognized will be ignored.
/** * Removes unnecessary files from the cf directory at startup: these include temp files, orphans, zero-length files * and compacted sstables. Files that cannot be recognized will be ignored. */
public static void scrubDataDirectories(CFMetaData metadata) throws StartupException { Directories directories = new Directories(metadata, initialDirectories); Set<File> cleanedDirectories = new HashSet<>(); // clear ephemeral snapshots that were not properly cleared last session (CASSANDRA-7357) clearEphemeralSnapshots(directories); directories.removeTemporaryDirectories(); logger.trace("Removing temporary or obsoleted files from unfinished operations for table {}", metadata.cfName); if (!LifecycleTransaction.removeUnfinishedLeftovers(metadata)) throw new StartupException(StartupException.ERR_WRONG_DISK_STATE, String.format("Cannot remove temporary or obsoleted files for %s.%s due to a problem with transaction " + "log files. Please check records with problems in the log messages above and fix them. " + "Refer to the 3.0 upgrading instructions in NEWS.txt " + "for a description of transaction log files.", metadata.ksName, metadata.cfName)); logger.trace("Further extra check for orphan sstable files for {}", metadata.cfName); for (Map.Entry<Descriptor,Set<Component>> sstableFiles : directories.sstableLister(Directories.OnTxnErr.IGNORE).list().entrySet()) { Descriptor desc = sstableFiles.getKey(); File directory = desc.directory; Set<Component> components = sstableFiles.getValue(); if (!cleanedDirectories.contains(directory)) { cleanedDirectories.add(directory); for (File tmpFile : desc.getTemporaryFiles()) tmpFile.delete(); } File dataFile = new File(desc.filenameFor(Component.DATA)); if (components.contains(Component.DATA) && dataFile.length() > 0) // everything appears to be in order... moving on. continue; // missing the DATA file! all components are orphaned logger.warn("Removing orphans for {}: {}", desc, components); for (Component component : components) { File file = new File(desc.filenameFor(component)); if (file.exists()) FileUtils.deleteWithConfirm(desc.filenameFor(component)); } } // cleanup incomplete saved caches Pattern tmpCacheFilePattern = Pattern.compile(metadata.ksName + "-" + metadata.cfName + "-(Key|Row)Cache.*\\.tmp$"); File dir = new File(DatabaseDescriptor.getSavedCachesLocation()); if (dir.exists()) { assert dir.isDirectory(); for (File file : dir.listFiles()) if (tmpCacheFilePattern.matcher(file.getName()).matches()) if (!file.delete()) logger.warn("could not delete {}", file.getAbsolutePath()); } // also clean out any index leftovers. for (IndexMetadata index : metadata.getIndexes()) if (!index.isCustom()) { CFMetaData indexMetadata = CassandraIndex.indexCfsMetadata(metadata, index); scrubDataDirectories(indexMetadata); } }
See #StorageService.loadNewSSTables(String, String) for more info
Params:
  • ksName – The keyspace name
  • cfName – The columnFamily name
/** * See #{@code StorageService.loadNewSSTables(String, String)} for more info * * @param ksName The keyspace name * @param cfName The columnFamily name */
public static void loadNewSSTables(String ksName, String cfName) { /** ks/cf existence checks will be done by open and getCFS methods for us */ Keyspace keyspace = Keyspace.open(ksName); keyspace.getColumnFamilyStore(cfName).loadNewSSTables(); }
#{@inheritDoc}
/** * #{@inheritDoc} */
public synchronized void loadNewSSTables() { logger.info("Loading new SSTables for {}/{}...", keyspace.getName(), name); Set<Descriptor> currentDescriptors = new HashSet<>(); for (SSTableReader sstable : getSSTables(SSTableSet.CANONICAL)) currentDescriptors.add(sstable.descriptor); Set<SSTableReader> newSSTables = new HashSet<>(); Directories.SSTableLister lister = getDirectories().sstableLister(Directories.OnTxnErr.IGNORE).skipTemporary(true); for (Map.Entry<Descriptor, Set<Component>> entry : lister.list().entrySet()) { Descriptor descriptor = entry.getKey(); if (currentDescriptors.contains(descriptor)) continue; // old (initialized) SSTable found, skipping if (!descriptor.isCompatible()) throw new RuntimeException(String.format("Can't open incompatible SSTable! Current version %s, found file: %s", descriptor.getFormat().getLatestVersion(), descriptor)); // force foreign sstables to level 0 try { if (new File(descriptor.filenameFor(Component.STATS)).exists()) descriptor.getMetadataSerializer().mutateLevel(descriptor, 0); } catch (IOException e) { FileUtils.handleCorruptSSTable(new CorruptSSTableException(e, entry.getKey().filenameFor(Component.STATS))); logger.error("Cannot read sstable {}; other IO error, skipping table", entry, e); continue; } // Increment the generation until we find a filename that doesn't exist. This is needed because the new // SSTables that are being loaded might already use these generation numbers. Descriptor newDescriptor; do { newDescriptor = new Descriptor(descriptor.version, descriptor.directory, descriptor.ksname, descriptor.cfname, fileIndexGenerator.incrementAndGet(), descriptor.formatType, descriptor.digestComponent); } while (new File(newDescriptor.filenameFor(Component.DATA)).exists()); logger.info("Renaming new SSTable {} to {}", descriptor, newDescriptor); SSTableWriter.rename(descriptor, newDescriptor, entry.getValue()); SSTableReader reader; try { reader = SSTableReader.open(newDescriptor, entry.getValue(), metadata); } catch (CorruptSSTableException ex) { FileUtils.handleCorruptSSTable(ex); logger.error("Corrupt sstable {}; skipping table", entry, ex); continue; } catch (FSError ex) { FileUtils.handleFSError(ex); logger.error("Cannot read sstable {}; file system error, skipping table", entry, ex); continue; } catch (IOException ex) { FileUtils.handleCorruptSSTable(new CorruptSSTableException(ex, entry.getKey().filenameFor(Component.DATA))); logger.error("Cannot read sstable {}; other IO error, skipping table", entry, ex); continue; } newSSTables.add(reader); } if (newSSTables.isEmpty()) { logger.info("No new SSTables were found for {}/{}", keyspace.getName(), name); return; } logger.info("Loading new SSTables and building secondary indexes for {}/{}: {}", keyspace.getName(), name, newSSTables); try (Refs<SSTableReader> refs = Refs.ref(newSSTables)) { data.addSSTables(newSSTables); indexManager.buildAllIndexesBlocking(newSSTables); } logger.info("Done loading load new SSTables for {}/{}", keyspace.getName(), name); } public void rebuildSecondaryIndex(String idxName) { rebuildSecondaryIndex(keyspace.getName(), metadata.cfName, idxName); } public static void rebuildSecondaryIndex(String ksName, String cfName, String... idxNames) { ColumnFamilyStore cfs = Keyspace.open(ksName).getColumnFamilyStore(cfName); Set<String> indexes = new HashSet<String>(Arrays.asList(idxNames)); Iterable<SSTableReader> sstables = cfs.getSSTables(SSTableSet.CANONICAL); try (Refs<SSTableReader> refs = Refs.ref(sstables)) { logger.info("User Requested secondary index re-build for {}/{} indexes: {}", ksName, cfName, Joiner.on(',').join(idxNames)); cfs.indexManager.rebuildIndexesBlocking(refs, indexes); } } @Deprecated public String getColumnFamilyName() { return getTableName(); } public String getTableName() { return name; } public String getSSTablePath(File directory) { return getSSTablePath(directory, SSTableFormat.Type.current().info.getLatestVersion(), SSTableFormat.Type.current()); } public String getSSTablePath(File directory, SSTableFormat.Type format) { return getSSTablePath(directory, format.info.getLatestVersion(), format); } private String getSSTablePath(File directory, Version version, SSTableFormat.Type format) { Descriptor desc = new Descriptor(version, directory, keyspace.getName(), name, fileIndexGenerator.incrementAndGet(), format, Component.digestFor(BigFormat.latestVersion.uncompressedChecksumType())); return desc.filenameFor(Component.DATA); }
Switches the memtable iff the live memtable is the one provided
Params:
  • memtable –
/** * Switches the memtable iff the live memtable is the one provided * * @param memtable */
public ListenableFuture<CommitLogPosition> switchMemtableIfCurrent(Memtable memtable) { synchronized (data) { if (data.getView().getCurrentMemtable() == memtable) return switchMemtable(); } return waitForFlushes(); } /* * switchMemtable puts Memtable.getSortedContents on the writer executor. When the write is complete, * we turn the writer into an SSTableReader and add it to ssTables where it is available for reads. * This method does not block except for synchronizing on Tracker, but the Future it returns will * not complete until the Memtable (and all prior Memtables) have been successfully flushed, and the CL * marked clean up to the position owned by the Memtable. */ public ListenableFuture<CommitLogPosition> switchMemtable() { synchronized (data) { logFlush(); Flush flush = new Flush(false); flushExecutor.execute(flush); postFlushExecutor.execute(flush.postFlushTask); return flush.postFlushTask; } } // print out size of all memtables we're enqueuing private void logFlush() { // reclaiming includes that which we are GC-ing; float onHeapRatio = 0, offHeapRatio = 0; long onHeapTotal = 0, offHeapTotal = 0; Memtable memtable = getTracker().getView().getCurrentMemtable(); onHeapRatio += memtable.getAllocator().onHeap().ownershipRatio(); offHeapRatio += memtable.getAllocator().offHeap().ownershipRatio(); onHeapTotal += memtable.getAllocator().onHeap().owns(); offHeapTotal += memtable.getAllocator().offHeap().owns(); for (ColumnFamilyStore indexCfs : indexManager.getAllIndexColumnFamilyStores()) { MemtableAllocator allocator = indexCfs.getTracker().getView().getCurrentMemtable().getAllocator(); onHeapRatio += allocator.onHeap().ownershipRatio(); offHeapRatio += allocator.offHeap().ownershipRatio(); onHeapTotal += allocator.onHeap().owns(); offHeapTotal += allocator.offHeap().owns(); } logger.debug("Enqueuing flush of {}: {}", name, String.format("%s (%.0f%%) on-heap, %s (%.0f%%) off-heap", FBUtilities.prettyPrintMemory(onHeapTotal), onHeapRatio * 100, FBUtilities.prettyPrintMemory(offHeapTotal), offHeapRatio * 100)); }
Flush if there is unflushed data in the memtables
Returns:a Future yielding the commit log position that can be guaranteed to have been successfully written to sstables for this table once the future completes
/** * Flush if there is unflushed data in the memtables * * @return a Future yielding the commit log position that can be guaranteed to have been successfully written * to sstables for this table once the future completes */
public ListenableFuture<CommitLogPosition> forceFlush() { synchronized (data) { Memtable current = data.getView().getCurrentMemtable(); for (ColumnFamilyStore cfs : concatWithIndexes()) if (!cfs.data.getView().getCurrentMemtable().isClean()) return switchMemtableIfCurrent(current); return waitForFlushes(); } }
Flush if there is unflushed data that was written to the CommitLog before @param flushIfDirtyBefore (inclusive).
Returns:a Future yielding the commit log position that can be guaranteed to have been successfully written to sstables for this table once the future completes
/** * Flush if there is unflushed data that was written to the CommitLog before @param flushIfDirtyBefore * (inclusive). * * @return a Future yielding the commit log position that can be guaranteed to have been successfully written * to sstables for this table once the future completes */
public ListenableFuture<?> forceFlush(CommitLogPosition flushIfDirtyBefore) { // we don't loop through the remaining memtables since here we only care about commit log dirtiness // and this does not vary between a table and its table-backed indexes Memtable current = data.getView().getCurrentMemtable(); if (current.mayContainDataBefore(flushIfDirtyBefore)) return switchMemtableIfCurrent(current); return waitForFlushes(); }
Returns:a Future yielding the commit log position that can be guaranteed to have been successfully written to sstables for this table once the future completes
/** * @return a Future yielding the commit log position that can be guaranteed to have been successfully written * to sstables for this table once the future completes */
private ListenableFuture<CommitLogPosition> waitForFlushes() { // we grab the current memtable; once any preceding memtables have flushed, we know its // commitLogLowerBound has been set (as this it is set with the upper bound of the preceding memtable) final Memtable current = data.getView().getCurrentMemtable(); ListenableFutureTask<CommitLogPosition> task = ListenableFutureTask.create(() -> { logger.debug("forceFlush requested but everything is clean in {}", name); return current.getCommitLogLowerBound(); }); postFlushExecutor.execute(task); return task; } public CommitLogPosition forceBlockingFlush() { return FBUtilities.waitOnFuture(forceFlush()); }
Both synchronises custom secondary indexes and provides ordering guarantees for futures on switchMemtable/flush etc, which expect to be able to wait until the flush (and all prior flushes) requested have completed.
/** * Both synchronises custom secondary indexes and provides ordering guarantees for futures on switchMemtable/flush * etc, which expect to be able to wait until the flush (and all prior flushes) requested have completed. */
private final class PostFlush implements Callable<CommitLogPosition> { final CountDownLatch latch = new CountDownLatch(1); final List<Memtable> memtables; volatile Throwable flushFailure = null; private PostFlush(List<Memtable> memtables) { this.memtables = memtables; } public CommitLogPosition call() { try { // we wait on the latch for the commitLogUpperBound to be set, and so that waiters // on this task can rely on all prior flushes being complete latch.await(); } catch (InterruptedException e) { throw new IllegalStateException(); } CommitLogPosition commitLogUpperBound = CommitLogPosition.NONE; // If a flush errored out but the error was ignored, make sure we don't discard the commit log. if (flushFailure == null && !memtables.isEmpty()) { Memtable memtable = memtables.get(0); commitLogUpperBound = memtable.getCommitLogUpperBound(); CommitLog.instance.discardCompletedSegments(metadata.cfId, memtable.getCommitLogLowerBound(), commitLogUpperBound); } metric.pendingFlushes.dec(); if (flushFailure != null) throw Throwables.propagate(flushFailure); return commitLogUpperBound; } }
Should only be constructed/used from switchMemtable() or truncate(), with ownership of the Tracker monitor. In the constructor the current memtable(s) are swapped, and a barrier on outstanding writes is issued; when run by the flushWriter the barrier is waited on to ensure all outstanding writes have completed before all memtables are immediately written, and the CL is either immediately marked clean or, if there are custom secondary indexes, the post flush clean up is left to update those indexes and mark the CL clean
/** * Should only be constructed/used from switchMemtable() or truncate(), with ownership of the Tracker monitor. * In the constructor the current memtable(s) are swapped, and a barrier on outstanding writes is issued; * when run by the flushWriter the barrier is waited on to ensure all outstanding writes have completed * before all memtables are immediately written, and the CL is either immediately marked clean or, if * there are custom secondary indexes, the post flush clean up is left to update those indexes and mark * the CL clean */
private final class Flush implements Runnable { final OpOrder.Barrier writeBarrier; final List<Memtable> memtables = new ArrayList<>(); final ListenableFutureTask<CommitLogPosition> postFlushTask; final PostFlush postFlush; final boolean truncate; private Flush(boolean truncate) { // if true, we won't flush, we'll just wait for any outstanding writes, switch the memtable, and discard this.truncate = truncate; metric.pendingFlushes.inc(); /** * To ensure correctness of switch without blocking writes, run() needs to wait for all write operations * started prior to the switch to complete. We do this by creating a Barrier on the writeOrdering * that all write operations register themselves with, and assigning this barrier to the memtables, * after which we *.issue()* the barrier. This barrier is used to direct write operations started prior * to the barrier.issue() into the memtable we have switched out, and any started after to its replacement. * In doing so it also tells the write operations to update the commitLogUpperBound of the memtable, so * that we know the CL position we are dirty to, which can be marked clean when we complete. */ writeBarrier = keyspace.writeOrder.newBarrier(); // submit flushes for the memtable for any indexed sub-cfses, and our own AtomicReference<CommitLogPosition> commitLogUpperBound = new AtomicReference<>(); for (ColumnFamilyStore cfs : concatWithIndexes()) { // switch all memtables, regardless of their dirty status, setting the barrier // so that we can reach a coordinated decision about cleanliness once they // are no longer possible to be modified Memtable newMemtable = new Memtable(commitLogUpperBound, cfs); Memtable oldMemtable = cfs.data.switchMemtable(truncate, newMemtable); oldMemtable.setDiscarding(writeBarrier, commitLogUpperBound); memtables.add(oldMemtable); } // we then ensure an atomic decision is made about the upper bound of the continuous range of commit log // records owned by this memtable setCommitLogUpperBound(commitLogUpperBound); // we then issue the barrier; this lets us wait for all operations started prior to the barrier to complete; // since this happens after wiring up the commitLogUpperBound, we also know all operations with earlier // commit log segment position have also completed, i.e. the memtables are done and ready to flush writeBarrier.issue(); postFlush = new PostFlush(memtables); postFlushTask = ListenableFutureTask.create(postFlush); } public void run() { // mark writes older than the barrier as blocking progress, permitting them to exceed our memory limit // if they are stuck waiting on it, then wait for them all to complete writeBarrier.markBlocking(); writeBarrier.await(); // mark all memtables as flushing, removing them from the live memtable list for (Memtable memtable : memtables) memtable.cfs.data.markFlushing(memtable); metric.memtableSwitchCount.inc(); try { // Flush "data" memtable with non-cf 2i first; flushMemtable(memtables.get(0), true); for (int i = 1; i < memtables.size(); i++) flushMemtable(memtables.get(i), false); } catch (Throwable t) { JVMStabilityInspector.inspectThrowable(t); postFlush.flushFailure = t; } // signal the post-flush we've done our work postFlush.latch.countDown(); } public Collection<SSTableReader> flushMemtable(Memtable memtable, boolean flushNonCf2i) { if (memtable.isClean() || truncate) { memtable.cfs.replaceFlushed(memtable, Collections.emptyList()); reclaim(memtable); return Collections.emptyList(); } List<Future<SSTableMultiWriter>> futures = new ArrayList<>(); long totalBytesOnDisk = 0; long maxBytesOnDisk = 0; long minBytesOnDisk = Long.MAX_VALUE; List<SSTableReader> sstables = new ArrayList<>(); try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH)) { List<Memtable.FlushRunnable> flushRunnables = null; List<SSTableMultiWriter> flushResults = null; try { // flush the memtable flushRunnables = memtable.flushRunnables(txn); for (int i = 0; i < flushRunnables.size(); i++) futures.add(perDiskflushExecutors[i].submit(flushRunnables.get(i))); /** * we can flush 2is as soon as the barrier completes, as they will be consistent with (or ahead of) the * flushed memtables and CL position, which is as good as we can guarantee. * TODO: SecondaryIndex should support setBarrier(), so custom implementations can co-ordinate exactly * with CL as we do with memtables/CFS-backed SecondaryIndexes. */ if (flushNonCf2i) indexManager.flushAllNonCFSBackedIndexesBlocking(); flushResults = Lists.newArrayList(FBUtilities.waitOnFutures(futures)); } catch (Throwable t) { t = memtable.abortRunnables(flushRunnables, t); t = txn.abort(t); throw Throwables.propagate(t); } try { Iterator<SSTableMultiWriter> writerIterator = flushResults.iterator(); while (writerIterator.hasNext()) { @SuppressWarnings("resource") SSTableMultiWriter writer = writerIterator.next(); if (writer.getFilePointer() > 0) { writer.setOpenResult(true).prepareToCommit(); } else { maybeFail(writer.abort(null)); writerIterator.remove(); } } } catch (Throwable t) { for (SSTableMultiWriter writer : flushResults) t = writer.abort(t); t = txn.abort(t); Throwables.propagate(t); } txn.prepareToCommit(); Throwable accumulate = null; for (SSTableMultiWriter writer : flushResults) accumulate = writer.commit(accumulate); maybeFail(txn.commit(accumulate)); for (SSTableMultiWriter writer : flushResults) { Collection<SSTableReader> flushedSSTables = writer.finished(); for (SSTableReader sstable : flushedSSTables) { if (sstable != null) { sstables.add(sstable); long size = sstable.bytesOnDisk(); totalBytesOnDisk += size; maxBytesOnDisk = Math.max(maxBytesOnDisk, size); minBytesOnDisk = Math.min(minBytesOnDisk, size); } } } } memtable.cfs.replaceFlushed(memtable, sstables); reclaim(memtable); memtable.cfs.compactionStrategyManager.compactionLogger.flush(sstables); logger.debug("Flushed to {} ({} sstables, {}), biggest {}, smallest {}", sstables, sstables.size(), FBUtilities.prettyPrintMemory(totalBytesOnDisk), FBUtilities.prettyPrintMemory(maxBytesOnDisk), FBUtilities.prettyPrintMemory(minBytesOnDisk)); return sstables; } private void reclaim(final Memtable memtable) { // issue a read barrier for reclaiming the memory, and offload the wait to another thread final OpOrder.Barrier readBarrier = readOrdering.newBarrier(); readBarrier.issue(); postFlushTask.addListener(new WrappedRunnable() { public void runMayThrow() { readBarrier.await(); memtable.setDiscarded(); } }, reclaimExecutor); } } // atomically set the upper bound for the commit log private static void setCommitLogUpperBound(AtomicReference<CommitLogPosition> commitLogUpperBound) { // we attempt to set the holder to the current commit log context. at the same time all writes to the memtables are // also maintaining this value, so if somebody sneaks ahead of us somehow (should be rare) we simply retry, // so that we know all operations prior to the position have not reached it yet CommitLogPosition lastReplayPosition; while (true) { lastReplayPosition = new Memtable.LastCommitLogPosition((CommitLog.instance.getCurrentPosition())); CommitLogPosition currentLast = commitLogUpperBound.get(); if ((currentLast == null || currentLast.compareTo(lastReplayPosition) <= 0) && commitLogUpperBound.compareAndSet(currentLast, lastReplayPosition)) break; } }
Finds the largest memtable, as a percentage of *either* on- or off-heap memory limits, and immediately queues it for flushing. If the memtable selected is flushed before this completes, no work is done.
/** * Finds the largest memtable, as a percentage of *either* on- or off-heap memory limits, and immediately * queues it for flushing. If the memtable selected is flushed before this completes, no work is done. */
public static class FlushLargestColumnFamily implements Runnable { public void run() { float largestRatio = 0f; Memtable largest = null; float liveOnHeap = 0, liveOffHeap = 0; for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) { // we take a reference to the current main memtable for the CF prior to snapping its ownership ratios // to ensure we have some ordering guarantee for performing the switchMemtableIf(), i.e. we will only // swap if the memtables we are measuring here haven't already been swapped by the time we try to swap them Memtable current = cfs.getTracker().getView().getCurrentMemtable(); // find the total ownership ratio for the memtable and all SecondaryIndexes owned by this CF, // both on- and off-heap, and select the largest of the two ratios to weight this CF float onHeap = 0f, offHeap = 0f; onHeap += current.getAllocator().onHeap().ownershipRatio(); offHeap += current.getAllocator().offHeap().ownershipRatio(); for (ColumnFamilyStore indexCfs : cfs.indexManager.getAllIndexColumnFamilyStores()) { MemtableAllocator allocator = indexCfs.getTracker().getView().getCurrentMemtable().getAllocator(); onHeap += allocator.onHeap().ownershipRatio(); offHeap += allocator.offHeap().ownershipRatio(); } float ratio = Math.max(onHeap, offHeap); if (ratio > largestRatio) { largest = current; largestRatio = ratio; } liveOnHeap += onHeap; liveOffHeap += offHeap; } if (largest != null) { float usedOnHeap = Memtable.MEMORY_POOL.onHeap.usedRatio(); float usedOffHeap = Memtable.MEMORY_POOL.offHeap.usedRatio(); float flushingOnHeap = Memtable.MEMORY_POOL.onHeap.reclaimingRatio(); float flushingOffHeap = Memtable.MEMORY_POOL.offHeap.reclaimingRatio(); float thisOnHeap = largest.getAllocator().onHeap().ownershipRatio(); float thisOffHeap = largest.getAllocator().offHeap().ownershipRatio(); logger.debug("Flushing largest {} to free up room. Used total: {}, live: {}, flushing: {}, this: {}", largest.cfs, ratio(usedOnHeap, usedOffHeap), ratio(liveOnHeap, liveOffHeap), ratio(flushingOnHeap, flushingOffHeap), ratio(thisOnHeap, thisOffHeap)); largest.cfs.switchMemtableIfCurrent(largest); } } } private static String ratio(float onHeap, float offHeap) { return String.format("%.2f/%.2f", onHeap, offHeap); }
Insert/Update the column family for this key. Caller is responsible for acquiring Keyspace.switchLock param @ lock - lock that needs to be used. param @ key - key for update/insert param @ columnFamily - columnFamily changes
/** * Insert/Update the column family for this key. * Caller is responsible for acquiring Keyspace.switchLock * param @ lock - lock that needs to be used. * param @ key - key for update/insert * param @ columnFamily - columnFamily changes */
public void apply(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup, CommitLogPosition commitLogPosition) { long start = System.nanoTime(); try { Memtable mt = data.getMemtableFor(opGroup, commitLogPosition); long timeDelta = mt.put(update, indexer, opGroup); DecoratedKey key = update.partitionKey(); invalidateCachedPartition(key); metric.samplers.get(Sampler.WRITES).addSample(key.getKey(), key.hashCode(), 1); StorageHook.instance.reportWrite(metadata.cfId, update); metric.writeLatency.addNano(System.nanoTime() - start); // CASSANDRA-11117 - certain resolution paths on memtable put can result in very // large time deltas, either through a variety of sentinel timestamps (used for empty values, ensuring // a minimal write, etc). This limits the time delta to the max value the histogram // can bucket correctly. This also filters the Long.MAX_VALUE case where there was no previous value // to update. if(timeDelta < Long.MAX_VALUE) metric.colUpdateTimeDeltaHistogram.update(Math.min(18165375903306L, timeDelta)); } catch (RuntimeException e) { throw new RuntimeException(e.getMessage() + " for ks: " + keyspace.getName() + ", table: " + name, e); } }
Params:
  • sstables –
Returns:sstables whose key range overlaps with that of the given sstables, not including itself. (The given sstables may or may not overlap with each other.)
/** * @param sstables * @return sstables whose key range overlaps with that of the given sstables, not including itself. * (The given sstables may or may not overlap with each other.) */
public Collection<SSTableReader> getOverlappingLiveSSTables(Iterable<SSTableReader> sstables) { logger.trace("Checking for sstables overlapping {}", sstables); // a normal compaction won't ever have an empty sstables list, but we create a skeleton // compaction controller for streaming, and that passes an empty list. if (!sstables.iterator().hasNext()) return ImmutableSet.of(); View view = data.getView(); List<SSTableReader> sortedByFirst = Lists.newArrayList(sstables); Collections.sort(sortedByFirst, (o1, o2) -> o1.first.compareTo(o2.first)); List<AbstractBounds<PartitionPosition>> bounds = new ArrayList<>(); DecoratedKey first = null, last = null; /* normalize the intervals covered by the sstables assume we have sstables like this (brackets representing first/last key in the sstable); [ ] [ ] [ ] [ ] [ ] [ ] then we can, instead of searching the interval tree 6 times, normalize the intervals and only query the tree 2 times, for these intervals; [ ] [ ] */ for (SSTableReader sstable : sortedByFirst) { if (first == null) { first = sstable.first; last = sstable.last; } else { if (sstable.first.compareTo(last) <= 0) // we do overlap { if (sstable.last.compareTo(last) > 0) last = sstable.last; } else { bounds.add(AbstractBounds.bounds(first, true, last, true)); first = sstable.first; last = sstable.last; } } } bounds.add(AbstractBounds.bounds(first, true, last, true)); Set<SSTableReader> results = new HashSet<>(); for (AbstractBounds<PartitionPosition> bound : bounds) Iterables.addAll(results, view.liveSSTablesInBounds(bound.left, bound.right)); return Sets.difference(results, ImmutableSet.copyOf(sstables)); }
like getOverlappingSSTables, but acquires references before returning
/** * like getOverlappingSSTables, but acquires references before returning */
public Refs<SSTableReader> getAndReferenceOverlappingLiveSSTables(Iterable<SSTableReader> sstables) { while (true) { Iterable<SSTableReader> overlapped = getOverlappingLiveSSTables(sstables); Refs<SSTableReader> refs = Refs.tryRef(overlapped); if (refs != null) return refs; } } /* * Called after a BinaryMemtable flushes its in-memory data, or we add a file * via bootstrap. This information is cached in the ColumnFamilyStore. * This is useful for reads because the ColumnFamilyStore first looks in * the in-memory store and the into the disk to find the key. If invoked * during recoveryMode the onMemtableFlush() need not be invoked. * * param @ filename - filename just flushed to disk */ public void addSSTable(SSTableReader sstable) { assert sstable.getColumnFamilyName().equals(name); addSSTables(Arrays.asList(sstable)); } public void addSSTables(Collection<SSTableReader> sstables) { data.addSSTables(sstables); CompactionManager.instance.submitBackground(this); }
Calculate expected file size of SSTable after compaction. If operation type is CLEANUP and we're not dealing with an index sstable, then we calculate expected file size with checking token range to be eliminated. Otherwise, we just add up all the files' size, which is the worst case file size for compaction of all the list of files given.
Params:
  • sstables – SSTables to calculate expected compacted file size
  • operation – Operation type
Returns:Expected file size of SSTable after compaction
/** * Calculate expected file size of SSTable after compaction. * * If operation type is {@code CLEANUP} and we're not dealing with an index sstable, * then we calculate expected file size with checking token range to be eliminated. * * Otherwise, we just add up all the files' size, which is the worst case file * size for compaction of all the list of files given. * * @param sstables SSTables to calculate expected compacted file size * @param operation Operation type * @return Expected file size of SSTable after compaction */
public long getExpectedCompactedFileSize(Iterable<SSTableReader> sstables, OperationType operation) { if (operation != OperationType.CLEANUP || isIndex()) { return SSTableReader.getTotalBytes(sstables); } // cleanup size estimation only counts bytes for keys local to this node long expectedFileSize = 0; Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName()); for (SSTableReader sstable : sstables) { List<Pair<Long, Long>> positions = sstable.getPositionsForRanges(ranges); for (Pair<Long, Long> position : positions) expectedFileSize += position.right - position.left; } double compressionRatio = metric.compressionRatio.getValue(); if (compressionRatio > 0d) expectedFileSize *= compressionRatio; return expectedFileSize; } /* * Find the maximum size file in the list . */ public SSTableReader getMaxSizeFile(Iterable<SSTableReader> sstables) { long maxSize = 0L; SSTableReader maxFile = null; for (SSTableReader sstable : sstables) { if (sstable.onDiskLength() > maxSize) { maxSize = sstable.onDiskLength(); maxFile = sstable; } } return maxFile; } public CompactionManager.AllSSTableOpStatus forceCleanup(int jobs) throws ExecutionException, InterruptedException { return CompactionManager.instance.performCleanup(ColumnFamilyStore.this, jobs); } public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted, boolean checkData, boolean reinsertOverflowedTTL, int jobs) throws ExecutionException, InterruptedException { return scrub(disableSnapshot, skipCorrupted, reinsertOverflowedTTL, false, checkData, jobs); } @VisibleForTesting public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted, boolean reinsertOverflowedTTL, boolean alwaysFail, boolean checkData, int jobs) throws ExecutionException, InterruptedException { // skip snapshot creation during scrub, SEE JIRA 5891 if(!disableSnapshot) snapshotWithoutFlush("pre-scrub-" + System.currentTimeMillis()); try { return CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted, checkData, reinsertOverflowedTTL, jobs); } catch(Throwable t) { if (!rebuildOnFailedScrub(t)) throw t; return alwaysFail ? CompactionManager.AllSSTableOpStatus.ABORTED : CompactionManager.AllSSTableOpStatus.SUCCESSFUL; } }
CASSANDRA-5174 : For an index cfs we may be able to discard everything and just rebuild the index when a scrub fails.
Returns:true if we are an index cfs and we successfully rebuilt the index
/** * CASSANDRA-5174 : For an index cfs we may be able to discard everything and just rebuild * the index when a scrub fails. * * @return true if we are an index cfs and we successfully rebuilt the index */
public boolean rebuildOnFailedScrub(Throwable failure) { if (!isIndex() || !SecondaryIndexManager.isIndexColumnFamilyStore(this)) return false; truncateBlocking(); logger.warn("Rebuilding index for {} because of <{}>", name, failure.getMessage()); ColumnFamilyStore parentCfs = SecondaryIndexManager.getParentCfs(this); assert parentCfs.indexManager.getAllIndexColumnFamilyStores().contains(this); String indexName = SecondaryIndexManager.getIndexName(this); parentCfs.rebuildSecondaryIndex(indexName); return true; } public CompactionManager.AllSSTableOpStatus verify(boolean extendedVerify) throws ExecutionException, InterruptedException { return CompactionManager.instance.performVerify(ColumnFamilyStore.this, extendedVerify); } public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean excludeCurrentVersion, int jobs) throws ExecutionException, InterruptedException { return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, excludeCurrentVersion, jobs); } public CompactionManager.AllSSTableOpStatus relocateSSTables(int jobs) throws ExecutionException, InterruptedException { return CompactionManager.instance.relocateSSTables(this, jobs); } public CompactionManager.AllSSTableOpStatus garbageCollect(TombstoneOption tombstoneOption, int jobs) throws ExecutionException, InterruptedException { return CompactionManager.instance.performGarbageCollection(this, tombstoneOption, jobs); } public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType) { assert !sstables.isEmpty(); maybeFail(data.dropSSTables(Predicates.in(sstables), compactionType, null)); } void replaceFlushed(Memtable memtable, Collection<SSTableReader> sstables) { compactionStrategyManager.replaceFlushed(memtable, sstables); } public boolean isValid() { return valid; }
Package protected for access from the CompactionManager.
/** * Package protected for access from the CompactionManager. */
public Tracker getTracker() { return data; } public Set<SSTableReader> getLiveSSTables() { return data.getView().liveSSTables(); } public Iterable<SSTableReader> getSSTables(SSTableSet sstableSet) { return data.getView().select(sstableSet); } public Iterable<SSTableReader> getUncompactingSSTables() { return data.getUncompacting(); } public boolean isFilterFullyCoveredBy(ClusteringIndexFilter filter, DataLimits limits, CachedPartition cached, int nowInSec) { // We can use the cached value only if we know that no data it doesn't contain could be covered // by the query filter, that is if: // 1) either the whole partition is cached // 2) or we can ensure than any data the filter selects is in the cached partition // We can guarantee that a partition is fully cached if the number of rows it contains is less than // what we're caching. Wen doing that, we should be careful about expiring cells: we should count // something expired that wasn't when the partition was cached, or we could decide that the whole // partition is cached when it's not. This is why we use CachedPartition#cachedLiveRows. if (cached.cachedLiveRows() < metadata.params.caching.rowsPerPartitionToCache()) return true; // If the whole partition isn't cached, then we must guarantee that the filter cannot select data that // is not in the cache. We can guarantee that if either the filter is a "head filter" and the cached // partition has more live rows that queried (where live rows refers to the rows that are live now), // or if we can prove that everything the filter selects is in the cached partition based on its content. return (filter.isHeadFilter() && limits.hasEnoughLiveData(cached, nowInSec, filter.selectsAllPartition(), metadata.enforceStrictLiveness())) || filter.isFullyCoveredBy(cached); } public int gcBefore(int nowInSec) { return nowInSec - metadata.params.gcGraceSeconds; } @SuppressWarnings("resource") public RefViewFragment selectAndReference(Function<View, Iterable<SSTableReader>> filter) { long failingSince = -1L; while (true) { ViewFragment view = select(filter); Refs<SSTableReader> refs = Refs.tryRef(view.sstables); if (refs != null) return new RefViewFragment(view.sstables, view.memtables, refs); if (failingSince <= 0) { failingSince = System.nanoTime(); } else if (System.nanoTime() - failingSince > TimeUnit.MILLISECONDS.toNanos(100)) { List<SSTableReader> released = new ArrayList<>(); for (SSTableReader reader : view.sstables) if (reader.selfRef().globalCount() == 0) released.add(reader); NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.SECONDS, "Spinning trying to capture readers {}, released: {}, ", view.sstables, released); failingSince = System.nanoTime(); } } } public ViewFragment select(Function<View, Iterable<SSTableReader>> filter) { View view = data.getView(); List<SSTableReader> sstables = Lists.newArrayList(filter.apply(view)); return new ViewFragment(sstables, view.getAllMemtables()); } // WARNING: this returns the set of LIVE sstables only, which may be only partially written public List<String> getSSTablesForKey(String key) { return getSSTablesForKey(key, false); } public List<String> getSSTablesForKey(String key, boolean hexFormat) { ByteBuffer keyBuffer = hexFormat ? ByteBufferUtil.hexToBytes(key) : metadata.getKeyValidator().fromString(key); DecoratedKey dk = decorateKey(keyBuffer); try (OpOrder.Group op = readOrdering.start()) { List<String> files = new ArrayList<>(); for (SSTableReader sstr : select(View.select(SSTableSet.LIVE, dk)).sstables) { // check if the key actually exists in this sstable, without updating cache and stats if (sstr.getPosition(dk, SSTableReader.Operator.EQ, false) != null) files.add(sstr.getFilename()); } return files; } } public void beginLocalSampling(String sampler, int capacity) { metric.samplers.get(Sampler.valueOf(sampler)).beginSampling(capacity); } public CompositeData finishLocalSampling(String sampler, int count) throws OpenDataException { SamplerResult<ByteBuffer> samplerResults = metric.samplers.get(Sampler.valueOf(sampler)) .finishSampling(count); TabularDataSupport result = new TabularDataSupport(COUNTER_TYPE); for (Counter<ByteBuffer> counter : samplerResults.topK) { //Not duplicating the buffer for safety because AbstractSerializer and ByteBufferUtil.bytesToHex //don't modify position or limit ByteBuffer key = counter.getItem(); result.put(new CompositeDataSupport(COUNTER_COMPOSITE_TYPE, COUNTER_NAMES, new Object[] { ByteBufferUtil.bytesToHex(key), // raw counter.getCount(), // count counter.getError(), // error metadata.getKeyValidator().getString(key) })); // string } return new CompositeDataSupport(SAMPLING_RESULT, SAMPLER_NAMES, new Object[]{ samplerResults.cardinality, result}); } public boolean isCompactionDiskSpaceCheckEnabled() { return compactionSpaceCheck; } public void compactionDiskSpaceCheck(boolean enable) { compactionSpaceCheck = enable; } public void cleanupCache() { Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName()); for (Iterator<RowCacheKey> keyIter = CacheService.instance.rowCache.keyIterator(); keyIter.hasNext(); ) { RowCacheKey key = keyIter.next(); DecoratedKey dk = decorateKey(ByteBuffer.wrap(key.key)); if (key.ksAndCFName.equals(metadata.ksAndCFName) && !Range.isInRanges(dk.getToken(), ranges)) invalidateCachedPartition(dk); } if (metadata.isCounter()) { for (Iterator<CounterCacheKey> keyIter = CacheService.instance.counterCache.keyIterator(); keyIter.hasNext(); ) { CounterCacheKey key = keyIter.next(); DecoratedKey dk = decorateKey(ByteBuffer.wrap(key.partitionKey)); if (key.ksAndCFName.equals(metadata.ksAndCFName) && !Range.isInRanges(dk.getToken(), ranges)) CacheService.instance.counterCache.remove(key); } } } public ClusteringComparator getComparator() { return metadata.comparator; } public void snapshotWithoutFlush(String snapshotName) { snapshotWithoutFlush(snapshotName, null, false); }
Params:
  • ephemeral – If this flag is set to true, the snapshot will be cleaned during next startup
/** * @param ephemeral If this flag is set to true, the snapshot will be cleaned during next startup */
public Set<SSTableReader> snapshotWithoutFlush(String snapshotName, Predicate<SSTableReader> predicate, boolean ephemeral) { Set<SSTableReader> snapshottedSSTables = new HashSet<>(); for (ColumnFamilyStore cfs : concatWithIndexes()) { final JSONArray filesJSONArr = new JSONArray(); try (RefViewFragment currentView = cfs.selectAndReference(View.select(SSTableSet.CANONICAL, (x) -> predicate == null || predicate.apply(x)))) { for (SSTableReader ssTable : currentView.sstables) { File snapshotDirectory = Directories.getSnapshotDirectory(ssTable.descriptor, snapshotName); ssTable.createLinks(snapshotDirectory.getPath()); // hard links filesJSONArr.add(ssTable.descriptor.relativeFilenameFor(Component.DATA)); if (logger.isTraceEnabled()) logger.trace("Snapshot for {} keyspace data file {} created in {}", keyspace, ssTable.getFilename(), snapshotDirectory); snapshottedSSTables.add(ssTable); } writeSnapshotManifest(filesJSONArr, snapshotName); if (!SchemaConstants.isLocalSystemKeyspace(metadata.ksName) && !SchemaConstants.isReplicatedSystemKeyspace(metadata.ksName)) writeSnapshotSchema(snapshotName); } } if (ephemeral) createEphemeralSnapshotMarkerFile(snapshotName); return snapshottedSSTables; } private void writeSnapshotManifest(final JSONArray filesJSONArr, final String snapshotName) { final File manifestFile = getDirectories().getSnapshotManifestFile(snapshotName); try { if (!manifestFile.getParentFile().exists()) manifestFile.getParentFile().mkdirs(); try (PrintStream out = new PrintStream(manifestFile)) { final JSONObject manifestJSON = new JSONObject(); manifestJSON.put("files", filesJSONArr); out.println(manifestJSON.toJSONString()); } } catch (IOException e) { throw new FSWriteError(e, manifestFile); } } private void writeSnapshotSchema(final String snapshotName) { final File schemaFile = getDirectories().getSnapshotSchemaFile(snapshotName); try { if (!schemaFile.getParentFile().exists()) schemaFile.getParentFile().mkdirs(); try (PrintStream out = new PrintStream(schemaFile)) { for (String s: ColumnFamilyStoreCQLHelper.dumpReCreateStatements(metadata)) out.println(s); } } catch (IOException e) { throw new FSWriteError(e, schemaFile); } } private void createEphemeralSnapshotMarkerFile(final String snapshot) { final File ephemeralSnapshotMarker = getDirectories().getNewEphemeralSnapshotMarkerFile(snapshot); try { if (!ephemeralSnapshotMarker.getParentFile().exists()) ephemeralSnapshotMarker.getParentFile().mkdirs(); Files.createFile(ephemeralSnapshotMarker.toPath()); logger.trace("Created ephemeral snapshot marker file on {}.", ephemeralSnapshotMarker.getAbsolutePath()); } catch (IOException e) { logger.warn(String.format("Could not create marker file %s for ephemeral snapshot %s. " + "In case there is a failure in the operation that created " + "this snapshot, you may need to clean it manually afterwards.", ephemeralSnapshotMarker.getAbsolutePath(), snapshot), e); } } protected static void clearEphemeralSnapshots(Directories directories) { for (String ephemeralSnapshot : directories.listEphemeralSnapshots()) { logger.trace("Clearing ephemeral snapshot {} leftover from previous session.", ephemeralSnapshot); Directories.clearSnapshot(ephemeralSnapshot, directories.getCFDirectories()); } } public Refs<SSTableReader> getSnapshotSSTableReader(String tag) throws IOException { Map<Integer, SSTableReader> active = new HashMap<>(); for (SSTableReader sstable : getSSTables(SSTableSet.CANONICAL)) active.put(sstable.descriptor.generation, sstable); Map<Descriptor, Set<Component>> snapshots = getDirectories().sstableLister(Directories.OnTxnErr.IGNORE).snapshots(tag).list(); Refs<SSTableReader> refs = new Refs<>(); try { for (Map.Entry<Descriptor, Set<Component>> entries : snapshots.entrySet()) { // Try acquire reference to an active sstable instead of snapshot if it exists, // to avoid opening new sstables. If it fails, use the snapshot reference instead. SSTableReader sstable = active.get(entries.getKey().generation); if (sstable == null || !refs.tryRef(sstable)) { if (logger.isTraceEnabled()) logger.trace("using snapshot sstable {}", entries.getKey()); // open offline so we don't modify components or track hotness. sstable = SSTableReader.open(entries.getKey(), entries.getValue(), metadata, true, true); refs.tryRef(sstable); // release the self ref as we never add the snapshot sstable to DataTracker where it is otherwise released sstable.selfRef().release(); } else if (logger.isTraceEnabled()) { logger.trace("using active sstable {}", entries.getKey()); } } } catch (IOException | RuntimeException e) { // In case one of the snapshot sstables fails to open, // we must release the references to the ones we opened so far refs.release(); throw e; } return refs; }
Take a snap shot of this columnfamily store.
Params:
  • snapshotName – the name of the associated with the snapshot
/** * Take a snap shot of this columnfamily store. * * @param snapshotName the name of the associated with the snapshot */
public Set<SSTableReader> snapshot(String snapshotName) { return snapshot(snapshotName, false); }
Take a snap shot of this columnfamily store.
Params:
  • snapshotName – the name of the associated with the snapshot
  • skipFlush – Skip blocking flush of memtable
/** * Take a snap shot of this columnfamily store. * * @param snapshotName the name of the associated with the snapshot * @param skipFlush Skip blocking flush of memtable */
public Set<SSTableReader> snapshot(String snapshotName, boolean skipFlush) { return snapshot(snapshotName, null, false, skipFlush); }
Params:
  • ephemeral – If this flag is set to true, the snapshot will be cleaned up during next startup
  • skipFlush – Skip blocking flush of memtable
/** * @param ephemeral If this flag is set to true, the snapshot will be cleaned up during next startup * @param skipFlush Skip blocking flush of memtable */
public Set<SSTableReader> snapshot(String snapshotName, Predicate<SSTableReader> predicate, boolean ephemeral, boolean skipFlush) { if (!skipFlush) { forceBlockingFlush(); } return snapshotWithoutFlush(snapshotName, predicate, ephemeral); } public boolean snapshotExists(String snapshotName) { return getDirectories().snapshotExists(snapshotName); } public long getSnapshotCreationTime(String snapshotName) { return getDirectories().snapshotCreationTime(snapshotName); }
Clear all the snapshots for a given column family.
Params:
  • snapshotName – the user supplied snapshot name. If left empty, all the snapshots will be cleaned.
/** * Clear all the snapshots for a given column family. * * @param snapshotName the user supplied snapshot name. If left empty, * all the snapshots will be cleaned. */
public void clearSnapshot(String snapshotName) { List<File> snapshotDirs = getDirectories().getCFDirectories(); Directories.clearSnapshot(snapshotName, snapshotDirs); }
Returns: Return a map of all snapshots to space being used The pair for a snapshot has true size and size on disk.
/** * * @return Return a map of all snapshots to space being used * The pair for a snapshot has true size and size on disk. */
public Map<String, Pair<Long,Long>> getSnapshotDetails() { return getDirectories().getSnapshotDetails(); }
Returns:the cached partition for @param key if it is already present in the cache. Not that this will not readAndCache the parition if it is not present, nor are these calls counted in cache statistics. Note that this WILL cause deserialization of a SerializingCache partition, so if all you need to know is whether a partition is present or not, use containsCachedParition instead.
/** * @return the cached partition for @param key if it is already present in the cache. * Not that this will not readAndCache the parition if it is not present, nor * are these calls counted in cache statistics. * * Note that this WILL cause deserialization of a SerializingCache partition, so if all you * need to know is whether a partition is present or not, use containsCachedParition instead. */
public CachedPartition getRawCachedPartition(DecoratedKey key) { if (!isRowCacheEnabled()) return null; IRowCacheEntry cached = CacheService.instance.rowCache.getInternal(new RowCacheKey(metadata.ksAndCFName, key)); return cached == null || cached instanceof RowCacheSentinel ? null : (CachedPartition)cached; } private void invalidateCaches() { CacheService.instance.invalidateKeyCacheForCf(metadata.ksAndCFName); CacheService.instance.invalidateRowCacheForCf(metadata.ksAndCFName); if (metadata.isCounter()) CacheService.instance.invalidateCounterCacheForCf(metadata.ksAndCFName); } public int invalidateRowCache(Collection<Bounds<Token>> boundsToInvalidate) { int invalidatedKeys = 0; for (Iterator<RowCacheKey> keyIter = CacheService.instance.rowCache.keyIterator(); keyIter.hasNext(); ) { RowCacheKey key = keyIter.next(); DecoratedKey dk = decorateKey(ByteBuffer.wrap(key.key)); if (key.ksAndCFName.equals(metadata.ksAndCFName) && Bounds.isInBounds(dk.getToken(), boundsToInvalidate)) { invalidateCachedPartition(dk); invalidatedKeys++; } } return invalidatedKeys; } public int invalidateCounterCache(Collection<Bounds<Token>> boundsToInvalidate) { int invalidatedKeys = 0; for (Iterator<CounterCacheKey> keyIter = CacheService.instance.counterCache.keyIterator(); keyIter.hasNext(); ) { CounterCacheKey key = keyIter.next(); DecoratedKey dk = decorateKey(ByteBuffer.wrap(key.partitionKey)); if (key.ksAndCFName.equals(metadata.ksAndCFName) && Bounds.isInBounds(dk.getToken(), boundsToInvalidate)) { CacheService.instance.counterCache.remove(key); invalidatedKeys++; } } return invalidatedKeys; }
Returns:true if @param key is contained in the row cache
/** * @return true if @param key is contained in the row cache */
public boolean containsCachedParition(DecoratedKey key) { return CacheService.instance.rowCache.getCapacity() != 0 && CacheService.instance.rowCache.containsKey(new RowCacheKey(metadata.ksAndCFName, key)); } public void invalidateCachedPartition(RowCacheKey key) { CacheService.instance.rowCache.remove(key); } public void invalidateCachedPartition(DecoratedKey key) { if (!isRowCacheEnabled()) return; invalidateCachedPartition(new RowCacheKey(metadata.ksAndCFName, key)); } public ClockAndCount getCachedCounter(ByteBuffer partitionKey, Clustering clustering, ColumnDefinition column, CellPath path) { if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled. return null; return CacheService.instance.counterCache.get(CounterCacheKey.create(metadata.ksAndCFName, partitionKey, clustering, column, path)); } public void putCachedCounter(ByteBuffer partitionKey, Clustering clustering, ColumnDefinition column, CellPath path, ClockAndCount clockAndCount) { if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled. return; CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.ksAndCFName, partitionKey, clustering, column, path), clockAndCount); } public void forceMajorCompaction() throws InterruptedException, ExecutionException { forceMajorCompaction(false); } public void forceMajorCompaction(boolean splitOutput) throws InterruptedException, ExecutionException { CompactionManager.instance.performMaximal(this, splitOutput); } public void forceCompactionForTokenRange(Collection<Range<Token>> tokenRanges) throws ExecutionException, InterruptedException { CompactionManager.instance.forceCompactionForTokenRange(this, tokenRanges); } public static Iterable<ColumnFamilyStore> all() { List<Iterable<ColumnFamilyStore>> stores = new ArrayList<Iterable<ColumnFamilyStore>>(Schema.instance.getKeyspaces().size()); for (Keyspace keyspace : Keyspace.all()) { stores.add(keyspace.getColumnFamilyStores()); } return Iterables.concat(stores); } public Iterable<DecoratedKey> keySamples(Range<Token> range) { try (RefViewFragment view = selectAndReference(View.selectFunction(SSTableSet.CANONICAL))) { Iterable<DecoratedKey>[] samples = new Iterable[view.sstables.size()]; int i = 0; for (SSTableReader sstable: view.sstables) { samples[i++] = sstable.getKeySamples(range); } return Iterables.concat(samples); } } public long estimatedKeysForRange(Range<Token> range) { try (RefViewFragment view = selectAndReference(View.selectFunction(SSTableSet.CANONICAL))) { long count = 0; for (SSTableReader sstable : view.sstables) count += sstable.estimatedKeysForRanges(Collections.singleton(range)); return count; } }
For testing. No effort is made to clear historical or even the current memtables, nor for thread safety. All we do is wipe the sstable containers clean, while leaving the actual data files present on disk. (This allows tests to easily call loadNewSSTables on them.)
/** * For testing. No effort is made to clear historical or even the current memtables, nor for * thread safety. All we do is wipe the sstable containers clean, while leaving the actual * data files present on disk. (This allows tests to easily call loadNewSSTables on them.) */
@VisibleForTesting public void clearUnsafe() { for (final ColumnFamilyStore cfs : concatWithIndexes()) { cfs.runWithCompactionsDisabled(new Callable<Void>() { public Void call() { cfs.data.reset(new Memtable(new AtomicReference<>(CommitLogPosition.NONE), cfs)); return null; } }, true, false); } }
Truncate deletes the entire column family's data with no expensive tombstone creation
/** * Truncate deletes the entire column family's data with no expensive tombstone creation */
public void truncateBlocking() { // We have two goals here: // - truncate should delete everything written before truncate was invoked // - but not delete anything that isn't part of the snapshot we create. // We accomplish this by first flushing manually, then snapshotting, and // recording the timestamp IN BETWEEN those actions. Any sstables created // with this timestamp or greater time, will not be marked for delete. // // Bonus complication: since we store commit log segment position in sstable metadata, // truncating those sstables means we will replay any CL segments from the // beginning if we restart before they [the CL segments] are discarded for // normal reasons post-truncate. To prevent this, we store truncation // position in the System keyspace. logger.info("Truncating {}.{}", keyspace.getName(), name); final long truncatedAt; final CommitLogPosition replayAfter; if (keyspace.getMetadata().params.durableWrites || DatabaseDescriptor.isAutoSnapshot()) { replayAfter = forceBlockingFlush(); viewManager.forceBlockingFlush(); } else { // just nuke the memtable data w/o writing to disk first viewManager.dumpMemtables(); try { replayAfter = dumpMemtable().get(); } catch (Exception e) { throw new RuntimeException(e); } } long now = System.currentTimeMillis(); // make sure none of our sstables are somehow in the future (clock drift, perhaps) for (ColumnFamilyStore cfs : concatWithIndexes()) for (SSTableReader sstable : cfs.getLiveSSTables()) now = Math.max(now, sstable.maxDataAge); truncatedAt = now; Runnable truncateRunnable = new Runnable() { public void run() { logger.debug("Discarding sstable data for truncated CF + indexes"); data.notifyTruncated(truncatedAt); if (DatabaseDescriptor.isAutoSnapshot()) snapshot(Keyspace.getTimestampedSnapshotNameWithPrefix(name, SNAPSHOT_TRUNCATE_PREFIX)); discardSSTables(truncatedAt); indexManager.truncateAllIndexesBlocking(truncatedAt); viewManager.truncateBlocking(replayAfter, truncatedAt); SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter); logger.trace("cleaning out row cache"); invalidateCaches(); } }; runWithCompactionsDisabled(Executors.callable(truncateRunnable), true, true); logger.info("Truncate of {}.{} is complete", keyspace.getName(), name); }
Drops current memtable without flushing to disk. This should only be called when truncating a column family which is not durable.
/** * Drops current memtable without flushing to disk. This should only be called when truncating a column family which is not durable. */
public Future<CommitLogPosition> dumpMemtable() { synchronized (data) { final Flush flush = new Flush(true); flushExecutor.execute(flush); postFlushExecutor.execute(flush.postFlushTask); return flush.postFlushTask; } } public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation, boolean interruptViews) { // synchronize so that concurrent invocations don't re-enable compactions partway through unexpectedly, // and so we only run one major compaction at a time synchronized (this) { logger.trace("Cancelling in-progress compactions for {}", metadata.cfName); Iterable<ColumnFamilyStore> selfWithAuxiliaryCfs = interruptViews ? Iterables.concat(concatWithIndexes(), viewManager.allViewsCfs()) : concatWithIndexes(); for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs) cfs.getCompactionStrategyManager().pause(); try { // interrupt in-progress compactions CompactionManager.instance.interruptCompactionForCFs(selfWithAuxiliaryCfs, interruptValidation); CompactionManager.instance.waitForCessation(selfWithAuxiliaryCfs); // doublecheck that we finished, instead of timing out for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs) { if (!cfs.getTracker().getCompacting().isEmpty()) { logger.warn("Unable to cancel in-progress compactions for {}. Perhaps there is an unusually large row in progress somewhere, or the system is simply overloaded.", metadata.cfName); return null; } } logger.trace("Compactions successfully cancelled"); // run our task try { return callable.call(); } catch (Exception e) { throw new RuntimeException(e); } } finally { for (ColumnFamilyStore cfs : selfWithAuxiliaryCfs) cfs.getCompactionStrategyManager().resume(); } } } public LifecycleTransaction markAllCompacting(final OperationType operationType) { Callable<LifecycleTransaction> callable = new Callable<LifecycleTransaction>() { public LifecycleTransaction call() { assert data.getCompacting().isEmpty() : data.getCompacting(); Iterable<SSTableReader> sstables = getLiveSSTables(); sstables = AbstractCompactionStrategy.filterSuspectSSTables(sstables); LifecycleTransaction modifier = data.tryModify(sstables, operationType); assert modifier != null: "something marked things compacting while compactions are disabled"; return modifier; } }; return runWithCompactionsDisabled(callable, false, false); } @Override public String toString() { return "CFS(" + "Keyspace='" + keyspace.getName() + '\'' + ", ColumnFamily='" + name + '\'' + ')'; } public void disableAutoCompaction() { // we don't use CompactionStrategy.pause since we don't want users flipping that on and off // during runWithCompactionsDisabled compactionStrategyManager.disable(); } public void enableAutoCompaction() { enableAutoCompaction(false); }
used for tests - to be able to check things after a minor compaction
Params:
  • waitForFutures – if we should block until autocompaction is done
/** * used for tests - to be able to check things after a minor compaction * @param waitForFutures if we should block until autocompaction is done */
@VisibleForTesting public void enableAutoCompaction(boolean waitForFutures) { compactionStrategyManager.enable(); List<Future<?>> futures = CompactionManager.instance.submitBackground(this); if (waitForFutures) FBUtilities.waitOnFutures(futures); } public boolean isAutoCompactionDisabled() { return !this.compactionStrategyManager.isEnabled(); } /* JMX getters and setters for the Default<T>s. - get/set minCompactionThreshold - get/set maxCompactionThreshold - get memsize - get memops - get/set memtime */ public CompactionStrategyManager getCompactionStrategyManager() { return compactionStrategyManager; } public void setCrcCheckChance(double crcCheckChance) { try { TableParams.builder().crcCheckChance(crcCheckChance).build().validate(); for (ColumnFamilyStore cfs : concatWithIndexes()) { cfs.crcCheckChance.set(crcCheckChance); for (SSTableReader sstable : cfs.getSSTables(SSTableSet.LIVE)) sstable.setCrcCheckChance(crcCheckChance); } } catch (ConfigurationException e) { throw new IllegalArgumentException(e.getMessage()); } } public Double getCrcCheckChance() { return crcCheckChance.value(); } public void setCompactionThresholds(int minThreshold, int maxThreshold) { validateCompactionThresholds(minThreshold, maxThreshold); minCompactionThreshold.set(minThreshold); maxCompactionThreshold.set(maxThreshold); CompactionManager.instance.submitBackground(this); } public int getMinimumCompactionThreshold() { return minCompactionThreshold.value(); } public void setMinimumCompactionThreshold(int minCompactionThreshold) { validateCompactionThresholds(minCompactionThreshold, maxCompactionThreshold.value()); this.minCompactionThreshold.set(minCompactionThreshold); } public int getMaximumCompactionThreshold() { return maxCompactionThreshold.value(); } public void setMaximumCompactionThreshold(int maxCompactionThreshold) { validateCompactionThresholds(minCompactionThreshold.value(), maxCompactionThreshold); this.maxCompactionThreshold.set(maxCompactionThreshold); } private void validateCompactionThresholds(int minThreshold, int maxThreshold) { if (minThreshold > maxThreshold) throw new RuntimeException(String.format("The min_compaction_threshold cannot be larger than the max_compaction_threshold. " + "Min is '%d', Max is '%d'.", minThreshold, maxThreshold)); if (maxThreshold == 0 || minThreshold == 0) throw new RuntimeException("Disabling compaction by setting min_compaction_threshold or max_compaction_threshold to 0 " + "is deprecated, set the compaction strategy option 'enabled' to 'false' instead or use the nodetool command 'disableautocompaction'."); } // End JMX get/set. public int getMeanColumns() { long sum = 0; long count = 0; for (SSTableReader sstable : getSSTables(SSTableSet.CANONICAL)) { long n = sstable.getEstimatedColumnCount().count(); sum += sstable.getEstimatedColumnCount().mean() * n; count += n; } return count > 0 ? (int) (sum / count) : 0; } public double getMeanPartitionSize() { long sum = 0; long count = 0; for (SSTableReader sstable : getSSTables(SSTableSet.CANONICAL)) { long n = sstable.getEstimatedPartitionSize().count(); sum += sstable.getEstimatedPartitionSize().mean() * n; count += n; } return count > 0 ? sum * 1.0 / count : 0; } public long estimateKeys() { long n = 0; for (SSTableReader sstable : getSSTables(SSTableSet.CANONICAL)) n += sstable.estimatedKeys(); return n; } public IPartitioner getPartitioner() { return metadata.partitioner; } public DecoratedKey decorateKey(ByteBuffer key) { return metadata.decorateKey(key); }
true if this CFS contains secondary index data
/** true if this CFS contains secondary index data */
public boolean isIndex() { return metadata.isIndex(); } public Iterable<ColumnFamilyStore> concatWithIndexes() { // we return the main CFS first, which we rely on for simplicity in switchMemtable(), for getting the // latest commit log segment position return Iterables.concat(Collections.singleton(this), indexManager.getAllIndexColumnFamilyStores()); } public List<String> getBuiltIndexes() { return indexManager.getBuiltIndexNames(); } public int getUnleveledSSTables() { return compactionStrategyManager.getUnleveledSSTables(); } public int[] getSSTableCountPerLevel() { return compactionStrategyManager.getSSTableCountPerLevel(); } public int getLevelFanoutSize() { return compactionStrategyManager.getLevelFanoutSize(); } public static class ViewFragment { public final List<SSTableReader> sstables; public final Iterable<Memtable> memtables; public ViewFragment(List<SSTableReader> sstables, Iterable<Memtable> memtables) { this.sstables = sstables; this.memtables = memtables; } } public static class RefViewFragment extends ViewFragment implements AutoCloseable { public final Refs<SSTableReader> refs; public RefViewFragment(List<SSTableReader> sstables, Iterable<Memtable> memtables, Refs<SSTableReader> refs) { super(sstables, memtables); this.refs = refs; } public void release() { refs.release(); } public void close() { refs.release(); } } public boolean isEmpty() { return data.getView().isEmpty(); } public boolean isRowCacheEnabled() { boolean retval = metadata.params.caching.cacheRows() && CacheService.instance.rowCache.getCapacity() > 0; assert(!retval || !isIndex()); return retval; } public boolean isCounterCacheEnabled() { return metadata.isCounter() && CacheService.instance.counterCache.getCapacity() > 0; } public boolean isKeyCacheEnabled() { return metadata.params.caching.cacheKeys() && CacheService.instance.keyCache.getCapacity() > 0; }
Discard all SSTables that were created before given timestamp. Caller should first ensure that comapctions have quiesced.
Params:
  • truncatedAt – The timestamp of the truncation (all SSTables before that timestamp are going be marked as compacted)
/** * Discard all SSTables that were created before given timestamp. * * Caller should first ensure that comapctions have quiesced. * * @param truncatedAt The timestamp of the truncation * (all SSTables before that timestamp are going be marked as compacted) */
public void discardSSTables(long truncatedAt) { assert data.getCompacting().isEmpty() : data.getCompacting(); List<SSTableReader> truncatedSSTables = new ArrayList<>(); for (SSTableReader sstable : getSSTables(SSTableSet.LIVE)) { if (!sstable.newSince(truncatedAt)) truncatedSSTables.add(sstable); } if (!truncatedSSTables.isEmpty()) markObsolete(truncatedSSTables, OperationType.UNKNOWN); } public double getDroppableTombstoneRatio() { double allDroppable = 0; long allColumns = 0; int localTime = (int)(System.currentTimeMillis()/1000); for (SSTableReader sstable : getSSTables(SSTableSet.CANONICAL)) { allDroppable += sstable.getDroppableTombstonesBefore(localTime - sstable.metadata.params.gcGraceSeconds); allColumns += sstable.getEstimatedColumnCount().mean() * sstable.getEstimatedColumnCount().count(); } return allColumns > 0 ? allDroppable / allColumns : 0; } public long trueSnapshotsSize() { return getDirectories().trueSnapshotsSize(); } @VisibleForTesting void resetFileIndexGenerator() { fileIndexGenerator.set(0); }
Returns a ColumnFamilyStore by cfId if it exists, null otherwise Differently from others, this method does not throw exception if the table does not exist.
/** * Returns a ColumnFamilyStore by cfId if it exists, null otherwise * Differently from others, this method does not throw exception if the table does not exist. */
public static ColumnFamilyStore getIfExists(UUID cfId) { Pair<String, String> kscf = Schema.instance.getCF(cfId); if (kscf == null) return null; Keyspace keyspace = Keyspace.open(kscf.left); if (keyspace == null) return null; return keyspace.getColumnFamilyStore(cfId); }
Returns a ColumnFamilyStore by ksname and cfname if it exists, null otherwise Differently from others, this method does not throw exception if the keyspace or table does not exist.
/** * Returns a ColumnFamilyStore by ksname and cfname if it exists, null otherwise * Differently from others, this method does not throw exception if the keyspace or table does not exist. */
public static ColumnFamilyStore getIfExists(String ksName, String cfName) { if (ksName == null || cfName == null) return null; Keyspace keyspace = Keyspace.open(ksName); if (keyspace == null) return null; UUID id = Schema.instance.getId(ksName, cfName); if (id == null) return null; return keyspace.getColumnFamilyStore(id); } public static TableMetrics metricsFor(UUID tableId) { return getIfExists(tableId).metric; } public DiskBoundaries getDiskBoundaries() { return diskBoundaryManager.getDiskBoundaries(this); } public void invalidateDiskBoundaries() { diskBoundaryManager.invalidate(); } }