Skip to content

Commit

Permalink
Sync json transport protocol with proton
Browse files Browse the repository at this point in the history
  • Loading branch information
Yibo-Chen13 committed Nov 21, 2024
1 parent 688b2bd commit 08c522f
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 71 deletions.
4 changes: 1 addition & 3 deletions lib/column/column_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

275 changes: 212 additions & 63 deletions lib/column/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,25 @@ package column
import (
"fmt"
"reflect"
"strings"

"github.com/timeplus-io/proton-go-driver/v2/lib/binary"
)

type Json struct {
nullable bool

// leaf nodes, for example '{"id": 1, "obj": { "x": "abc", "y": 2}}', the elems is:
// <"id", []int32>,
// <"obj.x", []string>,
// <"obj.y", []int32>
columns map[string]Interface
}

func (col *Json) parse(is_nullable bool) (_ Interface, err error) {
col.nullable = is_nullable
func (col *Json) parse() (_ Interface, err error) {
col.columns = make(map[string]Interface)
return col, nil
}

func (col *Json) Type() Type {
if col.nullable {
return "nullable_json"
}
return "json"
}

Expand Down Expand Up @@ -150,7 +145,9 @@ func (col *Json) AppendRow(v interface{}) (err error) {
if col.columns[path], err = toType(reflect.TypeOf(add_v)).Column(); err != nil {
return err
}
col.columns[path].AppendRow(add_v)
if err := col.columns[path].AppendRow(add_v); err != nil {
return err
}
}
}
default:
Expand All @@ -164,79 +161,98 @@ func (col *Json) AppendRow(v interface{}) (err error) {
}

func (col *Json) Decode(decoder *binary.Decoder, rows int) (err error) {
var columns uint64
if columns, err = decoder.Uvarint(); err != nil {
// deserialize json as a tuple.
if _, err := decoder.UInt8(); err != nil {
return err
}

for i := uint64(0); i < columns; i++ {
var parts_len uint64
if parts_len, err = decoder.Uvarint(); err != nil {
return err
}

var parts []string
parts = make([]string, parts_len)
for i := uint64(0); i < parts_len; i++ {
if parts[i], err = decoder.String(); err != nil {
return err
}
if _, err = decoder.Uvarint(); err != nil {
return err
}
if _, err = decoder.Uvarint(); err != nil {
return err
}
}

var t string
if t, err = decoder.String(); err != nil {
return err
}
tuple_type, err := decoder.String()
if err != nil {
return err
}

path := BuildJsonPath(parts)
if col.columns[path], err = Type(t).Column(); err != nil {
return err
}
if err = col.columns[path].Decode(decoder, rows); err != nil {
return err
}
if err := deserialisationFromTuple(col, tuple_type, []string{}, decoder, rows); err != nil {
return err
}
return nil
}

func (col *Json) Encode(encoder *binary.Encoder) error {
if err := encoder.Uvarint(uint64(len(col.columns))); err != nil {
// serialize json as a tuple.
if err := encoder.UInt8(0); err != nil {
return err
}
nested_json, err := col.NestJson()
if err != nil {
return err
}

for path, c := range col.columns {
parts := SplitJsonPath(path)
if err := encoder.Uvarint(uint64(len(parts))); err != nil {
return err
tuple_type := toTupleType(nested_json)
if err := encoder.String(tuple_type); err != nil {
return err
}

return serialisationToTuple(nested_json, tuple_type, encoder)
}

// get full tuple type from nested json.
// for example:
// {"a": 1, "b": {"c": 2, "d": 3}, "e": [1,2,3,4]}
// -> tuple(a int,b tuple(c int, d int),e array(int))
func toTupleType(json *Json) string {
var builder strings.Builder
builder.WriteString("tuple(")
cnt := 0
for key, val := range json.columns {
builder.WriteString(fmt.Sprintf("%s ", key))
switch v := json.columns[key].(type) {
case *Json:
builder.WriteString(toTupleType(v))
default:
builder.WriteString(string(val.Type()))
}
cnt += 1
if cnt != len(json.columns) {
builder.WriteString(", ")
}
}
builder.WriteString(")")
return builder.String()
}

for _, part := range parts {
if err := encoder.String(part); err != nil {
return err
// get a nested json format from a (path,value) json fromat.
func (col *Json) NestJson() (*Json, error) {
root := &Json{
columns: map[string]Interface{},
}
var cur *Json
for path, value := range col.columns {
cur = root
parts := SplitJsonPath(path)
for i, part := range parts {
if strings.Contains(part, ".") {
part = "\"" + part + "\""
}
if err := encoder.Uvarint(0); err != nil {
return err
if i == len(parts)-1 {
cur.columns[part] = value
break
}
if err := encoder.Uvarint(0); err != nil {
return err
if _, ok := cur.columns[part]; !ok {
cur.columns[part] = &Json{
columns: make(map[string]Interface),
}
}
var ok bool
cur, ok = cur.columns[part].(*Json)
if !ok {
return nil, &Error{
ColumnType: "json",
Err: fmt.Errorf("same json path with different value type"),
}
}
}

if err := encoder.String(string(c.Type())); err != nil {
return err
}

if err := c.Encode(encoder); err != nil {
return err
}
}
return nil
return root, nil
}

var (
Expand All @@ -250,7 +266,7 @@ func checkAndGetRows(json map[string]Interface) (int, error) {
if rows == -1 {
rows = elem_rows
} else if rows != elem_rows {
return -1, fmt.Errorf("Got inconsistent row count in json")
return -1, fmt.Errorf("got inconsistent row count in json")
}
}

Expand All @@ -274,3 +290,136 @@ func toType(t reflect.Type) Type {
}
return Type(t.Name())
}

// get full type of array, tuple, nested
func fullComplexType(builder *strings.Builder, tupleTypeWithName string) {
st := 1
for _, char := range tupleTypeWithName {
if char == '(' {
st++
} else if char == ')' {
st--
}
builder.WriteRune(char)
if st == 0 {
break
}
}
}

// get a (path,value) json fromat from a full tuple type.
func deserialisationFromTuple(json *Json, tupleTypeWithName string, nowPath []string, decoder *binary.Decoder, rows int) error {
typeLen := 6
for ; typeLen < len(tupleTypeWithName) /* without ")" */ -1; typeLen += 2 {
var nameBuilder strings.Builder
var typeBuilder strings.Builder

// parse name
has_escape := false
for ; typeLen < len(tupleTypeWithName); typeLen++ {
if tupleTypeWithName[typeLen] == '`' {
has_escape = !has_escape
}
if tupleTypeWithName[typeLen] == ' ' && !has_escape {
typeLen++
break
}
nameBuilder.WriteByte(tupleTypeWithName[typeLen])
}
nowPath = append(nowPath, UnescapeIfForJsonPath(nameBuilder.String()))
if typeLen >= len(tupleTypeWithName) {
return fmt.Errorf("parse json from tuple failed with tuple type: %s", tupleTypeWithName)
}

// parse sub object
if strings.HasPrefix(tupleTypeWithName[typeLen:], "tuple(") {
typeBuilder.WriteString("tuple(")
fullComplexType(&typeBuilder, tupleTypeWithName[typeLen+6:])
if err := deserialisationFromTuple(json, typeBuilder.String(), nowPath, decoder, rows); err != nil {
return err
}
} else {
if strings.HasPrefix(tupleTypeWithName[typeLen:], "array(") {
typeBuilder.WriteString("array(")
fullComplexType(&typeBuilder, tupleTypeWithName[typeLen+6:])
} else if strings.HasPrefix(tupleTypeWithName[typeLen:], "nested(") {
typeBuilder.WriteString("nested(")
fullComplexType(&typeBuilder, tupleTypeWithName[typeLen+7:])
} else {
for _, char := range tupleTypeWithName[typeLen:] {
if char == ')' || char == ',' {
break
}
typeBuilder.WriteRune(char)
}
}

valueType, err := Type(typeBuilder.String()).Column()
if err != nil {
return err
}

path := BuildJsonPath(nowPath)
json.columns[path] = valueType
if err := json.columns[path].Decode(decoder, rows); err != nil {
return err
}
}

nowPath = nowPath[:len(nowPath)-1]
typeLen += len(typeBuilder.String())
}
return nil
}

func serialisationToTuple(json *Json, tupleTypeWithName string, encoder *binary.Encoder) error {
typeLen := 6
for ; typeLen < len(tupleTypeWithName) /* without ")" */ -1; typeLen += 2 {
var nameBuilder strings.Builder
var typeBuilder strings.Builder

// parse name
has_escape := false
for ; typeLen < len(tupleTypeWithName); typeLen++ {
if tupleTypeWithName[typeLen] == '`' {
has_escape = !has_escape
}
if tupleTypeWithName[typeLen] == ' ' && !has_escape {
typeLen++
break
}
nameBuilder.WriteByte(tupleTypeWithName[typeLen])
}

// parse sub object
if strings.HasPrefix(tupleTypeWithName[typeLen:], "tuple(") {
typeBuilder.WriteString("tuple(")
fullComplexType(&typeBuilder, tupleTypeWithName[typeLen+6:])
if err := serialisationToTuple(json.columns[nameBuilder.String()].(*Json), typeBuilder.String(), encoder); err != nil {
return err
}
} else {
if strings.HasPrefix(tupleTypeWithName[typeLen:], "array(") {
typeBuilder.WriteString("array(")
fullComplexType(&typeBuilder, tupleTypeWithName[typeLen+6:])
} else if strings.HasPrefix(tupleTypeWithName[typeLen:], "nested(") {
typeBuilder.WriteString("nested(")
fullComplexType(&typeBuilder, tupleTypeWithName[typeLen+7:])
} else {
for _, char := range tupleTypeWithName[typeLen:] {
if char == ')' || char == ',' {
break
}
typeBuilder.WriteRune(char)
}
}

if err := json.columns[nameBuilder.String()].Encode(encoder); err != nil {
return err
}
}

typeLen += len(typeBuilder.String())
}
return nil
}
Loading

0 comments on commit 08c522f

Please sign in to comment.