package org.apache.cassandra.db.rows;
import java.io.IOException;
import java.io.IOError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.ByteBufferUtil;
public class UnfilteredRowIteratorSerializer
{
protected static final Logger logger = LoggerFactory.getLogger(UnfilteredRowIteratorSerializer.class);
private static final int IS_EMPTY = 0x01;
private static final int IS_REVERSED = 0x02;
private static final int HAS_PARTITION_DELETION = 0x04;
private static final int HAS_STATIC_ROW = 0x08;
private static final int HAS_ROW_ESTIMATE = 0x10;
public static final UnfilteredRowIteratorSerializer serializer = new UnfilteredRowIteratorSerializer();
public void serialize(UnfilteredRowIterator iterator, ColumnFilter selection, DataOutputPlus out, int version) throws IOException
{
serialize(iterator, selection, out, version, -1);
}
public void serialize(UnfilteredRowIterator iterator, ColumnFilter selection, DataOutputPlus out, int version, int rowEstimate) throws IOException
{
SerializationHeader header = new SerializationHeader(false,
iterator.metadata(),
iterator.columns(),
iterator.stats());
serialize(iterator, header, selection, out, version, rowEstimate);
}
public void serialize(UnfilteredRowIterator iterator, SerializationHeader header, ColumnFilter selection, DataOutputPlus out, int version, int rowEstimate) throws IOException
{
assert !header.isForSSTable();
ByteBufferUtil.writeWithVIntLength(iterator.partitionKey().getKey(), out);
int flags = 0;
if (iterator.isReverseOrder())
flags |= IS_REVERSED;
if (iterator.isEmpty())
{
out.writeByte((byte)(flags | IS_EMPTY));
return;
}
DeletionTime partitionDeletion = iterator.partitionLevelDeletion();
if (!partitionDeletion.isLive())
flags |= HAS_PARTITION_DELETION;
Row staticRow = iterator.staticRow();
boolean hasStatic = staticRow != Rows.EMPTY_STATIC_ROW;
if (hasStatic)
flags |= HAS_STATIC_ROW;
if (rowEstimate >= 0)
flags |= HAS_ROW_ESTIMATE;
out.writeByte((byte)flags);
SerializationHeader.serializer.serializeForMessaging(header, selection, out, hasStatic);
if (!partitionDeletion.isLive())
header.writeDeletionTime(partitionDeletion, out);
if (hasStatic)
UnfilteredSerializer.serializer.serialize(staticRow, header, out, version);
if (rowEstimate >= 0)
out.writeUnsignedVInt(rowEstimate);
while (iterator.hasNext())
UnfilteredSerializer.serializer.serialize(iterator.next(), header, out, version);
UnfilteredSerializer.serializer.writeEndOfPartition(out);
}
public long serializedSize(UnfilteredRowIterator iterator, ColumnFilter selection, int version, int rowEstimate)
{
SerializationHeader header = new SerializationHeader(false,
iterator.metadata(),
iterator.columns(),
iterator.stats());
assert rowEstimate >= 0;
long size = ByteBufferUtil.serializedSizeWithVIntLength(iterator.partitionKey().getKey())
+ 1;
if (iterator.isEmpty())
return size;
DeletionTime partitionDeletion = iterator.partitionLevelDeletion();
Row staticRow = iterator.staticRow();
boolean hasStatic = staticRow != Rows.EMPTY_STATIC_ROW;
size += SerializationHeader.serializer.serializedSizeForMessaging(header, selection, hasStatic);
if (!partitionDeletion.isLive())
size += header.deletionTimeSerializedSize(partitionDeletion);
if (hasStatic)
size += UnfilteredSerializer.serializer.serializedSize(staticRow, header, version);
if (rowEstimate >= 0)
size += TypeSizes.sizeofUnsignedVInt(rowEstimate);
while (iterator.hasNext())
size += UnfilteredSerializer.serializer.serializedSize(iterator.next(), header, version);
size += UnfilteredSerializer.serializer.serializedSizeEndOfPartition();
return size;
}
public Header deserializeHeader(CFMetaData metadata, ColumnFilter selection, DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException
{
DecoratedKey key = metadata.decorateKey(ByteBufferUtil.readWithVIntLength(in));
int flags = in.readUnsignedByte();
boolean isReversed = (flags & IS_REVERSED) != 0;
if ((flags & IS_EMPTY) != 0)
{
SerializationHeader sh = new SerializationHeader(false, metadata, PartitionColumns.NONE, EncodingStats.NO_STATS);
return new Header(sh, key, isReversed, true, null, null, 0);
}
boolean hasPartitionDeletion = (flags & HAS_PARTITION_DELETION) != 0;
boolean hasStatic = (flags & HAS_STATIC_ROW) != 0;
boolean hasRowEstimate = (flags & HAS_ROW_ESTIMATE) != 0;
SerializationHeader header = SerializationHeader.serializer.deserializeForMessaging(in, metadata, selection, hasStatic);
DeletionTime partitionDeletion = hasPartitionDeletion ? header.readDeletionTime(in) : DeletionTime.LIVE;
Row staticRow = Rows.EMPTY_STATIC_ROW;
if (hasStatic)
staticRow = UnfilteredSerializer.serializer.deserializeStaticRow(in, header, new SerializationHelper(metadata, version, flag));
int rowEstimate = hasRowEstimate ? (int)in.readUnsignedVInt() : -1;
return new Header(header, key, isReversed, false, partitionDeletion, staticRow, rowEstimate);
}
public UnfilteredRowIterator deserialize(DataInputPlus in, int version, CFMetaData metadata, SerializationHelper.Flag flag, Header header) throws IOException
{
if (header.isEmpty)
return EmptyIterators.unfilteredRow(metadata, header.key, header.isReversed);
final SerializationHelper helper = new SerializationHelper(metadata, version, flag);
final SerializationHeader sHeader = header.sHeader;
return new AbstractUnfilteredRowIterator(metadata, header.key, header.partitionDeletion, sHeader.columns(), header.staticRow, header.isReversed, sHeader.stats())
{
private final Row.Builder builder = BTreeRow.sortedBuilder();
protected Unfiltered computeNext()
{
try
{
Unfiltered unfiltered = UnfilteredSerializer.serializer.deserialize(in, sHeader, helper, builder);
return unfiltered == null ? endOfData() : unfiltered;
}
catch (IOException e)
{
throw new IOError(e);
}
}
};
}
public UnfilteredRowIterator deserialize(DataInputPlus in, int version, CFMetaData metadata, ColumnFilter selection, SerializationHelper.Flag flag) throws IOException
{
return deserialize(in, version, metadata, flag, deserializeHeader(metadata, selection, in, version, flag));
}
public static class Header
{
public final SerializationHeader sHeader;
public final DecoratedKey key;
public final boolean isReversed;
public final boolean isEmpty;
public final DeletionTime partitionDeletion;
public final Row staticRow;
public final int rowEstimate;
private Header(SerializationHeader sHeader,
DecoratedKey key,
boolean isReversed,
boolean isEmpty,
DeletionTime partitionDeletion,
Row staticRow,
int rowEstimate)
{
this.sHeader = sHeader;
this.key = key;
this.isReversed = isReversed;
this.isEmpty = isEmpty;
this.partitionDeletion = partitionDeletion;
this.staticRow = staticRow;
this.rowEstimate = rowEstimate;
}
@Override
public String toString()
{
return String.format("{header=%s, key=%s, isReversed=%b, isEmpty=%b, del=%s, staticRow=%s, rowEstimate=%d}",
sHeader, key, isReversed, isEmpty, partitionDeletion, staticRow, rowEstimate);
}
}
}