/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.lucene.store;

import java.io.EOFException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;

import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.RamUsageEstimator;

A DataInput implementing RandomAccessInput and reading data from a list of ByteBuffers.
/** * A {@link DataInput} implementing {@link RandomAccessInput} and reading data from a * list of {@link ByteBuffer}s. */
public final class ByteBuffersDataInput extends DataInput implements Accountable, RandomAccessInput { private final ByteBuffer[] blocks; private final int blockBits; private final int blockMask; private final long size; private final long offset; private long pos;
Read data from a set of contiguous buffers. All data buffers except for the last one must have an identical remaining number of bytes in the buffer (that is a power of two). The last buffer can be of an arbitrary remaining length.
/** * Read data from a set of contiguous buffers. All data buffers except for the last one * must have an identical remaining number of bytes in the buffer (that is a power of two). The last * buffer can be of an arbitrary remaining length. */
public ByteBuffersDataInput(List<ByteBuffer> buffers) { ensureAssumptions(buffers); this.blocks = buffers.stream().map(buf -> buf.asReadOnlyBuffer()).toArray(ByteBuffer[]::new); if (blocks.length == 1) { this.blockBits = 32; this.blockMask = ~0; } else { final int blockBytes = determineBlockPage(buffers); this.blockBits = Integer.numberOfTrailingZeros(blockBytes); this.blockMask = (1 << blockBits) - 1; } this.size = Arrays.stream(blocks).mapToLong(block -> block.remaining()).sum(); // The initial "position" of this stream is shifted by the position of the first block. this.offset = blocks[0].position(); this.pos = offset; } public long size() { return size; } @Override public long ramBytesUsed() { // Return a rough estimation for allocated blocks. Note that we do not make // any special distinction for what the type of buffer is (direct vs. heap-based). return RamUsageEstimator.NUM_BYTES_OBJECT_REF * blocks.length + Arrays.stream(blocks).mapToLong(buf -> buf.capacity()).sum(); } @Override public byte readByte() throws EOFException { try { ByteBuffer block = blocks[blockIndex(pos)]; byte v = block.get(blockOffset(pos)); pos++; return v; } catch (IndexOutOfBoundsException e) { if (pos >= size()) { throw new EOFException(); } else { throw e; // Something is wrong. } } }
Reads exactly len bytes into the given buffer. The buffer must have enough remaining limit. If there are fewer than len bytes in the input, EOFException is thrown.
/** * Reads exactly {@code len} bytes into the given buffer. The buffer must have * enough remaining limit. * * If there are fewer than {@code len} bytes in the input, {@link EOFException} * is thrown. */
public void readBytes(ByteBuffer buffer, int len) throws EOFException { try { while (len > 0) { ByteBuffer block = blocks[blockIndex(pos)].duplicate(); int blockOffset = blockOffset(pos); block.position(blockOffset); int chunk = Math.min(len, block.remaining()); if (chunk == 0) { throw new EOFException(); } // Update pos early on for EOF detection on output buffer, then try to get buffer content. pos += chunk; block.limit(blockOffset + chunk); buffer.put(block); len -= chunk; } } catch (BufferUnderflowException | ArrayIndexOutOfBoundsException e) { if (pos >= size()) { throw new EOFException(); } else { throw e; // Something is wrong. } } } @Override public void readBytes(byte[] arr, int off, int len) throws EOFException { try { while (len > 0) { ByteBuffer block = blocks[blockIndex(pos)].duplicate(); block.position(blockOffset(pos)); int chunk = Math.min(len, block.remaining()); if (chunk == 0) { throw new EOFException(); } // Update pos early on for EOF detection, then try to get buffer content. pos += chunk; block.get(arr, off, chunk); len -= chunk; off += chunk; } } catch (BufferUnderflowException | ArrayIndexOutOfBoundsException e) { if (pos >= size()) { throw new EOFException(); } else { throw e; // Something is wrong. } } } @Override public byte readByte(long pos) { pos += offset; return blocks[blockIndex(pos)].get(blockOffset(pos)); } @Override public short readShort(long pos) { long absPos = offset + pos; int blockOffset = blockOffset(absPos); if (blockOffset + Short.BYTES <= blockMask) { return blocks[blockIndex(absPos)].getShort(blockOffset); } else { return (short) ((readByte(pos ) & 0xFF) << 8 | (readByte(pos + 1) & 0xFF)); } } @Override public int readInt(long pos) { long absPos = offset + pos; int blockOffset = blockOffset(absPos); if (blockOffset + Integer.BYTES <= blockMask) { return blocks[blockIndex(absPos)].getInt(blockOffset); } else { return ((readByte(pos ) ) << 24 | (readByte(pos + 1) & 0xFF) << 16 | (readByte(pos + 2) & 0xFF) << 8 | (readByte(pos + 3) & 0xFF)); } } @Override public long readLong(long pos) { long absPos = offset + pos; int blockOffset = blockOffset(absPos); if (blockOffset + Long.BYTES <= blockMask) { return blocks[blockIndex(absPos)].getLong(blockOffset); } else { return (((long) readInt(pos)) << 32) | (readInt(pos + 4) & 0xFFFFFFFFL); } } public long position() { return pos - offset; } public void seek(long position) throws EOFException { this.pos = position + offset; if (position > size()) { this.pos = size(); throw new EOFException(); } } public ByteBuffersDataInput slice(long offset, long length) { if (offset < 0 || length < 0 || offset + length > this.size) { throw new IllegalArgumentException(String.format(Locale.ROOT, "slice(offset=%s, length=%s) is out of bounds: %s", offset, length, this)); } return new ByteBuffersDataInput(sliceBufferList(Arrays.asList(this.blocks), offset, length)); } @Override public String toString() { return String.format(Locale.ROOT, "%,d bytes, block size: %,d, blocks: %,d, position: %,d%s", size(), blockSize(), blocks.length, position(), offset == 0 ? "" : String.format(Locale.ROOT, " [offset: %,d]", offset)); } private final int blockIndex(long pos) { return Math.toIntExact(pos >> blockBits); } private final int blockOffset(long pos) { return (int) pos & blockMask; } private int blockSize() { return 1 << blockBits; } private static final boolean isPowerOfTwo(int v) { return (v & (v - 1)) == 0; } private static void ensureAssumptions(List<ByteBuffer> buffers) { if (buffers.isEmpty()) { throw new IllegalArgumentException("Buffer list must not be empty."); } if (buffers.size() == 1) { // Special case of just a single buffer, conditions don't apply. } else { final int blockPage = determineBlockPage(buffers); // First buffer decides on block page length. if (!isPowerOfTwo(blockPage)) { throw new IllegalArgumentException("The first buffer must have power-of-two position() + remaining(): 0x" + Integer.toHexString(blockPage)); } // Any block from 2..last-1 should have the same page size. for (int i = 1, last = buffers.size() - 1; i < last; i++) { ByteBuffer buffer = buffers.get(i); if (buffer.position() != 0) { throw new IllegalArgumentException("All buffers except for the first one must have position() == 0: " + buffer); } if (i != last && buffer.remaining() != blockPage) { throw new IllegalArgumentException("Intermediate buffers must share an identical remaining() power-of-two block size: 0x" + Integer.toHexString(blockPage)); } } } } static int determineBlockPage(List<ByteBuffer> buffers) { ByteBuffer first = buffers.get(0); final int blockPage = Math.toIntExact((long) first.position() + first.remaining()); return blockPage; } private static List<ByteBuffer> sliceBufferList(List<ByteBuffer> buffers, long offset, long length) { ensureAssumptions(buffers); if (buffers.size() == 1) { ByteBuffer cloned = buffers.get(0).asReadOnlyBuffer(); cloned.position(Math.toIntExact(cloned.position() + offset)); cloned.limit(Math.toIntExact(cloned.position() + length)); return Arrays.asList(cloned); } else { long absStart = buffers.get(0).position() + offset; long absEnd = absStart + length; int blockBytes = ByteBuffersDataInput.determineBlockPage(buffers); int blockBits = Integer.numberOfTrailingZeros(blockBytes); long blockMask = (1L << blockBits) - 1; int endOffset = Math.toIntExact(absEnd & blockMask); ArrayList<ByteBuffer> cloned = buffers.subList(Math.toIntExact(absStart / blockBytes), Math.toIntExact(absEnd / blockBytes + (endOffset == 0 ? 0 : 1))) .stream() .map(buf -> buf.asReadOnlyBuffer()) .collect(Collectors.toCollection(ArrayList::new)); if (endOffset == 0) { cloned.add(ByteBuffer.allocate(0)); } cloned.get(0).position(Math.toIntExact(absStart & blockMask)); cloned.get(cloned.size() - 1).limit(endOffset); return cloned; } } }