Skip to content

Commit 1f2f11d

Browse files
[#65] confluent rest connector (#87)
# New features and improvement - Move encoding responsibilities to Connector instead of AvroSchemaManager. - Add "confluent" connector as a facade to Confluent Schema Registry
1 parent 8d11a8a commit 1f2f11d

File tree

24 files changed

+1530
-74
lines changed

24 files changed

+1530
-74
lines changed

README.md

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Table of contents
2020
- [HBase](#hbase)
2121
- [PostgreSql](#postgresql)
2222
- [REST](#rest)
23+
- [Confluent](#confluent)
2324
---
2425

2526
Overview
@@ -130,6 +131,26 @@ libraryDependencies += "it.agilelab" %% "darwin-mock-connector" % "1.1.0-SNAPSHO
130131
</dependency>
131132
```
132133

134+
135+
### Confluent schema registry Connector
136+
137+
Darwin can be used as a *facade* over confluent schema registry.
138+
139+
### sbt
140+
141+
```scala
142+
libraryDependencies += "it.agilelab" %% "darwin-confluent-connector" % "1.1.0-SNAPSHOT"
143+
```
144+
#### maven
145+
```xml
146+
<dependency>
147+
<groupId>it.agilelab</groupId>
148+
<artifactId>darwin-confluent-connector_2.11</artifactId>
149+
<version>1.1.0-SNAPSHOT</version>
150+
</dependency>
151+
```
152+
153+
133154
Background
134155
-------------
135156
In systems where objects encoded using Avro are stored, a problem arises when there is an evolution of the structure
@@ -402,6 +423,93 @@ darwin-rest {
402423
}
403424
```
404425

426+
## Confluent
427+
428+
Darwin can be used as a `facade` over the `Confluent schema registry`.
429+
430+
Connecting to the confluent schema registry will help all applications currently using darwin to function correctly
431+
when running over confluent platform.
432+
433+
The connector can be used even if the only confluent component used is the schema registry.
434+
435+
When using the confluent connector a the avro single object encoding will be performed using the *Confluent* flavour.
436+
437+
### Confluent Single object encoding
438+
439+
The schema registry will assign globally unique ids to schemas, each avro message is encoded as following
440+
441+
```
442+
0x00 | 1 byte magic number representing confluent encoded avro
443+
0xXX 0xXX 0xXX 0xXX | 4 byte schema identifier interpreted as an integer
444+
... | avro encoded payload without schema (raw avro bytes not prepended with the json schema)
445+
```
446+
447+
### Subject
448+
449+
Confluent schema registry supports attaching schemas to a `subject`, the subject is the granularity at which schema
450+
compatibility is enforced, schemas can be registered with 3 subject strategies
451+
452+
* topic: The subject is the name of the topic (topic contains a single avro data type)
453+
* record: The subject is the fully qualified name of the topic (multiple topics can contain the same avro data type)
454+
* topic-record: The subject is derived from topic and record fqdn (a topic can have multiple data types, compatibility on
455+
same avro data type will be enforced for each topic instead of globally)
456+
457+
In order to support this scheme avro schemas registered via darwin should have a custom extension (`x-darwin-subject`)
458+
like in this example
459+
460+
```json
461+
{
462+
"type" : "record",
463+
"name" : "record",
464+
"fields" : [ {
465+
"name" : "stringField",
466+
"type" : "string"
467+
}, {
468+
"name" : "stringField2",
469+
"type" : [ "string", "null" ],
470+
"default" : "default-for-nullable"
471+
} ],
472+
"x-darwin-subject" : "subject-string"
473+
}
474+
```
475+
476+
## Configuration
477+
478+
```hocon
479+
darwin {
480+
type = "lazy"
481+
connector = "confluent"
482+
483+
endpoints: ["http://schema-registry-00:7777", "http://schema-registry-01:7777"]
484+
max-cached-schemas: 1000
485+
kafka.schemaregistry.standard-property-1: 1
486+
kafka.schemaregistry.standard-property-2: "default"
487+
}
488+
489+
```
490+
491+
The confluent connector can be used by declaring `confluent` as connector.
492+
493+
The `endpoints` configuration is a list of url to the confluent schema registry
494+
495+
the `max-cached-schemas` configures how many schemas are internally cached by the confluent schema registry connector
496+
497+
all other properties will be injected in the confluent schema registry client configuration.
498+
499+
For example if confluent schema registry declares a property `kafka.schemaregistry.auth` this property can simply be
500+
added to the darwin configuration like this
501+
502+
```hocon
503+
darwin {
504+
type = "lazy"
505+
connector = "confluent"
506+
507+
endpoints: ["http://schema-registry-00:7777", "http://schema-registry-01:7777"]
508+
max-cached-schemas: 1000
509+
kafka.schemaregistry.auth: "true"
510+
}
511+
```
512+
405513
## Mock
406514

407515
MockConnector can be conveniently used during tests or if all the schemas (past and current) are known when launching

build.sbt

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ lazy val root = Project("darwin", file("."))
2323
mockConnector,
2424
mockApplication,
2525
restConnector,
26-
mongoConnector
26+
mongoConnector,
27+
confluentConnector
2728
)
2829

2930
lazy val core = Project("darwin-core", file("core"))
@@ -65,7 +66,6 @@ lazy val hbaseConnector2 = Project("darwin-hbase2-connector", file("hbase2"))
6566
.settings(Settings.hbase2TestSettings)
6667
.enablePlugins(JavaAppPackaging)
6768

68-
6969
lazy val postgresConnector = Project("darwin-postgres-connector", file("postgres"))
7070
.settings(Settings.commonSettings: _*)
7171
.dependsOn(coreCommon)
@@ -85,6 +85,18 @@ lazy val restConnector = Project("darwin-rest-connector", file("rest"))
8585
.settings(crossScalaVersions := Seq(Versions.scala, Versions.scala_211, Versions.scala_213))
8686
.enablePlugins(JavaAppPackaging)
8787

88+
lazy val confluentConnector = Project("darwin-confluent-connector", file("confluent"))
89+
.settings(Settings.commonSettings: _*)
90+
.dependsOn(coreCommon)
91+
.settings(pgpPassphrase := Settings.pgpPass)
92+
.settings(
93+
libraryDependencies ++= Dependencies.core_deps ++
94+
Dependencies.wireMock ++
95+
Dependencies.confluentSchemaRegistryDependencies :+ Dependencies.scalatest
96+
)
97+
.settings(crossScalaVersions := Versions.crossScalaVersions)
98+
.enablePlugins(JavaAppPackaging)
99+
88100
lazy val restServer = Project("darwin-rest-server", file("rest-server"))
89101
.settings(Settings.commonSettings: _*)
90102
.dependsOn(coreCommon, mockConnector)

common/src/main/scala/it/agilelab/darwin/common/Connector.scala

Lines changed: 178 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
11
package it.agilelab.darwin.common
22

3-
import org.apache.avro.Schema
3+
import java.io.{ InputStream, OutputStream }
4+
import java.nio.{ ByteBuffer, ByteOrder }
5+
6+
import it.agilelab.darwin.common.compat.RightBiasedEither
7+
import it.agilelab.darwin.manager.SchemaPayloadPair
8+
import it.agilelab.darwin.manager.exception.DarwinException
9+
import it.agilelab.darwin.manager.util.AvroSingleObjectEncodingUtils
10+
import org.apache.avro.{ Schema, SchemaNormalization }
411

512
/**
613
* Generic abstraction of a component capable of reading and writing Schema entities in an external storage.
@@ -49,4 +56,174 @@ trait Connector extends Serializable {
4956
* @return an option that is empty if no schema was found for the ID or defined if a schema was found
5057
*/
5158
def findSchema(id: Long): Option[Schema]
59+
60+
/**
61+
* Generate a fingerprint for a schema, the default implementation is SchemaNormalization.parsingFingerprint64
62+
*
63+
* @param schema the schema to fingerprint
64+
* @return the schema id
65+
*/
66+
def fingerprint(schema: Schema): Long = {
67+
SchemaNormalization.parsingFingerprint64(schema)
68+
}
69+
70+
/**
71+
* Writes to the given OutputStream the Single Object Encoding header and returns the OutputStream
72+
*
73+
* @return the input OutputStream
74+
*/
75+
def writeHeaderToStream(byteStream: OutputStream, schemaId: Long, endianness: ByteOrder): OutputStream = {
76+
AvroSingleObjectEncodingUtils.writeHeaderToStream(byteStream, schemaId, endianness)
77+
}
78+
79+
/**
80+
* Create an array that creates a Single-Object encoded byte array.
81+
* By specifications the encoded array is obtained concatenating the V1_HEADER, the schema id and the avro-encoded
82+
* payload.
83+
*
84+
* @param avroPayload avro-serialized payload
85+
* @param schema the schema used to encode the payload
86+
* @return a Single-Object encoded byte array
87+
*/
88+
def generateAvroSingleObjectEncoded(
89+
avroPayload: Array[Byte],
90+
schema: Schema,
91+
endianness: ByteOrder,
92+
getId: Schema => Long
93+
): Array[Byte] = {
94+
AvroSingleObjectEncodingUtils.generateAvroSingleObjectEncoded(avroPayload, getId(schema), endianness)
95+
}
96+
97+
/**
98+
* Writes to the given OutputStream the Single Object Encoding header then the avroValue and returns the OutputStream
99+
*
100+
* @param byteStream the stream to write to
101+
* @param avroValue the value to be written to the stream
102+
* @param schemaId id of the schema used to encode the payload
103+
* @return the input OutputStream
104+
*/
105+
def generateAvroSingleObjectEncoded(
106+
byteStream: OutputStream,
107+
avroValue: Array[Byte],
108+
schemaId: Long,
109+
endianness: ByteOrder
110+
): OutputStream = {
111+
AvroSingleObjectEncodingUtils.generateAvroSingleObjectEncoded(byteStream, avroValue, schemaId, endianness)
112+
}
113+
114+
/**
115+
* Writes to the given OutputStream the Single Object Encoding header then calls the avroWriter function to
116+
* possibly add data to the stream and finally returns the OutputStream
117+
*
118+
* @param byteStream the stream to write to
119+
* @param schemaId id of the schema used to encode the payload
120+
* @param avroWriter function that will be called to add user generated avro to the stream
121+
* @return the input OutputStream
122+
*/
123+
def generateAvroSingleObjectEncoded(byteStream: OutputStream, schemaId: Long, endianness: ByteOrder)(
124+
avroWriter: OutputStream => OutputStream
125+
): OutputStream = {
126+
AvroSingleObjectEncodingUtils.generateAvroSingleObjectEncoded(byteStream, schemaId, endianness)(avroWriter)
127+
}
128+
129+
/**
130+
* Extracts a Tuple2 that contains the Schema and the Avro-encoded payload
131+
*
132+
* @param avroSingleObjectEncoded a byte array of a Single-Object encoded payload
133+
* @return a pair containing the Schema and the payload of the input array
134+
*/
135+
def retrieveSchemaAndAvroPayload(
136+
avroSingleObjectEncoded: Array[Byte],
137+
endianness: ByteOrder,
138+
getSchema: Long => Option[Schema]
139+
): (Schema, Array[Byte]) = {
140+
if (AvroSingleObjectEncodingUtils.isAvroSingleObjectEncoded(avroSingleObjectEncoded)) {
141+
val id = AvroSingleObjectEncodingUtils.extractId(avroSingleObjectEncoded, endianness)
142+
getSchema(id) match {
143+
case Some(schema) =>
144+
schema -> AvroSingleObjectEncodingUtils.dropHeader(avroSingleObjectEncoded)
145+
case _ =>
146+
throw new DarwinException(s"No schema found for ID $id")
147+
}
148+
} else {
149+
throw AvroSingleObjectEncodingUtils.parseException()
150+
}
151+
}
152+
153+
/**
154+
* Extracts the Schema from the ByteBuffer after the method call the ByteBuffer position will be right after the
155+
* header.
156+
*
157+
* @param avroSingleObjectEncoded a ByteBuffer of a Single-Object encoded payload
158+
* @return the avro Schema
159+
*/
160+
def retrieveSchemaAndAvroPayload(
161+
avroSingleObjectEncoded: ByteBuffer,
162+
endianness: ByteOrder,
163+
getSchema: Long => Option[Schema]
164+
): Schema = {
165+
if (AvroSingleObjectEncodingUtils.isAvroSingleObjectEncoded(avroSingleObjectEncoded)) {
166+
val id = AvroSingleObjectEncodingUtils.extractId(avroSingleObjectEncoded, endianness)
167+
getSchema(id) match {
168+
case Some(schema) => schema
169+
case _ => throw new DarwinException(s"No schema found for ID $id")
170+
}
171+
} else {
172+
throw AvroSingleObjectEncodingUtils.parseException()
173+
}
174+
}
175+
176+
/**
177+
* Extracts the schema from the avro single-object encoded at the head of this input stream.
178+
* The input stream will have 10 bytes consumed if the first two bytes correspond to the single object encoded
179+
* header, or zero bytes consumed if the InputStream supports marking; if it doesn't, the first bytes (up to 2) will
180+
* be consumed and returned in the Left part of the Either
181+
*
182+
* @param inputStream avro single-object encoded input stream
183+
* @return the schema ID extracted from the input data
184+
*/
185+
def extractSchema(
186+
inputStream: InputStream,
187+
endianness: ByteOrder,
188+
getSchema: Long => Option[Schema]
189+
): Either[Array[Byte], Schema] = {
190+
AvroSingleObjectEncodingUtils.extractId(inputStream, endianness).rightMap { id =>
191+
getSchema(id).getOrElse(throw new DarwinException(s"No schema found for ID $id"))
192+
}
193+
}
194+
195+
/**
196+
* Extracts the schema from the avro single-object encoded in the input array.
197+
*
198+
* @param array avro single-object encoded array
199+
* @return the schema ID extracted from the input data
200+
*/
201+
def extractSchema(
202+
array: Array[Byte],
203+
endianness: ByteOrder,
204+
getSchema: Long => Option[Schema]
205+
): Either[Exception, Schema] = {
206+
try {
207+
val id = AvroSingleObjectEncodingUtils.extractId(array, endianness)
208+
getSchema(id)
209+
.toRight(new RuntimeException(s"Cannot find schema with id $id"))
210+
} catch {
211+
case ie: IllegalArgumentException => Left(ie)
212+
}
213+
}
214+
215+
/**
216+
* Extracts a SchemaPayloadPair that contains the Schema and the Avro-encoded payload
217+
*
218+
* @param avroSingleObjectEncoded a byte array of a Single-Object encoded payload
219+
* @return a SchemaPayloadPair containing the Schema and the payload of the input array
220+
*/
221+
def retrieveSchemaAndPayload(
222+
avroSingleObjectEncoded: Array[Byte],
223+
endianness: ByteOrder,
224+
getSchema: Long => Option[Schema]
225+
): SchemaPayloadPair = {
226+
val (schema, payload) = retrieveSchemaAndAvroPayload(avroSingleObjectEncoded, endianness, getSchema)
227+
SchemaPayloadPair.create(schema, payload)
228+
}
52229
}

0 commit comments

Comments
 (0)