Skip to content

PDP 1: Schema Registry

shivesh ranjan edited this page Jul 2, 2020 · 3 revisions

Motivation

Pravega streams store sequences of bytes and pravega does not validate the information in these events. However, applications typically require encoding this information in a structured format. And as the business needs change, this structure may also need to change/evolve to handle new requirements.

To handle this, applications would want to use serialization systems that support schemas and allow for evolution of schemas. With schemas, they can define the structure of the data in an event while ensuring that both upstream and downstream applications use the correct structure. And over time, evolve this schema to incorporate new business requirements. Without a schema evolution support, if the upstream writers were updated to publish newer structures of data, it may break downstream readers if they were running with an older version of the data.

When schemas are allowed to change in compatible fashion then downstream applications continue to consume data without worrying about breaking because of unexpected changes to structures.

In the absence of schema management support from streaming storage layer, applications have to build an out of band coordination mechanism for potentially exchanging and evolving schemas for the data in the stream. Given that schema management is a common requirement of applications working with streams, it is desirable to perform it at the Streaming Storage layer.

With schema management supported by storage layer, all applications relying on the streaming storage can benefit from it. A common schema management can additionally enforce schema evolution strategies and prevent events with incompatible schemas to be written by the upstream applications, consequently simplifying the application development for stream processing and allowing for reduced coordination across teams.

Schema Serving and Management:

The goal is to build a serving and management layer for schemas for data in pravega streams. Making schema for the data written into the stream has advantages for many applications, particularly generic applications like SQL/BI applications. Such applications can benefit from the knowledge of structure of data in the stream so that they can process and make sense of it without being implicitly coded with the structure. Schema registry also serves the function of a centralized schema management layer that allows evolution of schema in conformance with desired compatibility strategy. This facilitates greater decoupling of different teams working against the data in the stream while ensuring that downstream applications do not break because of incompatible data could be ingested into the stream.

Terminology

Schema: Declaration of the structure of the data in a formal language

Schema Evolution: Schema evolution is simply changing the schema used in a stream. Serialization systems that support schema evolution allow using different schemas during writes and reads.

Compatibility: Two schemas are said to be compatible if data written using one schema can be read using another schema.

Schema Version: As schemas are evolved for a message type it is captured as a chronological history of schemas. Each schema is uniquely identified by a monotonically increasing version number.

Backward Compatibility: For two schemas S1 and S2 where S1 had lower version than S2, S2 is backward compatible with S1 if data written using S1 can be read using S2.

Forward Compatibility: For two schemas S1 and S2 where S1 had lower version than S2, S1 is forward compatible with S2 if data written using S2 can be read using S1.

Schema Version: Compatible schemas when included as part of an evolution group are considered different versions of the schema.

Understanding versioning

Serialization systems that allow schema evolution do not inherently attach a version to the schema. A version is logical property we apply to a schema when it is part of an ordered group of schemas describing an entity in its evolution.

Compatibility by definition is ability to read data using a different schema than the one that was used to write it. The direction of compatibility is applicable in the context of an order of evolution. There is no universal compatibility rules for all serialization formats. For example, the rules for compatibility for avro will be different from rules for compatibility for protobuf. However, compatibility and evolution are generic concepts that are applicable to all serialization systems that require a schema and support schema evolution.

Our objective is to build a schema management and serving layer that does not tie itself to any single serialization system. Any subsequent discussion about these concepts should be taken with that view.

Schema Management using a central service

For schema management we propose to build a centralized registry service, called schema registry service, that is single source of truth about schemas of data stored in a pravega stream. It will group schemas for the data in a stream in the order in which they were added and enforce compatibility checks as schemas are evolved.

The service will provide logical groups where different schemas for data in a stream will be stored. The schema group logically maps to a pravega stream but its usage is not limited to only pravega streams. A group is a name under which schemas are stored. The registry service maintains a versioned history of schemas for a group and assigns unique version numbers to these schemas. It will assign versions to each schema in the group to uniquely identify the schema and its order in the group. The registry service will provide an interface, a REST endpoint, for storing and retrieving schemas for each group.

Schema evolution for the group will will be subjected to compatibility checks based on chosen strategy. The strategy could be any of forward compatibility, backward compatibility, both or none. The service would accept only those schemas in a group that comply with the chosen compatibility strategy. The accepted schemas will become part of a versioned history and will be assigned unique reference identifiers called schema versions.

It is important to note that service can only perform compatibility checks for schemas that it can parse. So in v1, we intend to support avro, protobuf and json out of the box and start with compatibility checks for avro and later extend it to protobuf and json. But the service is not limited to storing schemas for just these three formats.

So far, we have described a way to store, evolve and access schemas. These can be accessed by anyone interested in knowing the structure of data in the group. Since the data in a stream can evolve, there could be different events written with different schemas or structures. Applications could benefit if there was a way to tag the data written into a stream with the schema with which it was written.

For this, the registry service will be complimented with a registry client. This registry client can be used to encode schema information with the payload by referencing the schema with the schema version. During write phase, the registry client will encode writer schema version along with the payload. During read phase, the registry client will resolve schema version to actual schema by querying the registry service. Including schema versions instead of actual schemas has advantages of reduced payload size while accruing benefits of sharing schema with downstream applications.

As mentioned earlier, the benefits sharing schema with downstream applications is not limited to formats like avro where reader requires the writer schema for deserialization. Enabling applications with the knowledge of the exact schema with which the data was written for all serialization formats can greatly simplify working with and building streaming data pipelines.

Using Pravega With Schema Registry

Pravega will continue to be decoupled from serialization format and only deal with raw bytes. Schema registry is a new complimentary service which can be deployed to benefit applications working with data in pravega streams. The management of schema via schema registry is optional and applications can continue to use pravega streams without using schema management provided by a central service.

Users can choose to use pravega any of the three following modes:

  1. Existing way: no schema registry.
  2. Register and evolve schemas in registry but do not use registry client while writing data
  3. Register schemas in the registry and use registry client to encode schema Id with payload

Because pravega stream can work in any of the three modes, the applications have the flexibility to choose the level of support it wants from the registry.

Different types of application use cases can benefit from different level of integration with the registry. We will try to classify applications and describe how they could benefit from the registry.

Applications

Generally, applications are authored to include a specific schema (or corresponding generated classes) in the code. Reader applications read the data using schema on read by applying specific schema on the serialized data. They would typically work with generated objects and interpret the data from the stream as specific structured objects. With compatible evolution of schemas, these applications can continue to work even as they encounter messages encoded with different schemas. There are another class of applications that are authored without any specific schema (or corresponding generated classes) and can work by knowing the schema at the time of data processing. Examples of such applications include connectors, sql etc. There is advantage in sharing the strucuture of the data along with the data so that such applications can be enabled.

To broadly understand the use case, we will classify applications into two broad categories.

  1. Applications with implicit knowledge of structure of data they work with by having the schema embedded in the code. These are your typical applications that are compiled with specific schemas (or corresponding generated classes) and their business logic is tightly coupled with a specific structure in which they want to process the data.
  2. Applications that do include specific schemas at compile time. They dynamically react to data they need to process. Examples of such applications would include SQL query application over streaming data. The application may not be hard coded with a schema but will request the registry service for a compatible schema that it can use to write and read data into the stream using SQL commands.

The first category of applications can benefit from sharing of schema via the registry service for formats like avro where the schema is required for deserialization. Other serialization formats do not necessarily benefit from sharing the exact writer schema. However, they can still benefit from storing and using the registry service to evolve schemas in a compatible fashion so that downstream applications can continue to work without worrying about incompatibility. This also allows downstream and upstream applications to be highly decoupled translating to lesser coordination between different teams and this faster development cycles and higher productivity.

The second category of applications benefit greatly from the presence of a central store for schemas that describe the data within a stream. Depending on applications, some could fetch a specific schema from the registry service and use schema on read technique; while others may want to fetch exact writer schema for each data that they process. These applications can process data in a stream by applying the structure described in the schema on the data that they read at the runtime by dynamically assessing the schema corresponding to the event. This structured data can then be used to perform any meaningful business processing of the data.

For streams where they may want or require the writer schema, they could use the registry client which would encode the schema version with the serialized data. But others that may not be interested in the exact structure, they could still use the registry service without using the registry client.

A schema registry service and client combine to enable all such applications by facilitating making write time schemas available with downstream applications and with enforcement of compatibility during schema evolution guarantees that these applications will not break even as schemas are evolved.

Let us take a concrete example application – imagine an SQL application that provides tabular views of data in pravega streams which is configured with pravega cluster and registry. They would want the view data in the stream in a tabular form, thus assume a fixed structure for all events as they are deserialized.

Users could simply express their SQL queries as follows.

CREATE STREAM sensorData WITH (PRAVEGA-STREAM='scope/sensorDataStream'); 
SELECT * FROM sensorData;

The application would contact the registry service to retrieve a registered (typically latest) schema and using its structure, it would apply schema on read on all events in the stream to give tabular views could of the streaming data. Take another example, this time imagine an indexing application that wants to index each event in the stream individually by using the exact structure of the data used for writing it. While indexing event, this search application would retrieve the write time schema from the registry and apply that structure on the event and create a document with all the fields in the event and index it with a search engine.

Such applications are made possible by managing schemas at a central location in the registry service. Without the central schema management and serving layer, the schemas would have to be shared out of band or in ad hoc manner and each application developer would invent their own mechanisms.

Schema Registry Service Design:

Applications will create a group and choose a compatibility policy with the Schema_Registry_Service. Compatibility policy tells the registry service which kind of changes to the schema are allowed and which are disallowed.

Pravega applications that use schema registry will instantiate pravega writers and readers with schema registry client. This can be instantiated in two modes – 1. Encode schema version with data 2. Do not encode schema version with data. Schema registry client will take care of all communication with the registry service. When an application chooses a schema to work with, a call to the registry service would be made that would either implicitly register the client's schema or retrieve the version if the said schema was already registered.

The registry service will offer multiple types of compatibility like backward, forward, bi-directional, disabled, and allow-all. Based on the chosen compatibility setting, new schemas will be checked against potentially all existing schemas for compatibility.

It is important to call out that these compatibility settings are not serialization format specific. Hence, schema registry is designed to be generic and extendible and be able to support multiple serialization systems.

Typically, users would prefer a serialization system that supports describing the structure of data in a formal language and allows them to evolve the schema. The popular choices are Avro, Protobuf, Thrift. The new schema registry will aim to provide compatibility validation for popular serialization formats, while at the same time, it will be designed to be plugged with custom schema compatibility checker code for any custom serialization system too.

Design Goals:

  • Provide a centralized service for storing and retrieving schemas.
  • Assign and store versioned history for schemas for a stream.
  • Allow evolution of schema and accept or reject new schemas based on chosen compatibility settings by validating against existing versions.
  • Support multiple schema types like Avro, Protobuf, Json, thrift and even custom schema formats such that Schema Registry is fully pluggable.
  • Allow each schema owners to choose compatibility strategy.
  • Modify pravega clients to be instantiated with specific schemas.
  • Provide Serializers that handle schema storage and retrieval from the registry. They also include schema version information along with the serialized data.

Non-goals:

  • Upgrade of applications in accordance with compatibility policy should be done out of band. Schema registry does not enforce correctness if upgrade order for the policy is not followed.

Architecture

Schema registry should be distributed and backed by durably persisting its metadata in pravega. All schema registry service nodes will be stateless and capable of serving all queries. So there is no partition or distribution of groups across schema registry service nodes. All the state is durably and consistently persisted in pravega tables.

The registry service is a standalone service that acts as a serving and management layer for schemas. Schemas are grouped under a unique name with the service and evolved within that namespace. Schemas are assigned unique identifiers within that group that applications can use to refer to a schema uniquely. The service merely acts as a central registry for this information and does not impose any constraints or criteria on how its information is to be used by the applications.

Using Schema registry in Pravega applications

Pravega applications can use schema registry service to store and retrieve the schemas for the data that they are working with during runtime. For this, applications would register schemas with the registry service and then tag their data with the unique schema identifiers generated by the service. The writer applications will add the identifier as a header with the data while writing serialized events into pravega. Reader applications will parse the header to read the encoding identifier and resolve it with the schema registry service and use the information from the service as the structure describing the data that follows it.

One of the important goals for the service is to ensure that schema layer is completely decoupled from Pravega. Pravega is a streaming storage system and it should continue to be schema less and work with raw bytes.

This follows naturally in to ensure that pravega streams could be used both with or without schemas. So it entails that pravega applications could use pravega in any of the three modes:

  1. No schema management using registry service – This is same as existing way and schema registry is not mandated.
  2. Use schema registry to store schemas but do not encode schema version identifiers with the stream data. Typical use case would be with serialization systems that do not require writer schema at the time of deserialization and applications that can work with schema on read technique with any compatible schema. Such applications can accrue benefits of central management of schema while they do not wish to include schema or its reference with the payload in the stream.
  3. Schema management with Schema registry with registry client encoding schema identifier with serialized payload for downstream applications to access writer schemas should they wish to. This is absolutely essential for formats like avro. This is also useful for other formats where downstream application are interested in exact writer schema.

Pravega reader and writer clients require users to supply an implementation of Serializer interface. We provide applications a suite of schema registry aware serializers. These serializers are responsible for all interactions with schema registry service.

This approach makes all interaction with schema registry transparent to the user. We will provide implementation for popular serialization formats like avro protobuf and json. It will also provide a plugablility for support users can provide implementation for custom formats.

We will next describe the interaction between schema registry client and service for the use case 3. During writer/reader instantiation, PravegaSerializers will automatically contact schema registry with the supplied schema and schema registry with validate the schema and return the schema identifiers for the application to use.

If the supplied schema already exists in the registry, then the schema registry performs compatibility validation with the latest schema before accepting or rejecting the schema.

Events written using a writer with schema will include schema version with serialized data. This will enable downstream readers to retrieve the schema and use it for deserialization.

Pravega Serializers:

We have created a whole suite of Pravega Serializer for different serialization formats which will transparently interact with the registry service and encode headers describing encoding of the data that follows it. Pravega Deserializers will also maintain a cache of retrieved schemas with their encoding information. Typically writers will only contact the registry service once, during the initialization of the serializer to get the unique encoding identifier. And readers will communicate with schema registry once for each new encoding id that they encounter. Once they retrieve the information corresponding to the encoding id, they will cache it and reuse it from the cache.

We will use following structure for encoding schema id with each serialized event. It will take 5 additional bytes per event. First byte will be the magic byte that describes the encoding format. Subsequent bytes will be used to encode the encoding id.

|       1 byte        |     2-4 byte     | <serialized event's length>  |
|    Encoding format  |  Encoding Id     |       Serialized Data        |

Consequently, as clients would deserialize the event using Pravega's Serializers, the serializer would first read the schema version and retrieve the schema from the Schema Registry and cache it. So readers will only call into Schema Registry everytime they encounter a new schema id.

Example usage for avro:

        Serializer<MyEvent> serializer = SerializerFactory.avroSerializer(serializerConfig, AvroSchema.of(MyEvent.class));
        Serializer<MyEvent> deserializer = SerializerFactory.avroDeserializer(serializerConfig, AvroSchema.of(MyEvent.class));
        Serializer<GenericRecord> genericDeserializer = SerializerFactory.avroGenericDeserializer(serializerConfig);

Writer and reader instantiation:

        EventStreamWriter<MyEvent> writer = clientFactory.createEventWriter(stream, serializer,
                EventWriterConfig.builder().build());
        EventStreamReader<MyEvent> reader = clientFactory.createReader("reader",
                readergroup,
                deserializer,
                ReaderConfig.builder().build());

Schema Registry will support multiple types of serialization systems. The schema is encoded as bytes and sent to registry service for persistence. For known schema formats, the registry will parse the schema and apply schema compatibility rules. For custom schema type, the user needs to provide the schema in binary form. The custom schema format is not parsed and interpreted by schema registry or its client.

Concepts:

Group

A group refers to the name under which the schemas are registered with the service. A group is identified by a unique group id. A group is the abstraction under which schemas are registered and evolved and is the chief primitive exposed by the registry service. A pravega stream would make use of a schema registry group for managing its schemas. Conceptually same group can be used by multiple streams too. Also, note that since schema registry and pravega, are decoupled, the linking of schema registry group to pravega streams is application's colloquial knowledge. Typically as a convention applications can decide on a convention for naming their groups which is identical to corresponding pravega stream. The information stored by the service for a group includes group metadata like serialization format, Compatibility policy. Codecs and Schemas are registered under the group and schema registry service applies compatibility policy for all schemas registered in a group.

Schemas

Schemas are encapsulated under an object called SchemaInfo will capture all metadata for the schema. SchemaInfo will have a name for the schema, a schema type and schema data in binary form. It will also include a key/value properties map which will hold user defined keys and values and can be used for tagging the schema.

Version

Schema registry stores schemas under a named group. Within a group, schemas are assigned versions to capture how schemas are evolved within the group. Version is encapulated under an object called VersionInfo. VerionInfo contains two fields - version, which is a 32 bit counter, and schema name. The version is used to uniquely identify a schema within a versioned history in a group. So all schemas within a stream will be uniquely identified by a unique version.

Codecs

Encoding of data in streams can include more than just schemas. Schema registry has support for registering codecs for a group. Codecs describe how data is encoded after serializing it. Typical usage of encoding would be to describe the compression type for the data. CodecType defines different types of encodings that can be used for encoding data while writing it to the stream.

Encoding

A codec type and schema version combination uniquely identifies encoding for the data. This is encapsulated under an object called EncodingInfo. Schema registry generates unique identifiers for each such pair of registered schemas and codecs. EncodingId is a 32 bit number. After a schema is registered, whenever its version is requested with a codec type, the encoding id that uniquely identifies that pair is returned. Pravega applications that use schema registry aware serializers

Support for multiple types of schemas within the same group:

Streams are also used in scenarios where multiple types of objects could be written into the same stream. These typically include usage scenarios like event sourcing, message bus etc. The Schema registry service has first class support coexistence of multiple types of events within a group. Schemas for each of these different object types will be evolved independent of other schemas under the group. To support this, there is a group property for a group called allowMultipleTypes. If this flag is set to true, schema registry service will compare any new schema with all existing schemas that share the same schema name. If allowMultipleTypes is set to false (default) schemas are compared with all other schemas in the group and only schemas with the same name are allowed.

Compatibility:

As part of schema management, the registry service supports validating schemas for compatibility.

Compatibility can be defined as group level policy. The policy is applied at the time when new schema is added to the group. Pravega schema registry supports a range of compatibility policies that can be specified.
Broadly they are :

  1. ALLOW_ANY: Allow all schema changes without any compatibility check. The responsibility to manage compatibility is left to the schema owner out of band.
  2. BACKWARD: Validate that new schema can be used to read data written using previous schema.
  3. BACKWARD_TILL(x): Validate that new schema is backward compatible till specified schema version.
  4. BACKWARD_TRANSITIVE: Validate that new schema is backward compatible with all previous schemas in the group.
  5. FORWARD: Validate that previous schema can be used to read data written using newer schema.
  6. FORWARD_TILL(x): Validate that new schema is backward compatible till specified schema version.
  7. FORWARD_TRANSITIVE: Validate that the new schema is forward compatible with all previous schemas in the group. 8: BACKWARD_TILL(x)_AND_FORWARD_TILL(y): Validate for backward compatibility till version identified by x and for forward compatibility till version identified by y.
  8. FULL: Validate against the latest schema that schema is compatible in both forward and backward direction with new schema. Old schema can be used to read data from new schema and new schema can be used to read data from old schema.
  9. FULL_TRANSITIVE: Validate against all previous schemas for both forward and backward compatibility.
  10. DENY_ALL: Disallow any schema evolution/modification.

Compatibility policy is encapsulated under a more generalized construct of Compatibility. Currently the only rule supported is Compatibility, but by encapsulating it under Rules object allows us to include additional types of rules in future.

How to choose compatibility policy:

The choice of compatibility policy is dependent on the scenario in which a stream is used. For example, backward compatibility ensures that data written with old schema readable by readers with new schema. So backward compatibility as a policy choice should be chosen when it is possible to upgrade readers first. The registry service validates schemas for compatibility, but the corresponding upgrades has to be done out of band. Take an analogy with Service client model. Schemas should be thought of as analogous to APIs. Backward compatibility choice ensures that when service is upgraded, it can continue to serve older clients. The service is upgraded followed by clients. Same concept applies to streams where you upgrade the readers followed by writers.

Forward compatibility, similarly, ensures that the data written with new schema can be read by readers with old schema. Here, you upgrade the writer first followed by readers.

If there is no control over which could be upgraded first, then Full compatibility should be chosen. If writers and readers from multiple versions can be present as schemas are evolved, or if historical reads are expected on the streaming data, policy with transitive property should be made accordingly.

So the choice of policy should be made very carefully by understanding the usage scenario for the stream and the level of control over writers or readers for upgrades.

APIs:

Following are the logical APIs exposed by the schema registry.

interface SchemaRegistry {
    /**
     * Adds a new group. A group refers to the name under which the schemas are registered. A group is identified by a 
     * unique id and has an associated set of group metadata {@link GroupProperties} and a list of codec types and a 
     * versioned history of schemas that were registered under the group. 
     * Add group is idempotent. If the group by the same id already exists the api will return false. 
     * 
     * @param groupId Id for the group that uniquely identifies the group. 
     * @param groupProperties groupProperties Group properties for the group. These include serialization format, compatibility policy, 
     *                        and flag to declare whether multiple schemas representing distinct object types can be 
     *                        registered with the group. Type identify objects of same type. Schema compatibility checks 
     *                        are always performed for schemas that share same {@link SchemaInfo#type}.
     *                        Additionally, a user defined map of properties can be supplied.
     * @return True indicates if the group was added successfully, false if it exists. 
     * @throws BadArgumentException if the group properties is rejected by service.
     * @throws UnauthorizedException if the user is unauthorized.
     */
    boolean addGroup(String groupId, GroupProperties groupProperties) throws BadArgumentException, UnauthorizedException;
    
    /**
     * Removes a group identified by the groupId. This will remove all the codec types and schemas registered under the group.
     * Remove group is idempotent. 
     * 
     * @param groupId Id for the group that uniquely identifies the group. 
     * @throws UnauthorizedException if the user is unauthorized.
     */
    void removeGroup(String groupId) throws UnauthorizedException;

    /**
     * List all groups that the user is authorized on. This returns an iterator where each element is a pair of group 
     * name and group properties. 
     * The list group is a non atomic call. The implementation is not necessarily consistent as it uses paginated 
     * iteration using Continuation Token. This could mean that as the list is being iterated over, the state on the server 
     * may be updated (some groups added or removed). For example, if a group that has been iterated over is deleted
     * and recereated, the iterator may deliver a group with identical name twice. Similarly, If a group that has not yet been
     * iterated over is deleted, the client may or may not see the group as it is iterating over the response depending on
     * whether the client had received the deleted group from service before it was deleted or not. 
     * This iterator can be used to iterate over each element until all elements are exhausted and gives a weak guarantee 
     * that all groups added before the iterator {@link Iterator#hasNext()} = false can be iterated over.
     * @return map of names of groups with corresponding group properties for all groups. 
     * @throws UnauthorizedException if the user is unauthorized.
     */
    Iterator<Map.Entry<String, GroupProperties>> listGroups() throws UnauthorizedException;
        
    /**
     * Get group properties for the group identified by the group id. 
     * 
     * {@link GroupProperties#serializationFormat} which identifies the serialization format is used to describe the schema.
     * {@link GroupProperties#compatibility} sets the schema compatibility policy that needs to be enforced for evolving schemas.
     * {@link GroupProperties#allowMultipleTypes} that specifies if multiple schemas are allowed to be registered in the group. 
     * Schemas are validated against existing schema versions that have the same {@link SchemaInfo#type}. 
     * {@link GroupProperties#properties} describes generic properties for a group.
     * 
     * @param groupId Id for the group.
     * @return Group properties which includes property like Serialization format and compatibility policy. 
     * @throws ResourceNotFoundException if group is not found.
     * @throws UnauthorizedException if the user is unauthorized.
     */
    GroupProperties getGroupProperties(String groupId) throws ResourceNotFoundException, UnauthorizedException;

    /**
     * Update group's schema compatibility policy. If previous compatibility policy are not supplied, then the update to the policy will be
     * performed unconditionally. However, if previous compatibility policy are supplied, then the update will be performed if and only if
     * existing {@link GroupProperties#compatibility} match previous compatibility policy. 
     * 
     * @param groupId Id for the group. 
     * @param compatibility New Compatibility for the group.
     * @param previous Previous compatibility.
     * @return true if the update was accepted by the service, false if it was rejected because of precondition failure.
     * Precondition failure can occur if previous compatibility policy were specified and they do not match the policy set on the group. 
     * @throws ResourceNotFoundException if group is not found.
     * @throws UnauthorizedException if the user is unauthorized.
     */
    boolean updateCompatibility(String groupId, Compatibility compatibility, @Nullable Compatibility previous)
        throws ResourceNotFoundException, UnauthorizedException;

    /**
     * Gets list of latest schemas for each object types registered under the group. Objects are identified by {@link SchemaInfo#type}.
     * Schema registry provides consistency guarantees. So all schemas added before this call will be returned by this call.
     * However, the service side implementation is not guaranteed to be atomic.
     * So if schemas are being added concurrently, the schemas returned by this api may or may not include those. 
     *
     * @param groupId Id for the group. 
     * @return Unordered list of different objects within the group.    
     * @throws ResourceNotFoundException if group is not found.
     * @throws UnauthorizedException if the user is unauthorized.
     */
    List<SchemaWithVersion> getSchemas(String groupId) throws ResourceNotFoundException, UnauthorizedException;

    /**
     * Registers schema to the group. Schemas are validated against existing schemas in the group that share the same 
     * {@link SchemaInfo#type}.
     * If group is configured with {@link GroupProperties#allowMultipleTypes} then multiple schemas with distinct
     * type {@link SchemaInfo#type} could be registered. 
     * All schemas with same type are assigned monotonically increasing version numbers. 
     * Implementation of this method is expected to be idempotent. The behaviour of Add Schema API on the schema registry
     * service is idempotent. The service assigns and returns a new version info object to identify the given schema. 
     * If a schema was already registered, the existing version info is returned by the service.  
     * 
     * @param groupId Id for the group. 
     * @param schemaInfo Schema to add. 
     * @return versionInfo which uniquely identifies where the schema is added in the group. If schema is already registered,
     * then the existing version info is returned. 
     * @throws SchemaValidationFailedException if the schema is deemed invalid by applying compatibility which may 
     * include comparing schema with existing schemas for compatibility in the desired direction. 
     * @throws SerializationMismatchException if serialization format does not match the group's configured serialization format.
     * @throws MalformedSchemaException for known serialization formats, if the service is unable to parse the schema binary or 
     * for avro and protobuf if the {@link SchemaInfo#type} does not match the name of record/message in the binary.
     * @throws ResourceNotFoundException if group is not found.
     * @throws UnauthorizedException if the user is unauthorized.
     */
    VersionInfo addSchema(String groupId, SchemaInfo schemaInfo) throws SchemaValidationFailedException, SerializationMismatchException, 
            MalformedSchemaException, ResourceNotFoundException, UnauthorizedException;

    /**
     * Deletes the schema associated to the given version. Users should be very careful while using this API in production, 
     * esp if the schema has already been used to write the data. 
     * An implementation of the delete call is expected to be idempotent. The behaviour of delete schema API invocation 
     * with the schema registry service is idempotent. 
     * The service performs a soft delete of the schema. So getSchemaVersion with the version info will still return the schema.
     * However, the schema will not participate in any compatibility checks once deleted.
     * It will not be included in listing schema versions for the group using APIs like {@link SchemaRegistryClient#getSchemaVersions}
     * or {@link SchemaRegistryClient#getGroupHistory} or {@link SchemaRegistryClient#getSchemas} or 
     * {@link SchemaRegistryClient#getLatestSchemaVersion}
     * If add schema is called again using this deleted schema will result in a new version being assigned to it upon registration. 
     * 
     * @param groupId Id for the group. 
     * @param versionInfo Version which uniquely identifies schema within a group. 
     * @throws ResourceNotFoundException if group is not found. 
     * @throws UnauthorizedException if the user is unauthorized.
     */
    void deleteSchemaVersion(String groupId, VersionInfo versionInfo) throws ResourceNotFoundException, UnauthorizedException;
    
    /**
     * Gets schema corresponding to the version. 
     * 
     * @param groupId Id for the group. 
     * @param versionInfo Version which uniquely identifies schema within a group. 
     * @return Schema info corresponding to the version info.
     * @throws ResourceNotFoundException if group or version is not found. 
     * @throws UnauthorizedException if the user is unauthorized.
     */
    SchemaInfo getSchemaForVersion(String groupId, VersionInfo versionInfo) throws ResourceNotFoundException, UnauthorizedException;
    
    /**
     * Gets encoding info against the requested encoding Id. The purpose of encoding info is to uniquely identify the encoding
     * used on the data at rest. The encoding covers two parts - 
     * 1. Schema that defines the structure of the data and is used for serialization. A specific schema version registered with
     * registry service is uniquely identified by the corresponding VersionInfo object. 
     * 2. CodecType that is used to encode the serialized data. This would typically be some compression. The codecType 
     * and schema should both be registered with the service and service will generate a unique identifier for each such 
     * pair. 
     * Encoding Info uniquely identifies a combination of a versionInfo and codecType.
     * EncodingInfo also includes the {@link SchemaInfo} identified by the {@link VersionInfo}.
     * 
     * @param groupId Id for the group. 
     * @param encodingId Encoding id that uniquely identifies a schema within a group. 
     * @return Encoding info corresponding to the encoding id. 
     * @throws ResourceNotFoundException if group or encoding id is not found. 
     * @throws UnauthorizedException if the user is unauthorized.
     */
    EncodingInfo getEncodingInfo(String groupId, EncodingId encodingId) throws ResourceNotFoundException, UnauthorizedException;

    /**
     * Gets an encoding id that uniquely identifies a combination of Schema version and codec type. 
     * This encoding id is a 4 byte integer and it can be used to tag the data which is serialized and encoded using the
     * schema version and codecType identified by this encoding id. 
     * The implementation of this method is expected to be idempotent. The corresponding GetEncodingId API on schema registry
     * service is idempotent and will generate a new encoding id for each unique version and codecType pair only once. 
     * Subsequent requests to get the encoding id for the codecType and version will return the previously generated id. 
     * If the schema identified by the version is deleted using {@link SchemaRegistryClient#deleteSchemaVersion} api, 
     * then if the encoding id was already generated for the pair of schema version and codec, then it will be returned. 
     * However, if no encoding id for the versioninfo and codec pair was generated and the schema version was deleted, 
     * then any call to getEncodingId using the deleted versionInfo will throw ResourceNotFoundException. 
     * 
     * @param groupId Id for the group. 
     * @param versionInfo version of schema 
     * @param codecType codec type
     * @return Encoding id for the pair of version and codec type.
     * @throws CodecTypeNotRegisteredException if codectype is not registered with the group. Use {@link SchemaRegistryClient#addCodecType} 
     * @throws ResourceNotFoundException if group or version info is not found. 
     * @throws UnauthorizedException if the user is unauthorized.
     */
    EncodingId getEncodingId(String groupId, VersionInfo versionInfo, String codecType) 
            throws CodecTypeNotRegisteredException, ResourceNotFoundException, UnauthorizedException;

    /**
     * Gets latest schema and version for the group (or type, if specified). 
     * To get latest schema version for a specific type identified by {@link SchemaInfo#type}, provide the type. 
     * Otherwise if the group is configured to allow multiple schemas {@link GroupProperties#allowMultipleTypes}, then 
     * and type is not specified, then last schema added to the group across all types will be returned. 
     * 
     * @param groupId Id for the group. 
     * @param serializationFormat Type of object identified by {@link SchemaInfo#type}. 
     *                 
     * @return Schema with version for the last schema that was added to the group (or type).
     * @throws ResourceNotFoundException if group is not found. 
     * @throws UnauthorizedException if the user is unauthorized.
     */
    SchemaWithVersion getLatestSchemaVersion(String groupId, @Nullable String serializationFormat)
        throws ResourceNotFoundException, UnauthorizedException;

    /**
     * Gets version corresponding to the schema.  
     * For each schema type {@link SchemaInfo#type} and {@link SchemaInfo#serializationFormat} a versionInfo object uniquely 
     * identifies each distinct {@link SchemaInfo#schemaData}. 
     *
     * @param groupId Id for the group. 
     * @param schemaInfo SchemaInfo that describes format and structure. 
     * @return VersionInfo corresponding to schema. 
     * @throws ResourceNotFoundException if group is not found or if schema is not registered. 
     * @throws UnauthorizedException if the user is unauthorized.
     */
    VersionInfo getVersionForSchema(String groupId, SchemaInfo schemaInfo) throws ResourceNotFoundException, UnauthorizedException;

    /**
     * Gets all schemas with corresponding versions for the group (or type, if specified). 
     * For groups configured with {@link GroupProperties#allowMultipleTypes}, the type {@link SchemaInfo#type} should be 
     * supplied to view schemas specific to a type. if type is null, all schemas in the group are returned.  
     * The order in the list matches the order in which schemas were evolved within the group. 
     * 
     * @param groupId Id for the group.
     * @param serializationFormat type of object identified by {@link SchemaInfo#type}. 
     * @return Ordered list of schemas with versions and compatibility policy for all schemas in the group. 
     * @throws ResourceNotFoundException if group is not found. 
     * @throws UnauthorizedException if the user is unauthorized.
     */
    List<SchemaWithVersion> getSchemaVersions(String groupId, @Nullable String serializationFormat) throws ResourceNotFoundException, UnauthorizedException;
    
    /**
     * Checks whether given schema is valid by applying compatibility policy against previous schemas in the group  
     * subject to current {@link GroupProperties#compatibility} policy.
     * The invocation of this method will perform exactly the same validations as {@link SchemaRegistryClient#addSchema(String, SchemaInfo)}
     * but without registering the schema. This is primarily intended to be used during schema development phase to validate that 
     * the changes to schema are in compliance with compatibility policy for the group.  
     * 
     * @param groupId Id for the group. 
     * @param schemaInfo Schema to check for validity. 
     * @return A schema is valid if it passes all the {@link GroupProperties#compatibility}. The rule supported 
     * are allow any, deny all or a combination of BackwardAndForward. If desired compatibility is satisfied by the schema then this method returns true, false otherwise. 
     * @throws ResourceNotFoundException if group is not found. 
     * @throws UnauthorizedException if the user is unauthorized.
     */
    boolean validateSchema(String groupId, SchemaInfo schemaInfo) throws ResourceNotFoundException, UnauthorizedException;

    /**
     * Checks whether given schema can be used to read by validating it for reads against one or more existing schemas in the group  
     * subject to current {@link GroupProperties#compatibility} policy.
     * 
     * @param groupId Id for the group. 
     * @param schemaInfo Schema to check to be used for reads. 
     * @return True if it can be used to read, false otherwise. 
     * @throws ResourceNotFoundException if group is not found. 
     * @throws UnauthorizedException if the user is unauthorized.
     */
    boolean canReadUsing(String groupId, SchemaInfo schemaInfo) throws ResourceNotFoundException, UnauthorizedException;

    /**
     * List of codec types used for encoding in the group. 
     * 
     * @param groupId Id for the group. 
     * @return List of codec types used for encoding in the group. 
     * @throws ResourceNotFoundException if group is not found. 
     * @throws UnauthorizedException if the user is unauthorized.
     */
    List<String> getCodecTypes(String groupId) throws ResourceNotFoundException, UnauthorizedException;

    /**
     * Add new codec type to be used in encoding in the group. Adding a new codectype is backward incompatible. 
     * Make sure all readers are upgraded to use the new codec before any writers use the codec to encode the data. 
     * 
     * @param groupId Id for the group. 
     * @param codecType codec type.
     * @throws ResourceNotFoundException if group is not found. 
     * @throws UnauthorizedException if the user is unauthorized.
     */
    void addCodecType(String groupId, String codecType) throws ResourceNotFoundException, UnauthorizedException;

    /**
     * Gets complete schema evolution history of the group with schemas, versions, compatibility policy and  
     * time when the schema was added to the group. 
     * The order in the list matches the order in which schemas were evolved within the group. 
     * This call will get a consistent view at the time when the request is processed on the service.
     * So all schemas that were added before this call are returned and all schemas that were deleted before this call
     * are excluded.
     * The execution of this API is non-atomic and if concurrent requests to add or delete schemas are invoked, it may or may not 
     * include those schemas in the response. 
     *
     * @param groupId Id for the group.
     * @return Ordered list of schemas with versions and compatibility policy for all schemas in the group. 
     * @throws ResourceNotFoundException if group is not found. 
     * @throws UnauthorizedException if the user is unauthorized.
     */
    List<GroupHistoryRecord> getGroupHistory(String groupId) throws ResourceNotFoundException, UnauthorizedException;

    /**
     * Finds all groups and corresponding version info for the groups where the supplied schema has been registered.
     * It is important to note that the same schema type could be part of multiple group, however in each group it 
     * may have gone through a separate evolution. Invocation of this method lists all groups where the specific schema 
     * (type, format and binary) is used along with versions that identifies this schema in those groups. 
     * The user defined {@link SchemaInfo#properties} is not used for comparison. 
     * 
     * @param schemaInfo Schema info to find references for. 
     * @return Map of group Id to versionInfo identifier for the schema in that group. 
     * @throws ResourceNotFoundException if schema is not found. 
     * @throws UnauthorizedException if the user is unauthorized.
     */
    Map<String, VersionInfo> getSchemaReferences(SchemaInfo schemaInfo) throws ResourceNotFoundException, UnauthorizedException;
}
    class GroupProperties {
        private final SerializationFormat serializationFormat;
        private final Compatibility compatibility;
        private final boolean allowMultipleTypes;
        private final Map<String, String> properties;
    }
	
    class EncodingInfo {
        private final SchemaInfo schemaInfo;
        private final VersionInfo versionInfo;
        private final CodecType codecType;
    }

    class EncodingId {
        private final int id;
    }

    class VersionInfo {
        private final int version;
        private final String type;
        private final String schemaId;
    }
    
    class SchemaInfo {
        private final String type;
        private final SerializationFormat serializationFormat;
        private final byte[] schemaData;
        private final ImmutableMap<String, String> properties;
    }

    enum SerializationFormat {
        None,
        Avro,
        Protobuf,
        Json,
        Any,
        Custom
    }

    enum CodecType {
        None,
        Snappy,
        GZip,
        Custom;
    }

Schema Storage

Since Pravega is a highly scalable reliable and durable storage system. Schema Registry will be backed by the pravega as the underlying storage layer.

Schemas are stored under groups. Each group is backed by a pravega table segment where all its metadata is durably and consistently persisted.

A common query on the registry would be finding if a schema already exists for the stream and finding its version. Another common query is to retrieve schema corresponding to a version.

Typically we dont envison a large number of schemas per stream and it would be in order of hundreds. However, retrieving and comparing all schemas from a revisioned stream could be costly. So we will maintain indexes and reverse indexes in pravega tables so that all such queries are optimized.

We will index schema fingerprints with their position in the revisioned segement. Fingerprint could be a 64 bit hash. A 64 bit hash will have very little probability of collisions as schemas are versioned per stream and there would be order hundreds of schemas.

Compatibility Checks

Schema Registry will perform schema compatibility checks while accepting changes to schemas. A new schema will only be added to the registry if it satisfies compatibility checks. Schema Registry will contain CompatibilityCheckers for each schema format. This will be an interface and users can plug in compatibility checkers for their serialization systems.

As Schema Registry receives the schema, it locates the Compatibility checker corresponding to the schema type and the registered compatibility level for the stream. It then applies the Compatibility checker on the new schema against existing schemas with selected strategy.

The exact rules for compatibility for different serialization systems may vary, and hence format specific checkers will be responsible for parsing and validating schemas.

To start with, we will only support Compatibility checker for Avro and Json. Going forward we will look to include compatibility checkers for popular schema formats like protobuf, thrift etc.

In first implementation, allowed values for compatibility check strategy for formats other than avro and json will either be None or Disabled.

There are two reasons for starting with Avro as the format with Compatibility checks. a) Avro is one of the most popular serialization formats. b) Avro has libraries to validate schemas for compatibility with support for multiple strategies namely "CanRead", "CanBeRead" and "MutualRead". These can be mapped to backward, forward and full strategies as listed above. In future we will keep adding compatibility checkers for other popular formats. Let us take a few examples of how compatibility for different serialization systems can be specified for a stream:

Example 1:
SerializationFormat: Avro ; Compatibility: None
Registry will not allow the schema in the stream to change and all new schemas will be rejected. 

Example 2:
SerializationFormat: Avro ; Compatibility: Disabled
Registry will not enforce any compatibility check and all changes to the schemas will be accepted. 

Example 3:
SerializationFormat: protobuf; Compatibility: Disabled
Registry will not enforce any compatibility check and all changes to the schemas will be accepted. 

Example 4:
SerializationFormat: custom; Compatibility: None
Registry will not allow the schema in the stream to change and all new schemas will be rejected. 

Example 5:
SerializationFormat: protobuf; Compatibility: Backward
NotSupportedException  // hopefully we can support it in future

Example 6:
SerializationFormat: avro; Compatibility: Backward 
Check for backward compatibility with the latest schema.

Example 7:
Seri: avro; Compatibility: Full Transitive
Check for compatibility in both directions with all the previously stored schemas for this stream.

Compatibility check during development phase

Preventing incompatible events to be ingested into Pravega at runtime has clear advantages as discussed above. However, it is imperative to prevent such mistakes during development phase. The objective is to allow developers to run schema validation against a stream in production while developing on next version of their application. They should ideally be able to run validation on their development environment, on their dev machines. There should be tools available to facilitate integrating schema validation in Continuous integration pipeline before every commit or as part of periodic builds.

There is merit in building tooling that integrate with registry running in production. The registry should expose apis to retrieve all schemas and also perform schema compatibility checks. We should be able to build simple tools and maven plugins that could allow developers to validate their schemas against the scemas in the registry. Schema apis are designed to expose and perform compatibility checks. However, development of such a tool is outside the scope of this PDP and shall be addressed separately.

Multiple Language Support:

With multiple language binding support coming to pravega clients, it will mean supporting PravegaSerializers wrappers in each of those languages. We use a simple encoding and leave the actual event serialization on underlying serializer for SerializationFormat. So as long as we have serializer either available in RUST or the higher layer language binding, we can support schema registry by building PravegaSerializer in the language of choice.

Discarded Alternatives

  1. Building an avro only schema registry. We did not want the schema management to be tied to a specific serialization format. We wanted to abstract the concepts of the registry and have applications using different serialization formats reap benefits of the registry. It may be the case that in first implementation we have more capabilities light up for avro as compared to other formats (for example, compatibility checks) but restricting ourselves to one format would be detrimental.

  2. Support for storing SerDe libraries and dynamically loading SerDe jars into runtime jvm. The idea stemmed from extending benefits of a centralized service to serialization systems that do not support an external schema defined with an IDL. The use case and benefits would be similar to sharing of schema - essentially allowing applications to know and interpret the structure of the data. This was rejected because the proposal to load SerDe dynamically at runtime would have serious security implications of loading arbitrary pieces of code into a runtime.

There is still merit in making these SerDe available centrally to be used for dynamic linking at deployment time.Though that can also be achieved through other mechanisms like out of band sharing of SerDe and we do not need to build support for it in schema registry.

  1. Relegating schema validation to only a development time activity with tooling to integrate schema storage with code repository. We discussed and evaluated whether schema management is primarily a development time activity or whether it could have benefits during runtime as well. The discussion ranged from considering development time tooling for schema management to facilitate greater decoupling and reduced coordination across teams working on downstream and upstream applications. We realized that there is definitely merit in providing development time tools, but that activity is completely decoupled from the purposes a schema management layer serves at runtime. And the registry service with schema evolution support can also be leveraged during development phase if we build tooling that allowed developers to automatically have their schema changes validated against the schema in the registry.

  2. Using third party schema registries for schema management. We explored many available third party schema registries built for streaming data. They were either tightly coupled with a specific serialization format or provided a view of the schemas in context of specific streaming system. Another problem with these registries was their schema storage backends which would require management of an additional durable storage system when pravega itself is a highly scalable durable and consistent storage backend.

Future improvements

  1. Support for additional Data Storage/Serialization Format schemas.
  2. Pluggable Compatibility Checkers for custom schema formats.
  3. Integration with security systems and governance systems like Atlas.
  4. Multiple language support for client sdk.
  5. Storage for SerDe jars