/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.avro;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.io.IOException;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.HashSet;

import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Field.Order;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;

A set of messages forming an application protocol.

A protocol consists of:

  • a name for the protocol;
  • an optional namespace, further qualifying the name;
  • a list of types, or named schemas;
  • a list of errors, or named schemas for exceptions;
  • a list of named messages, each of which specifies,
    • request, the parameter schemas;
    • one of either;
      • one-way
      or
      • response, the response schema;
      • errors, an optional list of potential error schema names.
/** * A set of messages forming an application protocol. * <p> * A protocol consists of: * <ul> * <li>a <i>name</i> for the protocol; * <li>an optional <i>namespace</i>, further qualifying the name; * <li>a list of <i>types</i>, or named {@link Schema schemas}; * <li>a list of <i>errors</i>, or named {@link Schema schemas} for exceptions; * <li>a list of named <i>messages</i>, each of which specifies, * <ul> * <li><i>request</i>, the parameter schemas; * <li>one of either; * <ul> * <li>one-way</li> * </ul> * or * <ul> * <li><i>response</i>, the response schema; * <li><i>errors</i>, an optional list of potential error schema names. * </ul> * </ul> * </ul> */
public class Protocol extends JsonProperties {
The version of the protocol specification implemented here.
/** The version of the protocol specification implemented here. */
public static final long VERSION = 1; // Support properties for both Protocol and Message objects private static final Set<String> MESSAGE_RESERVED = new HashSet<>(); static { Collections.addAll(MESSAGE_RESERVED, "doc", "response", "request", "errors", "one-way"); } private static final Set<String> FIELD_RESERVED = new HashSet<>(); static { Collections.addAll(FIELD_RESERVED, "name", "type", "doc", "default", "aliases"); }
A protocol message.
/** A protocol message. */
public class Message extends JsonProperties { private String name; private String doc; private Schema request;
Construct a message.
/** Construct a message. */
private Message(String name, String doc, JsonProperties propMap, Schema request) { super(MESSAGE_RESERVED); this.name = name; this.doc = doc; this.request = request; if (propMap != null) // copy props addAllProps(propMap); } private Message(String name, String doc, Map<String, ?> propMap, Schema request) { super(MESSAGE_RESERVED, propMap); this.name = name; this.doc = doc; this.request = request; }
The name of this message.
/** The name of this message. */
public String getName() { return name; }
The parameters of this message.
/** The parameters of this message. */
public Schema getRequest() { return request; }
The returned data.
/** The returned data. */
public Schema getResponse() { return Schema.create(Schema.Type.NULL); }
Errors that might be thrown.
/** Errors that might be thrown. */
public Schema getErrors() { return Schema.createUnion(new ArrayList<>()); }
Returns true if this is a one-way message, with no response or errors.
/** Returns true if this is a one-way message, with no response or errors. */
public boolean isOneWay() { return true; } @Override public String toString() { try { StringWriter writer = new StringWriter(); JsonGenerator gen = Schema.FACTORY.createGenerator(writer); toJson(gen); gen.flush(); return writer.toString(); } catch (IOException e) { throw new AvroRuntimeException(e); } } void toJson(JsonGenerator gen) throws IOException { gen.writeStartObject(); if (doc != null) gen.writeStringField("doc", doc); writeProps(gen); // write out properties gen.writeFieldName("request"); request.fieldsToJson(types, gen); toJson1(gen); gen.writeEndObject(); } void toJson1(JsonGenerator gen) throws IOException { gen.writeStringField("response", "null"); gen.writeBooleanField("one-way", true); } @Override public boolean equals(Object o) { if (o == this) return true; if (!(o instanceof Message)) return false; Message that = (Message) o; return this.name.equals(that.name) && this.request.equals(that.request) && propsEqual(that); } @Override public int hashCode() { return name.hashCode() + request.hashCode() + propsHashCode(); } public String getDoc() { return doc; } } private class TwoWayMessage extends Message { private Schema response; private Schema errors;
Construct a message.
/** Construct a message. */
private TwoWayMessage(String name, String doc, Map<String, ?> propMap, Schema request, Schema response, Schema errors) { super(name, doc, propMap, request); this.response = response; this.errors = errors; } private TwoWayMessage(String name, String doc, JsonProperties propMap, Schema request, Schema response, Schema errors) { super(name, doc, propMap, request); this.response = response; this.errors = errors; } @Override public Schema getResponse() { return response; } @Override public Schema getErrors() { return errors; } @Override public boolean isOneWay() { return false; } @Override public boolean equals(Object o) { if (!super.equals(o)) return false; if (!(o instanceof TwoWayMessage)) return false; TwoWayMessage that = (TwoWayMessage) o; return this.response.equals(that.response) && this.errors.equals(that.errors); } @Override public int hashCode() { return super.hashCode() + response.hashCode() + errors.hashCode(); } @Override void toJson1(JsonGenerator gen) throws IOException { gen.writeFieldName("response"); response.toJson(types, gen); List<Schema> errs = errors.getTypes(); // elide system error if (errs.size() > 1) { Schema union = Schema.createUnion(errs.subList(1, errs.size())); gen.writeFieldName("errors"); union.toJson(types, gen); } } } private String name; private String namespace; private String doc; private Schema.Names types = new Schema.Names(); private Map<String, Message> messages = new LinkedHashMap<>(); private byte[] md5;
An error that can be thrown by any message.
/** An error that can be thrown by any message. */
public static final Schema SYSTEM_ERROR = Schema.create(Schema.Type.STRING);
Union type for generating system errors.
/** Union type for generating system errors. */
public static final Schema SYSTEM_ERRORS; static { List<Schema> errors = new ArrayList<>(); errors.add(SYSTEM_ERROR); SYSTEM_ERRORS = Schema.createUnion(errors); } private static final Set<String> PROTOCOL_RESERVED = new HashSet<>(); static { Collections.addAll(PROTOCOL_RESERVED, "namespace", "protocol", "doc", "messages", "types", "errors"); } private Protocol() { super(PROTOCOL_RESERVED); }
Constructs a similar Protocol instance with the same name, doc, and namespace as {code p} has. It also copies all the props.
/** * Constructs a similar Protocol instance with the same {@code name}, * {@code doc}, and {@code namespace} as {code p} has. It also copies all the * {@code props}. */
public Protocol(Protocol p) { this(p.getName(), p.getDoc(), p.getNamespace()); putAll(p); } public Protocol(String name, String doc, String namespace) { super(PROTOCOL_RESERVED); this.name = name; this.doc = doc; this.namespace = namespace; } public Protocol(String name, String namespace) { this(name, null, namespace); }
The name of this protocol.
/** The name of this protocol. */
public String getName() { return name; }
The namespace of this protocol. Qualifies its name.
/** The namespace of this protocol. Qualifies its name. */
public String getNamespace() { return namespace; }
Doc string for this protocol.
/** Doc string for this protocol. */
public String getDoc() { return doc; }
The types of this protocol.
/** The types of this protocol. */
public Collection<Schema> getTypes() { return types.values(); }
Returns the named type.
/** Returns the named type. */
public Schema getType(String name) { return types.get(name); }
Set the types of this protocol.
/** Set the types of this protocol. */
public void setTypes(Collection<Schema> newTypes) { types = new Schema.Names(); for (Schema s : newTypes) types.add(s); }
The messages of this protocol.
/** The messages of this protocol. */
public Map<String, Message> getMessages() { return messages; }
Create a one-way message.
/** Create a one-way message. */
@Deprecated public Message createMessage(String name, String doc, Schema request) { return new Message(name, doc, new LinkedHashMap<String, String>(), request); }
Create a one-way message using the name, doc, and props of m.
/** * Create a one-way message using the {@code name}, {@code doc}, and * {@code props} of {@code m}. */
public Message createMessage(Message m, Schema request) { return new Message(name, doc, m, request); }
Create a one-way message.
/** Create a one-way message. */
public <T> Message createMessage(String name, String doc, JsonProperties propMap, Schema request) { return new Message(name, doc, propMap, request); }
Create a one-way message.
/** Create a one-way message. */
public <T> Message createMessage(String name, String doc, Map<String, ?> propMap, Schema request) { return new Message(name, doc, propMap, request); }
Create a two-way message.
/** Create a two-way message. */
@Deprecated public Message createMessage(String name, String doc, Schema request, Schema response, Schema errors) { return new TwoWayMessage(name, doc, new LinkedHashMap<String, String>(), request, response, errors); }
Create a two-way message using the name, doc, and props of m.
/** * Create a two-way message using the {@code name}, {@code doc}, and * {@code props} of {@code m}. */
public Message createMessage(Message m, Schema request, Schema response, Schema errors) { return new TwoWayMessage(m.getName(), m.getDoc(), m, request, response, errors); }
Create a two-way message.
/** Create a two-way message. */
public <T> Message createMessage(String name, String doc, JsonProperties propMap, Schema request, Schema response, Schema errors) { return new TwoWayMessage(name, doc, propMap, request, response, errors); }
Create a two-way message.
/** Create a two-way message. */
public <T> Message createMessage(String name, String doc, Map<String, ?> propMap, Schema request, Schema response, Schema errors) { return new TwoWayMessage(name, doc, propMap, request, response, errors); } @Override public boolean equals(Object o) { if (o == this) return true; if (!(o instanceof Protocol)) return false; Protocol that = (Protocol) o; return this.name.equals(that.name) && this.namespace.equals(that.namespace) && this.types.equals(that.types) && this.messages.equals(that.messages) && this.propsEqual(that); } @Override public int hashCode() { return name.hashCode() + namespace.hashCode() + types.hashCode() + messages.hashCode() + propsHashCode(); }
Render this as JSON.
/** Render this as <a href="https://json.org/">JSON</a>. */
@Override public String toString() { return toString(false); }
Render this as JSON.
Params:
  • pretty – if true, pretty-print JSON.
/** * Render this as <a href="https://json.org/">JSON</a>. * * @param pretty if true, pretty-print JSON. */
public String toString(boolean pretty) { try { StringWriter writer = new StringWriter(); JsonGenerator gen = Schema.FACTORY.createGenerator(writer); if (pretty) gen.useDefaultPrettyPrinter(); toJson(gen); gen.flush(); return writer.toString(); } catch (IOException e) { throw new AvroRuntimeException(e); } } void toJson(JsonGenerator gen) throws IOException { types.space(namespace); gen.writeStartObject(); gen.writeStringField("protocol", name); gen.writeStringField("namespace", namespace); if (doc != null) gen.writeStringField("doc", doc); writeProps(gen); gen.writeArrayFieldStart("types"); Schema.Names resolved = new Schema.Names(namespace); for (Schema type : types.values()) if (!resolved.contains(type)) type.toJson(resolved, gen); gen.writeEndArray(); gen.writeObjectFieldStart("messages"); for (Map.Entry<String, Message> e : messages.entrySet()) { gen.writeFieldName(e.getKey()); e.getValue().toJson(gen); } gen.writeEndObject(); gen.writeEndObject(); }
Return the MD5 hash of the text of this protocol.
/** Return the MD5 hash of the text of this protocol. */
public byte[] getMD5() { if (md5 == null) try { md5 = MessageDigest.getInstance("MD5").digest(this.toString().getBytes(StandardCharsets.UTF_8)); } catch (Exception e) { throw new AvroRuntimeException(e); } return md5; }
Read a protocol from a Json file.
/** Read a protocol from a Json file. */
public static Protocol parse(File file) throws IOException { return parse(Schema.FACTORY.createParser(file)); }
Read a protocol from a Json stream.
/** Read a protocol from a Json stream. */
public static Protocol parse(InputStream stream) throws IOException { return parse(Schema.FACTORY.createParser(stream)); }
Read a protocol from one or more json strings
/** Read a protocol from one or more json strings */
public static Protocol parse(String string, String... more) { StringBuilder b = new StringBuilder(string); for (String part : more) b.append(part); return parse(b.toString()); }
Read a protocol from a Json string.
/** Read a protocol from a Json string. */
public static Protocol parse(String string) { try { return parse(Schema.FACTORY.createParser(new ByteArrayInputStream(string.getBytes(StandardCharsets.UTF_8)))); } catch (IOException e) { throw new AvroRuntimeException(e); } } private static Protocol parse(JsonParser parser) { try { Protocol protocol = new Protocol(); protocol.parse((JsonNode) Schema.MAPPER.readTree(parser)); return protocol; } catch (IOException e) { throw new SchemaParseException(e); } } private void parse(JsonNode json) { parseNamespace(json); parseName(json); parseTypes(json); parseMessages(json); parseDoc(json); parseProps(json); } private void parseNamespace(JsonNode json) { JsonNode nameNode = json.get("namespace"); if (nameNode == null) return; // no namespace defined this.namespace = nameNode.textValue(); types.space(this.namespace); } private void parseDoc(JsonNode json) { this.doc = parseDocNode(json); } private String parseDocNode(JsonNode json) { JsonNode nameNode = json.get("doc"); if (nameNode == null) return null; // no doc defined return nameNode.textValue(); } private void parseName(JsonNode json) { JsonNode nameNode = json.get("protocol"); if (nameNode == null) throw new SchemaParseException("No protocol name specified: " + json); this.name = nameNode.textValue(); } private void parseTypes(JsonNode json) { JsonNode defs = json.get("types"); if (defs == null) return; // no types defined if (!defs.isArray()) throw new SchemaParseException("Types not an array: " + defs); for (JsonNode type : defs) { if (!type.isObject()) throw new SchemaParseException("Type not an object: " + type); Schema.parse(type, types); } } private void parseProps(JsonNode json) { for (Iterator<String> i = json.fieldNames(); i.hasNext();) { String p = i.next(); // add non-reserved as props if (!PROTOCOL_RESERVED.contains(p)) this.addProp(p, json.get(p)); } } private void parseMessages(JsonNode json) { JsonNode defs = json.get("messages"); if (defs == null) return; // no messages defined for (Iterator<String> i = defs.fieldNames(); i.hasNext();) { String prop = i.next(); this.messages.put(prop, parseMessage(prop, defs.get(prop))); } } private Message parseMessage(String messageName, JsonNode json) { String doc = parseDocNode(json); Map<String, JsonNode> mProps = new LinkedHashMap<>(); for (Iterator<String> i = json.fieldNames(); i.hasNext();) { String p = i.next(); // add non-reserved as props if (!MESSAGE_RESERVED.contains(p)) mProps.put(p, json.get(p)); } JsonNode requestNode = json.get("request"); if (requestNode == null || !requestNode.isArray()) throw new SchemaParseException("No request specified: " + json); List<Field> fields = new ArrayList<>(); for (JsonNode field : requestNode) { JsonNode fieldNameNode = field.get("name"); if (fieldNameNode == null) throw new SchemaParseException("No param name: " + field); JsonNode fieldTypeNode = field.get("type"); if (fieldTypeNode == null) throw new SchemaParseException("No param type: " + field); String name = fieldNameNode.textValue(); String fieldDoc = null; JsonNode fieldDocNode = field.get("doc"); if (fieldDocNode != null) fieldDoc = fieldDocNode.textValue(); Field newField = new Field(name, Schema.parse(fieldTypeNode, types), fieldDoc, field.get("default"), true, Order.ASCENDING); Set<String> aliases = Schema.parseAliases(field); if (aliases != null) { // add aliases for (String alias : aliases) newField.addAlias(alias); } Iterator<String> i = field.fieldNames(); while (i.hasNext()) { // add properties String prop = i.next(); if (!FIELD_RESERVED.contains(prop)) // ignore reserved newField.addProp(prop, field.get(prop)); } fields.add(newField); } Schema request = Schema.createRecord(fields); boolean oneWay = false; JsonNode oneWayNode = json.get("one-way"); if (oneWayNode != null) { if (!oneWayNode.isBoolean()) throw new SchemaParseException("one-way must be boolean: " + json); oneWay = oneWayNode.booleanValue(); } JsonNode responseNode = json.get("response"); if (!oneWay && responseNode == null) throw new SchemaParseException("No response specified: " + json); JsonNode decls = json.get("errors"); if (oneWay) { if (decls != null) throw new SchemaParseException("one-way can't have errors: " + json); if (responseNode != null && Schema.parse(responseNode, types).getType() != Schema.Type.NULL) throw new SchemaParseException("One way response must be null: " + json); return new Message(messageName, doc, mProps, request); } Schema response = Schema.parse(responseNode, types); List<Schema> errs = new ArrayList<>(); errs.add(SYSTEM_ERROR); // every method can throw if (decls != null) { if (!decls.isArray()) throw new SchemaParseException("Errors not an array: " + json); for (JsonNode decl : decls) { String name = decl.textValue(); Schema schema = this.types.get(name); if (schema == null) throw new SchemaParseException("Undefined error: " + name); if (!schema.isError()) throw new SchemaParseException("Not an error: " + name); errs.add(schema); } } return new TwoWayMessage(messageName, doc, mProps, request, response, Schema.createUnion(errs)); } public static void main(String[] args) throws Exception { System.out.println(Protocol.parse(new File(args[0]))); } }