/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.cassandra.streaming.messages;

import java.io.IOException;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;

import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.streaming.StreamSession;

StreamMessage is an abstract base class that every messages in streaming protocol inherit. Every message carries message type(Type) and streaming protocol version byte.
/** * StreamMessage is an abstract base class that every messages in streaming protocol inherit. * * Every message carries message type({@link Type}) and streaming protocol version byte. */
public abstract class StreamMessage {
Streaming protocol version
/** Streaming protocol version */
public static final int VERSION_20 = 2; public static final int VERSION_22 = 3; public static final int VERSION_30 = 4; public static final int CURRENT_VERSION = VERSION_30; private transient volatile boolean sent = false; public static void serialize(StreamMessage message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException { ByteBuffer buff = ByteBuffer.allocate(1); // message type buff.put(message.type.type); buff.flip(); out.write(buff); message.type.outSerializer.serialize(message, out, version, session); } public static StreamMessage deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException { ByteBuffer buff = ByteBuffer.allocate(1); int readBytes = in.read(buff); if (readBytes > 0) { buff.flip(); Type type = Type.get(buff.get()); return type.inSerializer.deserialize(in, version, session); } else if (readBytes == 0) { // input socket buffer was not filled yet return null; } else { // possibly socket gets closed throw new SocketException("End-of-stream reached"); } } public void sent() { sent = true; } public boolean wasSent() { return sent; }
StreamMessage serializer
/** StreamMessage serializer */
public static interface Serializer<V extends StreamMessage> { V deserialize(ReadableByteChannel in, int version, StreamSession session) throws IOException; void serialize(V message, DataOutputStreamPlus out, int version, StreamSession session) throws IOException; }
StreamMessage types
/** StreamMessage types */
public static enum Type { PREPARE(1, 5, PrepareMessage.serializer), FILE(2, 0, IncomingFileMessage.serializer, OutgoingFileMessage.serializer), RECEIVED(3, 4, ReceivedMessage.serializer), RETRY(4, 4, RetryMessage.serializer), COMPLETE(5, 1, CompleteMessage.serializer), SESSION_FAILED(6, 5, SessionFailedMessage.serializer), KEEP_ALIVE(7, 5, KeepAliveMessage.serializer); public static Type get(byte type) { for (Type t : Type.values()) { if (t.type == type) return t; } throw new IllegalArgumentException("Unknown type " + type); } private final byte type; public final int priority; public final Serializer<StreamMessage> inSerializer; public final Serializer<StreamMessage> outSerializer; @SuppressWarnings("unchecked") private Type(int type, int priority, Serializer serializer) { this(type, priority, serializer, serializer); } @SuppressWarnings("unchecked") private Type(int type, int priority, Serializer inSerializer, Serializer outSerializer) { this.type = (byte) type; this.priority = priority; this.inSerializer = inSerializer; this.outSerializer = outSerializer; } } public final Type type; protected StreamMessage(Type type) { this.type = type; }
Returns:priority of this message. higher value, higher priority.
/** * @return priority of this message. higher value, higher priority. */
public int getPriority() { return type.priority; } }