/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.db.commitlog;

import java.io.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.CRC32;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.UnknownColumnFamilyException;
import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadErrorReason;
import org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadException;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.SerializationHelper;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.io.util.RebufferingInputStream;
import org.apache.cassandra.utils.JVMStabilityInspector;

import static org.apache.cassandra.utils.FBUtilities.updateChecksumInt;

public class CommitLogReader
{
    private static final Logger logger = LoggerFactory.getLogger(CommitLogReader.class);

    private static final int LEGACY_END_OF_SEGMENT_MARKER = 0;

    @VisibleForTesting
    public static final int ALL_MUTATIONS = -1;
    private final CRC32 checksum;
    private final Map<UUID, AtomicInteger> invalidMutations;

    private byte[] buffer;

    public CommitLogReader()
    {
        checksum = new CRC32();
        invalidMutations = new HashMap<>();
        buffer = new byte[4096];
    }

    public Set<Map.Entry<UUID, AtomicInteger>> getInvalidMutations()
    {
        return invalidMutations.entrySet();
    }

    
Reads all passed in files with no minimum, no start, and no mutation limit.
/** * Reads all passed in files with no minimum, no start, and no mutation limit. */
public void readAllFiles(CommitLogReadHandler handler, File[] files) throws IOException { readAllFiles(handler, files, CommitLogPosition.NONE); } private static boolean shouldSkip(File file) throws IOException, ConfigurationException { CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName()); if (desc.version < CommitLogDescriptor.VERSION_21) { return false; } try(RandomAccessReader reader = RandomAccessReader.open(file)) { CommitLogDescriptor.readHeader(reader, DatabaseDescriptor.getEncryptionContext()); int end = reader.readInt(); long filecrc = reader.readInt() & 0xffffffffL; return end == 0 && filecrc == 0; } } private static List<File> filterCommitLogFiles(File[] toFilter) { List<File> filtered = new ArrayList<>(toFilter.length); for (File file: toFilter) { try { if (shouldSkip(file)) { logger.info("Skipping playback of empty log: {}", file.getName()); } else { filtered.add(file); } } catch (Exception e) { // let recover deal with it filtered.add(file); } } return filtered; }
Reads all passed in files with minPosition, no start, and no mutation limit.
/** * Reads all passed in files with minPosition, no start, and no mutation limit. */
public void readAllFiles(CommitLogReadHandler handler, File[] files, CommitLogPosition minPosition) throws IOException { List<File> filteredLogs = filterCommitLogFiles(files); int i = 0; for (File file: filteredLogs) { i++; readCommitLogSegment(handler, file, minPosition, ALL_MUTATIONS, i == filteredLogs.size()); } }
Reads passed in file fully
/** * Reads passed in file fully */
public void readCommitLogSegment(CommitLogReadHandler handler, File file, boolean tolerateTruncation) throws IOException { readCommitLogSegment(handler, file, CommitLogPosition.NONE, ALL_MUTATIONS, tolerateTruncation); }
Reads passed in file fully, up to mutationLimit count
/** * Reads passed in file fully, up to mutationLimit count */
@VisibleForTesting public void readCommitLogSegment(CommitLogReadHandler handler, File file, int mutationLimit, boolean tolerateTruncation) throws IOException { readCommitLogSegment(handler, file, CommitLogPosition.NONE, mutationLimit, tolerateTruncation); }
Reads mutations from file, handing them off to handler
Params:
  • handler – Handler that will take action based on deserialized Mutations
  • file – CommitLogSegment file to read
  • minPosition – Optional minimum CommitLogPosition - all segments with id > or matching w/greater position will be read
  • mutationLimit – Optional limit on # of mutations to replay. Local ALL_MUTATIONS serves as marker to play all.
  • tolerateTruncation – Whether or not we should allow truncation of this file or throw if EOF found
Throws:
/** * Reads mutations from file, handing them off to handler * @param handler Handler that will take action based on deserialized Mutations * @param file CommitLogSegment file to read * @param minPosition Optional minimum CommitLogPosition - all segments with id > or matching w/greater position will be read * @param mutationLimit Optional limit on # of mutations to replay. Local ALL_MUTATIONS serves as marker to play all. * @param tolerateTruncation Whether or not we should allow truncation of this file or throw if EOF found * * @throws IOException */
public void readCommitLogSegment(CommitLogReadHandler handler, File file, CommitLogPosition minPosition, int mutationLimit, boolean tolerateTruncation) throws IOException { // just transform from the file name (no reading of headers) to determine version CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName()); try(RandomAccessReader reader = RandomAccessReader.open(file)) { if (desc.version < CommitLogDescriptor.VERSION_21) { if (!shouldSkipSegmentId(file, desc, minPosition)) { if (minPosition.segmentId == desc.id) reader.seek(minPosition.position); ReadStatusTracker statusTracker = new ReadStatusTracker(mutationLimit, tolerateTruncation); statusTracker.errorContext = desc.fileName(); readSection(handler, reader, minPosition, (int) reader.length(), statusTracker, desc); } return; } final long segmentIdFromFilename = desc.id; try { // The following call can either throw or legitimately return null. For either case, we need to check // desc outside this block and set it to null in the exception case. desc = CommitLogDescriptor.readHeader(reader, DatabaseDescriptor.getEncryptionContext()); } catch (Exception e) { desc = null; } if (desc == null) { // don't care about whether or not the handler thinks we can continue. We can't w/out descriptor. // whether or not we continue with startup will depend on whether this is the last segment handler.handleUnrecoverableError(new CommitLogReadException( String.format("Could not read commit log descriptor in file %s", file), CommitLogReadErrorReason.UNRECOVERABLE_DESCRIPTOR_ERROR, tolerateTruncation)); return; } if (segmentIdFromFilename != desc.id) { if (handler.shouldSkipSegmentOnError(new CommitLogReadException(String.format( "Segment id mismatch (filename %d, descriptor %d) in file %s", segmentIdFromFilename, desc.id, file), CommitLogReadErrorReason.RECOVERABLE_DESCRIPTOR_ERROR, false))) { return; } } if (shouldSkipSegmentId(file, desc, minPosition)) return; CommitLogSegmentReader segmentReader; try { segmentReader = new CommitLogSegmentReader(handler, desc, reader, tolerateTruncation); } catch(Exception e) { handler.handleUnrecoverableError(new CommitLogReadException( String.format("Unable to create segment reader for commit log file: %s", e), CommitLogReadErrorReason.UNRECOVERABLE_UNKNOWN_ERROR, tolerateTruncation)); return; } try { ReadStatusTracker statusTracker = new ReadStatusTracker(mutationLimit, tolerateTruncation); for (CommitLogSegmentReader.SyncSegment syncSegment : segmentReader) { // Only tolerate truncation if we allow in both global and segment statusTracker.tolerateErrorsInSection = tolerateTruncation & syncSegment.toleratesErrorsInSection; // Skip segments that are completely behind the desired minPosition if (desc.id == minPosition.segmentId && syncSegment.endPosition < minPosition.position) continue; statusTracker.errorContext = String.format("Next section at %d in %s", syncSegment.fileStartPosition, desc.fileName()); readSection(handler, syncSegment.input, minPosition, syncSegment.endPosition, statusTracker, desc); if (!statusTracker.shouldContinue()) break; } } // Unfortunately AbstractIterator cannot throw a checked exception, so we check to see if a RuntimeException // is wrapping an IOException. catch (RuntimeException re) { if (re.getCause() instanceof IOException) throw (IOException) re.getCause(); throw re; } logger.debug("Finished reading {}", file); } }
Any segment with id >= minPosition.segmentId is a candidate for read.
/** * Any segment with id >= minPosition.segmentId is a candidate for read. */
private boolean shouldSkipSegmentId(File file, CommitLogDescriptor desc, CommitLogPosition minPosition) { logger.debug("Reading {} (CL version {}, messaging version {}, compression {})", file.getPath(), desc.version, desc.getMessagingVersion(), desc.compression); if (minPosition.segmentId > desc.id) { logger.trace("Skipping read of fully-flushed {}", file); return true; } return false; }
Reads a section of a file containing mutations
Params:
  • handler – Handler that will take action based on deserialized Mutations
  • reader – FileDataInput / logical buffer containing commitlog mutations
  • minPosition – CommitLogPosition indicating when we should start actively replaying mutations
  • end – logical numeric end of the segment being read
  • statusTracker – ReadStatusTracker with current state of mutation count, error state, etc
  • desc – Descriptor for CommitLog serialization
/** * Reads a section of a file containing mutations * * @param handler Handler that will take action based on deserialized Mutations * @param reader FileDataInput / logical buffer containing commitlog mutations * @param minPosition CommitLogPosition indicating when we should start actively replaying mutations * @param end logical numeric end of the segment being read * @param statusTracker ReadStatusTracker with current state of mutation count, error state, etc * @param desc Descriptor for CommitLog serialization */
private void readSection(CommitLogReadHandler handler, FileDataInput reader, CommitLogPosition minPosition, int end, ReadStatusTracker statusTracker, CommitLogDescriptor desc) throws IOException { // seek rather than deserializing mutation-by-mutation to reach the desired minPosition in this SyncSegment if (desc.id == minPosition.segmentId && reader.getFilePointer() < minPosition.position) reader.seek(minPosition.position); while (statusTracker.shouldContinue() && reader.getFilePointer() < end && !reader.isEOF()) { long mutationStart = reader.getFilePointer(); if (logger.isTraceEnabled()) logger.trace("Reading mutation at {}", mutationStart); long claimedCRC32; int serializedSize; try { // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end // of a segment, which happens naturally due to the 0 padding of the empty segment on creation. // However, it's possible with 2.1 era commitlogs that the last mutation ended less than 4 bytes // from the end of the file, which means that we'll be unable to read an a full int and instead // read an EOF here if(end - reader.getFilePointer() < 4) { logger.trace("Not enough bytes left for another mutation in this CommitLog segment, continuing"); statusTracker.requestTermination(); return; } // any of the reads may hit EOF serializedSize = reader.readInt(); if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER) { logger.trace("Encountered end of segment marker at {}", reader.getFilePointer()); statusTracker.requestTermination(); return; } // Mutation must be at LEAST 10 bytes: // 3 for a non-empty Keyspace // 3 for a Key (including the 2-byte length from writeUTF/writeWithShortLength) // 4 bytes for column count. // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128 if (serializedSize < 10) { if (handler.shouldSkipSegmentOnError(new CommitLogReadException( String.format("Invalid mutation size %d at %d in %s", serializedSize, mutationStart, statusTracker.errorContext), CommitLogReadErrorReason.MUTATION_ERROR, statusTracker.tolerateErrorsInSection))) { statusTracker.requestTermination(); } return; } long claimedSizeChecksum = CommitLogFormat.calculateClaimedChecksum(reader, desc.version); checksum.reset(); CommitLogFormat.updateChecksum(checksum, serializedSize, desc.version); if (checksum.getValue() != claimedSizeChecksum) { if (handler.shouldSkipSegmentOnError(new CommitLogReadException( String.format("Mutation size checksum failure at %d in %s", mutationStart, statusTracker.errorContext), CommitLogReadErrorReason.MUTATION_ERROR, statusTracker.tolerateErrorsInSection))) { statusTracker.requestTermination(); } return; } if (serializedSize > buffer.length) buffer = new byte[(int) (1.2 * serializedSize)]; reader.readFully(buffer, 0, serializedSize); claimedCRC32 = CommitLogFormat.calculateClaimedCRC32(reader, desc.version); } catch (EOFException eof) { if (handler.shouldSkipSegmentOnError(new CommitLogReadException( String.format("Unexpected end of segment at %d in %s", mutationStart, statusTracker.errorContext), CommitLogReadErrorReason.EOF, statusTracker.tolerateErrorsInSection))) { statusTracker.requestTermination(); } return; } checksum.update(buffer, 0, serializedSize); if (claimedCRC32 != checksum.getValue()) { if (handler.shouldSkipSegmentOnError(new CommitLogReadException( String.format("Mutation checksum failure at %d in %s", mutationStart, statusTracker.errorContext), CommitLogReadErrorReason.MUTATION_ERROR, statusTracker.tolerateErrorsInSection))) { statusTracker.requestTermination(); } continue; } long mutationPosition = reader.getFilePointer(); readMutation(handler, buffer, serializedSize, minPosition, (int)mutationPosition, desc); // Only count this as a processed mutation if it is after our min as we suppress reading of mutations that // are before this mark. if (mutationPosition >= minPosition.position) statusTracker.addProcessedMutation(); } }
Deserializes and passes a Mutation to the ICommitLogReadHandler requested
Params:
  • handler – Handler that will take action based on deserialized Mutations
  • inputBuffer – raw byte array w/Mutation data
  • size – deserialized size of mutation
  • minPosition – We need to suppress replay of mutations that are before the required minPosition
  • entryLocation – filePointer offset of mutation within CommitLogSegment
  • desc – CommitLogDescriptor being worked on
/** * Deserializes and passes a Mutation to the ICommitLogReadHandler requested * * @param handler Handler that will take action based on deserialized Mutations * @param inputBuffer raw byte array w/Mutation data * @param size deserialized size of mutation * @param minPosition We need to suppress replay of mutations that are before the required minPosition * @param entryLocation filePointer offset of mutation within CommitLogSegment * @param desc CommitLogDescriptor being worked on */
@VisibleForTesting protected void readMutation(CommitLogReadHandler handler, byte[] inputBuffer, int size, CommitLogPosition minPosition, final int entryLocation, final CommitLogDescriptor desc) throws IOException { // For now, we need to go through the motions of deserializing the mutation to determine its size and move // the file pointer forward accordingly, even if we're behind the requested minPosition within this SyncSegment. boolean shouldReplay = entryLocation > minPosition.position; final Mutation mutation; try (RebufferingInputStream bufIn = new DataInputBuffer(inputBuffer, 0, size)) { mutation = Mutation.serializer.deserialize(bufIn, desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL); // doublecheck that what we read is still] valid for the current schema for (PartitionUpdate upd : mutation.getPartitionUpdates()) upd.validate(); } catch (UnknownColumnFamilyException ex) { if (ex.cfId == null) return; AtomicInteger i = invalidMutations.get(ex.cfId); if (i == null) { i = new AtomicInteger(1); invalidMutations.put(ex.cfId, i); } else i.incrementAndGet(); return; } catch (Throwable t) { JVMStabilityInspector.inspectThrowable(t); File f = File.createTempFile("mutation", "dat"); try (DataOutputStream out = new DataOutputStream(new FileOutputStream(f))) { out.write(inputBuffer, 0, size); } // Checksum passed so this error can't be permissible. handler.handleUnrecoverableError(new CommitLogReadException( String.format( "Unexpected error deserializing mutation; saved to %s. " + "This may be caused by replaying a mutation against a table with the same name but incompatible schema. " + "Exception follows: %s", f.getAbsolutePath(), t), CommitLogReadErrorReason.MUTATION_ERROR, false)); return; } if (logger.isTraceEnabled()) logger.trace("Read mutation for {}.{}: {}", mutation.getKeyspaceName(), mutation.key(), "{" + StringUtils.join(mutation.getPartitionUpdates().iterator(), ", ") + "}"); if (shouldReplay) handler.handleMutation(mutation, size, entryLocation, desc); }
Helper methods to deal with changing formats of internals of the CommitLog without polluting deserialization code.
/** * Helper methods to deal with changing formats of internals of the CommitLog without polluting deserialization code. */
private static class CommitLogFormat { public static long calculateClaimedChecksum(FileDataInput input, int commitLogVersion) throws IOException { switch (commitLogVersion) { case CommitLogDescriptor.VERSION_12: case CommitLogDescriptor.VERSION_20: return input.readLong(); // Changed format in 2.1 default: return input.readInt() & 0xffffffffL; } } public static void updateChecksum(CRC32 checksum, int serializedSize, int commitLogVersion) { switch (commitLogVersion) { case CommitLogDescriptor.VERSION_12: checksum.update(serializedSize); break; // Changed format in 2.0 default: updateChecksumInt(checksum, serializedSize); break; } } public static long calculateClaimedCRC32(FileDataInput input, int commitLogVersion) throws IOException { switch (commitLogVersion) { case CommitLogDescriptor.VERSION_12: case CommitLogDescriptor.VERSION_20: return input.readLong(); // Changed format in 2.1 default: return input.readInt() & 0xffffffffL; } } } private static class ReadStatusTracker { private int mutationsLeft; public String errorContext = ""; public boolean tolerateErrorsInSection; private boolean error; public ReadStatusTracker(int mutationLimit, boolean tolerateErrorsInSection) { this.mutationsLeft = mutationLimit; this.tolerateErrorsInSection = tolerateErrorsInSection; } public void addProcessedMutation() { if (mutationsLeft == ALL_MUTATIONS) return; --mutationsLeft; } public boolean shouldContinue() { return !error && (mutationsLeft != 0 || mutationsLeft == ALL_MUTATIONS); } public void requestTermination() { error = true; } } }