/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.db.rows;

import java.io.IOException;
import java.io.IOError;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.ByteBufferUtil;

Serialize/Deserialize an unfiltered row iterator. The serialization is composed of a header, follows by the rows and range tombstones of the iterator serialized until we read the end of the partition (see UnfilteredSerializer for details). The header itself is: <cfid><key><flags><s_header>[<partition_deletion>][<static_row>][<row_estimate>] where: <cfid> is the table cfid. <key> is the partition key. <flags> contains bit flags. Each flag is set if it's corresponding bit is set. From rightmost bit to leftmost one, the flags are: - is empty: whether the iterator is empty. If so, nothing follows the <flags> - is reversed: whether the iterator is in reversed clustering order - has partition deletion: whether or not there is a <partition_deletion> following - has static row: whether or not there is a <static_row> following - has row estimate: whether or not there is a <row_estimate> following <s_header> is the {@code SerializationHeader}. It contains in particular the columns contains in the serialized iterator as well as other information necessary to decoding the serialized rows (see {@code SerializationHeader.Serializer for details}). <partition_deletion> is the deletion time for the partition (delta-encoded) <static_row> is the static row for this partition as serialized by UnfilteredSerializer. <row_estimate> is the (potentially estimated) number of rows serialized. This is only used for the purpose of sizing on the receiving end and should not be relied upon too strongly. Please note that the format described above is the on-wire format. On-disk, the format is basically the same, but the header is written once per sstable, not once per-partition. Further, the actual row and range tombstones are not written using this class, but rather by ColumnIndex.
/** * Serialize/Deserialize an unfiltered row iterator. * * The serialization is composed of a header, follows by the rows and range tombstones of the iterator serialized * until we read the end of the partition (see UnfilteredSerializer for details). The header itself * is: * {@code * <cfid><key><flags><s_header>[<partition_deletion>][<static_row>][<row_estimate>] * where: * <cfid> is the table cfid. * <key> is the partition key. * <flags> contains bit flags. Each flag is set if it's corresponding bit is set. From rightmost * bit to leftmost one, the flags are: * - is empty: whether the iterator is empty. If so, nothing follows the <flags> * - is reversed: whether the iterator is in reversed clustering order * - has partition deletion: whether or not there is a <partition_deletion> following * - has static row: whether or not there is a <static_row> following * - has row estimate: whether or not there is a <row_estimate> following * <s_header> is the {@code SerializationHeader}. It contains in particular the columns contains in the serialized * iterator as well as other information necessary to decoding the serialized rows * (see {@code SerializationHeader.Serializer for details}). * <partition_deletion> is the deletion time for the partition (delta-encoded) * <static_row> is the static row for this partition as serialized by UnfilteredSerializer. * <row_estimate> is the (potentially estimated) number of rows serialized. This is only used for * the purpose of sizing on the receiving end and should not be relied upon too strongly. * } * * Please note that the format described above is the on-wire format. On-disk, the format is basically the * same, but the header is written once per sstable, not once per-partition. Further, the actual row and * range tombstones are not written using this class, but rather by {@link ColumnIndex}. */
public class UnfilteredRowIteratorSerializer { protected static final Logger logger = LoggerFactory.getLogger(UnfilteredRowIteratorSerializer.class); private static final int IS_EMPTY = 0x01; private static final int IS_REVERSED = 0x02; private static final int HAS_PARTITION_DELETION = 0x04; private static final int HAS_STATIC_ROW = 0x08; private static final int HAS_ROW_ESTIMATE = 0x10; public static final UnfilteredRowIteratorSerializer serializer = new UnfilteredRowIteratorSerializer(); // Should only be used for the on-wire format. public void serialize(UnfilteredRowIterator iterator, ColumnFilter selection, DataOutputPlus out, int version) throws IOException { serialize(iterator, selection, out, version, -1); } // Should only be used for the on-wire format. public void serialize(UnfilteredRowIterator iterator, ColumnFilter selection, DataOutputPlus out, int version, int rowEstimate) throws IOException { SerializationHeader header = new SerializationHeader(false, iterator.metadata(), iterator.columns(), iterator.stats()); serialize(iterator, header, selection, out, version, rowEstimate); } // Should only be used for the on-wire format. public void serialize(UnfilteredRowIterator iterator, SerializationHeader header, ColumnFilter selection, DataOutputPlus out, int version, int rowEstimate) throws IOException { assert !header.isForSSTable(); ByteBufferUtil.writeWithVIntLength(iterator.partitionKey().getKey(), out); int flags = 0; if (iterator.isReverseOrder()) flags |= IS_REVERSED; if (iterator.isEmpty()) { out.writeByte((byte)(flags | IS_EMPTY)); return; } DeletionTime partitionDeletion = iterator.partitionLevelDeletion(); if (!partitionDeletion.isLive()) flags |= HAS_PARTITION_DELETION; Row staticRow = iterator.staticRow(); boolean hasStatic = staticRow != Rows.EMPTY_STATIC_ROW; if (hasStatic) flags |= HAS_STATIC_ROW; if (rowEstimate >= 0) flags |= HAS_ROW_ESTIMATE; out.writeByte((byte)flags); SerializationHeader.serializer.serializeForMessaging(header, selection, out, hasStatic); if (!partitionDeletion.isLive()) header.writeDeletionTime(partitionDeletion, out); if (hasStatic) UnfilteredSerializer.serializer.serialize(staticRow, header, out, version); if (rowEstimate >= 0) out.writeUnsignedVInt(rowEstimate); while (iterator.hasNext()) UnfilteredSerializer.serializer.serialize(iterator.next(), header, out, version); UnfilteredSerializer.serializer.writeEndOfPartition(out); } // Please note that this consume the iterator, and as such should not be called unless we have a simple way to // recreate an iterator for both serialize and serializedSize, which is mostly only PartitionUpdate/ArrayBackedCachedPartition. public long serializedSize(UnfilteredRowIterator iterator, ColumnFilter selection, int version, int rowEstimate) { SerializationHeader header = new SerializationHeader(false, iterator.metadata(), iterator.columns(), iterator.stats()); assert rowEstimate >= 0; long size = ByteBufferUtil.serializedSizeWithVIntLength(iterator.partitionKey().getKey()) + 1; // flags if (iterator.isEmpty()) return size; DeletionTime partitionDeletion = iterator.partitionLevelDeletion(); Row staticRow = iterator.staticRow(); boolean hasStatic = staticRow != Rows.EMPTY_STATIC_ROW; size += SerializationHeader.serializer.serializedSizeForMessaging(header, selection, hasStatic); if (!partitionDeletion.isLive()) size += header.deletionTimeSerializedSize(partitionDeletion); if (hasStatic) size += UnfilteredSerializer.serializer.serializedSize(staticRow, header, version); if (rowEstimate >= 0) size += TypeSizes.sizeofUnsignedVInt(rowEstimate); while (iterator.hasNext()) size += UnfilteredSerializer.serializer.serializedSize(iterator.next(), header, version); size += UnfilteredSerializer.serializer.serializedSizeEndOfPartition(); return size; } public Header deserializeHeader(CFMetaData metadata, ColumnFilter selection, DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException { DecoratedKey key = metadata.decorateKey(ByteBufferUtil.readWithVIntLength(in)); int flags = in.readUnsignedByte(); boolean isReversed = (flags & IS_REVERSED) != 0; if ((flags & IS_EMPTY) != 0) { SerializationHeader sh = new SerializationHeader(false, metadata, PartitionColumns.NONE, EncodingStats.NO_STATS); return new Header(sh, key, isReversed, true, null, null, 0); } boolean hasPartitionDeletion = (flags & HAS_PARTITION_DELETION) != 0; boolean hasStatic = (flags & HAS_STATIC_ROW) != 0; boolean hasRowEstimate = (flags & HAS_ROW_ESTIMATE) != 0; SerializationHeader header = SerializationHeader.serializer.deserializeForMessaging(in, metadata, selection, hasStatic); DeletionTime partitionDeletion = hasPartitionDeletion ? header.readDeletionTime(in) : DeletionTime.LIVE; Row staticRow = Rows.EMPTY_STATIC_ROW; if (hasStatic) staticRow = UnfilteredSerializer.serializer.deserializeStaticRow(in, header, new SerializationHelper(metadata, version, flag)); int rowEstimate = hasRowEstimate ? (int)in.readUnsignedVInt() : -1; return new Header(header, key, isReversed, false, partitionDeletion, staticRow, rowEstimate); } public UnfilteredRowIterator deserialize(DataInputPlus in, int version, CFMetaData metadata, SerializationHelper.Flag flag, Header header) throws IOException { if (header.isEmpty) return EmptyIterators.unfilteredRow(metadata, header.key, header.isReversed); final SerializationHelper helper = new SerializationHelper(metadata, version, flag); final SerializationHeader sHeader = header.sHeader; return new AbstractUnfilteredRowIterator(metadata, header.key, header.partitionDeletion, sHeader.columns(), header.staticRow, header.isReversed, sHeader.stats()) { private final Row.Builder builder = BTreeRow.sortedBuilder(); protected Unfiltered computeNext() { try { Unfiltered unfiltered = UnfilteredSerializer.serializer.deserialize(in, sHeader, helper, builder); return unfiltered == null ? endOfData() : unfiltered; } catch (IOException e) { throw new IOError(e); } } }; } public UnfilteredRowIterator deserialize(DataInputPlus in, int version, CFMetaData metadata, ColumnFilter selection, SerializationHelper.Flag flag) throws IOException { return deserialize(in, version, metadata, flag, deserializeHeader(metadata, selection, in, version, flag)); } public static class Header { public final SerializationHeader sHeader; public final DecoratedKey key; public final boolean isReversed; public final boolean isEmpty; public final DeletionTime partitionDeletion; public final Row staticRow; public final int rowEstimate; // -1 if no estimate private Header(SerializationHeader sHeader, DecoratedKey key, boolean isReversed, boolean isEmpty, DeletionTime partitionDeletion, Row staticRow, int rowEstimate) { this.sHeader = sHeader; this.key = key; this.isReversed = isReversed; this.isEmpty = isEmpty; this.partitionDeletion = partitionDeletion; this.staticRow = staticRow; this.rowEstimate = rowEstimate; } @Override public String toString() { return String.format("{header=%s, key=%s, isReversed=%b, isEmpty=%b, del=%s, staticRow=%s, rowEstimate=%d}", sHeader, key, isReversed, isEmpty, partitionDeletion, staticRow, rowEstimate); } } }