-
Notifications
You must be signed in to change notification settings - Fork 288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
codec(ticdc): fix some avro encoding bugs #4704
Changes from all commits
078c562
e85f8c3
11e8526
961134e
8a296f3
4741fc6
14b7490
b9b05cb
ee85c59
2612e5b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,7 @@ import ( | |
"github.com/pingcap/log" | ||
"github.com/pingcap/tidb/parser/mysql" | ||
"github.com/pingcap/tidb/types" | ||
"github.com/pingcap/tidb/util/rowcodec" | ||
"github.com/pingcap/tiflow/cdc/model" | ||
"github.com/pingcap/tiflow/pkg/config" | ||
cerror "github.com/pingcap/tiflow/pkg/errors" | ||
|
@@ -90,7 +91,7 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) | |
mqMessage := NewMQMessage(config.ProtocolAvro, nil, nil, e.CommitTs, model.MqMessageTypeRow, &e.Table.Schema, &e.Table.Table) | ||
|
||
if !e.IsDelete() { | ||
res, err := avroEncode(e.Table, a.valueSchemaManager, e.TableInfoVersion, e.Columns, a.tz) | ||
res, err := avroEncode(e.Table, a.valueSchemaManager, e.TableInfoVersion, e.Columns, e.ColInfos, a.tz) | ||
if err != nil { | ||
log.Warn("AppendRowChangedEvent: avro encoding failed", zap.String("table", e.Table.String())) | ||
return errors.Annotate(err, "AppendRowChangedEvent could not encode to Avro") | ||
|
@@ -109,7 +110,7 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent) | |
|
||
pkeyCols := e.HandleKeyColumns() | ||
|
||
res, err := avroEncode(e.Table, a.keySchemaManager, e.TableInfoVersion, pkeyCols, a.tz) | ||
res, err := avroEncode(e.Table, a.keySchemaManager, e.TableInfoVersion, pkeyCols, e.ColInfos, a.tz) | ||
if err != nil { | ||
log.Warn("AppendRowChangedEvent: avro encoding failed", zap.String("table", e.Table.String())) | ||
return errors.Annotate(err, "AppendRowChangedEvent could not encode to Avro") | ||
|
@@ -158,7 +159,7 @@ func (a *AvroEventBatchEncoder) Size() int { | |
return sum | ||
} | ||
|
||
func avroEncode(table *model.TableName, manager *AvroSchemaManager, tableVersion uint64, cols []*model.Column, tz *time.Location) (*avroEncodeResult, error) { | ||
func avroEncode(table *model.TableName, manager *AvroSchemaManager, tableVersion uint64, cols []*model.Column, colInfos []rowcodec.ColInfo, tz *time.Location) (*avroEncodeResult, error) { | ||
schemaGen := func() (string, error) { | ||
schema, err := ColumnInfoToAvroSchema(table.Table, cols) | ||
if err != nil { | ||
|
@@ -173,7 +174,7 @@ func avroEncode(table *model.TableName, manager *AvroSchemaManager, tableVersion | |
return nil, errors.Annotate(err, "AvroEventBatchEncoder: get-or-register failed") | ||
} | ||
|
||
native, err := rowToAvroNativeData(cols, tz) | ||
native, err := rowToAvroNativeData(cols, colInfos, tz) | ||
if err != nil { | ||
return nil, errors.Annotate(err, "AvroEventBatchEncoder: converting to native failed") | ||
} | ||
|
@@ -244,13 +245,13 @@ func ColumnInfoToAvroSchema(name string, columnInfo []*model.Column) (string, er | |
return string(str), nil | ||
} | ||
|
||
func rowToAvroNativeData(cols []*model.Column, tz *time.Location) (interface{}, error) { | ||
func rowToAvroNativeData(cols []*model.Column, colInfos []rowcodec.ColInfo, tz *time.Location) (interface{}, error) { | ||
ret := make(map[string]interface{}, len(cols)) | ||
for _, col := range cols { | ||
for i, col := range cols { | ||
if col == nil { | ||
continue | ||
} | ||
data, str, err := columnToAvroNativeData(col, tz) | ||
data, str, err := columnToAvroNativeData(col, colInfos[i].Ft, tz) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -293,7 +294,7 @@ func getAvroDataTypeFallback(v interface{}) (string, error) { | |
var unsignedLongAvroType = avroLogicalType{ | ||
Type: "bytes", | ||
LogicalType: decimalType, | ||
Precision: 8, | ||
Precision: 64, // enough to hold all values and is the default precision of confluent schema registry | ||
Scale: 0, | ||
} | ||
|
||
|
@@ -305,6 +306,9 @@ func getAvroDataTypeFromColumn(col *model.Column) (interface{}, error) { | |
case mysql.TypeDouble: | ||
return "double", nil | ||
case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString: | ||
if col.Flag.IsBinary() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add some unit tests? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the future, maybe we need a flag to control the behavior? Like https://debezium.io/documentation/reference/connectors/mysql.html#mysql-property-binary-handling-mode Cc @leoppro |
||
return "bytes", nil | ||
} | ||
return "string", nil | ||
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeTimestamp: | ||
return avroLogicalType{ | ||
|
@@ -317,9 +321,9 @@ func getAvroDataTypeFromColumn(col *model.Column) (interface{}, error) { | |
LogicalType: timeMillis, | ||
}, nil | ||
case mysql.TypeEnum: | ||
return unsignedLongAvroType, nil | ||
return "string", nil | ||
case mysql.TypeSet: | ||
return unsignedLongAvroType, nil | ||
return "string", nil | ||
case mysql.TypeBit: | ||
return unsignedLongAvroType, nil | ||
case mysql.TypeNewDecimal: | ||
|
@@ -341,7 +345,10 @@ func getAvroDataTypeFromColumn(col *model.Column) (interface{}, error) { | |
case mysql.TypeJSON: | ||
return "string", nil | ||
case mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob: | ||
return "bytes", nil | ||
if col.Flag.IsBinary() { | ||
return "bytes", nil | ||
} | ||
return "string", nil | ||
case mysql.TypeYear: | ||
return "long", nil | ||
default: | ||
|
@@ -355,7 +362,7 @@ var ( | |
zeroDateStr = types.NewTime(types.ZeroCoreTime, mysql.TypeDate, 0).String() | ||
) | ||
|
||
func columnToAvroNativeData(col *model.Column, tz *time.Location) (interface{}, string, error) { | ||
func columnToAvroNativeData(col *model.Column, ft *types.FieldType, tz *time.Location) (interface{}, string, error) { | ||
if col.Value == nil { | ||
return nil, "null", nil | ||
} | ||
|
@@ -434,7 +441,7 @@ func columnToAvroNativeData(col *model.Column, tz *time.Location) (interface{}, | |
d := types.NewDuration(hours, minutes, seconds, int(fracInt), fsp).Duration | ||
const fullType = "int." + timeMillis | ||
return d, string(fullType), nil | ||
case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString: | ||
case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeTinyBlob, mysql.TypeBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob: | ||
if col.Flag.IsBinary() { | ||
switch val := col.Value.(type) { | ||
case string: | ||
|
@@ -459,9 +466,17 @@ func columnToAvroNativeData(col *model.Column, tz *time.Location) (interface{}, | |
case mysql.TypeNewDecimal: | ||
return col.Value.(string), "string", nil | ||
case mysql.TypeEnum: | ||
return handleUnsignedInt64() | ||
enumVar, err := types.ParseEnumValue(ft.Elems, col.Value.(uint64)) | ||
if err != nil { | ||
return nil, "", cerror.WrapError(cerror.ErrAvroEncodeFailed, err) | ||
} | ||
return enumVar.Name, "string", nil | ||
case mysql.TypeSet: | ||
return handleUnsignedInt64() | ||
setVar, err := types.ParseSetValue(ft.Elems, col.Value.(uint64)) | ||
if err != nil { | ||
return nil, "", cerror.WrapError(cerror.ErrAvroEncodeFailed, err) | ||
} | ||
return setVar.Name, "string", nil | ||
case mysql.TypeBit: | ||
return handleUnsignedInt64() | ||
case mysql.TypeTiny, mysql.TypeShort, mysql.TypeInt24: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ import ( | |
model2 "github.com/pingcap/tidb/parser/model" | ||
"github.com/pingcap/tidb/parser/mysql" | ||
"github.com/pingcap/tidb/types" | ||
"github.com/pingcap/tidb/util/rowcodec" | ||
"github.com/pingcap/tiflow/cdc/model" | ||
"github.com/pingcap/tiflow/cdc/puller" | ||
"github.com/pingcap/tiflow/pkg/regionspan" | ||
|
@@ -58,43 +59,98 @@ func (s *avroBatchEncoderSuite) TearDownSuite(c *check.C) { | |
stopHTTPInterceptForTestingRegistry() | ||
} | ||
|
||
func setBinChsClnFlag(ft *types.FieldType) *types.FieldType { | ||
types.SetBinChsClnFlag(ft) | ||
return ft | ||
} | ||
|
||
func setFlag(ft *types.FieldType, flag uint) *types.FieldType { | ||
types.SetTypeFlag(&ft.Flag, flag, true) | ||
return ft | ||
} | ||
|
||
func setElems(ft *types.FieldType, elems []string) *types.FieldType { | ||
ft.Elems = elems | ||
return ft | ||
} | ||
|
||
func (s *avroBatchEncoderSuite) TestAvroEncodeOnly(c *check.C) { | ||
defer testleak.AfterTest(c)() | ||
avroCodec, err := goavro.NewCodec(` | ||
{ | ||
"type": "record", | ||
"name": "test1", | ||
"fields" : [ | ||
{"name": "id", "type": ["null", "int"], "default": null}, | ||
{"name": "myint", "type": ["null", "int"], "default": null}, | ||
{"name": "mybool", "type": ["null", "int"], "default": null}, | ||
{"name": "myfloat", "type": ["null", "float"], "default": null}, | ||
{"name": "mybytes", "type": ["null", "bytes"], "default": null}, | ||
{"name": "ts", "type": ["null", {"type": "long", "logicalType": "timestamp-millis"}], "default": null} | ||
] | ||
}`) | ||
|
||
c.Assert(err, check.IsNil) | ||
|
||
table := model.TableName{ | ||
Schema: "testdb", | ||
Table: "test1", | ||
Table: "TestAvroEncodeOnly", | ||
} | ||
|
||
r, err := avroEncode(&table, s.encoder.valueSchemaManager, 1, []*model.Column{ | ||
cols := []*model.Column{ | ||
{Name: "id", Value: int64(1), Type: mysql.TypeLong}, | ||
{Name: "myint", Value: int64(2), Type: mysql.TypeLong}, | ||
{Name: "mybool", Value: int64(1), Type: mysql.TypeTiny}, | ||
{Name: "myfloat", Value: float64(3.14), Type: mysql.TypeFloat}, | ||
{Name: "mybytes", Value: []byte("Hello World"), Type: mysql.TypeBlob}, | ||
{Name: "mybytes1", Value: []byte("Hello World"), Flag: model.BinaryFlag, Type: mysql.TypeBlob}, | ||
{Name: "mybytes2", Value: []byte("Hello World"), Flag: model.BinaryFlag, Type: mysql.TypeMediumBlob}, | ||
{Name: "mybytes3", Value: []byte("Hello World"), Flag: model.BinaryFlag, Type: mysql.TypeTinyBlob}, | ||
{Name: "mybytes4", Value: []byte("Hello World"), Flag: model.BinaryFlag, Type: mysql.TypeLongBlob}, | ||
{Name: "mybytes5", Value: []byte("Hello World"), Flag: model.BinaryFlag, Type: mysql.TypeVarString}, | ||
{Name: "mybytes6", Value: []byte("Hello World"), Flag: model.BinaryFlag, Type: mysql.TypeString}, | ||
{Name: "mybytes7", Value: []byte("Hello World"), Flag: model.BinaryFlag, Type: mysql.TypeVarchar}, | ||
{Name: "mystring1", Value: "Hello World", Type: mysql.TypeBlob}, | ||
{Name: "mystring2", Value: "Hello World", Type: mysql.TypeMediumBlob}, | ||
{Name: "mystring3", Value: "Hello World", Type: mysql.TypeTinyBlob}, | ||
{Name: "mystring4", Value: "Hello World", Type: mysql.TypeLongBlob}, | ||
{Name: "mystring5", Value: "Hello World", Type: mysql.TypeVarString}, | ||
{Name: "mystring6", Value: "Hello World", Type: mysql.TypeString}, | ||
{Name: "mystring7", Value: "Hello World", Type: mysql.TypeVarchar}, | ||
{Name: "myenum", Value: uint64(1), Type: mysql.TypeEnum}, | ||
{Name: "myset", Value: uint64(1), Type: mysql.TypeSet}, | ||
{Name: "ts", Value: time.Now().Format(types.TimeFSPFormat), Type: mysql.TypeTimestamp}, | ||
{Name: "myjson", Value: "{\"foo\": \"bar\"}", Type: mysql.TypeJSON}, | ||
}, time.Local) | ||
} | ||
|
||
colInfos := []rowcodec.ColInfo{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we can organize this data by structured arrays? Controlling correspondence by index may be difficult with large test data. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. Actually using arrays restricts the usage pattern. We have to iterate the array(currently no problem). I tried to use the tp map, but it can't be used since the column |
||
{ID: 1, IsPKHandle: true, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeLong)}, | ||
{ID: 2, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeLong)}, | ||
{ID: 3, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeTiny)}, | ||
{ID: 4, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeFloat)}, | ||
{ID: 5, IsPKHandle: false, VirtualGenCol: false, Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeBlob))}, | ||
{ID: 6, IsPKHandle: false, VirtualGenCol: false, Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeMediumBlob))}, | ||
{ID: 7, IsPKHandle: false, VirtualGenCol: false, Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeTinyBlob))}, | ||
{ID: 8, IsPKHandle: false, VirtualGenCol: false, Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeLongBlob))}, | ||
{ID: 9, IsPKHandle: false, VirtualGenCol: false, Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeVarString))}, | ||
{ID: 10, IsPKHandle: false, VirtualGenCol: false, Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeString))}, | ||
{ID: 11, IsPKHandle: false, VirtualGenCol: false, Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeVarchar))}, | ||
{ID: 12, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeBlob)}, | ||
{ID: 13, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeMediumBlob)}, | ||
{ID: 14, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeTinyBlob)}, | ||
{ID: 15, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeLongBlob)}, | ||
{ID: 16, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeVarString)}, | ||
{ID: 17, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeString)}, | ||
{ID: 18, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeVarchar)}, | ||
{ID: 19, IsPKHandle: false, VirtualGenCol: false, Ft: setElems(types.NewFieldType(mysql.TypeEnum), []string{"a", "b"})}, | ||
{ID: 20, IsPKHandle: false, VirtualGenCol: false, Ft: setElems(types.NewFieldType(mysql.TypeSet), []string{"a", "b"})}, | ||
{ID: 21, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeTimestamp)}, | ||
{ID: 22, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeJSON)}, | ||
} | ||
|
||
schema, err := ColumnInfoToAvroSchema(table.Table, cols) | ||
c.Assert(err, check.IsNil) | ||
avroCodec, err := goavro.NewCodec(schema) | ||
c.Assert(err, check.IsNil) | ||
|
||
r, err := avroEncode(&table, s.encoder.valueSchemaManager, 1, cols, colInfos, time.Local) | ||
c.Assert(err, check.IsNil) | ||
|
||
res, _, err := avroCodec.NativeFromBinary(r.data) | ||
c.Check(err, check.IsNil) | ||
c.Check(res, check.NotNil) | ||
for k, v := range res.(map[string]interface{}) { | ||
if k == "myenum" || k == "myset" { | ||
if vmap, ok := v.(map[string]interface{}); ok { | ||
_, exists := vmap["string"] | ||
c.Check(exists, check.IsTrue) | ||
} | ||
} | ||
} | ||
|
||
txt, err := avroCodec.TextualFromNative(nil, res) | ||
c.Check(err, check.IsNil) | ||
|
@@ -103,39 +159,40 @@ func (s *avroBatchEncoderSuite) TestAvroEncodeOnly(c *check.C) { | |
|
||
func (s *avroBatchEncoderSuite) TestAvroTimeZone(c *check.C) { | ||
defer testleak.AfterTest(c)() | ||
avroCodec, err := goavro.NewCodec(` | ||
{ | ||
"type": "record", | ||
"name": "test1", | ||
"fields" : [ | ||
{"name": "id", "type": ["null", "int"], "default": null}, | ||
{"name": "myint", "type": ["null", "int"], "default": null}, | ||
{"name": "mybool", "type": ["null", "int"], "default": null}, | ||
{"name": "myfloat", "type": ["null", "float"], "default": null}, | ||
{"name": "mybytes", "type": ["null", "bytes"], "default": null}, | ||
{"name": "ts", "type": ["null", {"type": "long", "logicalType": "timestamp-millis"}], "default": null} | ||
] | ||
}`) | ||
|
||
c.Assert(err, check.IsNil) | ||
|
||
table := model.TableName{ | ||
Schema: "testdb", | ||
Table: "test1", | ||
Table: "TestAvroTimeZone", | ||
} | ||
|
||
location, err := time.LoadLocation("UTC") | ||
c.Check(err, check.IsNil) | ||
|
||
timestamp := time.Now() | ||
r, err := avroEncode(&table, s.encoder.valueSchemaManager, 1, []*model.Column{ | ||
cols := []*model.Column{ | ||
{Name: "id", Value: int64(1), Type: mysql.TypeLong}, | ||
{Name: "myint", Value: int64(2), Type: mysql.TypeLong}, | ||
{Name: "mybool", Value: int64(1), Type: mysql.TypeTiny}, | ||
{Name: "myfloat", Value: float64(3.14), Type: mysql.TypeFloat}, | ||
{Name: "mybytes", Value: []byte("Hello World"), Type: mysql.TypeBlob}, | ||
{Name: "mystring", Value: []byte("Hello World"), Type: mysql.TypeBlob}, | ||
{Name: "ts", Value: timestamp.In(location).Format(types.TimeFSPFormat), Type: mysql.TypeTimestamp}, | ||
}, location) | ||
} | ||
|
||
colInfos := []rowcodec.ColInfo{ | ||
{ID: 1, IsPKHandle: true, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeLong)}, | ||
{ID: 2, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeLong)}, | ||
{ID: 3, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeTiny)}, | ||
{ID: 4, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeFloat)}, | ||
{ID: 5, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeBlob)}, | ||
{ID: 6, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeTimestamp)}, | ||
} | ||
|
||
schema, err := ColumnInfoToAvroSchema(table.Table, cols) | ||
c.Assert(err, check.IsNil) | ||
avroCodec, err := goavro.NewCodec(schema) | ||
c.Assert(err, check.IsNil) | ||
|
||
r, err := avroEncode(&table, s.encoder.valueSchemaManager, 1, cols, colInfos, location) | ||
c.Assert(err, check.IsNil) | ||
|
||
res, _, err := avroCodec.NativeFromBinary(r.data) | ||
|
@@ -150,7 +207,7 @@ func (s *avroBatchEncoderSuite) TestAvroEnvelope(c *check.C) { | |
avroCodec, err := goavro.NewCodec(` | ||
{ | ||
"type": "record", | ||
"name": "test2", | ||
"name": "TestAvroEnvelope", | ||
"fields" : [ | ||
{"name": "id", "type": "int", "default": 0} | ||
] | ||
|
@@ -199,6 +256,13 @@ func (s *avroBatchEncoderSuite) TestAvroEncode(c *check.C) { | |
{Name: "utiny", Type: mysql.TypeTiny, Flag: model.UnsignedFlag, Value: uint64(100)}, | ||
{Name: "comment", Type: mysql.TypeBlob, Value: []byte("测试")}, | ||
}, | ||
ColInfos: []rowcodec.ColInfo{ | ||
{ID: 1, IsPKHandle: true, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeLong)}, | ||
{ID: 2, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeVarchar)}, | ||
{ID: 3, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeTiny)}, | ||
{ID: 4, IsPKHandle: false, VirtualGenCol: false, Ft: setFlag(types.NewFieldType(mysql.TypeTiny), uint(model.UnsignedFlag))}, | ||
{ID: 5, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeBlob)}, | ||
}, | ||
} | ||
|
||
testCaseDdl := &model.DDLEvent{ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use
column_infos
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I don't really get it. Do you mean change the
json:"column-infos"
tojson:"column_infos"
? But all other fields use hyphens instead of underscores.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
msg:"-"
seems to prevent the information from being persisted for Redo. Could you verify the compatibility of this change with Redo?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redo only applies to mysql sink currently so does none of avro business. So prevent
ColInfos
from being persisted for Redo seems right.But I think the
ColInfos
is important and maybe in future Redo could support avro? PersistColInfos
seems bring no trouble. Upgrade is also safe since anil
ColInfos
field is safe for other codecs. But as @liuzix tells me, it will make the redo log large. So I'd like to leave it unpersist in this pull request.Could you take a look @ben1009 ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm for redo part