/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.io.compress;

import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.util.Optional;
import java.util.zip.CRC32;

import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.io.util.*;
import org.apache.cassandra.schema.CompressionParams;

import static org.apache.cassandra.utils.Throwables.merge;

public class CompressedSequentialWriter extends SequentialWriter
{
    private final ChecksumWriter crcMetadata;

    // holds offset in the file where current chunk should be written
    // changed only by flush() method where data buffer gets compressed and stored to the file
    private long chunkOffset = 0;

    // index file writer (random I/O)
    private final CompressionMetadata.Writer metadataWriter;
    private final ICompressor compressor;

    // used to store compressed data
    private ByteBuffer compressed;

    // holds a number of already written chunks
    private int chunkCount = 0;

    private long uncompressedSize = 0, compressedSize = 0;

    private final MetadataCollector sstableMetadataCollector;

    private final ByteBuffer crcCheckBuffer = ByteBuffer.allocate(4);
    private final Optional<File> digestFile;

    
Create CompressedSequentialWriter without digest file.
Params:
  • file – File to write
  • offsetsPath – File name to write compression metadata
  • digestFile – File to write digest
  • option – Write option (buffer size and type will be set the same as compression params)
  • parameters – Compression mparameters
  • sstableMetadataCollector – Metadata collector
/** * Create CompressedSequentialWriter without digest file. * * @param file File to write * @param offsetsPath File name to write compression metadata * @param digestFile File to write digest * @param option Write option (buffer size and type will be set the same as compression params) * @param parameters Compression mparameters * @param sstableMetadataCollector Metadata collector */
public CompressedSequentialWriter(File file, String offsetsPath, File digestFile, SequentialWriterOption option, CompressionParams parameters, MetadataCollector sstableMetadataCollector) { super(file, SequentialWriterOption.newBuilder() .bufferSize(option.bufferSize()) .bufferType(option.bufferType()) .bufferSize(parameters.chunkLength()) .bufferType(parameters.getSstableCompressor().preferredBufferType()) .finishOnClose(option.finishOnClose()) .build()); this.compressor = parameters.getSstableCompressor(); this.digestFile = Optional.ofNullable(digestFile); // buffer for compression should be the same size as buffer itself compressed = compressor.preferredBufferType().allocate(compressor.initialCompressedBufferLength(buffer.capacity())); /* Index File (-CompressionInfo.db component) and it's header */ metadataWriter = CompressionMetadata.Writer.open(parameters, offsetsPath); this.sstableMetadataCollector = sstableMetadataCollector; crcMetadata = new ChecksumWriter(new DataOutputStream(Channels.newOutputStream(channel))); } @Override public long getOnDiskFilePointer() { try { return fchannel.position(); } catch (IOException e) { throw new FSReadError(e, getPath()); } }
Get a quick estimation on how many bytes have been written to disk It should for the most part be exactly the same as getOnDiskFilePointer()
/** * Get a quick estimation on how many bytes have been written to disk * * It should for the most part be exactly the same as getOnDiskFilePointer() */
@Override public long getEstimatedOnDiskBytesWritten() { return chunkOffset; } @Override public void flush() { throw new UnsupportedOperationException(); } @Override protected void flushData() { seekToChunkStart(); // why is this necessary? seems like it should always be at chunk start in normal operation try { // compressing data with buffer re-use buffer.flip(); compressed.clear(); compressor.compress(buffer, compressed); } catch (IOException e) { throw new RuntimeException("Compression exception", e); // shouldn't happen } int compressedLength = compressed.position(); uncompressedSize += buffer.position(); compressedSize += compressedLength; try { // write an offset of the newly written chunk to the index file metadataWriter.addOffset(chunkOffset); chunkCount++; // write out the compressed data compressed.flip(); channel.write(compressed); // write corresponding checksum compressed.rewind(); crcMetadata.appendDirect(compressed, true); lastFlushOffset = uncompressedSize; } catch (IOException e) { throw new FSWriteError(e, getPath()); } // next chunk should be written right after current + length of the checksum (int) chunkOffset += compressedLength + 4; if (runPostFlush != null) runPostFlush.run(); } public CompressionMetadata open(long overrideLength) { if (overrideLength <= 0) overrideLength = uncompressedSize; return metadataWriter.open(overrideLength, chunkOffset); } @Override public DataPosition mark() { if (!buffer.hasRemaining()) doFlush(0); return new CompressedFileWriterMark(chunkOffset, current(), buffer.position(), chunkCount + 1); } @Override public synchronized void resetAndTruncate(DataPosition mark) { assert mark instanceof CompressedFileWriterMark; CompressedFileWriterMark realMark = (CompressedFileWriterMark) mark; // reset position long truncateTarget = realMark.uncDataOffset; if (realMark.chunkOffset == chunkOffset) { // simply drop bytes to the right of our mark buffer.position(realMark.validBufferBytes); return; } // synchronize current buffer with disk - we don't want any data loss syncInternal(); chunkOffset = realMark.chunkOffset; // compressed chunk size (- 4 bytes reserved for checksum) int chunkSize = (int) (metadataWriter.chunkOffsetBy(realMark.nextChunkIndex) - chunkOffset - 4); if (compressed.capacity() < chunkSize) { FileUtils.clean(compressed); compressed = compressor.preferredBufferType().allocate(chunkSize); } try { compressed.clear(); compressed.limit(chunkSize); fchannel.position(chunkOffset); fchannel.read(compressed); try { // Repopulate buffer from compressed data buffer.clear(); compressed.flip(); compressor.uncompress(compressed, buffer); } catch (IOException e) { throw new CorruptBlockException(getPath(), chunkOffset, chunkSize, e); } CRC32 checksum = new CRC32(); compressed.rewind(); checksum.update(compressed); crcCheckBuffer.clear(); fchannel.read(crcCheckBuffer); crcCheckBuffer.flip(); if (crcCheckBuffer.getInt() != (int) checksum.getValue()) throw new CorruptBlockException(getPath(), chunkOffset, chunkSize); } catch (CorruptBlockException e) { throw new CorruptSSTableException(e, getPath()); } catch (EOFException e) { throw new CorruptSSTableException(new CorruptBlockException(getPath(), chunkOffset, chunkSize), getPath()); } catch (IOException e) { throw new FSReadError(e, getPath()); } // Mark as dirty so we can guarantee the newly buffered bytes won't be lost on a rebuffer buffer.position(realMark.validBufferBytes); bufferOffset = truncateTarget - buffer.position(); chunkCount = realMark.nextChunkIndex - 1; // truncate data and index file truncate(chunkOffset, bufferOffset); metadataWriter.resetAndTruncate(realMark.nextChunkIndex - 1); } private void truncate(long toFileSize, long toBufferOffset) { try { fchannel.truncate(toFileSize); lastFlushOffset = toBufferOffset; } catch (IOException e) { throw new FSWriteError(e, getPath()); } }
Seek to the offset where next compressed data chunk should be stored.
/** * Seek to the offset where next compressed data chunk should be stored. */
private void seekToChunkStart() { if (getOnDiskFilePointer() != chunkOffset) { try { fchannel.position(chunkOffset); } catch (IOException e) { throw new FSReadError(e, getPath()); } } } protected class TransactionalProxy extends SequentialWriter.TransactionalProxy { @Override protected Throwable doCommit(Throwable accumulate) { return super.doCommit(metadataWriter.commit(accumulate)); } @Override protected Throwable doAbort(Throwable accumulate) { return super.doAbort(metadataWriter.abort(accumulate)); } @Override protected void doPrepare() { syncInternal(); digestFile.ifPresent(crcMetadata::writeFullChecksum); sstableMetadataCollector.addCompressionRatio(compressedSize, uncompressedSize); metadataWriter.finalizeLength(current(), chunkCount).prepareToCommit(); } @Override protected Throwable doPreCleanup(Throwable accumulate) { accumulate = super.doPreCleanup(accumulate); if (compressed != null) { try { FileUtils.clean(compressed); } catch (Throwable t) { accumulate = merge(accumulate, t); } compressed = null; } return accumulate; } } @Override protected SequentialWriter.TransactionalProxy txnProxy() { return new TransactionalProxy(); }
Class to hold a mark to the position of the file
/** * Class to hold a mark to the position of the file */
protected static class CompressedFileWriterMark implements DataPosition { // chunk offset in the compressed file final long chunkOffset; // uncompressed data offset (real data offset) final long uncDataOffset; final int validBufferBytes; final int nextChunkIndex; public CompressedFileWriterMark(long chunkOffset, long uncDataOffset, int validBufferBytes, int nextChunkIndex) { this.chunkOffset = chunkOffset; this.uncDataOffset = uncDataOffset; this.validBufferBytes = validBufferBytes; this.nextChunkIndex = nextChunkIndex; } } }