Skip to content

Commit abfeb3b

Browse files
[#104] Implement retrieveLatestSchema API(#105)
The API allows to retrieve the latest schema for a given subject, obviously this works only for the connectors that support subjects which is only Confluent at the moment.
1 parent 914c37f commit abfeb3b

File tree

13 files changed

+196
-54
lines changed

13 files changed

+196
-54
lines changed

common/src/main/scala/it/agilelab/darwin/common/Connector.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ trait Connector extends Serializable {
5757
*/
5858
def findSchema(id: Long): Option[Schema]
5959

60+
/**
61+
* Retrieves the latest schema for a given string identifier (not to be confused with the fingerprint id).
62+
* This API might not be implemented by all connectors, which should return None
63+
*/
64+
def retrieveLatestSchema(identifier: String): Option[(Long, Schema)]
65+
6066
/**
6167
* Generate a fingerprint for a schema, the default implementation is SchemaNormalization.parsingFingerprint64
6268
*

confluent/src/main/scala/it/agilelab/darwin/connector/confluent/ConfluentConnector.scala

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
package it.agilelab.darwin.connector.confluent
22

3-
import java.io.{ InputStream, OutputStream }
4-
import java.nio.{ ByteBuffer, ByteOrder }
5-
3+
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException
64
import io.confluent.kafka.schemaregistry.client.{ SchemaMetadata, SchemaRegistryClient }
75
import it.agilelab.darwin.common.Connector
86
import it.agilelab.darwin.common.compat._
97
import it.agilelab.darwin.manager.SchemaPayloadPair
108
import it.agilelab.darwin.manager.exception.DarwinException
119
import org.apache.avro.Schema
1210

11+
import java.io.{ IOException, InputStream, OutputStream }
12+
import java.nio.{ ByteBuffer, ByteOrder }
13+
1314
class ConfluentConnector(options: ConfluentConnectorOptions, client: SchemaRegistryClient) extends Connector {
1415

1516
/**
@@ -264,4 +265,33 @@ class ConfluentConnector(options: ConfluentConnectorOptions, client: SchemaRegis
264265
override def extractId(avroSingleObjectEncoded: ByteBuffer, endianness: ByteOrder): Long = {
265266
ConfluentSingleObjectEncoding.extractId(avroSingleObjectEncoded, endianness)
266267
}
268+
269+
override def retrieveLatestSchema(identifier: String): Option[(Long, Schema)] = {
270+
def safeGet(): Option[SchemaMetadata] = {
271+
try {
272+
Option(client.getLatestSchemaMetadata(identifier))
273+
} catch {
274+
// this is the mock connector case
275+
case io: IOException =>
276+
if (io.getMessage == "No schema registered under subject!") {
277+
None
278+
} else {
279+
throw io
280+
}
281+
// this is the *real* case tested with server 5.x
282+
case re: RestClientException =>
283+
if (re.getErrorCode == 40401) {
284+
None
285+
} else {
286+
throw re
287+
}
288+
}
289+
}
290+
291+
for {
292+
metadata <- safeGet()
293+
id = metadata.getId.toLong
294+
schema = new Schema.Parser().parse(metadata.getSchema)
295+
} yield (id, schema)
296+
}
267297
}

confluent/src/test/scala/it/agilelab/darwin/connector/confluent/ConfluentConnectorSuite.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,4 +126,18 @@ class ConfluentConnectorSuite extends AnyFlatSpec with BeforeAndAfterEach with O
126126
exception.getMessage should be("Schema does not contain the [x-darwin-subject] extension")
127127

128128
}
129+
130+
it should "return None if fetching latest schema of non-existing subject" in {
131+
132+
val mockRegistryClient = new MockSchemaRegistryClient()
133+
134+
val maxCachedSchemas = 1000
135+
136+
val connector = new ConfluentConnector(
137+
options = ConfluentConnectorOptions(List.empty, Collections.emptyMap(), maxCachedSchemas),
138+
client = mockRegistryClient
139+
)
140+
141+
connector.retrieveLatestSchema("pippo") shouldBe None
142+
}
129143
}

core/src/main/scala/it/agilelab/darwin/manager/AvroSchemaManager.scala

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package it.agilelab.darwin.manager
22

3-
import java.io.{ InputStream, OutputStream }
4-
import java.nio.{ ByteBuffer, ByteOrder }
5-
63
import it.agilelab.darwin.common.compat._
74
import it.agilelab.darwin.common.{ Connector, Logging }
85
import it.agilelab.darwin.manager.util.AvroSingleObjectEncodingUtils
96
import org.apache.avro.Schema
107

8+
import java.io.{ InputStream, OutputStream }
9+
import java.nio.{ ByteBuffer, ByteOrder }
10+
1111
/**
1212
* The main entry point of the Darwin library.
1313
* An instance of AvroSchemaManager should ALWAYS be obtained through the AvroSchemaManagerFactory.
@@ -180,37 +180,45 @@ abstract class AvroSchemaManager(connector: Connector, endianness: ByteOrder) ex
180180
def reload(): AvroSchemaManager
181181

182182
/**
183-
* Extracts the schema ID from the avro single-object encoded byte array
184-
*
185-
* @param array avro single-object encoded byte array
186-
* @return the schema ID extracted from the input data
187-
*/
183+
* Extracts the schema ID from the avro single-object encoded byte array
184+
*
185+
* @param array avro single-object encoded byte array
186+
* @return the schema ID extracted from the input data
187+
*/
188188
def extractId(array: Array[Byte]): Long = {
189189
connector.extractId(array, endianness)
190190
}
191191

192192
/**
193-
* Extracts the schema ID from the avro single-object encoded at the head of this input stream.
194-
* The input stream will have 10 bytes consumed if the first two bytes correspond to the single object encoded
195-
* header, or zero bytes consumed if the InputStream supports marking; if it doesn't, the first bytes (up to 2) will
196-
* be consumed and returned in the Left part of the Either.
197-
*
198-
* @param inputStream avro single-object encoded input stream
199-
* @return the schema ID extracted from the input data
200-
*/
193+
* Extracts the schema ID from the avro single-object encoded at the head of this input stream.
194+
* The input stream will have 10 bytes consumed if the first two bytes correspond to the single object encoded
195+
* header, or zero bytes consumed if the InputStream supports marking; if it doesn't, the first bytes (up to 2) will
196+
* be consumed and returned in the Left part of the Either.
197+
*
198+
* @param inputStream avro single-object encoded input stream
199+
* @return the schema ID extracted from the input data
200+
*/
201201
def extractId(inputStream: InputStream): Either[Array[Byte], Long] = {
202202
connector.extractId(inputStream, endianness)
203203
}
204204

205205
/**
206-
* Extracts the schema ID from the avro single-object encoded ByteBuffer, the ByteBuffer position will be after the
207-
* header when this method returns
208-
*
209-
* @param avroSingleObjectEncoded avro single-object encoded byte array
210-
* @return the schema ID extracted from the input data
211-
*/
206+
* Extracts the schema ID from the avro single-object encoded ByteBuffer, the ByteBuffer position will be after the
207+
* header when this method returns
208+
*
209+
* @param avroSingleObjectEncoded avro single-object encoded byte array
210+
* @return the schema ID extracted from the input data
211+
*/
212212
def extractId(avroSingleObjectEncoded: ByteBuffer): Long = {
213213
connector.extractId(avroSingleObjectEncoded, endianness)
214214
}
215215

216+
/**
217+
* Retrieves the latest schema for a given string identifier (not to be confused with the fingerprint id).
218+
* This API might not be implemented by all connectors, which should return None
219+
*/
220+
def retrieveLatestSchema(identifier: String): Option[(Long, Schema)] = {
221+
connector.retrieveLatestSchema(identifier)
222+
}
223+
216224
}

hbase/src/main/scala/it/agilelab/darwin/connector/hbase/HBaseConnector.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,4 +175,10 @@ case class HBaseConnector(config: Config) extends Connector with Logging {
175175
log.debug(s"$schema loaded from HBase for id = $id")
176176
schema
177177
}
178+
179+
/**
180+
* Retrieves the latest schema for a given string identifier (not to be confused with the fingerprint id).
181+
* This API might not be implemented by all connectors, which should return None
182+
*/
183+
override def retrieveLatestSchema(identifier: String): Option[(Long, Schema)] = None
178184
}

mock-application/src/test/scala/it/agilelab/darwin/app/mock/CachedEagerApplicationSuite.scala

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
package it.agilelab.darwin.app.mock
22

3-
import java.lang.reflect.Modifier
4-
import java.nio.ByteOrder
5-
63
import com.typesafe.config.{ Config, ConfigFactory }
74
import it.agilelab.darwin.annotations.AvroSerde
85
import it.agilelab.darwin.app.mock.classes.{ MyClass, MyNestedClass, NewClass, OneField }
6+
import it.agilelab.darwin.common.compat._
97
import it.agilelab.darwin.common.{ Connector, ConnectorFactory, SchemaReader }
108
import it.agilelab.darwin.manager.{ AvroSchemaManager, CachedEagerAvroSchemaManager }
11-
import org.apache.avro.{ Schema, SchemaNormalization }
129
import org.apache.avro.reflect.ReflectData
10+
import org.apache.avro.{ Schema, SchemaNormalization }
1311
import org.reflections.Reflections
1412
import org.scalatest.flatspec.AnyFlatSpec
1513
import org.scalatest.matchers.should.Matchers
16-
import it.agilelab.darwin.common.compat._
14+
15+
import java.lang.reflect.Modifier
16+
import java.nio.ByteOrder
1717

1818
class BigEndianCachedEagerApplicationSuite extends CachedEagerApplicationSuite(ByteOrder.BIG_ENDIAN)
1919

@@ -88,12 +88,13 @@ abstract class CachedEagerApplicationSuite(val endianness: ByteOrder) extends An
8888
var calls = 0
8989
val manager = new CachedEagerAvroSchemaManager(
9090
new Connector {
91-
override def createTable(): Unit = ()
92-
override def tableExists(): Boolean = true
93-
override def tableCreationHint(): String = ""
94-
override def fullLoad(): Seq[(Long, Schema)] = Seq.empty
95-
override def insert(schemas: Seq[(Long, Schema)]): Unit = ()
96-
override def findSchema(id: Long): Option[Schema] = Some(oneFieldSchema)
91+
override def createTable(): Unit = ()
92+
override def tableExists(): Boolean = true
93+
override def tableCreationHint(): String = ""
94+
override def fullLoad(): Seq[(Long, Schema)] = Seq.empty
95+
override def insert(schemas: Seq[(Long, Schema)]): Unit = ()
96+
override def findSchema(id: Long): Option[Schema] = Some(oneFieldSchema)
97+
override def retrieveLatestSchema(identifier: String): Option[(Long, Schema)] = Some(1L -> oneFieldSchema)
9798
},
9899
endianness
99100
) {
@@ -105,4 +106,14 @@ abstract class CachedEagerApplicationSuite(val endianness: ByteOrder) extends An
105106
manager.getSchema(3L) shouldNot be(null) // scalastyle:ignore
106107
calls shouldBe 0
107108
}
109+
110+
it should "not find the latest schema" in {
111+
manager.retrieveLatestSchema("asdf") shouldBe None
112+
}
113+
114+
it should "find the latest schema" in {
115+
manager.retrieveLatestSchema("it.agilelab.darwin.connector.mock.testclasses.MockClassParent") shouldBe Some(
116+
mockClassParentFingerprint -> manager.getSchema(mockClassParentFingerprint).get
117+
)
118+
}
108119
}

mock-application/src/test/scala/it/agilelab/darwin/app/mock/CachedLazyApplicationSuite.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,13 @@ abstract class CachedLazyApplicationSuite(val endianness: ByteOrder) extends Any
8686
var calls = 0
8787
val manager = new CachedLazyAvroSchemaManager(
8888
new Connector {
89-
override def createTable(): Unit = ()
90-
override def tableExists(): Boolean = true
91-
override def tableCreationHint(): String = ""
92-
override def fullLoad(): Seq[(Long, Schema)] = Seq.empty
93-
override def insert(schemas: Seq[(Long, Schema)]): Unit = ()
94-
override def findSchema(id: Long): Option[Schema] = Some(oneFieldSchema)
89+
override def createTable(): Unit = ()
90+
override def tableExists(): Boolean = true
91+
override def tableCreationHint(): String = ""
92+
override def fullLoad(): Seq[(Long, Schema)] = Seq.empty
93+
override def insert(schemas: Seq[(Long, Schema)]): Unit = ()
94+
override def findSchema(id: Long): Option[Schema] = Some(oneFieldSchema)
95+
override def retrieveLatestSchema(identifier: String): Option[(Long, Schema)] = Some(1L -> oneFieldSchema)
9596
},
9697
endianness
9798
) {

mock-application/src/test/scala/it/agilelab/darwin/app/mock/LazyApplicationSuite.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
package it.agilelab.darwin.app.mock
22

3-
import java.lang.reflect.Modifier
4-
import java.nio.ByteOrder
5-
63
import com.typesafe.config.{ Config, ConfigFactory }
74
import it.agilelab.darwin.annotations.AvroSerde
85
import it.agilelab.darwin.app.mock.classes.{ MyClass, MyNestedClass, NewClass, OneField }
@@ -15,6 +12,9 @@ import org.reflections.Reflections
1512
import org.scalatest.flatspec.AnyFlatSpec
1613
import org.scalatest.matchers.should.Matchers
1714

15+
import java.lang.reflect.Modifier
16+
import java.nio.ByteOrder
17+
1818
class BigEndianLazyApplicationSuite extends LazyApplicationSuite(ByteOrder.BIG_ENDIAN)
1919

2020
class LittleEndianLazyApplicationSuite extends LazyApplicationSuite(ByteOrder.LITTLE_ENDIAN)
@@ -85,12 +85,13 @@ abstract class LazyApplicationSuite(endianness: ByteOrder) extends AnyFlatSpec w
8585
var calls = 0
8686
val manager = new LazyAvroSchemaManager(
8787
new Connector {
88-
override def createTable(): Unit = ()
89-
override def tableExists(): Boolean = true
90-
override def tableCreationHint(): String = ""
91-
override def fullLoad(): Seq[(Long, Schema)] = Seq.empty
92-
override def insert(schemas: Seq[(Long, Schema)]): Unit = ()
93-
override def findSchema(id: Long): Option[Schema] = Some(oneFieldSchema)
88+
override def createTable(): Unit = ()
89+
override def tableExists(): Boolean = true
90+
override def tableCreationHint(): String = ""
91+
override def fullLoad(): Seq[(Long, Schema)] = Seq.empty
92+
override def insert(schemas: Seq[(Long, Schema)]): Unit = ()
93+
override def findSchema(id: Long): Option[Schema] = Some(oneFieldSchema)
94+
override def retrieveLatestSchema(identifier: String): Option[(Long, Schema)] = Some(1L -> oneFieldSchema)
9495
},
9596
endianness
9697
) {

mock-connector/src/main/scala/it/agilelab/darwin/connector/mock/MockConnector.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,4 +105,11 @@ class MockConnector(config: Config) extends Connector with Logging {
105105
override def tableExists(): Boolean = true
106106

107107
override def tableCreationHint(): String = "No table needs to be created since mock connecto"
108+
109+
/**
110+
* Retrieves the latest schema for a given string identifier (not to be confused with the fingerprint id).
111+
* This API might not be implemented by all connectors, which should return None
112+
*/
113+
override def retrieveLatestSchema(identifier: String): Option[(Long, Schema)] =
114+
table.find(_._2.getFullName == identifier)
108115
}

mock-connector/src/test/scala/it/agilelab/darwin/connector/mock/MockConnectorSpec.scala

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
package it.agilelab.darwin.connector.mock
22

3-
import java.nio.file.Paths
4-
import java.util
5-
63
import com.typesafe.config.ConfigFactory
74
import org.apache.avro.Schema
85
import org.apache.avro.Schema.Type
96
import org.scalatest.flatspec.AnyFlatSpec
107
import org.scalatest.matchers.should.Matchers
118

9+
import java.nio.file.Paths
10+
import java.util
11+
1212
class MockConnectorSpec extends AnyFlatSpec with Matchers {
1313

1414
private val p = Paths
@@ -82,4 +82,44 @@ class MockConnectorSpec extends AnyFlatSpec with Matchers {
8282
}
8383
}
8484

85+
it should "return Some schema if asked for the latest schema" in {
86+
val connector =
87+
new MockConnectorCreator()
88+
.create(ConfigFactory.parseMap {
89+
new java.util.HashMap[String, Object] {
90+
put(
91+
ConfigurationKeys.FILES,
92+
util.Arrays.asList(
93+
p.resolve("MockClassAlone.avsc").toString,
94+
p.resolve("MockClassParent.avsc").toString
95+
)
96+
)
97+
}
98+
})
99+
val all = connector.fullLoad()
100+
connector.retrieveLatestSchema("it.agilelab.darwin.connector.mock.testclasses.MockClassAlone") shouldBe all.find(
101+
_._2.getName == "MockClassAlone"
102+
)
103+
104+
}
105+
106+
it should "return None schema if asked for the latest schema" in {
107+
val connector =
108+
new MockConnectorCreator()
109+
.create(ConfigFactory.parseMap {
110+
new java.util.HashMap[String, Object] {
111+
put(
112+
ConfigurationKeys.FILES,
113+
util.Arrays.asList(
114+
p.resolve("MockClassAlone.avsc").toString,
115+
p.resolve("MockClassParent.avsc").toString
116+
)
117+
)
118+
}
119+
})
120+
connector.fullLoad()
121+
connector.retrieveLatestSchema("DoesNotExists") shouldBe None
122+
123+
}
124+
85125
}

0 commit comments

Comments
 (0)