From 3e4be7f763bcb4361a221bd3170a1e48f5cdeb1b Mon Sep 17 00:00:00 2001 From: "Taro L. Saito" Date: Thu, 18 Jan 2024 00:43:47 -0800 Subject: [PATCH] parquet (feature): Encode complex objects as JSON columns (#3343) Previously ParquetWriter used MessagePack for embedding complex object data into a column. For compatibility with the Parquet ecosystem (e.g., DuckDB), using JSON is better. --- .../wvlet/airframe/msgpack/spi/Value.scala | 35 +++++++++++++------ .../parquet/ParquetRecordReader.scala | 23 ++++++++++-- .../airframe/parquet/ParquetSchema.scala | 28 +++++++++++---- .../airframe/parquet/ParquetWriteCodec.scala | 14 ++++++-- .../wvlet/airframe/parquet/ParquetTest.scala | 3 +- 5 files changed, 79 insertions(+), 24 deletions(-) diff --git a/airframe-msgpack/src/main/scala/wvlet/airframe/msgpack/spi/Value.scala b/airframe-msgpack/src/main/scala/wvlet/airframe/msgpack/spi/Value.scala index d88af69ff9..6315f2550d 100644 --- a/airframe-msgpack/src/main/scala/wvlet/airframe/msgpack/spi/Value.scala +++ b/airframe-msgpack/src/main/scala/wvlet/airframe/msgpack/spi/Value.scala @@ -25,6 +25,12 @@ import wvlet.airframe.msgpack.spi.MessageException.* trait Value { override def toString = toJson def toJson: String + + /** + * Unlike toJson, toUnquotedString does not quote string/timestamp values. + * @return + */ + def toUnquotedString: String = toJson def valueType: ValueType /** @@ -139,13 +145,14 @@ object Value { appendJsonString(b, toRawString) b.result() } - protected def toRawString: String + def toRawString: String } case class StringValue(v: String) extends RawValue { - override def toString = v - override protected def toRawString: String = v - override def valueType: ValueType = ValueType.STRING + override def toString: String = v + override def toUnquotedString: String = v + override def toRawString: String = v + override def valueType: ValueType = ValueType.STRING override def writeTo(packer: Packer): Unit = { packer.packString(v) } @@ -153,15 +160,15 @@ object Value { case class BinaryValue(v: Array[Byte]) extends RawValue { @transient private var decodedStringCache: String = null - - override def valueType: ValueType = ValueType.BINARY + override def toUnquotedString: String = toRawString + override def valueType: ValueType = ValueType.BINARY override def writeTo(packer: Packer): Unit = { packer.packBinaryHeader(v.length) packer.writePayload(v) } // Produces Base64 encoded strings - override protected def toRawString: String = { + override def toRawString: String = { synchronized { if (decodedStringCache == null) { decodedStringCache = Base64.getEncoder.encodeToString(v) @@ -212,8 +219,10 @@ object Value { appendJsonString(b, toRawString) b.result() } - def toRawString = v.toString - override def valueType: ValueType = ValueType.EXTENSION // ValueType.TIMESTAMP + + override def toUnquotedString: String = toRawString + def toRawString = v.toString + override def valueType: ValueType = ValueType.EXTENSION // ValueType.TIMESTAMP override def writeTo(packer: Packer): Unit = { packer.packTimestamp(v) } @@ -244,7 +253,13 @@ object Value { def isEmpty: Boolean = entries.isEmpty def nonEmpty: Boolean = entries.nonEmpty override def toJson: String = { - s"{${entries.map(x => s"${x._1.toJson}:${x._2.toJson}").mkString(",")}}" + entries + .map { kv => + // JSON requires Map key must be a quoted UTF-8 string + val jsonKey = new StringBuilder() + appendJsonString(jsonKey, kv._1.toUnquotedString) + s"""${jsonKey.result()}:${kv._2.toJson}""" + }.mkString("{", ",", "}") } override def valueType: ValueType = ValueType.MAP override def writeTo(packer: Packer): Unit = { diff --git a/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetRecordReader.scala b/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetRecordReader.scala index fe3fa62e46..6aa04df1e9 100644 --- a/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetRecordReader.scala +++ b/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetRecordReader.scala @@ -16,11 +16,12 @@ package wvlet.airframe.parquet import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter, RecordMaterializer} import org.apache.parquet.schema.{GroupType, MessageType} import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName -import org.apache.parquet.schema.LogicalTypeAnnotation.stringType +import org.apache.parquet.schema.LogicalTypeAnnotation.{jsonType, stringType} import wvlet.airframe.codec.MessageCodec import wvlet.airframe.codec.PrimitiveCodec.ValueCodec import wvlet.airframe.surface.Surface import wvlet.log.LogSupport + import scala.jdk.CollectionConverters.* object ParquetRecordReader extends LogSupport { @@ -56,7 +57,21 @@ object ParquetRecordReader extends LogSupport { } private class MsgPackConverter(fieldName: String, holder: RecordBuilder) extends PrimitiveConverter { override def addBinary(value: Binary): Unit = { - holder.add(fieldName, ValueCodec.fromMsgPack(value.getBytes)) + val v = ValueCodec.fromMsgPack(value.getBytes) + holder.add(fieldName, v) + } + } + private class JsonConverter(fieldName: String, holder: RecordBuilder) extends PrimitiveConverter { + override def addBinary(value: Binary): Unit = { + val jsonStr = value.toStringUsingUTF8 + val obj: Any = + if (jsonStr.startsWith("{") || jsonStr.endsWith("[")) { + // Map to message pack value for handling nested objects + ValueCodec.fromJson(jsonStr) + } else { + jsonStr + } + holder.add(fieldName, obj) } } @@ -84,8 +99,10 @@ class ParquetRecordReader[A]( case PrimitiveTypeName.BOOLEAN => new BooleanConverter(f.getName, recordBuilder) case PrimitiveTypeName.FLOAT => new FloatConverter(f.getName, recordBuilder) case PrimitiveTypeName.DOUBLE => new DoubleConverter(f.getName, recordBuilder) - case PrimitiveTypeName.BINARY if p.getLogicalTypeAnnotation == stringType => + case PrimitiveTypeName.BINARY if p.getLogicalTypeAnnotation == stringType() => new StringConverter(f.getName, recordBuilder) + case PrimitiveTypeName.BINARY if p.getLogicalTypeAnnotation == jsonType() => + new JsonConverter(f.getName, recordBuilder) case PrimitiveTypeName.BINARY => new MsgPackConverter(f.getName, recordBuilder) case _ => ??? diff --git a/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetSchema.scala b/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetSchema.scala index 0365594630..2bd48684df 100644 --- a/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetSchema.scala +++ b/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetSchema.scala @@ -13,14 +13,16 @@ */ package wvlet.airframe.parquet -import org.apache.parquet.schema.LogicalTypeAnnotation.stringType +import org.apache.parquet.schema.LogicalTypeAnnotation.{jsonType, stringType} import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.Type.Repetition import org.apache.parquet.schema.{LogicalTypeAnnotation, MessageType, PrimitiveType, Type, Types} import org.apache.parquet.schema.Types.{MapBuilder, PrimitiveBuilder} +import wvlet.airframe.json.Json import wvlet.airframe.msgpack.spi.MsgPack import wvlet.airframe.surface.Primitive.PrimitiveSurface import wvlet.airframe.surface.{ + Alias, ArraySurface, OptionSurface, Parameter, @@ -30,11 +32,12 @@ import wvlet.airframe.surface.{ Surface } import wvlet.airframe.ulid.ULID +import wvlet.log.LogSupport import java.util.UUID import scala.jdk.CollectionConverters.* -object ParquetSchema { +object ParquetSchema extends LogSupport { // Convert surface into primitive private def toParquetPrimitiveTypeName(s: PrimitiveSurface): PrimitiveTypeName = { @@ -77,12 +80,18 @@ object ParquetSchema { toParquetPrimitive(p, rep) case o: OptionSurface => buildParquetType(o.elementSurface, Some(Repetition.OPTIONAL)) + case s: Surface if s == Surface.of[MsgPack] => + Types.primitive(PrimitiveTypeName.BINARY, rep.getOrElse(Repetition.OPTIONAL)) + case s: Surface if s == Surface.of[Json] => + Types.primitive(PrimitiveTypeName.BINARY, rep.getOrElse(Repetition.OPTIONAL)).as(jsonType()) + case s: Surface if classOf[wvlet.airframe.msgpack.spi.Value].isAssignableFrom(s.rawType) => + Types.primitive(PrimitiveTypeName.BINARY, rep.getOrElse(Repetition.OPTIONAL)) case s: Surface if s.isSeq || s.isArray => val elementSurface = s.typeArgs(0) buildParquetType(elementSurface, Some(Repetition.REPEATED)) case m: Surface if m.isMap => - // Encode Map[_, _] type as Binary and make it optional as Map can be empty - Types.primitive(PrimitiveTypeName.BINARY, Repetition.OPTIONAL) + // Encode Map[_, _] type as Json and make it optional as Map can be empty + Types.primitive(PrimitiveTypeName.BINARY, Repetition.OPTIONAL).as(jsonType()) // case m: Surface if m.isMap => // val keySurface = m.typeArgs(0) // val valueSurface = m.typeArgs(1) @@ -101,8 +110,8 @@ object ParquetSchema { } groupType case s: Surface => - // Use MsgPack for other types - Types.primitive(PrimitiveTypeName.BINARY, rep.getOrElse(Repetition.OPTIONAL)) + // Use JSON for other types + Types.primitive(PrimitiveTypeName.BINARY, rep.getOrElse(Repetition.OPTIONAL)).as(jsonType()) } } @@ -123,8 +132,13 @@ object ParquetSchema { Primitive.Boolean case PrimitiveTypeName.BINARY if p.getLogicalTypeAnnotation == stringType() => Primitive.String - case _ => + case PrimitiveTypeName.BINARY if p.getLogicalTypeAnnotation == jsonType() => + Primitive.String + case PrimitiveTypeName.BINARY => Surface.of[MsgPack] + case _ => + // Use JSON for other types + Surface.of[Json] } } else { val g = t.asGroupType() diff --git a/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetWriteCodec.scala b/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetWriteCodec.scala index 29a90d4425..ff6a1e35b8 100644 --- a/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetWriteCodec.scala +++ b/airframe-parquet/src/main/scala/wvlet/airframe/parquet/ParquetWriteCodec.scala @@ -14,11 +14,11 @@ package wvlet.airframe.parquet import org.apache.parquet.io.api.{Binary, RecordConsumer} -import org.apache.parquet.schema.LogicalTypeAnnotation.stringType +import org.apache.parquet.schema.LogicalTypeAnnotation.{jsonType, stringType} import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.{MessageType, Type} import org.apache.parquet.schema.Type.Repetition -import wvlet.airframe.codec.MessageCodec +import wvlet.airframe.codec.{JSONCodec, MessageCodec} import wvlet.airframe.codec.PrimitiveCodec.{ BooleanCodec, DoubleCodec, @@ -29,6 +29,7 @@ import wvlet.airframe.codec.PrimitiveCodec.{ ValueCodec } import wvlet.airframe.msgpack.spi.MsgPack +import wvlet.airframe.msgpack.spi.Value.{StringValue, TimestampValue} import wvlet.airframe.surface.Surface import wvlet.log.LogSupport @@ -103,12 +104,19 @@ object ParquetWriteCodec extends LogSupport { recordConsumer.addDouble(DoubleCodec.fromMsgPack(msgpack)) } } - case PrimitiveTypeName.BINARY if tpe.getLogicalTypeAnnotation == stringType => + case PrimitiveTypeName.BINARY if tpe.getLogicalTypeAnnotation == stringType() => new PrimitiveParquetCodec(codec) { override protected def writeValue(recordConsumer: RecordConsumer, msgpack: MsgPack): Unit = { recordConsumer.addBinary(Binary.fromString(StringCodec.fromMsgPack(msgpack))) } } + case PrimitiveTypeName.BINARY if tpe.getLogicalTypeAnnotation == jsonType() => + new PrimitiveParquetCodec(codec) { + override protected def writeValue(recordConsumer: RecordConsumer, msgpack: MsgPack): Unit = { + val json: String = ValueCodec.fromMsgPack(msgpack).toUnquotedString + recordConsumer.addBinary(Binary.fromString(json)) + } + } case _ => // For the other primitive type values new PrimitiveParquetCodec(codec) { diff --git a/airframe-parquet/src/test/scala/wvlet/airframe/parquet/ParquetTest.scala b/airframe-parquet/src/test/scala/wvlet/airframe/parquet/ParquetTest.scala index 9b615b537f..3f1c00d9fe 100644 --- a/airframe-parquet/src/test/scala/wvlet/airframe/parquet/ParquetTest.scala +++ b/airframe-parquet/src/test/scala/wvlet/airframe/parquet/ParquetTest.scala @@ -101,7 +101,7 @@ object ParquetTest extends AirSpec { p7: Boolean = true, p8: Boolean = false, json: Json = """{"id":1,"param":"json param"}""", - jsonValue: JSONValue = JSON.parse("""{"id":1,"param":"json param"}"""), + jsonValue: JSONValue = JSON.parse("""{"id":2,"param":"json param2"}"""), seqValue: Seq[String] = Seq("s1", "s2"), mapValue: Map[String, Any] = Map("param1" -> "hello", "feature1" -> true), ulid: ULID = ULID.newULID, @@ -127,6 +127,7 @@ object ParquetTest extends AirSpec { withResource(Parquet.newReader[MyData](path = file.getPath)) { reader => val r1 = reader.read() + r1.json shouldBe d1.json r1 shouldBe d1 val r2 = reader.read() r2 shouldBe d2