Skip to content

Commit

Permalink
encode & decode DML event passed.
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Jan 25, 2024
1 parent a01a0f7 commit 5cfdb61
Show file tree
Hide file tree
Showing 10 changed files with 262 additions and 518 deletions.
59 changes: 59 additions & 0 deletions pkg/sink/codec/simple/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,37 @@ func newMessageFromAvroGoGenMessage(holder *avro.Message, result *message) error
result.CommitTs = uint64(watermark.CommitTs)
result.BuildTs = watermark.BuildTs
case avro.UnionWatermarkBootstrapDDLDMLTypeEnumDML:
event := holder.Payload.DML
result.Version = int(event.Version)
result.Type = EventType(event.Type)
result.CommitTs = uint64(event.CommitTs)
result.BuildTs = event.BuildTs
result.Schema = event.Database
result.Table = event.Table
result.TableID = event.TableID
result.SchemaVersion = uint64(event.SchemaVersion)

if event.HandleKeyOnly != nil {
result.HandleKeyOnly = event.HandleKeyOnly.Bool
}
if event.ClaimCheckLocation != nil {
result.ClaimCheckLocation = event.ClaimCheckLocation.String
}
if event.Checksum != nil {
result.Checksum = &checksum{
Version: int(event.Checksum.Checksum.Version),
Corrupted: event.Checksum.Checksum.Corrupted,
Current: uint32(event.Checksum.Checksum.Current),
Previous: uint32(event.Checksum.Checksum.Previous),
}
}

if event.Data != nil {
result.Data = newDataMap4GoGenAvro(event.Data.MapUnionNullLongFloatDoubleStringBytes)
}
if event.Old != nil {
result.Old = newDataMap4GoGenAvro(event.Old.MapUnionNullLongFloatDoubleStringBytes)
}
case avro.UnionWatermarkBootstrapDDLDMLTypeEnumBootstrap:
event := holder.Payload.Bootstrap
result.Version = int(event.Version)
Expand All @@ -495,6 +526,34 @@ func newMessageFromAvroGoGenMessage(holder *avro.Message, result *message) error
return nil
}

func newDataMap4GoGenAvro(rawDataMap map[string]*avro.UnionNullLongFloatDoubleStringBytes) map[string]interface{} {
if rawDataMap == nil {
return nil
}
data := make(map[string]interface{})
for key, value := range rawDataMap {
if value == nil {
data[key] = nil
continue
}
switch value.UnionType {
case avro.UnionNullLongFloatDoubleStringBytesTypeEnumLong:
data[key] = value.Long
case avro.UnionNullLongFloatDoubleStringBytesTypeEnumFloat:
data[key] = value.Float
case avro.UnionNullLongFloatDoubleStringBytesTypeEnumDouble:
data[key] = value.Double
case avro.UnionNullLongFloatDoubleStringBytesTypeEnumString:
data[key] = value.String
case avro.UnionNullLongFloatDoubleStringBytesTypeEnumBytes:
data[key] = []byte(value.Bytes)
default:
log.Panic("unknown enum type found", zap.Any("value", value))
}
}
return data
}

func newTableSchemaFromGoGenAvro(avroTableSchema avro.TableSchema) *TableSchema {
columns := make([]*columnSchema, 0, len(avroTableSchema.Columns))
for _, col := range avroTableSchema.Columns {
Expand Down
12 changes: 6 additions & 6 deletions pkg/sink/codec/simple/avro/dml.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

4 changes: 2 additions & 2 deletions pkg/sink/codec/simple/avro/message.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 5cfdb61

Please sign in to comment.