/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.cassandra.db.compaction;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.lifecycle.SSTableSet;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.CompactionParams;
import org.apache.cassandra.utils.Pair;

import static com.google.common.collect.Iterables.filter;

public class TimeWindowCompactionStrategy extends AbstractCompactionStrategy
{
    private static final Logger logger = LoggerFactory.getLogger(TimeWindowCompactionStrategy.class);

    private final TimeWindowCompactionStrategyOptions options;
    protected volatile int estimatedRemainingTasks;
    private final Set<SSTableReader> sstables = new HashSet<>();
    private long lastExpiredCheck;
    private long highestWindowSeen;

    public TimeWindowCompactionStrategy(ColumnFamilyStore cfs, Map<String, String> options)
    {
        super(cfs, options);
        this.estimatedRemainingTasks = 0;
        this.options = new TimeWindowCompactionStrategyOptions(options);
        if (!options.containsKey(AbstractCompactionStrategy.TOMBSTONE_COMPACTION_INTERVAL_OPTION) && !options.containsKey(AbstractCompactionStrategy.TOMBSTONE_THRESHOLD_OPTION))
        {
            disableTombstoneCompactions = true;
            logger.debug("Disabling tombstone compactions for TWCS");
        }
        else
            logger.debug("Enabling tombstone compactions for TWCS");
    }

    @Override
    @SuppressWarnings("resource") // transaction is closed by AbstractCompactionTask::execute
    public AbstractCompactionTask getNextBackgroundTask(int gcBefore)
    {
        List<SSTableReader> previousCandidate = null;
        while (true)
        {
            List<SSTableReader> latestBucket = getNextBackgroundSSTables(gcBefore);

            if (latestBucket.isEmpty())
                return null;

            // Already tried acquiring references without success. It means there is a race with
            // the tracker but candidate SSTables were not yet replaced in the compaction strategy manager
            if (latestBucket.equals(previousCandidate))
            {
                logger.warn("Could not acquire references for compacting SSTables {} which is not a problem per se," +
                            "unless it happens frequently, in which case it must be reported. Will retry later.",
                            latestBucket);
                return null;
            }

            LifecycleTransaction modifier = cfs.getTracker().tryModify(latestBucket, OperationType.COMPACTION);
            if (modifier != null)
                return new TimeWindowCompactionTask(cfs, modifier, gcBefore, options.ignoreOverlaps);
            previousCandidate = latestBucket;
        }
    }

    
Params:
  • gcBefore –
Returns:
/** * * @param gcBefore * @return */
private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore) { if (Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE))) return Collections.emptyList(); Set<SSTableReader> uncompacting = ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(), sstables::contains)); // Find fully expired SSTables. Those will be included no matter what. Set<SSTableReader> expired = Collections.emptySet(); if (System.currentTimeMillis() - lastExpiredCheck > options.expiredSSTableCheckFrequency) { logger.debug("TWCS expired check sufficiently far in the past, checking for fully expired SSTables"); expired = CompactionController.getFullyExpiredSSTables(cfs, uncompacting, options.ignoreOverlaps ? Collections.emptySet() : cfs.getOverlappingLiveSSTables(uncompacting), gcBefore, options.ignoreOverlaps); lastExpiredCheck = System.currentTimeMillis(); } else { logger.debug("TWCS skipping check for fully expired SSTables"); } Set<SSTableReader> candidates = Sets.newHashSet(filterSuspectSSTables(uncompacting)); List<SSTableReader> compactionCandidates = new ArrayList<>(getNextNonExpiredSSTables(Sets.difference(candidates, expired), gcBefore)); if (!expired.isEmpty()) { logger.debug("Including expired sstables: {}", expired); compactionCandidates.addAll(expired); } return compactionCandidates; } private List<SSTableReader> getNextNonExpiredSSTables(Iterable<SSTableReader> nonExpiringSSTables, final int gcBefore) { List<SSTableReader> mostInteresting = getCompactionCandidates(nonExpiringSSTables); if (mostInteresting != null) { return mostInteresting; } // if there is no sstable to compact in standard way, try compacting single sstable whose droppable tombstone // ratio is greater than threshold. List<SSTableReader> sstablesWithTombstones = new ArrayList<>(); for (SSTableReader sstable : nonExpiringSSTables) { if (worthDroppingTombstones(sstable, gcBefore)) sstablesWithTombstones.add(sstable); } if (sstablesWithTombstones.isEmpty()) return Collections.emptyList(); return Collections.singletonList(Collections.min(sstablesWithTombstones, SSTableReader.sizeComparator)); } private List<SSTableReader> getCompactionCandidates(Iterable<SSTableReader> candidateSSTables) { Pair<HashMultimap<Long, SSTableReader>, Long> buckets = getBuckets(candidateSSTables, options.sstableWindowUnit, options.sstableWindowSize, options.timestampResolution); // Update the highest window seen, if necessary if(buckets.right > this.highestWindowSeen) this.highestWindowSeen = buckets.right; updateEstimatedCompactionsByTasks(buckets.left); List<SSTableReader> mostInteresting = newestBucket(buckets.left, cfs.getMinimumCompactionThreshold(), cfs.getMaximumCompactionThreshold(), options.stcsOptions, this.highestWindowSeen); if (!mostInteresting.isEmpty()) return mostInteresting; return null; } @Override public synchronized void addSSTable(SSTableReader sstable) { sstables.add(sstable); } @Override public synchronized void removeSSTable(SSTableReader sstable) { sstables.remove(sstable); }
Find the lowest and highest timestamps in a given timestamp/unit pair Returns milliseconds, caller should adjust accordingly
/** * Find the lowest and highest timestamps in a given timestamp/unit pair * Returns milliseconds, caller should adjust accordingly */
public static Pair<Long,Long> getWindowBoundsInMillis(TimeUnit windowTimeUnit, int windowTimeSize, long timestampInMillis) { long lowerTimestamp; long upperTimestamp; long timestampInSeconds = TimeUnit.SECONDS.convert(timestampInMillis, TimeUnit.MILLISECONDS); switch(windowTimeUnit) { case MINUTES: lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (60L * windowTimeSize)); upperTimestamp = (lowerTimestamp + (60L * (windowTimeSize - 1L))) + 59L; break; case HOURS: lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (3600L * windowTimeSize)); upperTimestamp = (lowerTimestamp + (3600L * (windowTimeSize - 1L))) + 3599L; break; case DAYS: default: lowerTimestamp = timestampInSeconds - ((timestampInSeconds) % (86400L * windowTimeSize)); upperTimestamp = (lowerTimestamp + (86400L * (windowTimeSize - 1L))) + 86399L; break; } return Pair.create(TimeUnit.MILLISECONDS.convert(lowerTimestamp, TimeUnit.SECONDS), TimeUnit.MILLISECONDS.convert(upperTimestamp, TimeUnit.SECONDS)); }
Group files with similar max timestamp into buckets.
Params:
  • files – pairs consisting of a file and its min timestamp
  • sstableWindowUnit –
  • sstableWindowSize –
  • timestampResolution –
Returns:A pair, where the left element is the bucket representation (map of timestamp to sstablereader), and the right is the highest timestamp seen
/** * Group files with similar max timestamp into buckets. * * @param files pairs consisting of a file and its min timestamp * @param sstableWindowUnit * @param sstableWindowSize * @param timestampResolution * @return A pair, where the left element is the bucket representation (map of timestamp to sstablereader), and the right is the highest timestamp seen */
@VisibleForTesting static Pair<HashMultimap<Long, SSTableReader>, Long> getBuckets(Iterable<SSTableReader> files, TimeUnit sstableWindowUnit, int sstableWindowSize, TimeUnit timestampResolution) { HashMultimap<Long, SSTableReader> buckets = HashMultimap.create(); long maxTimestamp = 0; // Create hash map to represent buckets // For each sstable, add sstable to the time bucket // Where the bucket is the file's max timestamp rounded to the nearest window bucket for (SSTableReader f : files) { assert TimeWindowCompactionStrategyOptions.validTimestampTimeUnits.contains(timestampResolution); long tStamp = TimeUnit.MILLISECONDS.convert(f.getMaxTimestamp(), timestampResolution); Pair<Long,Long> bounds = getWindowBoundsInMillis(sstableWindowUnit, sstableWindowSize, tStamp); buckets.put(bounds.left, f); if (bounds.left > maxTimestamp) maxTimestamp = bounds.left; } logger.trace("buckets {}, max timestamp {}", buckets, maxTimestamp); return Pair.create(buckets, maxTimestamp); } private void updateEstimatedCompactionsByTasks(HashMultimap<Long, SSTableReader> tasks) { int n = 0; long now = this.highestWindowSeen; for(Long key : tasks.keySet()) { // For current window, make sure it's compactable if (key.compareTo(now) >= 0 && tasks.get(key).size() >= cfs.getMinimumCompactionThreshold()) n++; else if (key.compareTo(now) < 0 && tasks.get(key).size() >= 2) n++; } this.estimatedRemainingTasks = n; }
Params:
  • buckets – list of buckets, sorted from newest to oldest, from which to return the newest bucket within thresholds.
  • minThreshold – minimum number of sstables in a bucket to qualify.
  • maxThreshold – maximum number of sstables to compact at once (the returned bucket will be trimmed down to this).
Returns:a bucket (list) of sstables to compact.
/** * @param buckets list of buckets, sorted from newest to oldest, from which to return the newest bucket within thresholds. * @param minThreshold minimum number of sstables in a bucket to qualify. * @param maxThreshold maximum number of sstables to compact at once (the returned bucket will be trimmed down to this). * @return a bucket (list) of sstables to compact. */
@VisibleForTesting static List<SSTableReader> newestBucket(HashMultimap<Long, SSTableReader> buckets, int minThreshold, int maxThreshold, SizeTieredCompactionStrategyOptions stcsOptions, long now) { // If the current bucket has at least minThreshold SSTables, choose that one. // For any other bucket, at least 2 SSTables is enough. // In any case, limit to maxThreshold SSTables. TreeSet<Long> allKeys = new TreeSet<>(buckets.keySet()); Iterator<Long> it = allKeys.descendingIterator(); while(it.hasNext()) { Long key = it.next(); Set<SSTableReader> bucket = buckets.get(key); logger.trace("Key {}, now {}", key, now); if (bucket.size() >= minThreshold && key >= now) { // If we're in the newest bucket, we'll use STCS to prioritize sstables List<Pair<SSTableReader,Long>> pairs = SizeTieredCompactionStrategy.createSSTableAndLengthPairs(bucket); List<List<SSTableReader>> stcsBuckets = SizeTieredCompactionStrategy.getBuckets(pairs, stcsOptions.bucketHigh, stcsOptions.bucketLow, stcsOptions.minSSTableSize); logger.debug("Using STCS compaction for first window of bucket: data files {} , options {}", pairs, stcsOptions); List<SSTableReader> stcsInterestingBucket = SizeTieredCompactionStrategy.mostInterestingBucket(stcsBuckets, minThreshold, maxThreshold); // If the tables in the current bucket aren't eligible in the STCS strategy, we'll skip it and look for other buckets if (!stcsInterestingBucket.isEmpty()) return stcsInterestingBucket; } else if (bucket.size() >= 2 && key < now) { logger.debug("bucket size {} >= 2 and not in current bucket, compacting what's here: {}", bucket.size(), bucket); return trimToThreshold(bucket, maxThreshold); } else { logger.trace("No compaction necessary for bucket size {} , key {}, now {}", bucket.size(), key, now); } } return Collections.<SSTableReader>emptyList(); }
Params:
  • bucket – set of sstables
  • maxThreshold – maximum number of sstables in a single compaction task.
Returns:A bucket trimmed to the maxThreshold newest sstables.
/** * @param bucket set of sstables * @param maxThreshold maximum number of sstables in a single compaction task. * @return A bucket trimmed to the maxThreshold newest sstables. */
@VisibleForTesting static List<SSTableReader> trimToThreshold(Set<SSTableReader> bucket, int maxThreshold) { List<SSTableReader> ssTableReaders = new ArrayList<>(bucket); // Trim the largest sstables off the end to meet the maxThreshold Collections.sort(ssTableReaders, SSTableReader.sizeComparator); return ImmutableList.copyOf(Iterables.limit(ssTableReaders, maxThreshold)); } @Override @SuppressWarnings("resource") // transaction is closed by AbstractCompactionTask::execute public synchronized Collection<AbstractCompactionTask> getMaximalTask(int gcBefore, boolean splitOutput) { Iterable<SSTableReader> filteredSSTables = filterSuspectSSTables(sstables); if (Iterables.isEmpty(filteredSSTables)) return null; LifecycleTransaction txn = cfs.getTracker().tryModify(filteredSSTables, OperationType.COMPACTION); if (txn == null) return null; return Collections.singleton(new TimeWindowCompactionTask(cfs, txn, gcBefore, options.ignoreOverlaps)); } @Override @SuppressWarnings("resource") // transaction is closed by AbstractCompactionTask::execute public synchronized AbstractCompactionTask getUserDefinedTask(Collection<SSTableReader> sstables, int gcBefore) { assert !sstables.isEmpty(); // checked for by CM.submitUserDefined LifecycleTransaction modifier = cfs.getTracker().tryModify(sstables, OperationType.COMPACTION); if (modifier == null) { logger.debug("Unable to mark {} for compaction; probably a background compaction got to it first. You can disable background compactions temporarily if this is a problem", sstables); return null; } return new TimeWindowCompactionTask(cfs, modifier, gcBefore, options.ignoreOverlaps).setUserDefined(true); } public int getEstimatedRemainingTasks() { return this.estimatedRemainingTasks; } public long getMaxSSTableBytes() { return Long.MAX_VALUE; } public static Map<String, String> validateOptions(Map<String, String> options) throws ConfigurationException { Map<String, String> uncheckedOptions = AbstractCompactionStrategy.validateOptions(options); uncheckedOptions = TimeWindowCompactionStrategyOptions.validateOptions(options, uncheckedOptions); uncheckedOptions.remove(CompactionParams.Option.MIN_THRESHOLD.toString()); uncheckedOptions.remove(CompactionParams.Option.MAX_THRESHOLD.toString()); return uncheckedOptions; } public String toString() { return String.format("TimeWindowCompactionStrategy[%s/%s]", cfs.getMinimumCompactionThreshold(), cfs.getMaximumCompactionThreshold()); } }