/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.db.columniterator;

import java.io.IOException;
import java.util.NoSuchElementException;

import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.io.util.FileHandle;

A Cell Iterator over SSTable
/** * A Cell Iterator over SSTable */
public class SSTableIterator extends AbstractSSTableIterator {
The index of the slice being processed.
/** * The index of the slice being processed. */
private int slice; public SSTableIterator(SSTableReader sstable, FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, Slices slices, ColumnFilter columns, boolean isForThrift, FileHandle ifile) { super(sstable, file, key, indexEntry, slices, columns, isForThrift, ifile); } protected Reader createReaderInternal(RowIndexEntry indexEntry, FileDataInput file, boolean shouldCloseFile) { return indexEntry.isIndexed() ? new ForwardIndexedReader(indexEntry, file, shouldCloseFile) : new ForwardReader(file, shouldCloseFile); } protected int nextSliceIndex() { int next = slice; slice++; return next; } protected boolean hasMoreSlices() { return slice < slices.size(); } public boolean isReverseOrder() { return false; } private class ForwardReader extends Reader { // The start of the current slice. This will be null as soon as we know we've passed that bound. protected ClusteringBound start; // The end of the current slice. Will never be null. protected ClusteringBound end = ClusteringBound.TOP; protected Unfiltered next; // the next element to return: this is computed by hasNextInternal(). protected boolean sliceDone; // set to true once we know we have no more result for the slice. This is in particular // used by the indexed reader when we know we can't have results based on the index. private ForwardReader(FileDataInput file, boolean shouldCloseFile) { super(file, shouldCloseFile); } public void setForSlice(Slice slice) throws IOException { start = slice.start() == ClusteringBound.BOTTOM ? null : slice.start(); end = slice.end(); sliceDone = false; next = null; } // Skip all data that comes before the currently set slice. // Return what should be returned at the end of this, or null if nothing should. private Unfiltered handlePreSliceData() throws IOException { assert deserializer != null; // Note that the following comparison is not strict. The reason is that the only cases // where it can be == is if the "next" is a RT start marker (either a '[' of a ')[' boundary), // and if we had a strict inequality and an open RT marker before this, we would issue // the open marker first, and then return then next later, which would send in the // stream both '[' (or '(') and then ')[' for the same clustering value, which is wrong. // By using a non-strict inequality, we avoid that problem (if we do get ')[' for the same // clustering value than the slice, we'll simply record it in 'openMarker'). while (deserializer.hasNext() && deserializer.compareNextTo(start) <= 0) { if (deserializer.nextIsRow()) deserializer.skipNext(); else updateOpenMarker((RangeTombstoneMarker)deserializer.readNext()); } ClusteringBound sliceStart = start; start = null; // We've reached the beginning of our queried slice. If we have an open marker // we should return that first. if (openMarker != null) return new RangeTombstoneBoundMarker(sliceStart, openMarker); return null; } // Compute the next element to return, assuming we're in the middle to the slice // and the next element is either in the slice, or just after it. Returns null // if we're done with the slice. protected Unfiltered computeNext() throws IOException { assert deserializer != null; while (true) { // We use a same reasoning as in handlePreSliceData regarding the strictness of the inequality below. // We want to exclude deserialized unfiltered equal to end, because 1) we won't miss any rows since those // woudn't be equal to a slice bound and 2) a end bound can be equal to a start bound // (EXCL_END(x) == INCL_START(x) for instance) and in that case we don't want to return start bound because // it's fundamentally excluded. And if the bound is a end (for a range tombstone), it means it's exactly // our slice end, but in that case we will properly close the range tombstone anyway as part of our "close // an open marker" code in hasNextInterna if (!deserializer.hasNext() || deserializer.compareNextTo(end) >= 0) return null; Unfiltered next = deserializer.readNext(); // We may get empty row for the same reason expressed on UnfilteredSerializer.deserializeOne. if (next.isEmpty()) continue; if (next.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) updateOpenMarker((RangeTombstoneMarker) next); return next; } } protected boolean hasNextInternal() throws IOException { if (next != null) return true; if (sliceDone) return false; if (start != null) { Unfiltered unfiltered = handlePreSliceData(); if (unfiltered != null) { next = unfiltered; return true; } } next = computeNext(); if (next != null) return true; // for current slice, no data read from deserialization sliceDone = true; // If we have an open marker, we should not close it, there could be more slices if (openMarker != null) { next = new RangeTombstoneBoundMarker(end, openMarker); return true; } return false; } protected Unfiltered nextInternal() throws IOException { if (!hasNextInternal()) throw new NoSuchElementException(); Unfiltered toReturn = next; next = null; return toReturn; } } private class ForwardIndexedReader extends ForwardReader { private final IndexState indexState; private int lastBlockIdx; // the last index block that has data for the current query private ForwardIndexedReader(RowIndexEntry indexEntry, FileDataInput file, boolean shouldCloseFile) { super(file, shouldCloseFile); this.indexState = new IndexState(this, sstable.metadata.comparator, indexEntry, false, ifile); this.lastBlockIdx = indexState.blocksCount(); // if we never call setForSlice, that's where we want to stop } @Override public void close() throws IOException { super.close(); this.indexState.close(); } @Override public void setForSlice(Slice slice) throws IOException { super.setForSlice(slice); // if our previous slicing already got us the biggest row in the sstable, we're done if (indexState.isDone()) { sliceDone = true; return; } // Find the first index block we'll need to read for the slice. int startIdx = indexState.findBlockIndex(slice.start(), indexState.currentBlockIdx()); if (startIdx >= indexState.blocksCount()) { sliceDone = true; return; } // Find the last index block we'll need to read for the slice. lastBlockIdx = indexState.findBlockIndex(slice.end(), startIdx); // If the slice end is before the very first block, we have nothing for that slice if (lastBlockIdx < 0) { assert startIdx < 0; sliceDone = true; return; } // If we start before the very first block, just read from the first one. if (startIdx < 0) startIdx = 0; // If that's the last block we were reading, we're already where we want to be. Otherwise, // seek to that first block if (startIdx != indexState.currentBlockIdx()) indexState.setToBlock(startIdx); // The index search is based on the last name of the index blocks, so at that point we have that: // 1) indexes[currentIdx - 1].lastName < slice.start <= indexes[currentIdx].lastName // 2) indexes[lastBlockIdx - 1].lastName < slice.end <= indexes[lastBlockIdx].lastName // so if currentIdx == lastBlockIdx and slice.end < indexes[currentIdx].firstName, we're guaranteed that the // whole slice is between the previous block end and this block start, and thus has no corresponding // data. One exception is if the previous block ends with an openMarker as it will cover our slice // and we need to return it (we also don't skip the slice for the old format because we didn't have the openMarker // info in that case and can't rely on this optimization). if (indexState.currentBlockIdx() == lastBlockIdx && metadata().comparator.compare(slice.end(), indexState.currentIndex().firstName) < 0 && openMarker == null && sstable.descriptor.version.storeRows()) { sliceDone = true; } } @Override protected Unfiltered computeNext() throws IOException { while (true) { // Our previous read might have made us cross an index block boundary. If so, update our informations. // If we read from the beginning of the partition, this is also what will initialize the index state. indexState.updateBlock(); // Return the next unfiltered unless we've reached the end, or we're beyond our slice // end (note that unless we're on the last block for the slice, there is no point // in checking the slice end). if (indexState.isDone() || indexState.currentBlockIdx() > lastBlockIdx || !deserializer.hasNext() || (indexState.currentBlockIdx() == lastBlockIdx && deserializer.compareNextTo(end) >= 0)) return null; Unfiltered next = deserializer.readNext(); // We may get empty row for the same reason expressed on UnfilteredSerializer.deserializeOne. if (next.isEmpty()) continue; if (next.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER) updateOpenMarker((RangeTombstoneMarker) next); return next; } } } }