/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.db.filter;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.aggregation.GroupMaker;
import org.apache.cassandra.db.aggregation.GroupingState;
import org.apache.cassandra.db.aggregation.AggregationSpecification;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.transform.BasePartitions;
import org.apache.cassandra.db.transform.BaseRows;
import org.apache.cassandra.db.transform.StoppingTransformation;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.ByteBufferUtil;

Object in charge of tracking if we have fetch enough data for a given query. The reason this is not just a simple integer is that Thrift and CQL3 count stuffs in different ways. This is what abstract those differences.
/** * Object in charge of tracking if we have fetch enough data for a given query. * * The reason this is not just a simple integer is that Thrift and CQL3 count * stuffs in different ways. This is what abstract those differences. */
public abstract class DataLimits { public static final Serializer serializer = new Serializer(); public static final int NO_LIMIT = Integer.MAX_VALUE; public static final DataLimits NONE = new CQLLimits(NO_LIMIT) { @Override public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) { return false; } @Override public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec, boolean countPartitionsWithOnlyStaticData) { return iter; } @Override public UnfilteredRowIterator filter(UnfilteredRowIterator iter, int nowInSec, boolean countPartitionsWithOnlyStaticData) { return iter; } @Override public PartitionIterator filter(PartitionIterator iter, int nowInSec, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) { return iter; } }; // We currently deal with distinct queries by querying full partitions but limiting the result at 1 row per // partition (see SelectStatement.makeFilter). So an "unbounded" distinct is still actually doing some filtering. public static final DataLimits DISTINCT_NONE = new CQLLimits(NO_LIMIT, 1, true); public enum Kind { CQL_LIMIT, CQL_PAGING_LIMIT, THRIFT_LIMIT, SUPER_COLUMN_COUNTING_LIMIT, CQL_GROUP_BY_LIMIT, CQL_GROUP_BY_PAGING_LIMIT } public static DataLimits cqlLimits(int cqlRowLimit) { return cqlRowLimit == NO_LIMIT ? NONE : new CQLLimits(cqlRowLimit); } // mixed mode partition range scans on compact storage tables without clustering columns coordinated by 2.x are // returned as one (cql) row per cell, but we need to count each partition as a single row. So we just return a // CQLLimits instance that doesn't count rows towards it's limit. See CASSANDRA-15072 public static DataLimits legacyCompactStaticCqlLimits(int cqlRowLimits) { return new CQLLimits(cqlRowLimits) { public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) { return new CQLCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness) { public Row applyToRow(Row row) { // noop: only count full partitions return row; } }; } }; } public static DataLimits cqlLimits(int cqlRowLimit, int perPartitionLimit) { return cqlRowLimit == NO_LIMIT && perPartitionLimit == NO_LIMIT ? NONE : new CQLLimits(cqlRowLimit, perPartitionLimit); } private static DataLimits cqlLimits(int cqlRowLimit, int perPartitionLimit, boolean isDistinct) { return cqlRowLimit == NO_LIMIT && perPartitionLimit == NO_LIMIT && !isDistinct ? NONE : new CQLLimits(cqlRowLimit, perPartitionLimit, isDistinct); } public static DataLimits groupByLimits(int groupLimit, int groupPerPartitionLimit, int rowLimit, AggregationSpecification groupBySpec) { return new CQLGroupByLimits(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec); } public static DataLimits distinctLimits(int cqlRowLimit) { return CQLLimits.distinct(cqlRowLimit); } public static DataLimits thriftLimits(int partitionLimit, int cellPerPartitionLimit) { return new ThriftLimits(partitionLimit, cellPerPartitionLimit); } public static DataLimits superColumnCountingLimits(int partitionLimit, int cellPerPartitionLimit) { return new SuperColumnCountingLimits(partitionLimit, cellPerPartitionLimit); } public abstract Kind kind(); public abstract boolean isUnlimited(); public abstract boolean isDistinct(); public boolean isGroupByLimit() { return false; } public boolean isExhausted(Counter counter) { return counter.counted() < count(); } public abstract DataLimits forPaging(int pageSize); public abstract DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining); public abstract DataLimits forShortReadRetry(int toFetch);
Creates a DataLimits instance to be used for paginating internally GROUP BY queries.
Params:
  • state – the GroupMaker state
Returns:a DataLimits instance to be used for paginating internally GROUP BY queries
/** * Creates a <code>DataLimits</code> instance to be used for paginating internally GROUP BY queries. * * @param state the <code>GroupMaker</code> state * @return a <code>DataLimits</code> instance to be used for paginating internally GROUP BY queries */
public DataLimits forGroupByInternalPaging(GroupingState state) { throw new UnsupportedOperationException(); } public abstract boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness);
Returns a new Counter for this limits.
Params:
  • nowInSec – the current time in second (to decide what is expired or not).
  • assumeLiveData – if true, the counter will assume that every row passed is live and won't thus check for liveness, otherwise it will. This should be true when used on a RowIterator (since it only returns live rows), false otherwise.
  • countPartitionsWithOnlyStaticData – if true the partitions with only static data should be counted as 1 valid row.
  • enforceStrictLiveness – whether the row should be purged if there is no PK liveness info, normally retrieved from CFMetaData.enforceStrictLiveness()
Returns:a new Counter for this limits.
/** * Returns a new {@code Counter} for this limits. * * @param nowInSec the current time in second (to decide what is expired or not). * @param assumeLiveData if true, the counter will assume that every row passed is live and won't * thus check for liveness, otherwise it will. This should be {@code true} when used on a * {@code RowIterator} (since it only returns live rows), false otherwise. * @param countPartitionsWithOnlyStaticData if {@code true} the partitions with only static data should be counted * as 1 valid row. * @param enforceStrictLiveness whether the row should be purged if there is no PK liveness info, * normally retrieved from {@link CFMetaData#enforceStrictLiveness()} * @return a new {@code Counter} for this limits. */
public abstract Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness);
The max number of results this limits enforces.

Note that the actual definition of "results" depends a bit: for CQL, it's always rows, but for thrift, it means cells.

Returns:the maximum number of results this limits enforces.
/** * The max number of results this limits enforces. * <p> * Note that the actual definition of "results" depends a bit: for CQL, it's always rows, but for * thrift, it means cells. * * @return the maximum number of results this limits enforces. */
public abstract int count(); public abstract int perPartitionCount();
Returns equivalent limits but where any internal state kept to track where we are of paging and/or grouping is discarded.
/** * Returns equivalent limits but where any internal state kept to track where we are of paging and/or grouping is * discarded. */
public abstract DataLimits withoutState(); public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator iter, int nowInSec, boolean countPartitionsWithOnlyStaticData) { return this.newCounter(nowInSec, false, countPartitionsWithOnlyStaticData, iter.metadata().enforceStrictLiveness()) .applyTo(iter); } public UnfilteredRowIterator filter(UnfilteredRowIterator iter, int nowInSec, boolean countPartitionsWithOnlyStaticData) { return this.newCounter(nowInSec, false, countPartitionsWithOnlyStaticData, iter.metadata().enforceStrictLiveness()) .applyTo(iter); } public PartitionIterator filter(PartitionIterator iter, int nowInSec, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) { return this.newCounter(nowInSec, true, countPartitionsWithOnlyStaticData, enforceStrictLiveness).applyTo(iter); }
Estimate the number of results (the definition of "results" will be rows for CQL queries and partitions for thrift ones) that a full scan of the provided cfs would yield.
/** * Estimate the number of results (the definition of "results" will be rows for CQL queries * and partitions for thrift ones) that a full scan of the provided cfs would yield. */
public abstract float estimateTotalResults(ColumnFamilyStore cfs); public static abstract class Counter extends StoppingTransformation<BaseRowIterator<?>> { protected final int nowInSec; protected final boolean assumeLiveData; private final boolean enforceStrictLiveness; // false means we do not propagate our stop signals onto the iterator, we only count private boolean enforceLimits = true; protected Counter(int nowInSec, boolean assumeLiveData, boolean enforceStrictLiveness) { this.nowInSec = nowInSec; this.assumeLiveData = assumeLiveData; this.enforceStrictLiveness = enforceStrictLiveness; } public Counter onlyCount() { this.enforceLimits = false; return this; } public PartitionIterator applyTo(PartitionIterator partitions) { return Transformation.apply(partitions, this); } public UnfilteredPartitionIterator applyTo(UnfilteredPartitionIterator partitions) { return Transformation.apply(partitions, this); } public UnfilteredRowIterator applyTo(UnfilteredRowIterator partition) { return (UnfilteredRowIterator) applyToPartition(partition); } public RowIterator applyTo(RowIterator partition) { return (RowIterator) applyToPartition(partition); }
The number of results counted.

Note that the definition of "results" should be the same that for count.

Returns:the number of results counted.
/** * The number of results counted. * <p> * Note that the definition of "results" should be the same that for {@link #count}. * * @return the number of results counted. */
public abstract int counted(); public abstract int countedInCurrentPartition();
The number of rows counted.
Returns:the number of rows counted.
/** * The number of rows counted. * * @return the number of rows counted. */
public abstract int rowCounted();
The number of rows counted in the current partition.
Returns:the number of rows counted in the current partition.
/** * The number of rows counted in the current partition. * * @return the number of rows counted in the current partition. */
public abstract int rowCountedInCurrentPartition(); public abstract boolean isDone(); public abstract boolean isDoneForPartition(); protected boolean isLive(Row row) { return assumeLiveData || row.hasLiveData(nowInSec, enforceStrictLiveness); } @Override protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> partition) { return partition instanceof UnfilteredRowIterator ? Transformation.apply((UnfilteredRowIterator) partition, this) : Transformation.apply((RowIterator) partition, this); } // called before we process a given partition protected abstract void applyToPartition(DecoratedKey partitionKey, Row staticRow); @Override protected void attachTo(BasePartitions partitions) { if (enforceLimits) super.attachTo(partitions); if (isDone()) stop(); } @Override protected void attachTo(BaseRows rows) { if (enforceLimits) super.attachTo(rows); applyToPartition(rows.partitionKey(), rows.staticRow()); if (isDoneForPartition()) stopInPartition(); } @Override public void onClose() { super.onClose(); } }
Limits used by CQL; this counts rows.
/** * Limits used by CQL; this counts rows. */
private static class CQLLimits extends DataLimits { protected final int rowLimit; protected final int perPartitionLimit; // Whether the query is a distinct query or not. protected final boolean isDistinct; private CQLLimits(int rowLimit) { this(rowLimit, NO_LIMIT); } private CQLLimits(int rowLimit, int perPartitionLimit) { this(rowLimit, perPartitionLimit, false); } private CQLLimits(int rowLimit, int perPartitionLimit, boolean isDistinct) { this.rowLimit = rowLimit; this.perPartitionLimit = perPartitionLimit; this.isDistinct = isDistinct; } private static CQLLimits distinct(int rowLimit) { return new CQLLimits(rowLimit, 1, true); } public Kind kind() { return Kind.CQL_LIMIT; } public boolean isUnlimited() { return rowLimit == NO_LIMIT && perPartitionLimit == NO_LIMIT; } public boolean isDistinct() { return isDistinct; } public DataLimits forPaging(int pageSize) { return new CQLLimits(pageSize, perPartitionLimit, isDistinct); } public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining) { return new CQLPagingLimits(pageSize, perPartitionLimit, isDistinct, lastReturnedKey, lastReturnedKeyRemaining); } public DataLimits forShortReadRetry(int toFetch) { return new CQLLimits(toFetch, perPartitionLimit, isDistinct); } public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) { // We want the number of row that are currently live. Getting that precise number forces // us to iterate the cached partition in general, but we can avoid that if: // - The number of rows with at least one non-expiring cell is greater than what we ask, // in which case we know we have enough live. // - The number of rows is less than requested, in which case we know we won't have enough. if (cached.rowsWithNonExpiringCells() >= rowLimit) return true; if (cached.rowCount() < rowLimit) return false; // Otherwise, we need to re-count DataLimits.Counter counter = newCounter(nowInSec, false, countPartitionsWithOnlyStaticData, enforceStrictLiveness); try (UnfilteredRowIterator cacheIter = cached.unfilteredIterator(ColumnFilter.selection(cached.columns()), Slices.ALL, false); UnfilteredRowIterator iter = counter.applyTo(cacheIter)) { // Consume the iterator until we've counted enough while (iter.hasNext()) iter.next(); return counter.isDone(); } } public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) { return new CQLCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness); } public int count() { return rowLimit; } public int perPartitionCount() { return perPartitionLimit; } public DataLimits withoutState() { return this; } public float estimateTotalResults(ColumnFamilyStore cfs) { // TODO: we should start storing stats on the number of rows (instead of the number of cells, which // is what getMeanColumns returns) float rowsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.size(); return rowsPerPartition * (cfs.estimateKeys()); } protected class CQLCounter extends Counter { protected int rowCounted; protected int rowInCurrentPartition; protected final boolean countPartitionsWithOnlyStaticData; protected boolean hasLiveStaticRow; public CQLCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) { super(nowInSec, assumeLiveData, enforceStrictLiveness); this.countPartitionsWithOnlyStaticData = countPartitionsWithOnlyStaticData; } @Override public void applyToPartition(DecoratedKey partitionKey, Row staticRow) { rowInCurrentPartition = 0; hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow); } @Override public Row applyToRow(Row row) { if (isLive(row)) incrementRowCount(); return row; } @Override public void onPartitionClose() { // Normally, we don't count static rows as from a CQL point of view, it will be merge with other // rows in the partition. However, if we only have the static row, it will be returned as one row // so count it. if (countPartitionsWithOnlyStaticData && hasLiveStaticRow && rowInCurrentPartition == 0) incrementRowCount(); super.onPartitionClose(); } protected void incrementRowCount() { if (++rowCounted >= rowLimit) stop(); if (++rowInCurrentPartition >= perPartitionLimit) stopInPartition(); } public int counted() { return rowCounted; } public int countedInCurrentPartition() { return rowInCurrentPartition; } public int rowCounted() { return rowCounted; } public int rowCountedInCurrentPartition() { return rowInCurrentPartition; } public boolean isDone() { return rowCounted >= rowLimit; } public boolean isDoneForPartition() { return isDone() || rowInCurrentPartition >= perPartitionLimit; } } @Override public String toString() { StringBuilder sb = new StringBuilder(); if (rowLimit != NO_LIMIT) { sb.append("LIMIT ").append(rowLimit); if (perPartitionLimit != NO_LIMIT) sb.append(' '); } if (perPartitionLimit != NO_LIMIT) sb.append("PER PARTITION LIMIT ").append(perPartitionLimit); return sb.toString(); } } private static class CQLPagingLimits extends CQLLimits { private final ByteBuffer lastReturnedKey; private final int lastReturnedKeyRemaining; public CQLPagingLimits(int rowLimit, int perPartitionLimit, boolean isDistinct, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining) { super(rowLimit, perPartitionLimit, isDistinct); this.lastReturnedKey = lastReturnedKey; this.lastReturnedKeyRemaining = lastReturnedKeyRemaining; } @Override public Kind kind() { return Kind.CQL_PAGING_LIMIT; } @Override public DataLimits forPaging(int pageSize) { throw new UnsupportedOperationException(); } @Override public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining) { throw new UnsupportedOperationException(); } @Override public DataLimits withoutState() { return new CQLLimits(rowLimit, perPartitionLimit, isDistinct); } @Override public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) { return new PagingAwareCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness); } private class PagingAwareCounter extends CQLCounter { private PagingAwareCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) { super(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness); } @Override public void applyToPartition(DecoratedKey partitionKey, Row staticRow) { if (partitionKey.getKey().equals(lastReturnedKey)) { rowInCurrentPartition = perPartitionLimit - lastReturnedKeyRemaining; // lastReturnedKey is the last key for which we're returned rows in the first page. // So, since we know we have returned rows, we know we have accounted for the static row // if any already, so force hasLiveStaticRow to false so we make sure to not count it // once more. hasLiveStaticRow = false; } else { super.applyToPartition(partitionKey, staticRow); } } } }
CQLLimits used for GROUP BY queries or queries with aggregates.

Internally, GROUP BY queries are always paginated by number of rows to avoid OOMExceptions. By consequence, the limits keep track of the number of rows as well as the number of groups.

A group can only be counted if the next group or the end of the data is reached.

/** * <code>CQLLimits</code> used for GROUP BY queries or queries with aggregates. * <p>Internally, GROUP BY queries are always paginated by number of rows to avoid OOMExceptions. By consequence, * the limits keep track of the number of rows as well as the number of groups.</p> * <p>A group can only be counted if the next group or the end of the data is reached.</p> */
private static class CQLGroupByLimits extends CQLLimits {
The GroupMaker state
/** * The <code>GroupMaker</code> state */
protected final GroupingState state;
The GROUP BY specification
/** * The GROUP BY specification */
protected final AggregationSpecification groupBySpec;
The limit on the number of groups
/** * The limit on the number of groups */
protected final int groupLimit;
The limit on the number of groups per partition
/** * The limit on the number of groups per partition */
protected final int groupPerPartitionLimit; public CQLGroupByLimits(int groupLimit, int groupPerPartitionLimit, int rowLimit, AggregationSpecification groupBySpec) { this(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec, GroupingState.EMPTY_STATE); } private CQLGroupByLimits(int groupLimit, int groupPerPartitionLimit, int rowLimit, AggregationSpecification groupBySpec, GroupingState state) { super(rowLimit, NO_LIMIT, false); this.groupLimit = groupLimit; this.groupPerPartitionLimit = groupPerPartitionLimit; this.groupBySpec = groupBySpec; this.state = state; } @Override public Kind kind() { return Kind.CQL_GROUP_BY_LIMIT; } @Override public boolean isGroupByLimit() { return true; } public boolean isUnlimited() { return groupLimit == NO_LIMIT && groupPerPartitionLimit == NO_LIMIT && rowLimit == NO_LIMIT; } public DataLimits forShortReadRetry(int toFetch) { return new CQLLimits(toFetch); } @Override public float estimateTotalResults(ColumnFamilyStore cfs) { // For the moment, we return the estimated number of rows as we have no good way of estimating // the number of groups that will be returned. Hopefully, we should be able to fix // that problem at some point. return super.estimateTotalResults(cfs); } @Override public DataLimits forPaging(int pageSize) { return new CQLGroupByLimits(pageSize, groupPerPartitionLimit, rowLimit, groupBySpec, state); } @Override public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining) { return new CQLGroupByPagingLimits(pageSize, groupPerPartitionLimit, rowLimit, groupBySpec, state, lastReturnedKey, lastReturnedKeyRemaining); } @Override public DataLimits forGroupByInternalPaging(GroupingState state) { return new CQLGroupByLimits(rowLimit, groupPerPartitionLimit, rowLimit, groupBySpec, state); } @Override public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) { return new GroupByAwareCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness); } @Override public int count() { return groupLimit; } @Override public int perPartitionCount() { return groupPerPartitionLimit; } @Override public DataLimits withoutState() { return state == GroupingState.EMPTY_STATE ? this : new CQLGroupByLimits(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec); } @Override public String toString() { StringBuilder sb = new StringBuilder(); if (groupLimit != NO_LIMIT) { sb.append("GROUP LIMIT ").append(groupLimit); if (groupPerPartitionLimit != NO_LIMIT || rowLimit != NO_LIMIT) sb.append(' '); } if (groupPerPartitionLimit != NO_LIMIT) { sb.append("GROUP PER PARTITION LIMIT ").append(groupPerPartitionLimit); if (rowLimit != NO_LIMIT) sb.append(' '); } if (rowLimit != NO_LIMIT) { sb.append("LIMIT ").append(rowLimit); } return sb.toString(); } @Override public boolean isExhausted(Counter counter) { return ((GroupByAwareCounter) counter).rowCounted < rowLimit && counter.counted() < groupLimit; } protected class GroupByAwareCounter extends Counter { private final GroupMaker groupMaker; protected final boolean countPartitionsWithOnlyStaticData;
The key of the partition being processed.
/** * The key of the partition being processed. */
protected DecoratedKey currentPartitionKey;
The number of rows counted so far.
/** * The number of rows counted so far. */
protected int rowCounted;
The number of rows counted so far in the current partition.
/** * The number of rows counted so far in the current partition. */
protected int rowCountedInCurrentPartition;
The number of groups counted so far. A group is counted only once it is complete (e.g the next one has been reached).
/** * The number of groups counted so far. A group is counted only once it is complete * (e.g the next one has been reached). */
protected int groupCounted;
The number of groups in the current partition.
/** * The number of groups in the current partition. */
protected int groupInCurrentPartition; protected boolean hasGroupStarted; protected boolean hasLiveStaticRow; protected boolean hasReturnedRowsFromCurrentPartition; private GroupByAwareCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) { super(nowInSec, assumeLiveData, enforceStrictLiveness); this.groupMaker = groupBySpec.newGroupMaker(state); this.countPartitionsWithOnlyStaticData = countPartitionsWithOnlyStaticData; // If the end of the partition was reached at the same time than the row limit, the last group might // not have been counted yet. Due to that we need to guess, based on the state, if the previous group // is still open. hasGroupStarted = state.hasClustering(); } @Override public void applyToPartition(DecoratedKey partitionKey, Row staticRow) { if (partitionKey.getKey().equals(state.partitionKey())) { // The only case were we could have state.partitionKey() equals to the partition key // is if some of the partition rows have been returned in the previous page but the // partition was not exhausted (as the state partition key has not been updated yet). // Since we know we have returned rows, we know we have accounted for // the static row if any already, so force hasLiveStaticRow to false so we make sure to not count it // once more. hasLiveStaticRow = false; hasReturnedRowsFromCurrentPartition = true; hasGroupStarted = true; } else { // We need to increment our count of groups if we have reached a new one and unless we had no new // content added since we closed our last group (that is, if hasGroupStarted). Note that we may get // here with hasGroupStarted == false in the following cases: // * the partition limit was reached for the previous partition // * the previous partition was containing only one static row // * the rows of the last group of the previous partition were all marked as deleted if (hasGroupStarted && groupMaker.isNewGroup(partitionKey, Clustering.STATIC_CLUSTERING)) { incrementGroupCount(); // If we detect, before starting the new partition, that we are done, we need to increase // the per partition group count of the previous partition as the next page will start from // there. if (isDone()) incrementGroupInCurrentPartitionCount(); hasGroupStarted = false; } hasReturnedRowsFromCurrentPartition = false; hasLiveStaticRow = !staticRow.isEmpty() && isLive(staticRow); } currentPartitionKey = partitionKey; // If we are done we need to preserve the groupInCurrentPartition and rowCountedInCurrentPartition // because the pager need to retrieve the count associated to the last value it has returned. if (!isDone()) { groupInCurrentPartition = 0; rowCountedInCurrentPartition = 0; } } @Override protected Row applyToStatic(Row row) { // It's possible that we're "done" if the partition we just started bumped the number of groups (in // applyToPartition() above), in which case Transformation will still call this method. In that case, we // want to ignore the static row, it should (and will) be returned with the next page/group if needs be. if (isDone()) { hasLiveStaticRow = false; // The row has not been returned return Rows.EMPTY_STATIC_ROW; } return row; } @Override public Row applyToRow(Row row) { // We want to check if the row belongs to a new group even if it has been deleted. The goal being // to minimize the chances of having to go through the same data twice if we detect on the next // non deleted row that we have reached the limit. if (groupMaker.isNewGroup(currentPartitionKey, row.clustering())) { if (hasGroupStarted) { incrementGroupCount(); incrementGroupInCurrentPartitionCount(); } hasGroupStarted = false; } // That row may have made us increment the group count, which may mean we're done for this partition, in // which case we shouldn't count this row (it won't be returned). if (isDoneForPartition()) { hasGroupStarted = false; return null; } if (isLive(row)) { hasGroupStarted = true; incrementRowCount(); hasReturnedRowsFromCurrentPartition = true; } return row; } @Override public int counted() { return groupCounted; } @Override public int countedInCurrentPartition() { return groupInCurrentPartition; } @Override public int rowCounted() { return rowCounted; } @Override public int rowCountedInCurrentPartition() { return rowCountedInCurrentPartition; } protected void incrementRowCount() { rowCountedInCurrentPartition++; if (++rowCounted >= rowLimit) stop(); } private void incrementGroupCount() { groupCounted++; if (groupCounted >= groupLimit) stop(); } private void incrementGroupInCurrentPartitionCount() { groupInCurrentPartition++; if (groupInCurrentPartition >= groupPerPartitionLimit) stopInPartition(); } @Override public boolean isDoneForPartition() { return isDone() || groupInCurrentPartition >= groupPerPartitionLimit; } @Override public boolean isDone() { return groupCounted >= groupLimit; } @Override public void onPartitionClose() { // Normally, we don't count static rows as from a CQL point of view, it will be merge with other // rows in the partition. However, if we only have the static row, it will be returned as one group // so count it. if (countPartitionsWithOnlyStaticData && hasLiveStaticRow && !hasReturnedRowsFromCurrentPartition) { incrementRowCount(); incrementGroupCount(); incrementGroupInCurrentPartitionCount(); hasGroupStarted = false; } super.onPartitionClose(); } @Override public void onClose() { // Groups are only counted when the end of the group is reached. // The end of a group is detected by 2 ways: // 1) a new group is reached // 2) the end of the data is reached // We know that the end of the data is reached if the group limit has not been reached // and the number of rows counted is smaller than the internal page size. if (hasGroupStarted && groupCounted < groupLimit && rowCounted < rowLimit) { incrementGroupCount(); incrementGroupInCurrentPartitionCount(); } super.onClose(); } } } private static class CQLGroupByPagingLimits extends CQLGroupByLimits { private final ByteBuffer lastReturnedKey; private final int lastReturnedKeyRemaining; public CQLGroupByPagingLimits(int groupLimit, int groupPerPartitionLimit, int rowLimit, AggregationSpecification groupBySpec, GroupingState state, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining) { super(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec, state); this.lastReturnedKey = lastReturnedKey; this.lastReturnedKeyRemaining = lastReturnedKeyRemaining; } @Override public Kind kind() { return Kind.CQL_GROUP_BY_PAGING_LIMIT; } @Override public DataLimits forPaging(int pageSize) { throw new UnsupportedOperationException(); } @Override public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining) { throw new UnsupportedOperationException(); } @Override public DataLimits forGroupByInternalPaging(GroupingState state) { throw new UnsupportedOperationException(); } @Override public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) { assert state == GroupingState.EMPTY_STATE || lastReturnedKey.equals(state.partitionKey()); return new PagingGroupByAwareCounter(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness); } @Override public DataLimits withoutState() { return new CQLGroupByLimits(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec); } private class PagingGroupByAwareCounter extends GroupByAwareCounter { private PagingGroupByAwareCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) { super(nowInSec, assumeLiveData, countPartitionsWithOnlyStaticData, enforceStrictLiveness); } @Override public void applyToPartition(DecoratedKey partitionKey, Row staticRow) { if (partitionKey.getKey().equals(lastReturnedKey)) { currentPartitionKey = partitionKey; groupInCurrentPartition = groupPerPartitionLimit - lastReturnedKeyRemaining; hasReturnedRowsFromCurrentPartition = true; hasLiveStaticRow = false; hasGroupStarted = state.hasClustering(); } else { super.applyToPartition(partitionKey, staticRow); } } } }
Limits used by thrift; this count partition and cells.
/** * Limits used by thrift; this count partition and cells. */
private static class ThriftLimits extends DataLimits { protected final int partitionLimit; protected final int cellPerPartitionLimit; private ThriftLimits(int partitionLimit, int cellPerPartitionLimit) { this.partitionLimit = partitionLimit; this.cellPerPartitionLimit = cellPerPartitionLimit; } public Kind kind() { return Kind.THRIFT_LIMIT; } public boolean isUnlimited() { return partitionLimit == NO_LIMIT && cellPerPartitionLimit == NO_LIMIT; } public boolean isDistinct() { return false; } public DataLimits forPaging(int pageSize) { // We don't support paging on thrift in general but do use paging under the hood for get_count. For // that case, we only care about limiting cellPerPartitionLimit (since it's paging over a single // partition). We do check that the partition limit is 1 however to make sure this is not misused // (as this wouldn't work properly for range queries). assert partitionLimit == 1; return new ThriftLimits(partitionLimit, pageSize); } public DataLimits forPaging(int pageSize, ByteBuffer lastReturnedKey, int lastReturnedKeyRemaining) { throw new UnsupportedOperationException(); } public DataLimits forShortReadRetry(int toFetch) { // Short read retries are always done for a single partition at a time, so it's ok to ignore the // partition limit for those return new ThriftLimits(1, toFetch); } public boolean hasEnoughLiveData(CachedPartition cached, int nowInSec, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) { // We want the number of cells that are currently live. Getting that precise number forces // us to iterate the cached partition in general, but we can avoid that if: // - The number of non-expiring live cells is greater than the number of cells asked (we then // know we have enough live cells). // - The number of cells cached is less than requested, in which case we know we won't have enough. if (cached.nonExpiringLiveCells() >= cellPerPartitionLimit) return true; if (cached.nonTombstoneCellCount() < cellPerPartitionLimit) return false; // Otherwise, we need to re-count DataLimits.Counter counter = newCounter(nowInSec, false, countPartitionsWithOnlyStaticData, enforceStrictLiveness); try (UnfilteredRowIterator cacheIter = cached.unfilteredIterator(ColumnFilter.selection(cached.columns()), Slices.ALL, false); UnfilteredRowIterator iter = counter.applyTo(cacheIter)) { // Consume the iterator until we've counted enough while (iter.hasNext()) iter.next(); return counter.isDone(); } } public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) { return new ThriftCounter(nowInSec, assumeLiveData, enforceStrictLiveness); } public int count() { return partitionLimit * cellPerPartitionLimit; } public int perPartitionCount() { return cellPerPartitionLimit; } public DataLimits withoutState() { return this; } public float estimateTotalResults(ColumnFamilyStore cfs) { // remember that getMeansColumns returns a number of cells: we should clean nomenclature float cellsPerPartition = ((float) cfs.getMeanColumns()) / cfs.metadata.partitionColumns().regulars.size(); return cellsPerPartition * cfs.estimateKeys(); } protected class ThriftCounter extends Counter { protected int partitionsCounted; protected int cellsCounted; protected int cellsInCurrentPartition; public ThriftCounter(int nowInSec, boolean assumeLiveData, boolean enforceStrictLiveness) { super(nowInSec, assumeLiveData, enforceStrictLiveness); } @Override public void applyToPartition(DecoratedKey partitionKey, Row staticRow) { cellsInCurrentPartition = 0; if (!staticRow.isEmpty()) applyToRow(staticRow); } @Override public Row applyToRow(Row row) { for (Cell cell : row.cells()) { if (assumeLiveData || cell.isLive(nowInSec)) { ++cellsCounted; if (++cellsInCurrentPartition >= cellPerPartitionLimit) stopInPartition(); } } return row; } @Override public void onPartitionClose() { if (++partitionsCounted >= partitionLimit) stop(); super.onPartitionClose(); } public int counted() { return cellsCounted; } public int countedInCurrentPartition() { return cellsInCurrentPartition; } public int rowCounted() { throw new UnsupportedOperationException(); } public int rowCountedInCurrentPartition() { throw new UnsupportedOperationException(); } public boolean isDone() { return partitionsCounted >= partitionLimit; } public boolean isDoneForPartition() { return isDone() || cellsInCurrentPartition >= cellPerPartitionLimit; } } @Override public String toString() { // This is not valid CQL, but that's ok since it's not used for CQL queries. return String.format("THRIFT LIMIT (partitions=%d, cells_per_partition=%d)", partitionLimit, cellPerPartitionLimit); } }
Limits used for thrift get_count when we only want to count super columns.
/** * Limits used for thrift get_count when we only want to count super columns. */
private static class SuperColumnCountingLimits extends ThriftLimits { private SuperColumnCountingLimits(int partitionLimit, int cellPerPartitionLimit) { super(partitionLimit, cellPerPartitionLimit); } public Kind kind() { return Kind.SUPER_COLUMN_COUNTING_LIMIT; } public DataLimits forPaging(int pageSize) { // We don't support paging on thrift in general but do use paging under the hood for get_count. For // that case, we only care about limiting cellPerPartitionLimit (since it's paging over a single // partition). We do check that the partition limit is 1 however to make sure this is not misused // (as this wouldn't work properly for range queries). assert partitionLimit == 1; return new SuperColumnCountingLimits(partitionLimit, pageSize); } public DataLimits forShortReadRetry(int toFetch) { // Short read retries are always done for a single partition at a time, so it's ok to ignore the // partition limit for those return new SuperColumnCountingLimits(1, toFetch); } @Override public Counter newCounter(int nowInSec, boolean assumeLiveData, boolean countPartitionsWithOnlyStaticData, boolean enforceStrictLiveness) { return new SuperColumnCountingCounter(nowInSec, assumeLiveData, enforceStrictLiveness); } protected class SuperColumnCountingCounter extends ThriftCounter { private final boolean enforceStrictLiveness; public SuperColumnCountingCounter(int nowInSec, boolean assumeLiveData, boolean enforceStrictLiveness) { super(nowInSec, assumeLiveData, enforceStrictLiveness); this.enforceStrictLiveness = enforceStrictLiveness; } @Override public Row applyToRow(Row row) { // In the internal format, a row == a super column, so that's what we want to count. if (isLive(row)) { ++cellsCounted; if (++cellsInCurrentPartition >= cellPerPartitionLimit) stopInPartition(); } return row; } } } public static class Serializer { public void serialize(DataLimits limits, DataOutputPlus out, int version, ClusteringComparator comparator) throws IOException { out.writeByte(limits.kind().ordinal()); switch (limits.kind()) { case CQL_LIMIT: case CQL_PAGING_LIMIT: CQLLimits cqlLimits = (CQLLimits)limits; out.writeUnsignedVInt(cqlLimits.rowLimit); out.writeUnsignedVInt(cqlLimits.perPartitionLimit); out.writeBoolean(cqlLimits.isDistinct); if (limits.kind() == Kind.CQL_PAGING_LIMIT) { CQLPagingLimits pagingLimits = (CQLPagingLimits)cqlLimits; ByteBufferUtil.writeWithVIntLength(pagingLimits.lastReturnedKey, out); out.writeUnsignedVInt(pagingLimits.lastReturnedKeyRemaining); } break; case CQL_GROUP_BY_LIMIT: case CQL_GROUP_BY_PAGING_LIMIT: CQLGroupByLimits groupByLimits = (CQLGroupByLimits) limits; out.writeUnsignedVInt(groupByLimits.groupLimit); out.writeUnsignedVInt(groupByLimits.groupPerPartitionLimit); out.writeUnsignedVInt(groupByLimits.rowLimit); AggregationSpecification groupBySpec = groupByLimits.groupBySpec; AggregationSpecification.serializer.serialize(groupBySpec, out, version); GroupingState.serializer.serialize(groupByLimits.state, out, version, comparator); if (limits.kind() == Kind.CQL_GROUP_BY_PAGING_LIMIT) { CQLGroupByPagingLimits pagingLimits = (CQLGroupByPagingLimits) groupByLimits; ByteBufferUtil.writeWithVIntLength(pagingLimits.lastReturnedKey, out); out.writeUnsignedVInt(pagingLimits.lastReturnedKeyRemaining); } break; case THRIFT_LIMIT: case SUPER_COLUMN_COUNTING_LIMIT: ThriftLimits thriftLimits = (ThriftLimits)limits; out.writeUnsignedVInt(thriftLimits.partitionLimit); out.writeUnsignedVInt(thriftLimits.cellPerPartitionLimit); break; } } public DataLimits deserialize(DataInputPlus in, int version, ClusteringComparator comparator) throws IOException { Kind kind = Kind.values()[in.readUnsignedByte()]; switch (kind) { case CQL_LIMIT: case CQL_PAGING_LIMIT: { int rowLimit = (int) in.readUnsignedVInt(); int perPartitionLimit = (int) in.readUnsignedVInt(); boolean isDistinct = in.readBoolean(); if (kind == Kind.CQL_LIMIT) return cqlLimits(rowLimit, perPartitionLimit, isDistinct); ByteBuffer lastKey = ByteBufferUtil.readWithVIntLength(in); int lastRemaining = (int) in.readUnsignedVInt(); return new CQLPagingLimits(rowLimit, perPartitionLimit, isDistinct, lastKey, lastRemaining); } case CQL_GROUP_BY_LIMIT: case CQL_GROUP_BY_PAGING_LIMIT: { int groupLimit = (int) in.readUnsignedVInt(); int groupPerPartitionLimit = (int) in.readUnsignedVInt(); int rowLimit = (int) in.readUnsignedVInt(); AggregationSpecification groupBySpec = AggregationSpecification.serializer.deserialize(in, version, comparator); GroupingState state = GroupingState.serializer.deserialize(in, version, comparator); if (kind == Kind.CQL_GROUP_BY_LIMIT) return new CQLGroupByLimits(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec, state); ByteBuffer lastKey = ByteBufferUtil.readWithVIntLength(in); int lastRemaining = (int) in.readUnsignedVInt(); return new CQLGroupByPagingLimits(groupLimit, groupPerPartitionLimit, rowLimit, groupBySpec, state, lastKey, lastRemaining); } case THRIFT_LIMIT: case SUPER_COLUMN_COUNTING_LIMIT: int partitionLimit = (int) in.readUnsignedVInt(); int cellPerPartitionLimit = (int) in.readUnsignedVInt(); return kind == Kind.THRIFT_LIMIT ? new ThriftLimits(partitionLimit, cellPerPartitionLimit) : new SuperColumnCountingLimits(partitionLimit, cellPerPartitionLimit); } throw new AssertionError(); } public long serializedSize(DataLimits limits, int version, ClusteringComparator comparator) { long size = TypeSizes.sizeof((byte) limits.kind().ordinal()); switch (limits.kind()) { case CQL_LIMIT: case CQL_PAGING_LIMIT: CQLLimits cqlLimits = (CQLLimits) limits; size += TypeSizes.sizeofUnsignedVInt(cqlLimits.rowLimit); size += TypeSizes.sizeofUnsignedVInt(cqlLimits.perPartitionLimit); size += TypeSizes.sizeof(cqlLimits.isDistinct); if (limits.kind() == Kind.CQL_PAGING_LIMIT) { CQLPagingLimits pagingLimits = (CQLPagingLimits) cqlLimits; size += ByteBufferUtil.serializedSizeWithVIntLength(pagingLimits.lastReturnedKey); size += TypeSizes.sizeofUnsignedVInt(pagingLimits.lastReturnedKeyRemaining); } break; case CQL_GROUP_BY_LIMIT: case CQL_GROUP_BY_PAGING_LIMIT: CQLGroupByLimits groupByLimits = (CQLGroupByLimits) limits; size += TypeSizes.sizeofUnsignedVInt(groupByLimits.groupLimit); size += TypeSizes.sizeofUnsignedVInt(groupByLimits.groupPerPartitionLimit); size += TypeSizes.sizeofUnsignedVInt(groupByLimits.rowLimit); AggregationSpecification groupBySpec = groupByLimits.groupBySpec; size += AggregationSpecification.serializer.serializedSize(groupBySpec, version); size += GroupingState.serializer.serializedSize(groupByLimits.state, version, comparator); if (limits.kind() == Kind.CQL_GROUP_BY_PAGING_LIMIT) { CQLGroupByPagingLimits pagingLimits = (CQLGroupByPagingLimits) groupByLimits; size += ByteBufferUtil.serializedSizeWithVIntLength(pagingLimits.lastReturnedKey); size += TypeSizes.sizeofUnsignedVInt(pagingLimits.lastReturnedKeyRemaining); } break; case THRIFT_LIMIT: case SUPER_COLUMN_COUNTING_LIMIT: ThriftLimits thriftLimits = (ThriftLimits) limits; size += TypeSizes.sizeofUnsignedVInt(thriftLimits.partitionLimit); size += TypeSizes.sizeofUnsignedVInt(thriftLimits.cellPerPartitionLimit); break; default: throw new AssertionError(); } return size; } } }