/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.service.pager;

import java.nio.ByteBuffer;
import java.util.NoSuchElementException;

import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.aggregation.GroupingState;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.service.ClientState;

QueryPager that takes care of fetching the pages for aggregation queries.

For aggregation/group by queries, the user page size is in number of groups. But each group could be composed of very many rows so to avoid running into OOMs, this pager will page internal queries into sub-pages. So each call to fetchPage may (transparently) yield multiple internal queries (sub-pages).

/** * {@code QueryPager} that takes care of fetching the pages for aggregation queries. * <p> * For aggregation/group by queries, the user page size is in number of groups. But each group could be composed of very * many rows so to avoid running into OOMs, this pager will page internal queries into sub-pages. So each call to * {@link fetchPage} may (transparently) yield multiple internal queries (sub-pages). */
public final class AggregationQueryPager implements QueryPager { private final DataLimits limits; // The sub-pager, used to retrieve the next sub-page. private QueryPager subPager; public AggregationQueryPager(QueryPager subPager, DataLimits limits) { this.subPager = subPager; this.limits = limits; } @Override public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) { if (limits.isGroupByLimit()) return new GroupByPartitionIterator(pageSize, consistency, clientState, queryStartNanoTime); return new AggregationPartitionIterator(pageSize, consistency, clientState, queryStartNanoTime); } @Override public ReadExecutionController executionController() { return subPager.executionController(); } @Override public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController) { if (limits.isGroupByLimit()) return new GroupByPartitionIterator(pageSize, executionController, System.nanoTime()); return new AggregationPartitionIterator(pageSize, executionController, System.nanoTime()); } @Override public boolean isExhausted() { return subPager.isExhausted(); } @Override public int maxRemaining() { return subPager.maxRemaining(); } @Override public PagingState state() { return subPager.state(); } @Override public QueryPager withUpdatedLimit(DataLimits newLimits) { throw new UnsupportedOperationException(); }
PartitionIterator that automatically fetch a new sub-page of data if needed when the current iterator is exhausted.
/** * <code>PartitionIterator</code> that automatically fetch a new sub-page of data if needed when the current iterator is * exhausted. */
public class GroupByPartitionIterator implements PartitionIterator {
The top-level page size in number of groups.
/** * The top-level page size in number of groups. */
private final int pageSize; // For "normal" queries private final ConsistencyLevel consistency; private final ClientState clientState; // For internal queries private final ReadExecutionController executionController;
The PartitionIterator over the last page retrieved.
/** * The <code>PartitionIterator</code> over the last page retrieved. */
private PartitionIterator partitionIterator;
The next RowIterator to be returned.
/** * The next <code>RowIterator</code> to be returned. */
private RowIterator next;
Specify if all the data have been returned.
/** * Specify if all the data have been returned. */
private boolean endOfData;
Keeps track if the partitionIterator has been closed or not.
/** * Keeps track if the partitionIterator has been closed or not. */
private boolean closed;
The key of the last partition processed.
/** * The key of the last partition processed. */
private ByteBuffer lastPartitionKey;
The clustering of the last row processed
/** * The clustering of the last row processed */
private Clustering lastClustering;
The initial amount of row remaining
/** * The initial amount of row remaining */
private int initialMaxRemaining; private long queryStartNanoTime; public GroupByPartitionIterator(int pageSize, ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) { this(pageSize, consistency, clientState, null, queryStartNanoTime); } public GroupByPartitionIterator(int pageSize, ReadExecutionController executionController, long queryStartNanoTime) { this(pageSize, null, null, executionController, queryStartNanoTime); } private GroupByPartitionIterator(int pageSize, ConsistencyLevel consistency, ClientState clientState, ReadExecutionController executionController, long queryStartNanoTime) { this.pageSize = handlePagingOff(pageSize); this.consistency = consistency; this.clientState = clientState; this.executionController = executionController; this.queryStartNanoTime = queryStartNanoTime; } private int handlePagingOff(int pageSize) { // If the paging is off, the pageSize will be <= 0. So we need to replace // it by DataLimits.NO_LIMIT return pageSize <= 0 ? DataLimits.NO_LIMIT : pageSize; } public final void close() { if (!closed) { closed = true; partitionIterator.close(); } } public final boolean hasNext() { if (endOfData) return false; if (next != null) return true; fetchNextRowIterator(); return next != null; }
Loads the next RowIterator to be returned.
/** * Loads the next <code>RowIterator</code> to be returned. */
private void fetchNextRowIterator() { if (partitionIterator == null) { initialMaxRemaining = subPager.maxRemaining(); partitionIterator = fetchSubPage(pageSize); } while (!partitionIterator.hasNext()) { partitionIterator.close(); int counted = initialMaxRemaining - subPager.maxRemaining(); if (isDone(pageSize, counted) || subPager.isExhausted()) { endOfData = true; closed = true; return; } subPager = updatePagerLimit(subPager, limits, lastPartitionKey, lastClustering); partitionIterator = fetchSubPage(computeSubPageSize(pageSize, counted)); } next = partitionIterator.next(); } protected boolean isDone(int pageSize, int counted) { return counted == pageSize; }
Updates the pager with the new limits if needed.
Params:
  • pager – the pager previoulsy used
  • limits – the DataLimits
  • lastPartitionKey – the partition key of the last row returned
  • lastClustering – the clustering of the last row returned
Returns:the pager to use to query the next page of data
/** * Updates the pager with the new limits if needed. * * @param pager the pager previoulsy used * @param limits the DataLimits * @param lastPartitionKey the partition key of the last row returned * @param lastClustering the clustering of the last row returned * @return the pager to use to query the next page of data */
protected QueryPager updatePagerLimit(QueryPager pager, DataLimits limits, ByteBuffer lastPartitionKey, Clustering lastClustering) { GroupingState state = new GroupingState(lastPartitionKey, lastClustering); DataLimits newLimits = limits.forGroupByInternalPaging(state); return pager.withUpdatedLimit(newLimits); }
Computes the size of the next sub-page to retrieve.
Params:
  • pageSize – the top-level page size
  • counted – the number of result returned so far by the previous sub-pages
Returns:the size of the next sub-page to retrieve
/** * Computes the size of the next sub-page to retrieve. * * @param pageSize the top-level page size * @param counted the number of result returned so far by the previous sub-pages * @return the size of the next sub-page to retrieve */
protected int computeSubPageSize(int pageSize, int counted) { return pageSize - counted; }
Fetchs the next sub-page.
Params:
  • subPageSize – the sub-page size in number of groups
Returns:the next sub-page
/** * Fetchs the next sub-page. * * @param subPageSize the sub-page size in number of groups * @return the next sub-page */
private final PartitionIterator fetchSubPage(int subPageSize) { return consistency != null ? subPager.fetchPage(subPageSize, consistency, clientState, queryStartNanoTime) : subPager.fetchPageInternal(subPageSize, executionController); } public final RowIterator next() { if (!hasNext()) throw new NoSuchElementException(); RowIterator iterator = new GroupByRowIterator(next); lastPartitionKey = iterator.partitionKey().getKey(); next = null; return iterator; } private class GroupByRowIterator implements RowIterator {
The decorated RowIterator.
/** * The decorated <code>RowIterator</code>. */
private RowIterator rowIterator;
Keeps track if the decorated iterator has been closed or not.
/** * Keeps track if the decorated iterator has been closed or not. */
private boolean closed; public GroupByRowIterator(RowIterator delegate) { this.rowIterator = delegate; } public CFMetaData metadata() { return rowIterator.metadata(); } public boolean isReverseOrder() { return rowIterator.isReverseOrder(); } public PartitionColumns columns() { return rowIterator.columns(); } public DecoratedKey partitionKey() { return rowIterator.partitionKey(); } public Row staticRow() { Row row = rowIterator.staticRow(); lastClustering = null; return row; } public boolean isEmpty() { return this.rowIterator.isEmpty() && !hasNext(); } public void close() { if (!closed) rowIterator.close(); } public boolean hasNext() { if (rowIterator.hasNext()) return true; DecoratedKey partitionKey = rowIterator.partitionKey(); rowIterator.close(); // Fetch the next RowIterator GroupByPartitionIterator.this.hasNext(); // if the previous page was ending within the partition the // next RowIterator is the continuation of this one if (next != null && partitionKey.equals(next.partitionKey())) { rowIterator = next; next = null; return rowIterator.hasNext(); } closed = true; return false; } public Row next() { Row row = this.rowIterator.next(); lastClustering = row.clustering(); return row; } } }
PartitionIterator for queries without Group By but with aggregates.

For maintaining backward compatibility we are forced to use the CQLLimits instead of the CQLGroupByLimits. Due to that pages need to be fetched in a different way.

/** * <code>PartitionIterator</code> for queries without Group By but with aggregates. * <p>For maintaining backward compatibility we are forced to use the {@link org.apache.cassandra.db.filter.DataLimits.CQLLimits} instead of the * {@link org.apache.cassandra.db.filter.DataLimits.CQLGroupByLimits}. Due to that pages need to be fetched in a different way.</p> */
public final class AggregationPartitionIterator extends GroupByPartitionIterator { public AggregationPartitionIterator(int pageSize, ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) { super(pageSize, consistency, clientState, queryStartNanoTime); } public AggregationPartitionIterator(int pageSize, ReadExecutionController executionController, long queryStartNanoTime) { super(pageSize, executionController, queryStartNanoTime); } @Override protected QueryPager updatePagerLimit(QueryPager pager, DataLimits limits, ByteBuffer lastPartitionKey, Clustering lastClustering) { return pager; } @Override protected boolean isDone(int pageSize, int counted) { return false; } @Override protected int computeSubPageSize(int pageSize, int counted) { return pageSize; } } }