/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.cassandra.io.util;

import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Arrays;

import org.slf4j.LoggerFactory;

import org.apache.cassandra.io.FSReadError;
import org.apache.cassandra.io.compress.CompressionMetadata;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.concurrent.RefCounted;
import org.apache.cassandra.utils.concurrent.SharedCloseableImpl;

import static java.util.stream.Stream.of;
import static org.apache.cassandra.utils.Throwables.perform;

public class MmappedRegions extends SharedCloseableImpl
{
    
In a perfect world, MAX_SEGMENT_SIZE would be final, but we need to test with a smaller size
/** In a perfect world, MAX_SEGMENT_SIZE would be final, but we need to test with a smaller size */
public static int MAX_SEGMENT_SIZE = Integer.MAX_VALUE;
When we need to grow the arrays, we add this number of region slots
/** When we need to grow the arrays, we add this number of region slots */
static final int REGION_ALLOC_SIZE = 15;
The original state, which is shared with the tidier and contains all the regions mapped so far. It also does the actual mapping.
/** The original state, which is shared with the tidier and * contains all the regions mapped so far. It also * does the actual mapping. */
private final State state;
A copy of the latest state. We update this each time the original state is updated and we share this with copies. If we are a copy, then this is null. Copies can only access existing regions, they cannot create new ones. This is for thread safety and because MmappedRegions is reference counted, only the original state will be cleaned-up, therefore only the original state should create new mapped regions.
/** A copy of the latest state. We update this each time the original state is * updated and we share this with copies. If we are a copy, then this * is null. Copies can only access existing regions, they cannot create * new ones. This is for thread safety and because MmappedRegions is * reference counted, only the original state will be cleaned-up, * therefore only the original state should create new mapped regions. */
private volatile State copy; private MmappedRegions(ChannelProxy channel, CompressionMetadata metadata, long length) { this(new State(channel), metadata, length); } private MmappedRegions(State state, CompressionMetadata metadata, long length) { super(new Tidier(state)); this.state = state; if (metadata != null) { assert length == 0 : "expected no length with metadata"; updateState(metadata); } else if (length > 0) { updateState(length); } this.copy = new State(state); } private MmappedRegions(MmappedRegions original) { super(original); this.state = original.copy; } public static MmappedRegions empty(ChannelProxy channel) { return new MmappedRegions(channel, null, 0); }
Params:
  • channel – file to map. the MmappedRegions instance will hold shared copy of given channel.
  • metadata –
Returns:new instance
/** * @param channel file to map. the MmappedRegions instance will hold shared copy of given channel. * @param metadata * @return new instance */
public static MmappedRegions map(ChannelProxy channel, CompressionMetadata metadata) { if (metadata == null) throw new IllegalArgumentException("metadata cannot be null"); return new MmappedRegions(channel, metadata, 0); } public static MmappedRegions map(ChannelProxy channel, long length) { if (length <= 0) throw new IllegalArgumentException("Length must be positive"); return new MmappedRegions(channel, null, length); }
Returns:a snapshot of the memory mapped regions. The snapshot can only use existing regions, it cannot create new ones.
/** * @return a snapshot of the memory mapped regions. The snapshot can * only use existing regions, it cannot create new ones. */
public MmappedRegions sharedCopy() { return new MmappedRegions(this); } private boolean isCopy() { return copy == null; } public void extend(long length) { if (length < 0) throw new IllegalArgumentException("Length must not be negative"); assert !isCopy() : "Copies cannot be extended"; if (length <= state.length) return; updateState(length); copy = new State(state); } private void updateState(long length) { state.length = length; long pos = state.getPosition(); while (pos < length) { long size = Math.min(MAX_SEGMENT_SIZE, length - pos); state.add(pos, size); pos += size; } } private void updateState(CompressionMetadata metadata) { long offset = 0; long lastSegmentOffset = 0; long segmentSize = 0; while (offset < metadata.dataLength) { CompressionMetadata.Chunk chunk = metadata.chunkFor(offset); //Reached a new mmap boundary if (segmentSize + chunk.length + 4 > MAX_SEGMENT_SIZE) { if (segmentSize > 0) { state.add(lastSegmentOffset, segmentSize); lastSegmentOffset += segmentSize; segmentSize = 0; } } segmentSize += chunk.length + 4; //checksum offset += metadata.chunkLength(); } if (segmentSize > 0) state.add(lastSegmentOffset, segmentSize); state.length = lastSegmentOffset + segmentSize; } public boolean isValid(ChannelProxy channel) { return state.isValid(channel); } public boolean isEmpty() { return state.isEmpty(); } public Region floor(long position) { assert !isCleanedUp() : "Attempted to use closed region"; return state.floor(position); } public void closeQuietly() { Throwable err = close(null); if (err != null) { JVMStabilityInspector.inspectThrowable(err); // This is not supposed to happen LoggerFactory.getLogger(getClass()).error("Error while closing mmapped regions", err); } } public static final class Region implements Rebufferer.BufferHolder { public final long offset; public final ByteBuffer buffer; public Region(long offset, ByteBuffer buffer) { this.offset = offset; this.buffer = buffer; } public ByteBuffer buffer() { return buffer.duplicate(); } public long offset() { return offset; } public long end() { return offset + buffer.capacity(); } public void release() { // only released after no readers are present } } private static final class State {
The file channel
/** The file channel */
private final ChannelProxy channel;
An array of region buffers, synchronized with offsets
/** An array of region buffers, synchronized with offsets */
private ByteBuffer[] buffers;
An array of region offsets, synchronized with buffers
/** An array of region offsets, synchronized with buffers */
private long[] offsets;
The maximum file length we have mapped
/** The maximum file length we have mapped */
private long length;
The index to the last region added
/** The index to the last region added */
private int last; private State(ChannelProxy channel) { this.channel = channel.sharedCopy(); this.buffers = new ByteBuffer[REGION_ALLOC_SIZE]; this.offsets = new long[REGION_ALLOC_SIZE]; this.length = 0; this.last = -1; } private State(State original) { this.channel = original.channel; this.buffers = original.buffers; this.offsets = original.offsets; this.length = original.length; this.last = original.last; } private boolean isEmpty() { return last < 0; } private boolean isValid(ChannelProxy channel) { return this.channel.filePath().equals(channel.filePath()); } private Region floor(long position) { assert 0 <= position && position <= length : String.format("%d > %d", position, length); int idx = Arrays.binarySearch(offsets, 0, last +1, position); assert idx != -1 : String.format("Bad position %d for regions %s, last %d in %s", position, Arrays.toString(offsets), last, channel); if (idx < 0) idx = -(idx + 2); // round down to entry at insertion point return new Region(offsets[idx], buffers[idx]); } private long getPosition() { return last < 0 ? 0 : offsets[last] + buffers[last].capacity(); } private void add(long pos, long size) { ByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, pos, size); ++last; if (last == offsets.length) { offsets = Arrays.copyOf(offsets, offsets.length + REGION_ALLOC_SIZE); buffers = Arrays.copyOf(buffers, buffers.length + REGION_ALLOC_SIZE); } offsets[last] = pos; buffers[last] = buffer; } private Throwable close(Throwable accumulate) { accumulate = channel.close(accumulate); /* * Try forcing the unmapping of segments using undocumented unsafe sun APIs. * If this fails (non Sun JVM), we'll have to wait for the GC to finalize the mapping. * If this works and a thread tries to access any segment, hell will unleash on earth. */ if (!FileUtils.isCleanerAvailable) return accumulate; return perform(accumulate, channel.filePath(), Throwables.FileOpType.READ, of(buffers) .map((buffer) -> () -> { if (buffer != null) FileUtils.clean(buffer); })); } } public static final class Tidier implements RefCounted.Tidy { final State state; Tidier(State state) { this.state = state; } public String name() { return state.channel.filePath(); } public void tidy() { try { Throwables.maybeFail(state.close(null)); } catch (Exception e) { throw new FSReadError(e, state.channel.filePath()); } } } }