/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.db.commitlog;

import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import net.nicoulaj.compilecommand.annotations.DontInline;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.*;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.concurrent.WaitQueue;

import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation;

Performs eager-creation of commit log segments in a background thread. All the public methods are thread safe.
/** * Performs eager-creation of commit log segments in a background thread. All the * public methods are thread safe. */
public abstract class AbstractCommitLogSegmentManager { static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogSegmentManager.class);
Segment that is ready to be used. The management thread fills this and blocks until consumed. A single management thread produces this, and consumers are already synchronizing to make sure other work is performed atomically with consuming this. Volatile to make sure writes by the management thread become visible (ordered/lazySet would suffice). Consumers (advanceAllocatingFrom and discardAvailableSegment) must synchronize on 'this'.
/** * Segment that is ready to be used. The management thread fills this and blocks until consumed. * * A single management thread produces this, and consumers are already synchronizing to make sure other work is * performed atomically with consuming this. Volatile to make sure writes by the management thread become * visible (ordered/lazySet would suffice). Consumers (advanceAllocatingFrom and discardAvailableSegment) must * synchronize on 'this'. */
private volatile CommitLogSegment availableSegment = null; private final WaitQueue segmentPrepared = new WaitQueue();
Active segments, containing unflushed data. The tail of this queue is the one we allocate writes to
/** Active segments, containing unflushed data. The tail of this queue is the one we allocate writes to */
private final ConcurrentLinkedQueue<CommitLogSegment> activeSegments = new ConcurrentLinkedQueue<>();
The segment we are currently allocating commit log records to. Written by advanceAllocatingFrom which synchronizes on 'this'. Volatile to ensure reads get current value.
/** * The segment we are currently allocating commit log records to. * * Written by advanceAllocatingFrom which synchronizes on 'this'. Volatile to ensure reads get current value. */
private volatile CommitLogSegment allocatingFrom = null; final String storageDirectory;
Tracks commitlog size, in multiples of the segment size. We need to do this so we can "promise" size adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic can see the effect of recycling segments immediately (even though they're really happening asynchronously on the manager thread, which will take a ms or two).
/** * Tracks commitlog size, in multiples of the segment size. We need to do this so we can "promise" size * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic * can see the effect of recycling segments immediately (even though they're really happening asynchronously * on the manager thread, which will take a ms or two). */
private final AtomicLong size = new AtomicLong(); private Thread managerThread; protected final CommitLog commitLog; private volatile boolean shutdown; private final BooleanSupplier managerThreadWaitCondition = () -> (availableSegment == null && !atSegmentBufferLimit()) || shutdown; private final WaitQueue managerThreadWaitQueue = new WaitQueue(); private static final SimpleCachedBufferPool bufferPool = new SimpleCachedBufferPool(DatabaseDescriptor.getCommitLogMaxCompressionBuffersInPool(), DatabaseDescriptor.getCommitLogSegmentSize()); AbstractCommitLogSegmentManager(final CommitLog commitLog, String storageDirectory) { this.commitLog = commitLog; this.storageDirectory = storageDirectory; } void start() { // The run loop for the manager thread Runnable runnable = new WrappedRunnable() { public void runMayThrow() throws Exception { while (!shutdown) { try { assert availableSegment == null; logger.trace("No segments in reserve; creating a fresh one"); availableSegment = createSegment(); if (shutdown) { // If shutdown() started and finished during segment creation, we are now left with a // segment that no one will consume. Discard it. discardAvailableSegment(); return; } segmentPrepared.signalAll(); Thread.yield(); if (availableSegment == null && !atSegmentBufferLimit()) // Writing threads need another segment now. continue; // Writing threads are not waiting for new segments, we can spend time on other tasks. // flush old Cfs if we're full maybeFlushToReclaim(); } catch (Throwable t) { JVMStabilityInspector.inspectThrowable(t); if (!CommitLog.handleCommitError("Failed managing commit log segments", t)) return; // sleep some arbitrary period to avoid spamming CL Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); // If we offered a segment, wait for it to be taken before reentering the loop. // There could be a new segment in next not offered, but only on failure to discard it while // shutting down-- nothing more can or needs to be done in that case. } WaitQueue.waitOnCondition(managerThreadWaitCondition, managerThreadWaitQueue); } } }; shutdown = false; managerThread = NamedThreadFactory.createThread(runnable, "COMMIT-LOG-ALLOCATOR"); managerThread.start(); // for simplicity, ensure the first segment is allocated before continuing advanceAllocatingFrom(null); } private boolean atSegmentBufferLimit() { return CommitLogSegment.usesBufferPool(commitLog) && bufferPool.atLimit(); } private void maybeFlushToReclaim() { long unused = unusedCapacity(); if (unused < 0) { long flushingSize = 0; List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(); for (CommitLogSegment segment : activeSegments) { if (segment == allocatingFrom) break; flushingSize += segment.onDiskSize(); segmentsToRecycle.add(segment); if (flushingSize + unused >= 0) break; } flushDataFrom(segmentsToRecycle, false); } }
Allocate a segment within this CLSM. Should either succeed or throw.
/** * Allocate a segment within this CLSM. Should either succeed or throw. */
public abstract Allocation allocate(Mutation mutation, int size);
The recovery and replay process replays mutations into memtables and flushes them to disk. Individual CLSM decide what to do with those segments on disk after they've been replayed.
/** * The recovery and replay process replays mutations into memtables and flushes them to disk. Individual CLSM * decide what to do with those segments on disk after they've been replayed. */
abstract void handleReplayedSegment(final File file);
Hook to allow segment managers to track state surrounding creation of new segments. Onl perform as task submit to segment manager so it's performed on segment management thread.
/** * Hook to allow segment managers to track state surrounding creation of new segments. Onl perform as task submit * to segment manager so it's performed on segment management thread. */
abstract CommitLogSegment createSegment();
Indicates that a segment file has been flushed and is no longer needed. Only perform as task submit to segment manager so it's performend on segment management thread, or perform while segment management thread is shutdown during testing resets.
Params:
  • segment – segment to be discarded
  • delete – whether or not the segment is safe to be deleted.
/** * Indicates that a segment file has been flushed and is no longer needed. Only perform as task submit to segment * manager so it's performend on segment management thread, or perform while segment management thread is shutdown * during testing resets. * * @param segment segment to be discarded * @param delete whether or not the segment is safe to be deleted. */
abstract void discard(CommitLogSegment segment, boolean delete);
Advances the allocatingFrom pointer to the next prepared segment, but only if it is currently the segment provided. WARNING: Assumes segment management thread always succeeds in allocating a new segment or kills the JVM.
/** * Advances the allocatingFrom pointer to the next prepared segment, but only if it is currently the segment provided. * * WARNING: Assumes segment management thread always succeeds in allocating a new segment or kills the JVM. */
@DontInline void advanceAllocatingFrom(CommitLogSegment old) { while (true) { synchronized (this) { // do this in a critical section so we can maintain the order of segment construction when moving to allocatingFrom/activeSegments if (allocatingFrom != old) return; // If a segment is ready, take it now, otherwise wait for the management thread to construct it. if (availableSegment != null) { // Success! Change allocatingFrom and activeSegments (which must be kept in order) before leaving // the critical section. activeSegments.add(allocatingFrom = availableSegment); availableSegment = null; break; } } awaitAvailableSegment(old); } // Signal the management thread to prepare a new segment. wakeManager(); if (old != null) { // Now we can run the user defined command just after switching to the new commit log. // (Do this here instead of in the recycle call so we can get a head start on the archive.) commitLog.archiver.maybeArchive(old); // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it old.discardUnusedTail(); } // request that the CL be synced out-of-band, as we've finished a segment commitLog.requestExtraSync(); } void awaitAvailableSegment(CommitLogSegment currentAllocatingFrom) { do { WaitQueue.Signal prepared = segmentPrepared.register(commitLog.metrics.waitingOnSegmentAllocation.time()); if (availableSegment == null && allocatingFrom == currentAllocatingFrom) prepared.awaitUninterruptibly(); else prepared.cancel(); } while (availableSegment == null && allocatingFrom == currentAllocatingFrom); }
Switch to a new segment, regardless of how much is left in the current one. Flushes any dirty CFs for this segment and any older segments, and then discards the segments
/** * Switch to a new segment, regardless of how much is left in the current one. * * Flushes any dirty CFs for this segment and any older segments, and then discards the segments */
void forceRecycleAll(Iterable<UUID> droppedCfs) { List<CommitLogSegment> segmentsToRecycle = new ArrayList<>(activeSegments); CommitLogSegment last = segmentsToRecycle.get(segmentsToRecycle.size() - 1); advanceAllocatingFrom(last); // wait for the commit log modifications last.waitForModifications(); // make sure the writes have materialized inside of the memtables by waiting for all outstanding writes // to complete Keyspace.writeOrder.awaitNewBarrier(); // flush and wait for all CFs that are dirty in segments up-to and including 'last' Future<?> future = flushDataFrom(segmentsToRecycle, true); try { future.get(); for (CommitLogSegment segment : activeSegments) for (UUID cfId : droppedCfs) segment.markClean(cfId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition()); // now recycle segments that are unused, as we may not have triggered a discardCompletedSegments() // if the previous active segment was the only one to recycle (since an active segment isn't // necessarily dirty, and we only call dCS after a flush). for (CommitLogSegment segment : activeSegments) { if (segment.isUnused()) archiveAndDiscard(segment); } CommitLogSegment first; if ((first = activeSegments.peek()) != null && first.id <= last.id) logger.error("Failed to force-recycle all segments; at least one segment is still in use with dirty CFs."); } catch (Throwable t) { // for now just log the error logger.error("Failed waiting for a forced recycle of in-use commit log segments", t); } }
Indicates that a segment is no longer in use and that it should be discarded.
Params:
  • segment – segment that is no longer in use
/** * Indicates that a segment is no longer in use and that it should be discarded. * * @param segment segment that is no longer in use */
void archiveAndDiscard(final CommitLogSegment segment) { boolean archiveSuccess = commitLog.archiver.maybeWaitForArchiving(segment.getName()); if (!activeSegments.remove(segment)) return; // already discarded // if archiving (command) was not successful then leave the file alone. don't delete or recycle. logger.debug("Segment {} is no longer active and will be deleted {}", segment, archiveSuccess ? "now" : "by the archive script"); discard(segment, archiveSuccess); }
Adjust the tracked on-disk size. Called by individual segments to reflect writes, allocations and discards.
Params:
  • addedSize –
/** * Adjust the tracked on-disk size. Called by individual segments to reflect writes, allocations and discards. * @param addedSize */
void addSize(long addedSize) { size.addAndGet(addedSize); }
Returns:the space (in bytes) used by all segment files.
/** * @return the space (in bytes) used by all segment files. */
public long onDiskSize() { return size.get(); } private long unusedCapacity() { long total = DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024; long currentSize = size.get(); logger.trace("Total active commitlog segment space used is {} out of {}", currentSize, total); return total - currentSize; }
Force a flush on all CFs that are still dirty in @param segments.
Returns:a Future that will finish when all the flushes are complete.
/** * Force a flush on all CFs that are still dirty in @param segments. * * @return a Future that will finish when all the flushes are complete. */
private Future<?> flushDataFrom(List<CommitLogSegment> segments, boolean force) { if (segments.isEmpty()) return Futures.immediateFuture(null); final CommitLogPosition maxCommitLogPosition = segments.get(segments.size() - 1).getCurrentCommitLogPosition(); // a map of CfId -> forceFlush() to ensure we only queue one flush per cf final Map<UUID, ListenableFuture<?>> flushes = new LinkedHashMap<>(); for (CommitLogSegment segment : segments) { for (UUID dirtyCFId : segment.getDirtyCFIDs()) { Pair<String,String> pair = Schema.instance.getCF(dirtyCFId); if (pair == null) { // even though we remove the schema entry before a final flush when dropping a CF, // it's still possible for a writer to race and finish his append after the flush. logger.trace("Marking clean CF {} that doesn't exist anymore", dirtyCFId); segment.markClean(dirtyCFId, CommitLogPosition.NONE, segment.getCurrentCommitLogPosition()); } else if (!flushes.containsKey(dirtyCFId)) { String keyspace = pair.left; final ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId); // can safely call forceFlush here as we will only ever block (briefly) for other attempts to flush, // no deadlock possibility since switchLock removal flushes.put(dirtyCFId, force ? cfs.forceFlush() : cfs.forceFlush(maxCommitLogPosition)); } } } return Futures.allAsList(flushes.values()); }
Stops CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS. Only call this after the AbstractCommitLogService is shut down.
/** * Stops CL, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS. * Only call this after the AbstractCommitLogService is shut down. */
public void stopUnsafe(boolean deleteSegments) { logger.debug("CLSM closing and clearing existing commit log segments..."); shutdown(); try { awaitTermination(); } catch (InterruptedException e) { throw new RuntimeException(e); } for (CommitLogSegment segment : activeSegments) closeAndDeleteSegmentUnsafe(segment, deleteSegments); activeSegments.clear(); size.set(0L); logger.trace("CLSM done with closing and clearing existing commit log segments."); }
To be used by tests only. Not safe if mutation slots are being allocated concurrently.
/** * To be used by tests only. Not safe if mutation slots are being allocated concurrently. */
void awaitManagementTasksCompletion() { if (availableSegment == null && !atSegmentBufferLimit()) { awaitAvailableSegment(allocatingFrom); } }
Explicitly for use only during resets in unit testing.
/** * Explicitly for use only during resets in unit testing. */
private void closeAndDeleteSegmentUnsafe(CommitLogSegment segment, boolean delete) { try { discard(segment, delete); } catch (AssertionError ignored) { // segment file does not exist } }
Initiates the shutdown process for the management thread.
/** * Initiates the shutdown process for the management thread. */
public void shutdown() { assert !shutdown; shutdown = true; // Release the management thread and delete prepared segment. // Do not block as another thread may claim the segment (this can happen during unit test initialization). discardAvailableSegment(); wakeManager(); } private void discardAvailableSegment() { CommitLogSegment next = null; synchronized (this) { next = availableSegment; availableSegment = null; } if (next != null) next.discard(true); }
Returns when the management thread terminates.
/** * Returns when the management thread terminates. */
public void awaitTermination() throws InterruptedException { managerThread.join(); managerThread = null; for (CommitLogSegment segment : activeSegments) segment.close(); bufferPool.shutdown(); }
Returns:a read-only collection of the active commit log segments
/** * @return a read-only collection of the active commit log segments */
@VisibleForTesting public Collection<CommitLogSegment> getActiveSegments() { return Collections.unmodifiableCollection(activeSegments); }
Returns:the current CommitLogPosition of the active segment we're allocating from
/** * @return the current CommitLogPosition of the active segment we're allocating from */
CommitLogPosition getCurrentPosition() { return allocatingFrom.getCurrentCommitLogPosition(); }
Requests commit log files sync themselves, if needed. This may or may not involve flushing to disk.
Params:
  • flush – Request that the sync operation flush the file to disk.
/** * Requests commit log files sync themselves, if needed. This may or may not involve flushing to disk. * * @param flush Request that the sync operation flush the file to disk. */
public void sync(boolean flush) throws IOException { CommitLogSegment current = allocatingFrom; for (CommitLogSegment segment : getActiveSegments()) { // Do not sync segments that became active after sync started. if (segment.id > current.id) return; segment.sync(flush); } }
Used by compressed and encrypted segments to share a buffer pool across the CLSM.
/** * Used by compressed and encrypted segments to share a buffer pool across the CLSM. */
SimpleCachedBufferPool getBufferPool() { return bufferPool; } void wakeManager() { managerThreadWaitQueue.signalAll(); }
Called by commit log segments when a buffer is freed to wake the management thread, which may be waiting for a buffer to become available.
/** * Called by commit log segments when a buffer is freed to wake the management thread, which may be waiting for * a buffer to become available. */
void notifyBufferFreed() { wakeManager(); }
Read-only access to current segment for subclasses.
/** Read-only access to current segment for subclasses. */
CommitLogSegment allocatingFrom() { return allocatingFrom; } }