Skip to content

Commit b637354

Browse files
[#74] Avoid useless IO operations
1 parent e513ba0 commit b637354

File tree

3 files changed

+55
-28
lines changed

3 files changed

+55
-28
lines changed

core/src/main/scala/it/agilelab/darwin/manager/AvroSchemaManager.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,21 @@ abstract class AvroSchemaManager(connector: Connector, endianness: ByteOrder) ex
170170
}
171171
}
172172

173+
/** Extracts the schema from the avro single-object encoded in the input array.
174+
*
175+
* @param array avro single-object encoded array
176+
* @return the schema ID extracted from the input data
177+
*/
178+
def extractSchema(array: Array[Byte]): Either[Exception, Schema] = {
179+
try {
180+
val id = AvroSingleObjectEncodingUtils.extractId(array, endianness)
181+
getSchema(id)
182+
.toRight(new RuntimeException(s"Cannot find schema with id $id"))
183+
} catch {
184+
case ie: IllegalArgumentException => Left(ie)
185+
}
186+
}
187+
173188
/** Extracts a [[SchemaPayloadPair]] that contains the Schema and the Avro-encoded payload
174189
*
175190
* @param avroSingleObjectEncoded a byte array of a Single-Object encoded payload

hbase/src/main/scala/it/agilelab/darwin/connector/hbase/HBaseConnector.scala

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -115,17 +115,22 @@ case class HBaseConnector(config: Config) extends Connector with Logging {
115115
}
116116

117117
override def insert(schemas: Seq[(Long, Schema)]): Unit = {
118-
log.debug(s"inserting ${schemas.size} schemas in HBase table $NAMESPACE_STRING:$TABLE_NAME_STRING")
119-
val mutator = connection.getBufferedMutator(TABLE_NAME)
120-
schemas.map { case (id, schema) =>
121-
val put = new Put(Bytes.toBytes(id))
122-
put.addColumn(CF, QUALIFIER_SCHEMA, Bytes.toBytes(schema.toString))
123-
put.addColumn(CF, QUALIFIER_NAME, Bytes.toBytes(schema.getName))
124-
put.addColumn(CF, QUALIFIER_NAMESPACE, Bytes.toBytes(schema.getNamespace))
125-
put
126-
}.foreach(mutator.mutate)
127-
mutator.flush()
128-
log.debug(s"insertion of schemas into $NAMESPACE_STRING:$TABLE_NAME_STRING successful")
118+
if (schemas.nonEmpty) {
119+
120+
log.debug(s"inserting ${schemas.size} schemas in HBase table $NAMESPACE_STRING:$TABLE_NAME_STRING")
121+
using(connection.getBufferedMutator(TABLE_NAME)) { mutator =>
122+
schemas.map { case (id, schema) =>
123+
val put = new Put(Bytes.toBytes(id))
124+
put.addColumn(CF, QUALIFIER_SCHEMA, Bytes.toBytes(schema.toString))
125+
put.addColumn(CF, QUALIFIER_NAME, Bytes.toBytes(schema.getName))
126+
put.addColumn(CF, QUALIFIER_NAMESPACE, Bytes.toBytes(schema.getNamespace))
127+
put
128+
}.foreach(mutator.mutate)
129+
mutator.flush()
130+
log.debug(s"insertion of schemas into $NAMESPACE_STRING:$TABLE_NAME_STRING successful")
131+
}
132+
}
133+
129134
}
130135

131136
override def createTable(): Unit = {

postgres/src/main/scala/it/agilelab/darwin/connector/postgres/PostgresConnector.scala

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,11 @@ class PostgresConnector(config: Config) extends Connector with PostgresConnectio
6161
}
6262

6363
override def insert(schemas: Seq[(Long, Schema)]): Unit = {
64-
MODE match {
65-
case ExceptionDriven => insertExceptionDriven(schemas)
66-
case OneTransaction => insertOneTransaction(schemas)
64+
if (schemas.nonEmpty) {
65+
MODE match {
66+
case ExceptionDriven => insertExceptionDriven(schemas)
67+
case OneTransaction => insertOneTransaction(schemas)
68+
}
6769
}
6870
}
6971

@@ -151,21 +153,26 @@ class PostgresConnector(config: Config) extends Connector with PostgresConnectio
151153

152154
private def findSchemas(connection: Connection, ids: Seq[(Long, Schema)]): Map[Long, Schema] = {
153155

154-
val withIdx = ids.zipWithIndex
155-
val statement = connection.prepareStatement(s"select * from $TABLE_NAME where id in " +
156-
withIdx.map(_ => "?").mkString("(", ",", ")")
157-
)
158-
withIdx.foreach { case (f, idx) =>
159-
statement.setLong(idx + 1, f._1)
160-
}
161-
using(statement.executeQuery()) { resultSet =>
162-
val schemas = Map.newBuilder[Long, Schema]
163-
while (resultSet.next()) {
164-
val id = resultSet.getLong("id")
165-
val schema = parser.parse(resultSet.getString("schema"))
166-
schemas += (id -> schema)
156+
if (ids.nonEmpty) {
157+
val withIdx = ids.zipWithIndex
158+
using(connection.prepareStatement(s"select * from $TABLE_NAME where id in " +
159+
withIdx.map(_ => "?").mkString("(", ",", ")")
160+
)) { statement =>
161+
withIdx.foreach { case (f, idx) =>
162+
statement.setLong(idx + 1, f._1)
163+
}
164+
using(statement.executeQuery()) { resultSet =>
165+
val schemas = Map.newBuilder[Long, Schema]
166+
while (resultSet.next()) {
167+
val id = resultSet.getLong("id")
168+
val schema = parser.parse(resultSet.getString("schema"))
169+
schemas += (id -> schema)
170+
}
171+
schemas.result()
172+
}
167173
}
168-
schemas.result()
174+
} else {
175+
Map.empty
169176
}
170177
}
171178

0 commit comments

Comments
 (0)