Skip to content

Commit

Permalink
fix documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
Chuckame committed Jul 16, 2024
1 parent 6a8dd19 commit 84e88f4
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 42 deletions.
3 changes: 1 addition & 2 deletions Migrating-from-v1.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,8 @@ val dataSequence = sequenceOf(
TheDataClass(...),
TheDataClass(...),
)
val avro = Avro { fieldNamingStrategy = FieldNamingStrategy.SnakeCase }
Files.newOutputStream(Path("/your/file.avro")).use { outputStream ->
AvroObjectContainerFile(avro)
AvroObjectContainer { fieldNamingStrategy = FieldNamingStrategy.SnakeCase }
.encodeToStream(dataSequence, outputStream) {
codec(CodecFactory.snappyCodec())
// you can also add your metadata !
Expand Down
32 changes: 21 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,15 @@ fun main() {
## Object container

Avro4k provides a way to encode and decode object container — also known as data file — with `AvroObjectContainerFile` class. This encoding will prefix the binary data with the
Avro4k provides a way to encode and decode object container — also known as data file — with `AvroObjectContainer` class. This encoding will prefix the binary data with the
full schema to
allow knowing the writer schema when reading the data. This format is perfect for storing multiple long-term objects in a single file.
allow knowing the writer schema when reading the data. This format is perfect for storing many long-term objects in a single file.

The main difference with the `AvroObjectContainer` is that you will encode and decode `Sequence` of objects instead of single objects.

Be aware that consuming the decoded `Sequence` needs to be done **before** closing the stream, or you will get an exception as a sequence is a "hot" source,
which means that if there is millions of objects in the file, all the objects are extracted one-by-one when requested. If you take only the first 10 objects and close the stream,
the remaining objects won't be extracted. Use carefully `sequence.toList()` as it could lead to OutOfMemoryError as extracting millions of objects may not fit in memory.

<details>
<summary>Example:</summary>
Expand All @@ -108,23 +114,27 @@ package myapp

import com.github.avrokotlin.avro4k.*
import kotlinx.serialization.*
import org.apache.avro.SchemaNormalization

@Serializable
data class Project(val name: String, val language: String)

fun main() {
val schema = Avro.schema<Project>()
val schemasByFingerprint = mapOf(SchemaNormalization.parsingFingerprint64(schema), schema)
val singleObjectInstance = AvroSingleObject { schemasByFingerprint[it] }

// Serializing objects
val data = Project("kotlinx.serialization", "Kotlin")
val bytes = singleObjectInstance.encodeToByteArray(data)
val valuesToEncode = sequenceOf(
Project("kotlinx.serialization", "Kotlin"),
Project("java.lang", "Java"),
Project("avro4k", "Kotlin"),
)
Files.newOutputStream(Path("your-file.bin")).use { fileStream ->
AvroObjectContainer.encodeToStream(valuesToEncode, fileStream, builder)
}

// Deserializing objects
val obj = singleObjectInstance.decodeFromByteArray<Project>(bytes)
println(obj) // Project(name=kotlinx.serialization, language=Kotlin)
Files.newInputStream(Path("your-file.bin")).use { fileStream ->
AvroObjectContainer.decodeFromStream<Project>(valuesToEncode, fileStream, builder).forEach {
println(it) // Project(name=kotlinx.serialization, language=Kotlin) ...
}
}
}
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,62 +21,80 @@ import java.io.OutputStream
* [spec](https://avro.apache.org/docs/1.11.1/specification/#object-container-files)
*/
@ExperimentalSerializationApi
public class AvroObjectContainerFile(
public sealed class AvroObjectContainer(
@PublishedApi
internal val avro: Avro = Avro,
internal val avro: Avro,
) {
public companion object Default : AvroObjectContainer(Avro)

/**
* Encodes the given sequence to the given output stream.
*
* Note that the output stream is not closed after the operation, which means you need to handle it to avoid resource leaks.
*/
public fun <T> encodeToStream(
schema: Schema,
serializer: SerializationStrategy<T>,
values: Sequence<T>,
outputStream: OutputStream,
builder: AvroObjectContainerFileBuilder.() -> Unit = {},
builder: AvroObjectContainerBuilder.() -> Unit = {},
) {
val datumWriter: DatumWriter<T> = KotlinxSerializationDatumWriter(serializer, avro)
val dataFileWriter = DataFileWriter(datumWriter)

builder(AvroObjectContainerFileBuilder(dataFileWriter))
dataFileWriter.create(schema, outputStream)
values.forEach {
dataFileWriter.append(it)
try {
builder(AvroObjectContainerBuilder(dataFileWriter))
dataFileWriter.create(schema, outputStream)
values.forEach {
dataFileWriter.append(it)
}
} finally {
dataFileWriter.flush()
}
dataFileWriter.flush()
}

public fun <T> decodeFromStream(
deserializer: DeserializationStrategy<T>,
inputStream: InputStream,
metadataDumper: AvroObjectContainerFileMetadataDumper.() -> Unit = {},
): Sequence<T> =
sequence {
metadataDumper: AvroObjectContainerMetadataDumper.() -> Unit = {},
): Sequence<T> {
return sequence {
val datumReader: DatumReader<T> = KotlinxSerializationDatumReader(deserializer, avro)
DataFileStream(inputStream, datumReader).use { dataFileStream ->
metadataDumper(AvroObjectContainerFileMetadataDumper(dataFileStream))
yieldAll(dataFileStream.iterator())
}
val dataFileStream = DataFileStream(inputStream, datumReader)
metadataDumper(AvroObjectContainerMetadataDumper(dataFileStream))
yieldAll(dataFileStream.iterator())
}.constrainOnce()
}
}

private class AvroObjectContainerImpl(avro: Avro) : AvroObjectContainer(avro)

public fun AvroObjectContainer(
from: Avro = Avro,
builderAction: AvroBuilder.() -> Unit,
): AvroObjectContainer {
return AvroObjectContainerImpl(Avro(from, builderAction))
}

@ExperimentalSerializationApi
public inline fun <reified T> AvroObjectContainerFile.encodeToStream(
public inline fun <reified T> AvroObjectContainer.encodeToStream(
values: Sequence<T>,
outputStream: OutputStream,
noinline builder: AvroObjectContainerFileBuilder.() -> Unit = {},
noinline builder: AvroObjectContainerBuilder.() -> Unit = {},
) {
val serializer = avro.serializersModule.serializer<T>()
encodeToStream(avro.schema(serializer), serializer, values, outputStream, builder)
}

@ExperimentalSerializationApi
public inline fun <reified T> AvroObjectContainerFile.decodeFromStream(
public inline fun <reified T> AvroObjectContainer.decodeFromStream(
inputStream: InputStream,
noinline metadataDumper: AvroObjectContainerFileMetadataDumper.() -> Unit = {},
noinline metadataDumper: AvroObjectContainerMetadataDumper.() -> Unit = {},
): Sequence<T> {
val serializer = avro.serializersModule.serializer<T>()
return decodeFromStream(serializer, inputStream, metadataDumper)
}

public class AvroObjectContainerFileBuilder(private val fileWriter: DataFileWriter<*>) {
public class AvroObjectContainerBuilder(private val fileWriter: DataFileWriter<*>) {
public fun metadata(
key: String,
value: ByteArray,
Expand All @@ -103,7 +121,7 @@ public class AvroObjectContainerFileBuilder(private val fileWriter: DataFileWrit
}
}

public class AvroObjectContainerFileMetadataDumper(private val fileStream: DataFileStream<*>) {
public class AvroObjectContainerMetadataDumper(private val fileStream: DataFileStream<*>) {
public fun metadata(key: String): MetadataAccessor? {
return fileStream.getMeta(key)?.let { MetadataAccessor(it) }
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.github.avrokotlin.avro4k

import io.kotest.assertions.throwables.shouldThrow
import io.kotest.core.spec.style.StringSpec
import io.kotest.matchers.shouldBe
import kotlinx.serialization.Contextual
Expand All @@ -11,9 +12,11 @@ import org.apache.avro.generic.GenericDatumReader
import org.apache.avro.generic.GenericDatumWriter
import org.apache.avro.generic.GenericRecord
import java.io.ByteArrayOutputStream
import java.io.InputStream
import java.io.OutputStream
import java.util.UUID

internal class AvroObjectContainerFileTest : StringSpec({
internal class AvroObjectContainerTest : StringSpec({
val firstProfile =
UserProfile(
id = UserId(UUID.randomUUID()),
Expand Down Expand Up @@ -54,18 +57,15 @@ internal class AvroObjectContainerFileTest : StringSpec({
// write with avro4k
val bytes =
ByteArrayOutputStream().use {
AvroObjectContainerFile().encodeToStream(sequenceOf(firstProfile, secondProfile), it) {
AvroObjectContainer.encodeToStream(sequenceOf(firstProfile, secondProfile), it) {
metadata("meta-string", "awesome string")
metadata("meta-long", 42)
metadata("bytes", byteArrayOf(1, 3, 2, 42))
}
it.toByteArray()
}
// read with apache avro lib
val dataFile =
bytes.inputStream().use {
DataFileStream<GenericRecord>(it, GenericDatumReader(Avro.schema<UserProfile>()))
}
val dataFile = DataFileStream<GenericRecord>(bytes.inputStream(), GenericDatumReader(Avro.schema<UserProfile>()))
dataFile.getMetaString("meta-string") shouldBe "awesome string"
dataFile.getMetaLong("meta-long") shouldBe 42
dataFile.getMeta("bytes") shouldBe byteArrayOf(1, 3, 2, 42)
Expand All @@ -90,7 +90,7 @@ internal class AvroObjectContainerFileTest : StringSpec({
// read with avro4k
val profiles =
bytes.inputStream().use {
AvroObjectContainerFile().decodeFromStream<UserProfile>(it) {
AvroObjectContainer.decodeFromStream<UserProfile>(it) {
metadata("meta-string")?.asString() shouldBe "awesome string"
metadata("meta-long")?.asLong() shouldBe 42
metadata("bytes")?.asBytes() shouldBe byteArrayOf(1, 3, 2, 42)
Expand All @@ -100,6 +100,44 @@ internal class AvroObjectContainerFileTest : StringSpec({
profiles[0] shouldBe firstProfile
profiles[1] shouldBe secondProfile
}
"encoding error is not closing the stream" {
class SimpleOutputStream: OutputStream() {
var closed = false

override fun write(b: Int) {
throw UnsupportedOperationException()
}

override fun close() {
closed = true
}
}

val os = SimpleOutputStream()
shouldThrow<UnsupportedOperationException> {
AvroObjectContainer.encodeToStream<UserId>(sequence {}, os)
}
os.closed shouldBe false
}
"decoding error is not closing the stream" {
class SimpleInputStream: InputStream() {
var closed = false

override fun read(): Int {
throw UnsupportedOperationException()
}

override fun close() {
closed = true
}
}

val input = SimpleInputStream()
shouldThrow<UnsupportedOperationException> {
AvroObjectContainer.decodeFromStream<UserId>(input).toList()
}
input.closed shouldBe false
}
}) {
@Serializable
private data class UserProfile(
Expand Down

0 comments on commit 84e88f4

Please sign in to comment.