/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.cql3.selection;

import java.nio.ByteBuffer;
import java.util.*;

import com.google.common.base.MoreObjects;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;

import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.cql3.functions.Function;
import org.apache.cassandra.db.Clustering;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.aggregation.AggregationSpecification;
import org.apache.cassandra.db.aggregation.GroupMaker;
import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.utils.ByteBufferUtil;

public abstract class Selection
{
    
A predicate that returns true for static columns.
/** * A predicate that returns <code>true</code> for static columns. */
private static final Predicate<ColumnDefinition> STATIC_COLUMN_FILTER = new Predicate<ColumnDefinition>() { public boolean apply(ColumnDefinition def) { return def.isStatic(); } }; private final CFMetaData cfm; private final List<ColumnDefinition> columns; private final SelectionColumnMapping columnMapping; private final ResultSet.ResultMetadata metadata; private final boolean collectTimestamps; private final boolean collectTTLs; // Columns used to order the result set for multi-partition queries private Map<ColumnDefinition, Integer> orderingIndex; protected Selection(CFMetaData cfm, List<ColumnDefinition> columns, SelectionColumnMapping columnMapping, boolean collectTimestamps, boolean collectTTLs) { this.cfm = cfm; this.columns = columns; this.columnMapping = columnMapping; this.metadata = new ResultSet.ResultMetadata(columnMapping.getColumnSpecifications()); this.collectTimestamps = collectTimestamps; this.collectTTLs = collectTTLs; } // Overriden by SimpleSelection when appropriate. public boolean isWildcard() { return false; }
Checks if this selection contains static columns.
Returns:true if this selection contains static columns, false otherwise;
/** * Checks if this selection contains static columns. * @return <code>true</code> if this selection contains static columns, <code>false</code> otherwise; */
public boolean containsStaticColumns() { if (cfm.isStaticCompactTable() || !cfm.hasStaticColumns()) return false; if (isWildcard()) return true; return !Iterables.isEmpty(Iterables.filter(columns, STATIC_COLUMN_FILTER)); }
Checks if this selection contains only static columns.
Returns:true if this selection contains only static columns, false otherwise;
/** * Checks if this selection contains only static columns. * @return <code>true</code> if this selection contains only static columns, <code>false</code> otherwise; */
public boolean containsOnlyStaticColumns() { if (!containsStaticColumns()) return false; if (isWildcard()) return false; for (ColumnDefinition def : getColumns()) { if (!def.isPartitionKey() && !def.isStatic()) return false; } return true; }
Checks if this selection contains a complex column.
Returns:true if this selection contains a multicell collection or UDT, false otherwise.
/** * Checks if this selection contains a complex column. * * @return <code>true</code> if this selection contains a multicell collection or UDT, <code>false</code> otherwise. */
public boolean containsAComplexColumn() { for (ColumnDefinition def : getColumns()) if (def.isComplex()) return true; return false; } public Map<ColumnDefinition, Integer> getOrderingIndex(boolean isJson) { if (!isJson) return orderingIndex; // If we order post-query in json, the first and only column that we ship to the client is the json column. // In that case, we should keep ordering columns around to perform the ordering, then these columns will // be placed after the json column. As a consequence of where the colums are placed, we should give the // ordering index a value based on their position in the json encoding and discard the original index. // (CASSANDRA-14286) int columnIndex = 1; Map<ColumnDefinition, Integer> jsonOrderingIndex = new LinkedHashMap<>(orderingIndex.size()); for (ColumnDefinition column : orderingIndex.keySet()) jsonOrderingIndex.put(column, columnIndex++); return jsonOrderingIndex; } public ResultSet.ResultMetadata getResultMetadata(boolean isJson) { if (!isJson) return metadata; ColumnSpecification firstColumn = metadata.names.get(0); ColumnSpecification jsonSpec = new ColumnSpecification(firstColumn.ksName, firstColumn.cfName, Json.JSON_COLUMN_ID, UTF8Type.instance); ResultSet.ResultMetadata resultMetadata = new ResultSet.ResultMetadata(Lists.newArrayList(jsonSpec)); if (orderingIndex != null) { for (ColumnDefinition orderingColumn : orderingIndex.keySet()) resultMetadata.addNonSerializedColumn(orderingColumn); } return resultMetadata; } public static Selection wildcard(CFMetaData cfm) { List<ColumnDefinition> all = new ArrayList<>(cfm.allColumns().size()); Iterators.addAll(all, cfm.allColumnsInSelectOrder()); return new SimpleSelection(cfm, all, true); } public static Selection wildcardWithGroupBy(CFMetaData cfm, VariableSpecifications boundNames) { List<RawSelector> rawSelectors = new ArrayList<>(cfm.allColumns().size()); Iterator<ColumnDefinition> iter = cfm.allColumnsInSelectOrder(); while (iter.hasNext()) { ColumnDefinition.Raw raw = ColumnDefinition.Raw.forColumn(iter.next()); rawSelectors.add(new RawSelector(raw, null)); } return fromSelectors(cfm, rawSelectors, boundNames, true); } public static Selection forColumns(CFMetaData cfm, List<ColumnDefinition> columns) { return new SimpleSelection(cfm, columns, false); } public void addColumnForOrdering(ColumnDefinition c) { if (orderingIndex == null) orderingIndex = new LinkedHashMap<>(); int index = getResultSetIndex(c); if (index < 0) index = addOrderingColumn(c); orderingIndex.put(c, index); } protected int addOrderingColumn(ColumnDefinition c) { columns.add(c); metadata.addNonSerializedColumn(c); return columns.size() - 1; } public void addFunctionsTo(List<Function> functions) { } private static boolean processesSelection(List<RawSelector> rawSelectors) { for (RawSelector rawSelector : rawSelectors) { if (rawSelector.processesSelection()) return true; } return false; } public static Selection fromSelectors(CFMetaData cfm, List<RawSelector> rawSelectors, VariableSpecifications boundNames, boolean hasGroupBy) { List<ColumnDefinition> defs = new ArrayList<>(); SelectorFactories factories = SelectorFactories.createFactoriesAndCollectColumnDefinitions(RawSelector.toSelectables(rawSelectors, cfm), null, cfm, defs, boundNames); SelectionColumnMapping mapping = collectColumnMappings(cfm, rawSelectors, factories); return (processesSelection(rawSelectors) || rawSelectors.size() != defs.size() || hasGroupBy) ? new SelectionWithProcessing(cfm, defs, mapping, factories) : new SimpleSelection(cfm, defs, mapping, false); }
Returns the index of the specified column within the resultset
Params:
  • c – the column
Returns:the index of the specified column within the resultset or -1
/** * Returns the index of the specified column within the resultset * @param c the column * @return the index of the specified column within the resultset or -1 */
public int getResultSetIndex(ColumnDefinition c) { return getColumnIndex(c); }
Returns the index of the specified column
Params:
  • c – the column
Returns:the index of the specified column or -1
/** * Returns the index of the specified column * @param c the column * @return the index of the specified column or -1 */
protected final int getColumnIndex(ColumnDefinition c) { for (int i = 0, m = columns.size(); i < m; i++) if (columns.get(i).name.equals(c.name)) return i; return -1; } private static SelectionColumnMapping collectColumnMappings(CFMetaData cfm, List<RawSelector> rawSelectors, SelectorFactories factories) { SelectionColumnMapping selectionColumns = SelectionColumnMapping.newMapping(); Iterator<RawSelector> iter = rawSelectors.iterator(); for (Selector.Factory factory : factories) { ColumnSpecification colSpec = factory.getColumnSpecification(cfm); ColumnIdentifier alias = iter.next().alias; factory.addColumnMapping(selectionColumns, alias == null ? colSpec : colSpec.withAlias(alias)); } return selectionColumns; } protected abstract Selectors newSelectors(QueryOptions options) throws InvalidRequestException;
Returns:the list of CQL3 columns value this SelectionClause needs.
/** * @return the list of CQL3 columns value this SelectionClause needs. */
public List<ColumnDefinition> getColumns() { return columns; }
Returns:the mappings between resultset columns and the underlying columns
/** * @return the mappings between resultset columns and the underlying columns */
public SelectionColumns getColumnMapping() { return columnMapping; } public ResultSetBuilder resultSetBuilder(QueryOptions options, boolean isJson) { return new ResultSetBuilder(options, isJson); } public ResultSetBuilder resultSetBuilder(QueryOptions options, boolean isJson, AggregationSpecification aggregationSpec) { return aggregationSpec == null ? new ResultSetBuilder(options, isJson) : new ResultSetBuilder(options, isJson, aggregationSpec.newGroupMaker()); } public abstract boolean isAggregate(); @Override public String toString() { return MoreObjects.toStringHelper(this) .add("columns", columns) .add("columnMapping", columnMapping) .add("metadata", metadata) .add("collectTimestamps", collectTimestamps) .add("collectTTLs", collectTTLs) .toString(); } public static List<ByteBuffer> rowToJson(List<ByteBuffer> row, ProtocolVersion protocolVersion, ResultSet.ResultMetadata metadata) { StringBuilder sb = new StringBuilder("{"); for (int i = 0; i < metadata.getColumnCount(); i++) { if (i > 0) sb.append(", "); ColumnSpecification spec = metadata.names.get(i); String columnName = spec.name.toString(); if (!columnName.equals(columnName.toLowerCase(Locale.US))) columnName = "\"" + columnName + "\""; ByteBuffer buffer = row.get(i); sb.append('"'); sb.append(Json.quoteAsJsonString(columnName)); sb.append("\": "); if (buffer == null) sb.append("null"); else if (!buffer.hasRemaining()) sb.append("\"\""); else sb.append(spec.type.toJSONString(buffer, protocolVersion)); } sb.append("}"); List<ByteBuffer> jsonRow = new ArrayList<>(); jsonRow.add(UTF8Type.instance.getSerializer().serialize(sb.toString())); return jsonRow; } public class ResultSetBuilder { private final ResultSet resultSet; private final ProtocolVersion protocolVersion;
As multiple thread can access a Selection instance each ResultSetBuilder will use its own Selectors instance.
/** * As multiple thread can access a <code>Selection</code> instance each <code>ResultSetBuilder</code> will use * its own <code>Selectors</code> instance. */
private final Selectors selectors;
The GroupMaker used to build the aggregates.
/** * The <code>GroupMaker</code> used to build the aggregates. */
private final GroupMaker groupMaker; /* * We'll build CQL3 row one by one. * The currentRow is the values for the (CQL3) columns we've fetched. * We also collect timestamps and ttls for the case where the writetime and * ttl functions are used. Note that we might collect timestamp and/or ttls * we don't care about, but since the array below are allocated just once, * it doesn't matter performance wise. */ List<ByteBuffer> current; final long[] timestamps; final int[] ttls; private final boolean isJson; private ResultSetBuilder(QueryOptions options, boolean isJson) { this(options, isJson, null); } private ResultSetBuilder(QueryOptions options, boolean isJson, GroupMaker groupMaker) { this.resultSet = new ResultSet(getResultMetadata(isJson).copy(), new ArrayList<List<ByteBuffer>>()); this.protocolVersion = options.getProtocolVersion(); this.selectors = newSelectors(options); this.groupMaker = groupMaker; this.timestamps = collectTimestamps ? new long[columns.size()] : null; this.ttls = collectTTLs ? new int[columns.size()] : null; this.isJson = isJson; // We use MIN_VALUE to indicate no timestamp and -1 for no ttl if (timestamps != null) Arrays.fill(timestamps, Long.MIN_VALUE); if (ttls != null) Arrays.fill(ttls, -1); } public void add(ByteBuffer v) { current.add(v); } public void add(Cell c, int nowInSec) { if (c == null) { current.add(null); return; } current.add(value(c)); if (timestamps != null) timestamps[current.size() - 1] = c.timestamp(); if (ttls != null) ttls[current.size() - 1] = remainingTTL(c, nowInSec); } private int remainingTTL(Cell c, int nowInSec) { if (!c.isExpiring()) return -1; int remaining = c.localDeletionTime() - nowInSec; return remaining >= 0 ? remaining : -1; } private ByteBuffer value(Cell c) { return c.isCounterCell() ? ByteBufferUtil.bytes(CounterContext.instance().total(c.value())) : c.value(); }
Notifies this Builder that a new row is being processed.
Params:
  • partitionKey – the partition key of the new row
  • clustering – the clustering of the new row
/** * Notifies this <code>Builder</code> that a new row is being processed. * * @param partitionKey the partition key of the new row * @param clustering the clustering of the new row */
public void newRow(DecoratedKey partitionKey, Clustering clustering) { // The groupMaker needs to be called for each row boolean isNewAggregate = groupMaker == null || groupMaker.isNewGroup(partitionKey, clustering); if (current != null) { selectors.addInputRow(protocolVersion, this); if (isNewAggregate) { resultSet.addRow(getOutputRow()); selectors.reset(); } } current = new ArrayList<>(columns.size()); // Timestamps and TTLs are arrays per row, we must null them out between rows if (timestamps != null) Arrays.fill(timestamps, Long.MIN_VALUE); if (ttls != null) Arrays.fill(ttls, -1); }
Builds the ResultSet
/** * Builds the <code>ResultSet</code> */
public ResultSet build() { if (current != null) { selectors.addInputRow(protocolVersion, this); resultSet.addRow(getOutputRow()); selectors.reset(); current = null; } // For aggregates we need to return a row even it no records have been found if (resultSet.isEmpty() && groupMaker != null && groupMaker.returnAtLeastOneRow()) resultSet.addRow(getOutputRow()); return resultSet; } private List<ByteBuffer> getOutputRow() { List<ByteBuffer> outputRow = selectors.getOutputRow(protocolVersion); if (isJson) { // Keep all columns around for possible post-query ordering. (CASSANDRA-14286) List<ByteBuffer> jsonRow = rowToJson(outputRow, protocolVersion, metadata); // Keep ordering columns around for possible post-query ordering. (CASSANDRA-14286) if (orderingIndex != null) { for (Integer orderingColumnIndex : orderingIndex.values()) jsonRow.add(outputRow.get(orderingColumnIndex)); } outputRow = jsonRow; } return outputRow; } } private static interface Selectors { public boolean isAggregate();
Adds the current row of the specified ResultSetBuilder.
Params:
  • protocolVersion –
  • rs – the ResultSetBuilder
Throws:
/** * Adds the current row of the specified <code>ResultSetBuilder</code>. * * @param protocolVersion * @param rs the <code>ResultSetBuilder</code> * @throws InvalidRequestException */
public void addInputRow(ProtocolVersion protocolVersion, ResultSetBuilder rs) throws InvalidRequestException; public List<ByteBuffer> getOutputRow(ProtocolVersion protocolVersion) throws InvalidRequestException; public void reset(); } // Special cased selection for when only columns are selected. private static class SimpleSelection extends Selection { private final boolean isWildcard; public SimpleSelection(CFMetaData cfm, List<ColumnDefinition> columns, boolean isWildcard) { this(cfm, columns, SelectionColumnMapping.simpleMapping(columns), isWildcard); } public SimpleSelection(CFMetaData cfm, List<ColumnDefinition> columns, SelectionColumnMapping metadata, boolean isWildcard) { /* * In theory, even a simple selection could have multiple time the same column, so we * could filter those duplicate out of columns. But since we're very unlikely to * get much duplicate in practice, it's more efficient not to bother. */ super(cfm, columns, metadata, false, false); this.isWildcard = isWildcard; } @Override public boolean isWildcard() { return isWildcard; } public boolean isAggregate() { return false; } protected Selectors newSelectors(QueryOptions options) { return new Selectors() { private List<ByteBuffer> current; public void reset() { current = null; } public List<ByteBuffer> getOutputRow(ProtocolVersion protocolVersion) { return current; } public void addInputRow(ProtocolVersion protocolVersion, ResultSetBuilder rs) throws InvalidRequestException { current = rs.current; } public boolean isAggregate() { return false; } }; } } private static class SelectionWithProcessing extends Selection { private final SelectorFactories factories; public SelectionWithProcessing(CFMetaData cfm, List<ColumnDefinition> columns, SelectionColumnMapping metadata, SelectorFactories factories) throws InvalidRequestException { super(cfm, columns, metadata, factories.containsWritetimeSelectorFactory(), factories.containsTTLSelectorFactory()); this.factories = factories; } @Override public void addFunctionsTo(List<Function> functions) { factories.addFunctionsTo(functions); } @Override public int getResultSetIndex(ColumnDefinition c) { int index = getColumnIndex(c); if (index < 0) return -1; for (int i = 0, m = factories.size(); i < m; i++) if (factories.get(i).isSimpleSelectorFactory(index)) return i; return -1; } @Override protected int addOrderingColumn(ColumnDefinition c) { int index = super.addOrderingColumn(c); factories.addSelectorForOrdering(c, index); return factories.size() - 1; } public boolean isAggregate() { return factories.doesAggregation(); } protected Selectors newSelectors(final QueryOptions options) throws InvalidRequestException { return new Selectors() { private final List<Selector> selectors = factories.newInstances(options); public void reset() { for (Selector selector : selectors) selector.reset(); } public boolean isAggregate() { return factories.doesAggregation(); } public List<ByteBuffer> getOutputRow(ProtocolVersion protocolVersion) throws InvalidRequestException { List<ByteBuffer> outputRow = new ArrayList<>(selectors.size()); for (Selector selector: selectors) outputRow.add(selector.getOutput(protocolVersion)); return outputRow; } public void addInputRow(ProtocolVersion protocolVersion, ResultSetBuilder rs) throws InvalidRequestException { for (Selector selector : selectors) selector.addInput(protocolVersion, rs); } }; } } }