Skip to content

Commit e513ba0

Browse files
[#72] It is not possible to understand a AvroSchemaManager configured endianess
Changes: - add `getEndianness()` to AvroSchemaManager - add `writeHeaderToStream` to AvroSchemaManager which is the same as the one in AvroSingleObjectEncodingUtils but "inherits" the endianness from the AvroSchemaManager instance
1 parent 3ff178e commit e513ba0

File tree

1 file changed

+91
-80
lines changed

1 file changed

+91
-80
lines changed
Lines changed: 91 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,104 +1,115 @@
11
package it.agilelab.darwin.manager
22

3-
import java.io.OutputStream
3+
import java.io.{InputStream, OutputStream}
44
import java.nio.{ByteBuffer, ByteOrder}
55

6+
import it.agilelab.darwin.common.compat._
67
import it.agilelab.darwin.common.{Connector, Logging}
78
import it.agilelab.darwin.manager.exception.DarwinException
89
import it.agilelab.darwin.manager.util.AvroSingleObjectEncodingUtils
910
import org.apache.avro.Schema
1011

11-
import it.agilelab.darwin.common.compat._
12-
import java.io.InputStream
13-
1412
/**
15-
* The main entry point of the Darwin library.
16-
* An instance of AvroSchemaManager should ALWAYS be obtained through the AvroSchemaManagerFactory.
17-
* The manager is responsible for schemas registration, retrieval and updates.
18-
*
19-
* @param connector the connector used to retrieve and persist schemas
20-
* @param endianness the endianness that will be used to persist and parse fingerprint bytes, it won't affect how avro
21-
* payload is written, that is up to the darwin user
22-
*/
13+
* The main entry point of the Darwin library.
14+
* An instance of AvroSchemaManager should ALWAYS be obtained through the AvroSchemaManagerFactory.
15+
* The manager is responsible for schemas registration, retrieval and updates.
16+
*
17+
* @param connector the connector used to retrieve and persist schemas
18+
* @param endianness the endianness that will be used to persist and parse fingerprint bytes, it won't affect how avro
19+
* payload is written, that is up to the darwin user
20+
*/
2321
abstract class AvroSchemaManager(connector: Connector, endianness: ByteOrder) extends Logging {
2422

23+
/**
24+
* @return the configured endianness of this AvroSchemaManager instance
25+
*/
26+
def getEndianness(): ByteOrder = endianness
2527

2628
/**
27-
* Retrieves all registered schemas
28-
*
29-
* @return A Sequence of (ID, Schema)
30-
*/
29+
* Retrieves all registered schemas
30+
*
31+
* @return A Sequence of (ID, Schema)
32+
*/
3133
def getAll: Seq[(Long, Schema)]
3234

3335
/**
34-
* Extracts the ID from a Schema.
35-
*
36-
* @param schema a Schema with unknown ID
37-
* @return the ID associated with the input schema
38-
*/
36+
* Extracts the ID from a Schema.
37+
*
38+
* @param schema a Schema with unknown ID
39+
* @return the ID associated with the input schema
40+
*/
3941
def getId(schema: Schema): Long = AvroSingleObjectEncodingUtils.getId(schema)
4042

4143
/**
42-
* Extracts the Schema from its ID.
43-
*
44-
* @param id a Long representing an ID
45-
* @return the Schema associated to the input ID
46-
*/
44+
* Extracts the Schema from its ID.
45+
*
46+
* @param id a Long representing an ID
47+
* @return the Schema associated to the input ID
48+
*/
4749
def getSchema(id: Long): Option[Schema]
4850

4951
/**
50-
* Checks if all the input Schema elements are already in the cache. Then, it performs an insert on the
51-
* storage for all the elements not found on the cache, and then returns each input schema paired with its ID.
52-
*
53-
* @param schemas all the Schema that should be registered
54-
* @return a sequence of pairs of the input schemas associated with their IDs
55-
*/
52+
* Checks if all the input Schema elements are already in the cache. Then, it performs an insert on the
53+
* storage for all the elements not found on the cache, and then returns each input schema paired with its ID.
54+
*
55+
* @param schemas all the Schema that should be registered
56+
* @return a sequence of pairs of the input schemas associated with their IDs
57+
*/
5658
def registerAll(schemas: Seq[Schema]): Seq[(Long, Schema)]
5759

5860
/**
59-
* JAVA API: Checks if all the input Schema elements are already in the cache. Then, it performs an insert on the
60-
* storage for all the elements not found on the cache, and then returns each input schema paired with its ID.
61-
*
62-
* @param schemas all the Schema that should be registered
63-
* @return a sequence of pairs of the input schemas associated with their IDs
64-
*/
61+
* JAVA API: Checks if all the input Schema elements are already in the cache. Then, it performs an insert on the
62+
* storage for all the elements not found on the cache, and then returns each input schema paired with its ID.
63+
*
64+
* @param schemas all the Schema that should be registered
65+
* @return a sequence of pairs of the input schemas associated with their IDs
66+
*/
6567
def registerAll(schemas: java.lang.Iterable[Schema]): java.lang.Iterable[IdSchemaPair] = {
6668
registerAll(schemas.toScala.toSeq).map { case (id, schema) => IdSchemaPair.create(id, schema) }.toJava
6769
}
6870

71+
/** Writes to the given OutputStream the Single Object Encoding header and returns the OutputStream
72+
*
73+
* @return the input OutputStream
74+
*/
75+
def writeHeaderToStream(byteStream: OutputStream,
76+
schemaId: Long): OutputStream = {
77+
AvroSingleObjectEncodingUtils.writeHeaderToStream(byteStream, schemaId, endianness)
78+
}
79+
6980
/** Create an array that creates a Single-Object encoded byte array.
70-
* By specifications the encoded array is obtained concatenating the V1_HEADER, the schema id and the avro-encoded
71-
* payload.
72-
*
73-
* @param avroPayload avro-serialized payload
74-
* @param schema the schema used to encode the payload
75-
* @return a Single-Object encoded byte array
76-
*/
81+
* By specifications the encoded array is obtained concatenating the V1_HEADER, the schema id and the avro-encoded
82+
* payload.
83+
*
84+
* @param avroPayload avro-serialized payload
85+
* @param schema the schema used to encode the payload
86+
* @return a Single-Object encoded byte array
87+
*/
7788
def generateAvroSingleObjectEncoded(avroPayload: Array[Byte], schema: Schema): Array[Byte] = {
7889
AvroSingleObjectEncodingUtils.generateAvroSingleObjectEncoded(avroPayload, getId(schema), endianness)
7990
}
8091

8192
/** Writes to the given OutputStream the Single Object Encoding header then the avroValue and returns the OutputStream
82-
*
83-
* @param byteStream the stream to write to
84-
* @param avroValue the value to be written to the stream
85-
* @param schemaId id of the schema used to encode the payload
86-
* @return the input OutputStream
87-
*/
93+
*
94+
* @param byteStream the stream to write to
95+
* @param avroValue the value to be written to the stream
96+
* @param schemaId id of the schema used to encode the payload
97+
* @return the input OutputStream
98+
*/
8899
def generateAvroSingleObjectEncoded(byteStream: OutputStream,
89100
avroValue: Array[Byte],
90101
schemaId: Long): OutputStream = {
91102
AvroSingleObjectEncodingUtils.generateAvroSingleObjectEncoded(byteStream, avroValue, schemaId, endianness)
92103
}
93104

94105
/** Writes to the given OutputStream the Single Object Encoding header then calls the avroWriter function to
95-
* possibly add data to the stream and finally returns the OutputStream
96-
*
97-
* @param byteStream the stream to write to
98-
* @param schemaId id of the schema used to encode the payload
99-
* @param avroWriter function that will be called to add user generated avro to the stream
100-
* @return the input OutputStream
101-
*/
106+
* possibly add data to the stream and finally returns the OutputStream
107+
*
108+
* @param byteStream the stream to write to
109+
* @param schemaId id of the schema used to encode the payload
110+
* @param avroWriter function that will be called to add user generated avro to the stream
111+
* @return the input OutputStream
112+
*/
102113
def generateAvroSingleObjectEncoded(byteStream: OutputStream,
103114
schemaId: Long)
104115
(avroWriter: OutputStream => OutputStream): OutputStream = {
@@ -107,10 +118,10 @@ abstract class AvroSchemaManager(connector: Connector, endianness: ByteOrder) ex
107118

108119

109120
/** Extracts a Tuple2 that contains the Schema and the Avro-encoded payload
110-
*
111-
* @param avroSingleObjectEncoded a byte array of a Single-Object encoded payload
112-
* @return a pair containing the Schema and the payload of the input array
113-
*/
121+
*
122+
* @param avroSingleObjectEncoded a byte array of a Single-Object encoded payload
123+
* @return a pair containing the Schema and the payload of the input array
124+
*/
114125
def retrieveSchemaAndAvroPayload(avroSingleObjectEncoded: Array[Byte]): (Schema, Array[Byte]) = {
115126
if (AvroSingleObjectEncodingUtils.isAvroSingleObjectEncoded(avroSingleObjectEncoded)) {
116127
val id = AvroSingleObjectEncodingUtils.extractId(avroSingleObjectEncoded, endianness)
@@ -127,11 +138,11 @@ abstract class AvroSchemaManager(connector: Connector, endianness: ByteOrder) ex
127138
}
128139

129140
/** Extracts the Schema from the ByteBuffer after the method call the ByteBuffer position will be right after the
130-
* header.
131-
*
132-
* @param avroSingleObjectEncoded a ByteBuffer of a Single-Object encoded payload
133-
* @return the avro Schema
134-
*/
141+
* header.
142+
*
143+
* @param avroSingleObjectEncoded a ByteBuffer of a Single-Object encoded payload
144+
* @return the avro Schema
145+
*/
135146
def retrieveSchemaAndAvroPayload(avroSingleObjectEncoded: ByteBuffer): Schema = {
136147
if (AvroSingleObjectEncodingUtils.isAvroSingleObjectEncoded(avroSingleObjectEncoded)) {
137148
val id = AvroSingleObjectEncodingUtils.extractId(avroSingleObjectEncoded, endianness)
@@ -146,31 +157,31 @@ abstract class AvroSchemaManager(connector: Connector, endianness: ByteOrder) ex
146157
}
147158

148159
/** Extracts the schema from the avro single-object encoded at the head of this input stream.
149-
* The input stream will have 10 bytes consumed if the first two bytes correspond to the single object encoded
150-
* header, or zero bytes consumed if the InputStream supports marking; if it doesn't, the first bytes (up to 2) will
151-
* be consumed and returned in the Left part of the Either
152-
*
153-
* @param inputStream avro single-object encoded input stream
154-
* @return the schema ID extracted from the input data
155-
*/
160+
* The input stream will have 10 bytes consumed if the first two bytes correspond to the single object encoded
161+
* header, or zero bytes consumed if the InputStream supports marking; if it doesn't, the first bytes (up to 2) will
162+
* be consumed and returned in the Left part of the Either
163+
*
164+
* @param inputStream avro single-object encoded input stream
165+
* @return the schema ID extracted from the input data
166+
*/
156167
def extractSchema(inputStream: InputStream): Either[Array[Byte], Schema] = {
157168
AvroSingleObjectEncodingUtils.extractId(inputStream, endianness).rightMap { id =>
158169
getSchema(id).getOrElse(throw new DarwinException(s"No schema found for ID $id"))
159170
}
160171
}
161172

162173
/** Extracts a [[SchemaPayloadPair]] that contains the Schema and the Avro-encoded payload
163-
*
164-
* @param avroSingleObjectEncoded a byte array of a Single-Object encoded payload
165-
* @return a [[SchemaPayloadPair]] containing the Schema and the payload of the input array
166-
*/
174+
*
175+
* @param avroSingleObjectEncoded a byte array of a Single-Object encoded payload
176+
* @return a [[SchemaPayloadPair]] containing the Schema and the payload of the input array
177+
*/
167178
def retrieveSchemaAndPayload(avroSingleObjectEncoded: Array[Byte]): SchemaPayloadPair = {
168179
val (schema, payload) = retrieveSchemaAndAvroPayload(avroSingleObjectEncoded)
169180
SchemaPayloadPair.create(schema, payload)
170181
}
171182

172183
/**
173-
* Reloads all the schemas from the previously configured storage.
174-
*/
184+
* Reloads all the schemas from the previously configured storage.
185+
*/
175186
def reload(): AvroSchemaManager
176187
}

0 commit comments

Comments
 (0)