Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

codec(ticdc): fix some avro encoding bugs #4704

Merged
merged 10 commits into from
Mar 7, 2022
Prev Previous commit
Next Next commit
instead of carry enum values, carry colInfos and query when needed
  • Loading branch information
zhangyangyu committed Feb 28, 2022
commit 8a296f3f41a386c91f4dd963d14ce1e4e55d9972
7 changes: 5 additions & 2 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,8 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr
tableInfoVersion = tableInfo.TableInfoVersion
}

_, _, colInfos := tableInfo.GetRowColInfos()

return &model.RowChangedEvent{
StartTs: row.StartTs,
CommitTs: row.CRTs,
Expand All @@ -411,6 +413,7 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr
TableID: row.PhysicalTableID,
IsPartition: tableInfo.GetPartitionInfo() != nil,
},
ColInfos: colInfos,
Columns: cols,
PreColumns: preCols,
IndexColumns: tableInfo.IndexColumnsOffset,
Expand Down Expand Up @@ -467,11 +470,11 @@ func formatColVal(datum types.Datum, tp byte) (
v := d.String()
return v, sizeOfString(v), "", nil
case mysql.TypeEnum:
v := datum.GetMysqlEnum()
v := datum.GetMysqlEnum().Value
const sizeOfV = unsafe.Sizeof(v)
return v, int(sizeOfV), "", nil
case mysql.TypeSet:
v := datum.GetMysqlSet()
v := datum.GetMysqlSet().Value
const sizeOfV = unsafe.Sizeof(v)
return v, int(sizeOfV), "", nil
case mysql.TypeBit:
Expand Down
13 changes: 3 additions & 10 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,14 +401,7 @@ func prepareCheckSQL(t *testing.T, tableName string, cols []*model.Column) (stri
_, err = sb.WriteString(col.Name + " IS NULL")
require.Nil(t, err)
continue
}
if col.Type == mysql.TypeEnum {
params = append(params, col.Value.(types.Enum).Value)
} else if col.Type == mysql.TypeSet {
params = append(params, col.Value.(types.Set).Value)
} else {
params = append(params, col.Value)
}
params = append(params, col.Value)
if col.Type == mysql.TypeJSON {
_, err = sb.WriteString(col.Name + " = CAST(? AS JSON)")
} else {
Expand Down Expand Up @@ -886,7 +879,7 @@ func TestGetDefaultZeroValue(t *testing.T) {
},
// TypeEnum value will be a string and then translate to []byte
// NotNull && no default will choose first element
Res: types.Enum{Value: 0, Name: "e0"},
Res: uint64(0),
},
// mysql.TypeEnum + notnull + default
{
Expand All @@ -909,7 +902,7 @@ func TestGetDefaultZeroValue(t *testing.T) {
Flag: mysql.NotNullFlag,
},
},
Res: types.Set{},
Res: uint64(0),
},
// mysql.TypeSet + notnull + default
{
Expand Down
4 changes: 3 additions & 1 deletion cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/log"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/pkg/quotes"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
Expand Down Expand Up @@ -253,7 +254,8 @@ type RowChangedEvent struct {

RowID int64 `json:"row-id" msg:"-"` // Deprecated. It is empty when the RowID comes from clustered index table.

Table *TableName `json:"table" msg:"table"`
Table *TableName `json:"table" msg:"table"`
ColInfos []rowcodec.ColInfo `json:"column-infos" msg:"-"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use column_infos.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I don't really get it. Do you mean change the json:"column-infos" to json:"column_infos"? But all other fields use hyphens instead of underscores.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

msg:"-" seems to prevent the information from being persisted for Redo. Could you verify the compatibility of this change with Redo?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redo only applies to mysql sink currently so does none of avro business. So prevent ColInfos from being persisted for Redo seems right.

But I think the ColInfos is important and maybe in future Redo could support avro? Persist ColInfos seems bring no trouble. Upgrade is also safe since a nil ColInfos field is safe for other codecs. But as @liuzix tells me, it will make the redo log large. So I'd like to leave it unpersist in this pull request.

Could you take a look @ben1009 ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm for redo part


TableInfoVersion uint64 `json:"table-info-version,omitempty" msg:"table-info-version"`

Expand Down
31 changes: 20 additions & 11 deletions cdc/sink/codec/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -90,7 +91,7 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
mqMessage := NewMQMessage(config.ProtocolAvro, nil, nil, e.CommitTs, model.MqMessageTypeRow, &e.Table.Schema, &e.Table.Table)

if !e.IsDelete() {
res, err := avroEncode(e.Table, a.valueSchemaManager, e.TableInfoVersion, e.Columns, a.tz)
res, err := avroEncode(e.Table, a.valueSchemaManager, e.TableInfoVersion, e.Columns, e.ColInfos, a.tz)
if err != nil {
log.Warn("AppendRowChangedEvent: avro encoding failed", zap.String("table", e.Table.String()))
return errors.Annotate(err, "AppendRowChangedEvent could not encode to Avro")
Expand All @@ -109,7 +110,7 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)

pkeyCols := e.HandleKeyColumns()

res, err := avroEncode(e.Table, a.keySchemaManager, e.TableInfoVersion, pkeyCols, a.tz)
res, err := avroEncode(e.Table, a.keySchemaManager, e.TableInfoVersion, pkeyCols, e.ColInfos, a.tz)
if err != nil {
log.Warn("AppendRowChangedEvent: avro encoding failed", zap.String("table", e.Table.String()))
return errors.Annotate(err, "AppendRowChangedEvent could not encode to Avro")
Expand Down Expand Up @@ -158,7 +159,7 @@ func (a *AvroEventBatchEncoder) Size() int {
return sum
}

func avroEncode(table *model.TableName, manager *AvroSchemaManager, tableVersion uint64, cols []*model.Column, tz *time.Location) (*avroEncodeResult, error) {
func avroEncode(table *model.TableName, manager *AvroSchemaManager, tableVersion uint64, cols []*model.Column, colInfos []rowcodec.ColInfo, tz *time.Location) (*avroEncodeResult, error) {
schemaGen := func() (string, error) {
schema, err := ColumnInfoToAvroSchema(table.Table, cols)
if err != nil {
Expand All @@ -173,7 +174,7 @@ func avroEncode(table *model.TableName, manager *AvroSchemaManager, tableVersion
return nil, errors.Annotate(err, "AvroEventBatchEncoder: get-or-register failed")
}

native, err := rowToAvroNativeData(cols, tz)
native, err := rowToAvroNativeData(cols, colInfos, tz)
if err != nil {
return nil, errors.Annotate(err, "AvroEventBatchEncoder: converting to native failed")
}
Expand Down Expand Up @@ -244,13 +245,13 @@ func ColumnInfoToAvroSchema(name string, columnInfo []*model.Column) (string, er
return string(str), nil
}

func rowToAvroNativeData(cols []*model.Column, tz *time.Location) (interface{}, error) {
func rowToAvroNativeData(cols []*model.Column, colInfos []rowcodec.ColInfo, tz *time.Location) (interface{}, error) {
ret := make(map[string]interface{}, len(cols))
for _, col := range cols {
for i, col := range cols {
if col == nil {
continue
}
data, str, err := columnToAvroNativeData(col, tz)
data, str, err := columnToAvroNativeData(col, colInfos[i].Ft, tz)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -361,7 +362,7 @@ var (
zeroDateStr = types.NewTime(types.ZeroCoreTime, mysql.TypeDate, 0).String()
)

func columnToAvroNativeData(col *model.Column, tz *time.Location) (interface{}, string, error) {
func columnToAvroNativeData(col *model.Column, ft *types.FieldType, tz *time.Location) (interface{}, string, error) {
if col.Value == nil {
return nil, "null", nil
}
Expand Down Expand Up @@ -440,7 +441,7 @@ func columnToAvroNativeData(col *model.Column, tz *time.Location) (interface{},
d := types.NewDuration(hours, minutes, seconds, int(fracInt), fsp).Duration
const fullType = "int." + timeMillis
return d, string(fullType), nil
case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob:
case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeTinyBlob, mysql.TypeBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob:
if col.Flag.IsBinary() {
switch val := col.Value.(type) {
case string:
Expand All @@ -465,9 +466,17 @@ func columnToAvroNativeData(col *model.Column, tz *time.Location) (interface{},
case mysql.TypeNewDecimal:
return col.Value.(string), "string", nil
case mysql.TypeEnum:
return col.Value.(types.Enum).Name, "string", nil
enumVar, err := types.ParseEnumValue(ft.Elems, col.Value.(uint64))
if err != nil {
return nil, "", cerror.WrapError(cerror.ErrAvroEncodeFailed, err)
}
return enumVar.Name, "string", nil
case mysql.TypeSet:
return col.Value.(types.Set).Name, "string", nil
setVar, err := types.ParseSetValue(ft.Elems, col.Value.(uint64))
if err != nil {
return nil, "", cerror.WrapError(cerror.ErrAvroEncodeFailed, err)
}
return setVar.Name, "string", nil
case mysql.TypeBit:
return handleUnsignedInt64()
case mysql.TypeTiny, mysql.TypeShort, mysql.TypeInt24:
Expand Down
97 changes: 77 additions & 20 deletions cdc/sink/codec/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
model2 "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/puller"
"github.com/pingcap/tiflow/pkg/regionspan"
Expand Down Expand Up @@ -58,6 +59,21 @@ func (s *avroBatchEncoderSuite) TearDownSuite(c *check.C) {
stopHTTPInterceptForTestingRegistry()
}

func setBinChsClnFlag(ft *types.FieldType) *types.FieldType {
types.SetBinChsClnFlag(ft)
return ft
}

func setFlag(ft *types.FieldType, flag uint) *types.FieldType {
types.SetTypeFlag(&ft.Flag, flag, true)
return ft
}

func setElems(ft *types.FieldType, elems []string) *types.FieldType {
ft.Elems = elems
return ft
}

func (s *avroBatchEncoderSuite) TestAvroEncodeOnly(c *check.C) {
defer testleak.AfterTest(c)()

Expand Down Expand Up @@ -85,23 +101,56 @@ func (s *avroBatchEncoderSuite) TestAvroEncodeOnly(c *check.C) {
{Name: "mystring5", Value: "Hello World", Type: mysql.TypeVarString},
{Name: "mystring6", Value: "Hello World", Type: mysql.TypeString},
{Name: "mystring7", Value: "Hello World", Type: mysql.TypeVarchar},
{Name: "myenum", Value: types.Enum{Value: 1, Name: "v"}, Type: mysql.TypeEnum},
{Name: "myset", Value: types.Set{Value: 1, Name: "v"}, Type: mysql.TypeSet},
{Name: "myenum", Value: uint64(1), Type: mysql.TypeEnum},
{Name: "myset", Value: uint64(1), Type: mysql.TypeSet},
{Name: "ts", Value: time.Now().Format(types.TimeFSPFormat), Type: mysql.TypeTimestamp},
{Name: "myjson", Value: "{\"foo\": \"bar\"}", Type: mysql.TypeJSON},
}

colInfos := []rowcodec.ColInfo{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can organize this data by structured arrays? Controlling correspondence by index may be difficult with large test data.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Actually using arrays restricts the usage pattern. We have to iterate the array(currently no problem). I tried to use the tp map, but it can't be used since the column Id is lost in Column struct. Add fields to Column will cause much code churn. Any suggestions?

{ID: 1, IsPKHandle: true, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeLong)},
{ID: 2, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeLong)},
{ID: 3, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeTiny)},
{ID: 4, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeFloat)},
{ID: 5, IsPKHandle: false, VirtualGenCol: false, Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeBlob))},
{ID: 6, IsPKHandle: false, VirtualGenCol: false, Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeMediumBlob))},
{ID: 7, IsPKHandle: false, VirtualGenCol: false, Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeTinyBlob))},
{ID: 8, IsPKHandle: false, VirtualGenCol: false, Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeLongBlob))},
{ID: 9, IsPKHandle: false, VirtualGenCol: false, Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeVarString))},
{ID: 10, IsPKHandle: false, VirtualGenCol: false, Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeString))},
{ID: 11, IsPKHandle: false, VirtualGenCol: false, Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeVarchar))},
{ID: 12, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeBlob)},
{ID: 13, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeMediumBlob)},
{ID: 14, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeTinyBlob)},
{ID: 15, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeLongBlob)},
{ID: 16, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeVarString)},
{ID: 17, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeString)},
{ID: 18, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeVarchar)},
{ID: 19, IsPKHandle: false, VirtualGenCol: false, Ft: setElems(types.NewFieldType(mysql.TypeEnum), []string{"a", "b"})},
{ID: 20, IsPKHandle: false, VirtualGenCol: false, Ft: setElems(types.NewFieldType(mysql.TypeSet), []string{"a", "b"})},
{ID: 21, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeTimestamp)},
{ID: 22, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeJSON)},
}

schema, err := ColumnInfoToAvroSchema(table.Table, cols)
c.Assert(err, check.IsNil)
avroCodec, err := goavro.NewCodec(schema)
c.Assert(err, check.IsNil)

r, err := avroEncode(&table, s.encoder.valueSchemaManager, 1, cols, time.Local)
r, err := avroEncode(&table, s.encoder.valueSchemaManager, 1, cols, colInfos, time.Local)
c.Assert(err, check.IsNil)

res, _, err := avroCodec.NativeFromBinary(r.data)
c.Check(err, check.IsNil)
c.Check(res, check.NotNil)
for k, v := range res.(map[string]interface{}) {
if k == "myenum" || k == "myset" {
if vmap, ok := v.(map[string]interface{}); ok {
_, exists := vmap["string"]
c.Check(exists, check.IsTrue)
}
}
}

txt, err := avroCodec.TextualFromNative(nil, res)
c.Check(err, check.IsNil)
Expand All @@ -110,21 +159,6 @@ func (s *avroBatchEncoderSuite) TestAvroEncodeOnly(c *check.C) {

func (s *avroBatchEncoderSuite) TestAvroTimeZone(c *check.C) {
defer testleak.AfterTest(c)()
avroCodec, err := goavro.NewCodec(`
{
"type": "record",
"name": "TestAvroTimeZone",
"fields" : [
{"name": "id", "type": ["null", "int"], "default": null},
{"name": "myint", "type": ["null", "int"], "default": null},
{"name": "mybool", "type": ["null", "int"], "default": null},
{"name": "myfloat", "type": ["null", "float"], "default": null},
{"name": "mystring", "type": ["null", "string"], "default": null},
{"name": "ts", "type": ["null", {"type": "long", "logicalType": "timestamp-millis"}], "default": null}
]
}`)

c.Assert(err, check.IsNil)

table := model.TableName{
Schema: "testdb",
Expand All @@ -135,14 +169,30 @@ func (s *avroBatchEncoderSuite) TestAvroTimeZone(c *check.C) {
c.Check(err, check.IsNil)

timestamp := time.Now()
r, err := avroEncode(&table, s.encoder.valueSchemaManager, 1, []*model.Column{
cols := []*model.Column{
{Name: "id", Value: int64(1), Type: mysql.TypeLong},
{Name: "myint", Value: int64(2), Type: mysql.TypeLong},
{Name: "mybool", Value: int64(1), Type: mysql.TypeTiny},
{Name: "myfloat", Value: float64(3.14), Type: mysql.TypeFloat},
{Name: "mystring", Value: []byte("Hello World"), Type: mysql.TypeBlob},
{Name: "ts", Value: timestamp.In(location).Format(types.TimeFSPFormat), Type: mysql.TypeTimestamp},
}, location)
}

colInfos := []rowcodec.ColInfo{
{ID: 1, IsPKHandle: true, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeLong)},
{ID: 2, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeLong)},
{ID: 3, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeTiny)},
{ID: 4, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeFloat)},
{ID: 5, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeBlob)},
{ID: 6, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeTimestamp)},
}

schema, err := ColumnInfoToAvroSchema(table.Table, cols)
c.Assert(err, check.IsNil)
avroCodec, err := goavro.NewCodec(schema)
c.Assert(err, check.IsNil)

r, err := avroEncode(&table, s.encoder.valueSchemaManager, 1, cols, colInfos, location)
c.Assert(err, check.IsNil)

res, _, err := avroCodec.NativeFromBinary(r.data)
Expand Down Expand Up @@ -206,6 +256,13 @@ func (s *avroBatchEncoderSuite) TestAvroEncode(c *check.C) {
{Name: "utiny", Type: mysql.TypeTiny, Flag: model.UnsignedFlag, Value: uint64(100)},
{Name: "comment", Type: mysql.TypeBlob, Value: []byte("测试")},
},
ColInfos: []rowcodec.ColInfo{
{ID: 1, IsPKHandle: true, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeLong)},
{ID: 2, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeVarchar)},
{ID: 3, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeTiny)},
{ID: 4, IsPKHandle: false, VirtualGenCol: false, Ft: setFlag(types.NewFieldType(mysql.TypeTiny), uint(model.UnsignedFlag))},
{ID: 5, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeBlob)},
},
}

testCaseDdl := &model.DDLEvent{
Expand Down
4 changes: 0 additions & 4 deletions cdc/sink/codec/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,6 @@ func (b *canalEntryBuilder) formatValue(value interface{}, javaType JavaSQLType)
}
result = string(decoded)
}
case types.Enum:
result = strconv.FormatUint(v.Value, 10)
case types.Set:
result = strconv.FormatUint(v.Value, 10)
default:
result = fmt.Sprintf("%v", v)
}
Expand Down
5 changes: 2 additions & 3 deletions cdc/sink/codec/canal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/pingcap/check"
mm "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"golang.org/x/text/encoding/charmap"

"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -444,8 +443,8 @@ var testColumnsTable = []*testColumnTuple{
{&model.Column{Name: "time", Type: mysql.TypeDuration, Value: "02:20:20"}, "time", JavaSQLTypeTIME, "02:20:20"},
{&model.Column{Name: "year", Type: mysql.TypeYear, Value: "2020", Flag: model.UnsignedFlag}, "year", JavaSQLTypeVARCHAR, "2020"},

{&model.Column{Name: "enum", Type: mysql.TypeEnum, Value: types.Enum{Name: "v", Value: 1}}, "enum", JavaSQLTypeINTEGER, "1"},
{&model.Column{Name: "set", Type: mysql.TypeSet, Value: types.Set{Name: "v", Value: 3}}, "set", JavaSQLTypeBIT, "3"},
{&model.Column{Name: "enum", Type: mysql.TypeEnum, Value: uint64(1)}, "enum", JavaSQLTypeINTEGER, "1"},
{&model.Column{Name: "set", Type: mysql.TypeSet, Value: uint64(3)}, "set", JavaSQLTypeBIT, "3"},
{&model.Column{Name: "bit", Type: mysql.TypeBit, Value: uint64(65), Flag: model.UnsignedFlag | model.BinaryFlag}, "bit", JavaSQLTypeBIT, "65"},
{&model.Column{Name: "json", Type: mysql.TypeJSON, Value: "{\"key1\": \"value1\"}", Flag: model.BinaryFlag}, "json", JavaSQLTypeVARCHAR, "{\"key1\": \"value1\"}"},
}
Expand Down
7 changes: 1 addition & 6 deletions cdc/sink/codec/craft/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"unsafe"

"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tiflow/cdc/model"
)

Expand Down Expand Up @@ -194,11 +193,7 @@ func EncodeTiDBType(allocator *SliceAllocator, ty byte, flag model.ColumnFlagTyp
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeNewDate, mysql.TypeTimestamp, mysql.TypeDuration, mysql.TypeJSON, mysql.TypeNewDecimal:
// value type for these mysql types are string
return unsafeStringToBytes(value.(string))
case mysql.TypeEnum:
return encodeUvarint(allocator.byteSlice(binary.MaxVarintLen64)[:0], value.(types.Enum).Value)
case mysql.TypeSet:
return encodeUvarint(allocator.byteSlice(binary.MaxVarintLen64)[:0], value.(types.Set).Value)
case mysql.TypeBit:
case mysql.TypeEnum, mysql.TypeSet, mysql.TypeBit:
// value type for these mysql types are uint64
return encodeUvarint(allocator.byteSlice(binary.MaxVarintLen64)[:0], value.(uint64))
case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar,
Expand Down
Loading