Skip to content

Commit

Permalink
Protobuf performance fix (#473)
Browse files Browse the repository at this point in the history
* Fix protobuf performance issue

* Improve protobuf varint encoding performance

* ScalaFix

* Restricting zoned date time protobuf encoding test to fix JDK 1.8 issue

* Another attempt to fix JDK 8

* ScalaFix

* Fix test on JDK8
  • Loading branch information
vigoo authored Dec 23, 2022
1 parent 8e3dad0 commit dc294f2
Show file tree
Hide file tree
Showing 3 changed files with 291 additions and 182 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package zio.schema.codec

import org.openjdk.jmh.annotations._
import zio.Chunk

import java.util.concurrent.TimeUnit
import scala.util.Random

@State(Scope.Thread)
@BenchmarkMode(Array(Mode.Throughput))
Expand All @@ -17,12 +19,24 @@ class ProtobufBenchmarks {
@Param(Array("1000"))
var size: Int = _

@Param(Array("30721"))
var bigSize: Int = _

var outputs: Array[Any] = _

var bigByteChunk: Chunk[Byte] = _
var encodedBigByteChunk: Chunk[Byte] = _
var byteChunkCodec: BinaryCodec[Chunk[Byte]] = _

@Setup
def allocateOutputs(): Unit =
def setup(): Unit = {
outputs = Array.ofDim[Any](size)

byteChunkCodec = ProtobufCodec.protobufCodec[Chunk[Byte]]
bigByteChunk = Chunk.fromArray(Random.nextBytes(bigSize))
encodedBigByteChunk = byteChunkCodec.encode(bigByteChunk)
}

@Benchmark
def enumEncoding(): Array[Any] = {
for (i <- 0 until size) {
Expand All @@ -39,4 +53,24 @@ class ProtobufBenchmarks {
outputs
}

@Benchmark
def encodeLargeByteChunk(): Chunk[Byte] =
byteChunkCodec.encode(bigByteChunk)

@Benchmark
def decodeLargeByteChunk(): Either[DecodeError, Chunk[Byte]] =
byteChunkCodec.decode(encodedBigByteChunk)
}

object ProtobufBenchmarksProfiling extends App {

val bigSize = 30721
val byteChunkCodec = ProtobufCodec.protobufCodec[Chunk[Byte]]
val bigByteChunk = Chunk.fromArray(Random.nextBytes(bigSize))
val encodedBigByteChunk = byteChunkCodec.encode(bigByteChunk)

while(true) {
val decoded = byteChunkCodec.decode(encodedBigByteChunk)
println(s"Decoded ${decoded.map(_.length)} bytes")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package zio.schema.codec
import java.nio.charset.StandardCharsets
import java.nio.{ ByteBuffer, ByteOrder }
import java.time._
import java.time.format.DateTimeFormatter
import java.util.UUID

import scala.collection.immutable.ListMap
Expand All @@ -13,7 +14,7 @@ import zio.schema._
import zio.schema.codec.DecodeError.{ ExtraFields, MalformedField, MissingField }
import zio.schema.codec.ProtobufCodec.Protobuf.WireType.LengthDelimited
import zio.stream.ZPipeline
import zio.{ Cause, Chunk, Unsafe, ZIO }
import zio.{ Cause, Chunk, ChunkBuilder, Unsafe, ZIO }

object ProtobufCodec {

Expand Down Expand Up @@ -391,22 +392,37 @@ object ProtobufCodec {
case (StandardType.OffsetDateTimeType, v: OffsetDateTime) =>
encodePrimitive(fieldNumber, StandardType.StringType, v.toString)
case (StandardType.ZonedDateTimeType, v: ZonedDateTime) =>
encodePrimitive(fieldNumber, StandardType.StringType, v.toString)
encodePrimitive(fieldNumber, StandardType.StringType, v.format(DateTimeFormatter.ISO_ZONED_DATE_TIME))
case (_, _) =>
throw new NotImplementedError(s"No encoder for $standardType")
}

private def encodeVarInt(value: Int): Chunk[Byte] =
encodeVarInt(value.toLong)
private def encodeVarInt(value: Int): Chunk[Byte] = {
val builder = ChunkBuilder.make[Byte](5)
encodeVarInt(value.toLong, builder)
builder.result()
}

private def encodeVarInt(value: Long): Chunk[Byte] = {
val base128 = value & 0x7F
val higherBits = value >>> 7
val builder = ChunkBuilder.make[Byte](10)
encodeVarInt(value, builder)
builder.result()
}

if (higherBits != 0x00) {
(0x80 | base128).byteValue() +: encodeVarInt(higherBits)
} else {
Chunk(base128.byteValue())
private def encodeVarInt(value: Long, builder: ChunkBuilder[Byte]): Unit = {
var current = value
var higherBits = current >>> 7
var done = false

while (!done) {
if (higherBits != 0x00) {
builder += (0x80 | (current & 0x7F)).byteValue()
current = higherBits
higherBits = higherBits >>> 7
} else {
builder += (current & 0x7F).byteValue()
done = true
}
}
}

Expand Down Expand Up @@ -445,6 +461,8 @@ object ProtobufCodec {
def peek(context: DecoderContext): Chunk[Byte] =
chunk.slice(position, position + length(context))

def peek: Byte = chunk.byte(position)

def move(count: Int): Unit =
position += count

Expand Down Expand Up @@ -929,28 +947,41 @@ object ProtobufCodec {
/**
* Decodes bytes to following types: int32, int64, uint32, uint64, sint32, sint64, bool, enumN.
* Takes index of first byte which is inside 0 - 127 range.
* Returns remainder of the bytes together with computed value.
*
* (0 -> 127) & 0x80 => 0, (128 -> 255) & 0x80 => 128
* (0 << 7 => 0, 1 << 7 => 128, 2 << 7 => 256, 3 << 7 => 384
* 1 & 0X7F => 1, 127 & 0x7F => 127, 128 & 0x7F => 0, 129 & 0x7F => 1
*/
private def varIntDecoder(context: DecoderContext): Long =
if (state.length(context) == 0) {
private def varIntDecoder(context: DecoderContext): Long = {
val maxLength = state.length(context)
if (maxLength == 0) {
throw MalformedField(Schema.primitive[Long], "Failed to decode VarInt. Unexpected end of chunk")
} else {
val chunk = state.peek(context)
val length = chunk.indexWhere(octet => (octet.longValue() & 0x80) != 0x80) + 1
if (length <= 0) {
var count = 0
var done = false
var result = 0L
while (count < maxLength && !done) {
val byte = state.peek
result = result | (byte & 0x7f).toLong << (count * 7)

state.move(1)
if ((byte & 0x80) == 0) {
done = true
} else {
count += 1
}
}

if (!done) {
throw MalformedField(
Schema.primitive[Long],
"Failed to decode VarInt. No byte within the range 0 - 127 are present"
)
} else {
state.move(length)
chunk.take(length).foldRight(0L)((octet, v) => (v << 7) + (octet & 0x7F))
}

result
}
}

private def binaryDecoder(context: DecoderContext): Chunk[Byte] =
state.all(context)
Expand Down
Loading

0 comments on commit dc294f2

Please sign in to comment.