diff --git a/Migrating-from-v1.md b/Migrating-from-v1.md index e3811ae3..88b4fd34 100644 --- a/Migrating-from-v1.md +++ b/Migrating-from-v1.md @@ -155,9 +155,8 @@ val dataSequence = sequenceOf( TheDataClass(...), TheDataClass(...), ) -val avro = Avro { fieldNamingStrategy = FieldNamingStrategy.SnakeCase } Files.newOutputStream(Path("/your/file.avro")).use { outputStream -> - AvroObjectContainerFile(avro) + AvroObjectContainer { fieldNamingStrategy = FieldNamingStrategy.SnakeCase } .encodeToStream(dataSequence, outputStream) { codec(CodecFactory.snappyCodec()) // you can also add your metadata ! diff --git a/README.md b/README.md index f3402ac3..2528b9f0 100644 --- a/README.md +++ b/README.md @@ -96,9 +96,15 @@ fun main() { ## Object container -Avro4k provides a way to encode and decode object container — also known as data file — with `AvroObjectContainerFile` class. This encoding will prefix the binary data with the +Avro4k provides a way to encode and decode object container — also known as data file — with `AvroObjectContainer` class. This encoding will prefix the binary data with the full schema to -allow knowing the writer schema when reading the data. This format is perfect for storing multiple long-term objects in a single file. +allow knowing the writer schema when reading the data. This format is perfect for storing many long-term objects in a single file. + +The main difference with the `AvroObjectContainer` is that you will encode and decode `Sequence` of objects instead of single objects. + +Be aware that consuming the decoded `Sequence` needs to be done **before** closing the stream, or you will get an exception as a sequence is a "hot" source, +which means that if there is millions of objects in the file, all the objects are extracted one-by-one when requested. If you take only the first 10 objects and close the stream, +the remaining objects won't be extracted. Use carefully `sequence.toList()` as it could lead to OutOfMemoryError as extracting millions of objects may not fit in memory.
Example: @@ -108,23 +114,27 @@ package myapp import com.github.avrokotlin.avro4k.* import kotlinx.serialization.* -import org.apache.avro.SchemaNormalization @Serializable data class Project(val name: String, val language: String) fun main() { - val schema = Avro.schema() - val schemasByFingerprint = mapOf(SchemaNormalization.parsingFingerprint64(schema), schema) - val singleObjectInstance = AvroSingleObject { schemasByFingerprint[it] } - // Serializing objects - val data = Project("kotlinx.serialization", "Kotlin") - val bytes = singleObjectInstance.encodeToByteArray(data) + val valuesToEncode = sequenceOf( + Project("kotlinx.serialization", "Kotlin"), + Project("java.lang", "Java"), + Project("avro4k", "Kotlin"), + ) + Files.newOutputStream(Path("your-file.bin")).use { fileStream -> + AvroObjectContainer.encodeToStream(valuesToEncode, fileStream, builder) + } // Deserializing objects - val obj = singleObjectInstance.decodeFromByteArray(bytes) - println(obj) // Project(name=kotlinx.serialization, language=Kotlin) + Files.newInputStream(Path("your-file.bin")).use { fileStream -> + AvroObjectContainer.decodeFromStream(valuesToEncode, fileStream, builder).forEach { + println(it) // Project(name=kotlinx.serialization, language=Kotlin) ... + } + } } ``` diff --git a/src/main/kotlin/com/github/avrokotlin/avro4k/AvroObjectContainerFile.kt b/src/main/kotlin/com/github/avrokotlin/avro4k/AvroObjectContainer.kt similarity index 68% rename from src/main/kotlin/com/github/avrokotlin/avro4k/AvroObjectContainerFile.kt rename to src/main/kotlin/com/github/avrokotlin/avro4k/AvroObjectContainer.kt index 16078a7b..9956f3a5 100644 --- a/src/main/kotlin/com/github/avrokotlin/avro4k/AvroObjectContainerFile.kt +++ b/src/main/kotlin/com/github/avrokotlin/avro4k/AvroObjectContainer.kt @@ -21,62 +21,80 @@ import java.io.OutputStream * [spec](https://avro.apache.org/docs/1.11.1/specification/#object-container-files) */ @ExperimentalSerializationApi -public class AvroObjectContainerFile( +public sealed class AvroObjectContainer( @PublishedApi - internal val avro: Avro = Avro, + internal val avro: Avro, ) { + public companion object Default : AvroObjectContainer(Avro) + + /** + * Encodes the given sequence to the given output stream. + * + * Note that the output stream is not closed after the operation, which means you need to handle it to avoid resource leaks. + */ public fun encodeToStream( schema: Schema, serializer: SerializationStrategy, values: Sequence, outputStream: OutputStream, - builder: AvroObjectContainerFileBuilder.() -> Unit = {}, + builder: AvroObjectContainerBuilder.() -> Unit = {}, ) { val datumWriter: DatumWriter = KotlinxSerializationDatumWriter(serializer, avro) val dataFileWriter = DataFileWriter(datumWriter) - - builder(AvroObjectContainerFileBuilder(dataFileWriter)) - dataFileWriter.create(schema, outputStream) - values.forEach { - dataFileWriter.append(it) + try { + builder(AvroObjectContainerBuilder(dataFileWriter)) + dataFileWriter.create(schema, outputStream) + values.forEach { + dataFileWriter.append(it) + } + } finally { + dataFileWriter.flush() } - dataFileWriter.flush() } public fun decodeFromStream( deserializer: DeserializationStrategy, inputStream: InputStream, - metadataDumper: AvroObjectContainerFileMetadataDumper.() -> Unit = {}, - ): Sequence = - sequence { + metadataDumper: AvroObjectContainerMetadataDumper.() -> Unit = {}, + ): Sequence { + return sequence { val datumReader: DatumReader = KotlinxSerializationDatumReader(deserializer, avro) - DataFileStream(inputStream, datumReader).use { dataFileStream -> - metadataDumper(AvroObjectContainerFileMetadataDumper(dataFileStream)) - yieldAll(dataFileStream.iterator()) - } + val dataFileStream = DataFileStream(inputStream, datumReader) + metadataDumper(AvroObjectContainerMetadataDumper(dataFileStream)) + yieldAll(dataFileStream.iterator()) }.constrainOnce() + } +} + +private class AvroObjectContainerImpl(avro: Avro) : AvroObjectContainer(avro) + +public fun AvroObjectContainer( + from: Avro = Avro, + builderAction: AvroBuilder.() -> Unit, +): AvroObjectContainer { + return AvroObjectContainerImpl(Avro(from, builderAction)) } @ExperimentalSerializationApi -public inline fun AvroObjectContainerFile.encodeToStream( +public inline fun AvroObjectContainer.encodeToStream( values: Sequence, outputStream: OutputStream, - noinline builder: AvroObjectContainerFileBuilder.() -> Unit = {}, + noinline builder: AvroObjectContainerBuilder.() -> Unit = {}, ) { val serializer = avro.serializersModule.serializer() encodeToStream(avro.schema(serializer), serializer, values, outputStream, builder) } @ExperimentalSerializationApi -public inline fun AvroObjectContainerFile.decodeFromStream( +public inline fun AvroObjectContainer.decodeFromStream( inputStream: InputStream, - noinline metadataDumper: AvroObjectContainerFileMetadataDumper.() -> Unit = {}, + noinline metadataDumper: AvroObjectContainerMetadataDumper.() -> Unit = {}, ): Sequence { val serializer = avro.serializersModule.serializer() return decodeFromStream(serializer, inputStream, metadataDumper) } -public class AvroObjectContainerFileBuilder(private val fileWriter: DataFileWriter<*>) { +public class AvroObjectContainerBuilder(private val fileWriter: DataFileWriter<*>) { public fun metadata( key: String, value: ByteArray, @@ -103,7 +121,7 @@ public class AvroObjectContainerFileBuilder(private val fileWriter: DataFileWrit } } -public class AvroObjectContainerFileMetadataDumper(private val fileStream: DataFileStream<*>) { +public class AvroObjectContainerMetadataDumper(private val fileStream: DataFileStream<*>) { public fun metadata(key: String): MetadataAccessor? { return fileStream.getMeta(key)?.let { MetadataAccessor(it) } } diff --git a/src/test/kotlin/com/github/avrokotlin/avro4k/AvroObjectContainerFileTest.kt b/src/test/kotlin/com/github/avrokotlin/avro4k/AvroObjectContainerTest.kt similarity index 73% rename from src/test/kotlin/com/github/avrokotlin/avro4k/AvroObjectContainerFileTest.kt rename to src/test/kotlin/com/github/avrokotlin/avro4k/AvroObjectContainerTest.kt index a8cf6fed..5f5f5f2d 100644 --- a/src/test/kotlin/com/github/avrokotlin/avro4k/AvroObjectContainerFileTest.kt +++ b/src/test/kotlin/com/github/avrokotlin/avro4k/AvroObjectContainerTest.kt @@ -1,5 +1,6 @@ package com.github.avrokotlin.avro4k +import io.kotest.assertions.throwables.shouldThrow import io.kotest.core.spec.style.StringSpec import io.kotest.matchers.shouldBe import kotlinx.serialization.Contextual @@ -11,9 +12,11 @@ import org.apache.avro.generic.GenericDatumReader import org.apache.avro.generic.GenericDatumWriter import org.apache.avro.generic.GenericRecord import java.io.ByteArrayOutputStream +import java.io.InputStream +import java.io.OutputStream import java.util.UUID -internal class AvroObjectContainerFileTest : StringSpec({ +internal class AvroObjectContainerTest : StringSpec({ val firstProfile = UserProfile( id = UserId(UUID.randomUUID()), @@ -54,7 +57,7 @@ internal class AvroObjectContainerFileTest : StringSpec({ // write with avro4k val bytes = ByteArrayOutputStream().use { - AvroObjectContainerFile().encodeToStream(sequenceOf(firstProfile, secondProfile), it) { + AvroObjectContainer.encodeToStream(sequenceOf(firstProfile, secondProfile), it) { metadata("meta-string", "awesome string") metadata("meta-long", 42) metadata("bytes", byteArrayOf(1, 3, 2, 42)) @@ -62,10 +65,7 @@ internal class AvroObjectContainerFileTest : StringSpec({ it.toByteArray() } // read with apache avro lib - val dataFile = - bytes.inputStream().use { - DataFileStream(it, GenericDatumReader(Avro.schema())) - } + val dataFile = DataFileStream(bytes.inputStream(), GenericDatumReader(Avro.schema())) dataFile.getMetaString("meta-string") shouldBe "awesome string" dataFile.getMetaLong("meta-long") shouldBe 42 dataFile.getMeta("bytes") shouldBe byteArrayOf(1, 3, 2, 42) @@ -90,7 +90,7 @@ internal class AvroObjectContainerFileTest : StringSpec({ // read with avro4k val profiles = bytes.inputStream().use { - AvroObjectContainerFile().decodeFromStream(it) { + AvroObjectContainer.decodeFromStream(it) { metadata("meta-string")?.asString() shouldBe "awesome string" metadata("meta-long")?.asLong() shouldBe 42 metadata("bytes")?.asBytes() shouldBe byteArrayOf(1, 3, 2, 42) @@ -100,6 +100,44 @@ internal class AvroObjectContainerFileTest : StringSpec({ profiles[0] shouldBe firstProfile profiles[1] shouldBe secondProfile } + "encoding error is not closing the stream" { + class SimpleOutputStream: OutputStream() { + var closed = false + + override fun write(b: Int) { + throw UnsupportedOperationException() + } + + override fun close() { + closed = true + } + } + + val os = SimpleOutputStream() + shouldThrow { + AvroObjectContainer.encodeToStream(sequence {}, os) + } + os.closed shouldBe false + } + "decoding error is not closing the stream" { + class SimpleInputStream: InputStream() { + var closed = false + + override fun read(): Int { + throw UnsupportedOperationException() + } + + override fun close() { + closed = true + } + } + + val input = SimpleInputStream() + shouldThrow { + AvroObjectContainer.decodeFromStream(input).toList() + } + input.closed shouldBe false + } }) { @Serializable private data class UserProfile(