Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 52 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ In order to access to Darwin core functionalities add the core dependency to you
#### sbt
```scala
libraryDependencies += "it.agilelab" %% "darwin-core" % "1.2.1-SNAPSHOT"
```
```
#### maven
```xml
<dependency>
Expand All @@ -63,7 +63,7 @@ Then add the connector of your choice, either HBase:
#### sbt
```scala
libraryDependencies += "it.agilelab" %% "darwin-hbase-connector" % "1.2.1-SNAPSHOT"
```
```
#### maven
```xml
<dependency>
Expand All @@ -81,7 +81,7 @@ Or PostgreSql:

```scala
libraryDependencies += "it.agilelab" %% "darwin-postgres-connector" % "1.2.1-SNAPSHOT"
```
```
#### maven
```xml
<dependency>
Expand All @@ -100,7 +100,7 @@ Or Rest

```scala
libraryDependencies += "it.agilelab" %% "darwin-rest-connector" % "1.2.1-SNAPSHOT"
```
```
#### maven
```xml
<dependency>
Expand All @@ -122,7 +122,7 @@ Or Mock (only for test scenarios):

```scala
libraryDependencies += "it.agilelab" %% "darwin-mock-connector" % "1.2.1-SNAPSHOT"
```
```
#### maven
```xml
<dependency>
Expand All @@ -141,7 +141,7 @@ Darwin can be used as a *facade* over confluent schema registry.

```scala
libraryDependencies += "it.agilelab" %% "darwin-confluent-connector" % "1.2.1-SNAPSHOT"
```
```
#### maven
```xml
<dependency>
Expand Down Expand Up @@ -384,7 +384,6 @@ timeout = 5000
## REST

The configuration keys managed by the `RestConnector` are:
-
- **protocol**: http or https
- **host**: the hostname where rest-server (or an http proxy) is deployed
- **port**: the port where rest-server (or an http proxy) is listening
Expand Down Expand Up @@ -526,3 +525,49 @@ Here is an example of configuration:
"resources": ["schemas/Apple.avsc", "schemas/Orange.avsc"]
"mode": "permissive"
```

----

## Multi-Connector

Multi-connector can connect to multiple connectors in a hierarchical order. It is useful when schemas are registered on different datastore (i.e. confluent + hbase).

You configure it in the following way:

```
darwin {
type = "lazy"
connector = "multi"
registrar = "hbase"
confluent-single-object-encoding: "confluent"
standard-single-object-encoding: ["hbase", "mongo"]
confluent {
endpoints: ["http://schema-registry-00:7777", "http://schema-registry-01:7777"]
max-cached-schemas: 1000
}
hbase {
isSecure: false
namespace: "DARWIN"
table: "REPOSITORY"
coreSite: "/etc/hadoop/conf/core-site.xml"
hbaseSite: "/etc/hadoop/conf/hbase-site.xml"
}
mongo {
username = "mongo"
password = "mongo"
host = ["localhost:12345"]
database = "test"
collection = "collection_test"
timeout = 5000
}
}
```

When extracting the schemaId, it will check if the single object encoding is "confluent" or "standard" way.
Given that, it will go down the chain of confluent-single-object-encoding or standard-single-object-encoding **in order**.
The first that matches, is the one that will be used.

In order to initialize the single connectors, a configuration will be created merging the specific part
(i.e. hbase/mongo/confluent) with the outer layer: in case of duplicated entries the more specific one will be used.

Registration of the schema, will work with the connector set as registrar.
14 changes: 12 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import sbt.Keys.baseDirectory
* See project/Dependencies.scala for the dependencies definitions.
* See project/Versions.scala for the versions definitions.
*/
dynverVTagPrefix in ThisBuild := false
ThisBuild / dynverVTagPrefix := false

lazy val root = Project("darwin", file("."))
.settings(Settings.commonSettings: _*)
Expand All @@ -22,7 +22,8 @@ lazy val root = Project("darwin", file("."))
mockApplication,
restConnector,
mongoConnector,
confluentConnector
confluentConnector,
multiConnector
)

lazy val core = Project("darwin-core", file("core"))
Expand Down Expand Up @@ -113,3 +114,12 @@ lazy val sparkApplication = Project("darwin-spark-application", file("spark-appl
.settings(libraryDependencies ++= Dependencies.spark_app)
.settings(crossScalaVersions := Seq(Versions.scala, Versions.scala_211))
.settings(Settings.notPublishSettings)

lazy val multiConnector = Project("darwin-multi-connector", file("multi-connector"))
.settings(Settings.commonSettings: _*)
.dependsOn(coreCommon)
.dependsOn(core)
.dependsOn(mockConnector % Test)
.dependsOn(confluentConnector % Test)
.settings(crossScalaVersions := Versions.crossScalaVersions)
.settings(libraryDependencies += Dependencies.scalatest)
Original file line number Diff line number Diff line change
Expand Up @@ -98,5 +98,12 @@ package object compat {
case _ => self.asInstanceOf[Either[L, R1]]
}
}

def rightFlatMap[L1 >: L, R1](f: R => Either[L1, R1]): Either[L1, R1] = {
self match {
case Right(v) => f(v)
case _ => self.asInstanceOf[Either[L1, R1]]
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package it.agilelab.darwin.manager.util

import java.io.{ InputStream, OutputStream }
import java.nio.{ ByteBuffer, ByteOrder }
import java.util

import it.agilelab.darwin.common.DarwinConcurrentHashMap
import it.agilelab.darwin.manager.exception.DarwinException
import it.agilelab.darwin.manager.util.ByteArrayUtils._
import org.apache.avro.Schema

import java.io.{ InputStream, OutputStream }
import java.nio.{ ByteBuffer, ByteOrder }
import java.util

object AvroSingleObjectEncodingUtils {
private val V1_HEADER = Array[Byte](0xc3.toByte, 0x01.toByte)
private val ID_SIZE = 8
private val HEADER_LENGTH = V1_HEADER.length + ID_SIZE
val V1_HEADER: Array[Byte] = Array[Byte](0xc3.toByte, 0x01.toByte)
private val ID_SIZE = 8
private val HEADER_LENGTH = V1_HEADER.length + ID_SIZE

private val schemaMap = DarwinConcurrentHashMap.empty[Schema, Long]

Expand Down Expand Up @@ -219,15 +219,19 @@ object AvroSingleObjectEncodingUtils {
if (inputStream.markSupported()) {
inputStream.reset()
inputStream.mark(0)
Left(Array.emptyByteArray)
} else {
Left(buffer.slice(0, V1_HEADER.length))
}
Left(buffer.slice(0, V1_HEADER.length))
}
} else {
if (inputStream.markSupported()) {
inputStream.reset()
inputStream.mark(0)
Left(Array.emptyByteArray)
} else {
Left(buffer.slice(0, bytesReadMagicBytes))
}
Left(buffer.slice(0, bytesReadMagicBytes))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package it.agilelab.darwin.connector.confluent

import java.io.{ InputStream, OutputStream }
import java.nio.{ ByteBuffer, ByteOrder }
import java.util
package it.agilelab.darwin.manager.util

import it.agilelab.darwin.common.DarwinConcurrentHashMap
import it.agilelab.darwin.manager.exception.DarwinException
import it.agilelab.darwin.manager.util.ByteArrayUtils
import it.agilelab.darwin.manager.util.ByteArrayUtils.EnrichedInt
import org.apache.avro.Schema

import java.io.{ InputStream, OutputStream }
import java.nio.{ ByteBuffer, ByteOrder }
import java.util

object ConfluentSingleObjectEncoding {
private val V1_HEADER = Array[Byte](0x00.toByte)
val V1_HEADER = Array[Byte](0x00.toByte)
private val ID_SIZE = 4
private val HEADER_LENGTH = V1_HEADER.length + ID_SIZE

Expand Down Expand Up @@ -222,15 +221,19 @@ object ConfluentSingleObjectEncoding {
if (inputStream.markSupported()) {
inputStream.reset()
inputStream.mark(0)
Left(Array.emptyByteArray)
} else {
Left(buffer.slice(0, V1_HEADER.length))
}
Left(buffer.slice(0, V1_HEADER.length))
}
} else {
if (inputStream.markSupported()) {
inputStream.reset()
inputStream.mark(0)
Left(Array.emptyByteArray)
} else {
Left(buffer.slice(0, bytesReadMagicBytes))
}
Left(buffer.slice(0, bytesReadMagicBytes))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ abstract class AvroSingleObjectEncodingUtilsSpec(val endianness: ByteOrder) exte
"extractId(InputStream)" should "return Left if the input stream has only one byte" in {
val stream = new ByteArrayInputStream(Array(Random.nextInt().toByte))
val id = AvroSingleObjectEncodingUtils.extractId(stream, endianness)
id.left.map(_.length == 1) should be(Left(true))
id.left.map(_.length == 0) should be(Left(true))
stream.read() should not be (-1)
stream.read() should be(-1)
}

"extractId(InputStream)" should "return Left if the input stream does not have the expected header" in {
val stream = new ByteArrayInputStream(Array(0xc3.toByte, 0x02.toByte))
val id = AvroSingleObjectEncodingUtils.extractId(stream, endianness)
id.left.map(_.sameElements(Array(0xc3.toByte, 0x02.toByte))) should be(Left(true))
id.left.map(_.length == 0) should be(Left(true))
stream.read().toByte should be(0xc3.toByte)
stream.read().toByte should be(0x02.toByte)
stream.read() should be(-1)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
package it.agilelab.darwin.connector.confluent

import java.io.{ ByteArrayInputStream, ByteArrayOutputStream }
import java.nio.{ BufferUnderflowException, ByteBuffer, ByteOrder }
import java.util
package it.agilelab.darwin.manager.util

import it.agilelab.darwin.common.compat._
import it.agilelab.darwin.manager.util.ByteArrayUtils._
import org.apache.avro.{ Schema, SchemaNormalization }
import org.apache.avro.generic.{ GenericData, GenericDatumReader, GenericDatumWriter, GenericRecord }
import org.apache.avro.io.{ DecoderFactory, EncoderFactory }
import org.apache.avro.util.ByteBufferInputStream

import scala.util.Random
import org.apache.avro.{ Schema, SchemaNormalization }
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import it.agilelab.darwin.common.compat._

import java.io.{ ByteArrayInputStream, ByteArrayOutputStream }
import java.nio.{ BufferUnderflowException, ByteBuffer, ByteOrder }
import java.util
import scala.util.Random

abstract class ConfluentAvroSingleObjectEncodingSpec(val endianness: ByteOrder) extends AnyFlatSpec with Matchers {
val sizeOfBuffer = 200
Expand Down Expand Up @@ -63,17 +62,17 @@ abstract class ConfluentAvroSingleObjectEncodingSpec(val endianness: ByteOrder)
}

"extractId(InputStream)" should "return Left if the input stream has only one byte" in {
val stream = new ByteArrayInputStream(Array(Random.nextInt().toByte))
val stream = new ByteArrayInputStream(Array((Random.nextInt(2048) + 1).toByte)) // scalastyle:ignore
val id = ConfluentSingleObjectEncoding.extractId(stream, endianness)
id.left.map(_.length == 1) should be(Left(true))
id.left.map(_.length == 0) should be(Left(true))
stream.read() should not be (-1)
stream.read() should be(-1)
}

"extractId(InputStream)" should "return Left if the input stream does not have the expected header" in {
val stream = new ByteArrayInputStream(Array(0x01.toByte))
val id = ConfluentSingleObjectEncoding.extractId(stream, endianness)
id.left.map(_.sameElements(Array(0x01.toByte))) should be(Left(true))
id.left.map(_.length == 0) should be(Left(true))
stream.read().toByte should be(0x01.toByte)
stream.read() should be(-1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import it.agilelab.darwin.common.Connector
import it.agilelab.darwin.common.compat._
import it.agilelab.darwin.manager.SchemaPayloadPair
import it.agilelab.darwin.manager.exception.DarwinException
import it.agilelab.darwin.manager.util.ConfluentSingleObjectEncoding
import org.apache.avro.Schema

import java.io.{ IOException, InputStream, OutputStream }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
it.agilelab.darwin.connector.multi.MultiConnectorCreator
Loading