package org.apache.cassandra.cql3.restrictions;
import java.nio.ByteBuffer;
import java.util.*;
import com.google.common.collect.BoundType;
import com.google.common.collect.ImmutableRangeSet;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.cql3.QueryOptions;
import org.apache.cassandra.cql3.functions.Function;
import org.apache.cassandra.cql3.statements.Bound;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.index.SecondaryIndexManager;
import static org.apache.cassandra.cql3.statements.Bound.END;
import static org.apache.cassandra.cql3.statements.Bound.START;
final class TokenFilter implements PartitionKeyRestrictions
{
private final PartitionKeyRestrictions restrictions;
private final TokenRestriction tokenRestriction;
private final IPartitioner partitioner;
public boolean hasIN()
{
return isOnToken() ? false : restrictions.hasIN();
}
public boolean hasContains()
{
return isOnToken() ? false : restrictions.hasContains();
}
public boolean hasOnlyEqualityRestrictions()
{
return isOnToken() ? false : restrictions.hasOnlyEqualityRestrictions();
}
@Override
public Set<Restriction> getRestrictions(ColumnDefinition columnDef)
{
Set<Restriction> set = new HashSet<>();
set.addAll(restrictions.getRestrictions(columnDef));
set.addAll(tokenRestriction.getRestrictions(columnDef));
return set;
}
@Override
public boolean isOnToken()
{
return needFiltering(tokenRestriction.metadata) || restrictions.size() < tokenRestriction.size();
}
public TokenFilter(PartitionKeyRestrictions restrictions, TokenRestriction tokenRestriction)
{
this.restrictions = restrictions;
this.tokenRestriction = tokenRestriction;
this.partitioner = tokenRestriction.metadata.partitioner;
}
@Override
public List<ByteBuffer> values(QueryOptions options) throws InvalidRequestException
{
return filter(restrictions.values(options), options);
}
@Override
public PartitionKeyRestrictions mergeWith(Restriction restriction) throws InvalidRequestException
{
if (restriction.isOnToken())
return new TokenFilter(restrictions, (TokenRestriction) tokenRestriction.mergeWith(restriction));
return new TokenFilter(restrictions.mergeWith(restriction), tokenRestriction);
}
@Override
public boolean isInclusive(Bound bound)
{
return isOnToken() ? tokenRestriction.isInclusive(bound) : restrictions.isInclusive(bound);
}
@Override
public boolean hasBound(Bound bound)
{
return isOnToken() ? tokenRestriction.hasBound(bound) : restrictions.hasBound(bound);
}
@Override
public List<ByteBuffer> bounds(Bound bound, QueryOptions options) throws InvalidRequestException
{
return isOnToken() ? tokenRestriction.bounds(bound, options) : restrictions.bounds(bound, options);
}
private List<ByteBuffer> filter(List<ByteBuffer> values, QueryOptions options) throws InvalidRequestException
{
RangeSet<Token> rangeSet = tokenRestriction.hasSlice() ? toRangeSet(tokenRestriction, options)
: toRangeSet(tokenRestriction.values(options));
return filterWithRangeSet(rangeSet, values);
}
private List<ByteBuffer> filterWithRangeSet(RangeSet<Token> tokens, List<ByteBuffer> values)
{
List<ByteBuffer> remaining = new ArrayList<>();
for (ByteBuffer value : values)
{
Token token = partitioner.getToken(value);
if (!tokens.contains(token))
continue;
remaining.add(value);
}
return remaining;
}
private RangeSet<Token> toRangeSet(List<ByteBuffer> buffers)
{
ImmutableRangeSet.Builder<Token> builder = ImmutableRangeSet.builder();
for (ByteBuffer buffer : buffers)
builder.add(Range.singleton(deserializeToken(buffer)));
return builder.build();
}
private RangeSet<Token> toRangeSet(TokenRestriction slice, QueryOptions options) throws InvalidRequestException
{
if (slice.hasBound(START))
{
Token start = deserializeToken(slice.bounds(START, options).get(0));
BoundType startBoundType = toBoundType(slice.isInclusive(START));
if (slice.hasBound(END))
{
BoundType endBoundType = toBoundType(slice.isInclusive(END));
Token end = deserializeToken(slice.bounds(END, options).get(0));
if (start.equals(end) && (BoundType.OPEN == startBoundType || BoundType.OPEN == endBoundType))
return ImmutableRangeSet.of();
if (start.compareTo(end) <= 0)
return ImmutableRangeSet.of(Range.range(start,
startBoundType,
end,
endBoundType));
return ImmutableRangeSet.<Token> builder()
.add(Range.upTo(end, endBoundType))
.add(Range.downTo(start, startBoundType))
.build();
}
return ImmutableRangeSet.of(Range.downTo(start,
startBoundType));
}
Token end = deserializeToken(slice.bounds(END, options).get(0));
return ImmutableRangeSet.of(Range.upTo(end, toBoundType(slice.isInclusive(END))));
}
private Token deserializeToken(ByteBuffer buffer)
{
return partitioner.getTokenFactory().fromByteArray(buffer);
}
private static BoundType toBoundType(boolean inclusive)
{
return inclusive ? BoundType.CLOSED : BoundType.OPEN;
}
@Override
public ColumnDefinition getFirstColumn()
{
return restrictions.getFirstColumn();
}
@Override
public ColumnDefinition getLastColumn()
{
return restrictions.getLastColumn();
}
@Override
public List<ColumnDefinition> getColumnDefs()
{
return restrictions.getColumnDefs();
}
@Override
public void addFunctionsTo(List<Function> functions)
{
restrictions.addFunctionsTo(functions);
}
@Override
public boolean hasSupportingIndex(SecondaryIndexManager indexManager)
{
return restrictions.hasSupportingIndex(indexManager);
}
@Override
public void addRowFilterTo(RowFilter filter, SecondaryIndexManager indexManager, QueryOptions options)
{
restrictions.addRowFilterTo(filter, indexManager, options);
}
@Override
public boolean isEmpty()
{
return restrictions.isEmpty();
}
@Override
public int size()
{
return restrictions.size();
}
@Override
public boolean needFiltering(CFMetaData cfm)
{
return restrictions.needFiltering(cfm);
}
@Override
public boolean hasUnrestrictedPartitionKeyComponents(CFMetaData cfm)
{
return restrictions.hasUnrestrictedPartitionKeyComponents(cfm);
}
@Override
public boolean hasSlice()
{
return restrictions.hasSlice();
}
}