Skip to content

Commit

Permalink
instead of carry enum values, carry colInfos and query when needed
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangyangyu committed Feb 27, 2022
1 parent c1db795 commit 012c834
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 70 deletions.
7 changes: 5 additions & 2 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,8 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr
tableInfoVersion = tableInfo.TableInfoVersion
}

_, _, colInfos := tableInfo.GetRowColInfos()

return &model.RowChangedEvent{
StartTs: row.StartTs,
CommitTs: row.CRTs,
Expand All @@ -411,6 +413,7 @@ func (m *mounterImpl) mountRowKVEntry(tableInfo *model.TableInfo, row *rowKVEntr
TableID: row.PhysicalTableID,
IsPartition: tableInfo.GetPartitionInfo() != nil,
},
ColInfos: colInfos,
Columns: cols,
PreColumns: preCols,
IndexColumns: tableInfo.IndexColumnsOffset,
Expand Down Expand Up @@ -467,11 +470,11 @@ func formatColVal(datum types.Datum, tp byte) (
v := d.String()
return v, sizeOfString(v), "", nil
case mysql.TypeEnum:
v := datum.GetMysqlEnum()
v := datum.GetMysqlEnum().Value
const sizeOfV = unsafe.Sizeof(v)
return v, int(sizeOfV), "", nil
case mysql.TypeSet:
v := datum.GetMysqlSet()
v := datum.GetMysqlSet().Value
const sizeOfV = unsafe.Sizeof(v)
return v, int(sizeOfV), "", nil
case mysql.TypeBit:
Expand Down
13 changes: 3 additions & 10 deletions cdc/entry/mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,14 +401,7 @@ func prepareCheckSQL(t *testing.T, tableName string, cols []*model.Column) (stri
_, err = sb.WriteString(col.Name + " IS NULL")
require.Nil(t, err)
continue
}
if col.Type == mysql.TypeEnum {
params = append(params, col.Value.(types.Enum).Value)
} else if col.Type == mysql.TypeSet {
params = append(params, col.Value.(types.Set).Value)
} else {
params = append(params, col.Value)
}
params = append(params, col.Value)
if col.Type == mysql.TypeJSON {
_, err = sb.WriteString(col.Name + " = CAST(? AS JSON)")
} else {
Expand Down Expand Up @@ -886,7 +879,7 @@ func TestGetDefaultZeroValue(t *testing.T) {
},
// TypeEnum value will be a string and then translate to []byte
// NotNull && no default will choose first element
Res: types.Enum{Value: 0, Name: "e0"},
Res: uint64(0),
},
// mysql.TypeEnum + notnull + default
{
Expand All @@ -909,7 +902,7 @@ func TestGetDefaultZeroValue(t *testing.T) {
Flag: mysql.NotNullFlag,
},
},
Res: types.Set{},
Res: uint64(0),
},
// mysql.TypeSet + notnull + default
{
Expand Down
4 changes: 3 additions & 1 deletion cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/log"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/pkg/quotes"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
Expand Down Expand Up @@ -253,7 +254,8 @@ type RowChangedEvent struct {

RowID int64 `json:"row-id" msg:"-"` // Deprecated. It is empty when the RowID comes from clustered index table.

Table *TableName `json:"table" msg:"table"`
Table *TableName `json:"table" msg:"table"`
ColInfos []rowcodec.ColInfo `json:"column-infos" msg:"-"`

TableInfoVersion uint64 `json:"table-info-version,omitempty" msg:"table-info-version"`

Expand Down
31 changes: 20 additions & 11 deletions cdc/sink/codec/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -91,7 +92,7 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
mqMessage := NewMQMessage(config.ProtocolAvro, nil, nil, e.CommitTs, model.MqMessageTypeRow, &e.Table.Schema, &e.Table.Table)

if !e.IsDelete() {
res, err := avroEncode(e.Table, a.valueSchemaManager, e.TableInfoVersion, e.Columns, a.tz)
res, err := avroEncode(e.Table, a.valueSchemaManager, e.TableInfoVersion, e.Columns, e.ColInfos, a.tz)
if err != nil {
log.Warn("AppendRowChangedEvent: avro encoding failed", zap.String("table", e.Table.String()))
return errors.Annotate(err, "AppendRowChangedEvent could not encode to Avro")
Expand All @@ -110,7 +111,7 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)

pkeyCols := e.HandleKeyColumns()

res, err := avroEncode(e.Table, a.keySchemaManager, e.TableInfoVersion, pkeyCols, a.tz)
res, err := avroEncode(e.Table, a.keySchemaManager, e.TableInfoVersion, pkeyCols, e.ColInfos, a.tz)
if err != nil {
log.Warn("AppendRowChangedEvent: avro encoding failed", zap.String("table", e.Table.String()))
return errors.Annotate(err, "AppendRowChangedEvent could not encode to Avro")
Expand Down Expand Up @@ -165,7 +166,7 @@ func (a *AvroEventBatchEncoder) SetParams(params map[string]string) error {
return nil
}

func avroEncode(table *model.TableName, manager *AvroSchemaManager, tableVersion uint64, cols []*model.Column, tz *time.Location) (*avroEncodeResult, error) {
func avroEncode(table *model.TableName, manager *AvroSchemaManager, tableVersion uint64, cols []*model.Column, colInfos []rowcodec.ColInfo, tz *time.Location) (*avroEncodeResult, error) {
schemaGen := func() (string, error) {
schema, err := ColumnInfoToAvroSchema(table.Table, cols)
if err != nil {
Expand All @@ -180,7 +181,7 @@ func avroEncode(table *model.TableName, manager *AvroSchemaManager, tableVersion
return nil, errors.Annotate(err, "AvroEventBatchEncoder: get-or-register failed")
}

native, err := rowToAvroNativeData(cols, tz)
native, err := rowToAvroNativeData(cols, colInfos, tz)
if err != nil {
return nil, errors.Annotate(err, "AvroEventBatchEncoder: converting to native failed")
}
Expand Down Expand Up @@ -251,13 +252,13 @@ func ColumnInfoToAvroSchema(name string, columnInfo []*model.Column) (string, er
return string(str), nil
}

func rowToAvroNativeData(cols []*model.Column, tz *time.Location) (interface{}, error) {
func rowToAvroNativeData(cols []*model.Column, colInfos []rowcodec.ColInfo, tz *time.Location) (interface{}, error) {
ret := make(map[string]interface{}, len(cols))
for _, col := range cols {
for i, col := range cols {
if col == nil {
continue
}
data, str, err := columnToAvroNativeData(col, tz)
data, str, err := columnToAvroNativeData(col, colInfos[i].Ft, tz)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -368,7 +369,7 @@ var (
zeroDateStr = types.NewTime(types.ZeroCoreTime, mysql.TypeDate, 0).String()
)

func columnToAvroNativeData(col *model.Column, tz *time.Location) (interface{}, string, error) {
func columnToAvroNativeData(col *model.Column, ft *types.FieldType, tz *time.Location) (interface{}, string, error) {
if col.Value == nil {
return nil, "null", nil
}
Expand Down Expand Up @@ -447,7 +448,7 @@ func columnToAvroNativeData(col *model.Column, tz *time.Location) (interface{},
d := types.NewDuration(hours, minutes, seconds, int(fracInt), fsp).Duration
const fullType = "int." + timeMillis
return d, string(fullType), nil
case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeTinyBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob, mysql.TypeBlob:
case mysql.TypeVarchar, mysql.TypeString, mysql.TypeVarString, mysql.TypeTinyBlob, mysql.TypeBlob, mysql.TypeMediumBlob, mysql.TypeLongBlob:
if col.Flag.IsBinary() {
switch val := col.Value.(type) {
case string:
Expand All @@ -472,9 +473,17 @@ func columnToAvroNativeData(col *model.Column, tz *time.Location) (interface{},
case mysql.TypeNewDecimal:
return col.Value.(string), "string", nil
case mysql.TypeEnum:
return col.Value.(types.Enum).Name, "string", nil
enumVar, err := types.ParseEnumValue(ft.Elems, col.Value.(uint64))
if err != nil {
return nil, "", cerror.WrapError(cerror.ErrAvroEncodeFailed, err)
}
return enumVar.Name, "string", nil
case mysql.TypeSet:
return col.Value.(types.Set).Name, "string", nil
setVar, err := types.ParseSetValue(ft.Elems, col.Value.(uint64))
if err != nil {
return nil, "", cerror.WrapError(cerror.ErrAvroEncodeFailed, err)
}
return setVar.Name, "string", nil
case mysql.TypeBit:
return handleUnsignedInt64()
case mysql.TypeTiny, mysql.TypeShort, mysql.TypeInt24:
Expand Down
97 changes: 77 additions & 20 deletions cdc/sink/codec/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
model2 "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/puller"
"github.com/pingcap/tiflow/pkg/regionspan"
Expand Down Expand Up @@ -58,6 +59,21 @@ func (s *avroBatchEncoderSuite) TearDownSuite(c *check.C) {
stopHTTPInterceptForTestingRegistry()
}

func setBinChsClnFlag(ft *types.FieldType) *types.FieldType {
types.SetBinChsClnFlag(ft)
return ft
}

func setFlag(ft *types.FieldType, flag uint) *types.FieldType {
types.SetTypeFlag(&ft.Flag, flag, true)
return ft
}

func setElems(ft *types.FieldType, elems []string) *types.FieldType {
ft.Elems = elems
return ft
}

func (s *avroBatchEncoderSuite) TestAvroEncodeOnly(c *check.C) {
defer testleak.AfterTest(c)()

Expand Down Expand Up @@ -85,23 +101,56 @@ func (s *avroBatchEncoderSuite) TestAvroEncodeOnly(c *check.C) {
{Name: "mystring5", Value: "Hello World", Type: mysql.TypeVarString},
{Name: "mystring6", Value: "Hello World", Type: mysql.TypeString},
{Name: "mystring7", Value: "Hello World", Type: mysql.TypeVarchar},
{Name: "myenum", Value: types.Enum{Value: 1, Name: "v"}, Type: mysql.TypeEnum},
{Name: "myset", Value: types.Set{Value: 1, Name: "v"}, Type: mysql.TypeSet},
{Name: "myenum", Value: uint64(1), Type: mysql.TypeEnum},
{Name: "myset", Value: uint64(1), Type: mysql.TypeSet},
{Name: "ts", Value: time.Now().Format(types.TimeFSPFormat), Type: mysql.TypeTimestamp},
{Name: "myjson", Value: "{\"foo\": \"bar\"}", Type: mysql.TypeJSON},
}

colInfos := []rowcodec.ColInfo{
{ID: 1, IsPKHandle: true, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeLong)},
{ID: 2, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeLong)},
{ID: 3, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeTiny)},
{ID: 4, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeFloat)},
{ID: 5, IsPKHandle: false, VirtualGenCol: false, Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeBlob))},
{ID: 6, IsPKHandle: false, VirtualGenCol: false, Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeMediumBlob))},
{ID: 7, IsPKHandle: false, VirtualGenCol: false, Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeTinyBlob))},
{ID: 8, IsPKHandle: false, VirtualGenCol: false, Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeLongBlob))},
{ID: 9, IsPKHandle: false, VirtualGenCol: false, Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeVarString))},
{ID: 10, IsPKHandle: false, VirtualGenCol: false, Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeString))},
{ID: 11, IsPKHandle: false, VirtualGenCol: false, Ft: setBinChsClnFlag(types.NewFieldType(mysql.TypeVarchar))},
{ID: 12, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeBlob)},
{ID: 13, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeMediumBlob)},
{ID: 14, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeTinyBlob)},
{ID: 15, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeLongBlob)},
{ID: 16, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeVarString)},
{ID: 17, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeString)},
{ID: 18, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeVarchar)},
{ID: 19, IsPKHandle: false, VirtualGenCol: false, Ft: setElems(types.NewFieldType(mysql.TypeEnum), []string{"a", "b"})},
{ID: 20, IsPKHandle: false, VirtualGenCol: false, Ft: setElems(types.NewFieldType(mysql.TypeSet), []string{"a", "b"})},
{ID: 21, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeTimestamp)},
{ID: 22, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeJSON)},
}

schema, err := ColumnInfoToAvroSchema(table.Table, cols)
c.Assert(err, check.IsNil)
avroCodec, err := goavro.NewCodec(schema)
c.Assert(err, check.IsNil)

r, err := avroEncode(&table, s.encoder.valueSchemaManager, 1, cols, time.Local)
r, err := avroEncode(&table, s.encoder.valueSchemaManager, 1, cols, colInfos, time.Local)
c.Assert(err, check.IsNil)

res, _, err := avroCodec.NativeFromBinary(r.data)
c.Check(err, check.IsNil)
c.Check(res, check.NotNil)
for k, v := range res.(map[string]interface{}) {
if k == "myenum" || k == "myset" {
if vmap, ok := v.(map[string]interface{}); ok {
_, exists := vmap["string"]
c.Check(exists, check.IsTrue)
}
}
}

txt, err := avroCodec.TextualFromNative(nil, res)
c.Check(err, check.IsNil)
Expand All @@ -110,21 +159,6 @@ func (s *avroBatchEncoderSuite) TestAvroEncodeOnly(c *check.C) {

func (s *avroBatchEncoderSuite) TestAvroTimeZone(c *check.C) {
defer testleak.AfterTest(c)()
avroCodec, err := goavro.NewCodec(`
{
"type": "record",
"name": "TestAvroTimeZone",
"fields" : [
{"name": "id", "type": ["null", "int"], "default": null},
{"name": "myint", "type": ["null", "int"], "default": null},
{"name": "mybool", "type": ["null", "int"], "default": null},
{"name": "myfloat", "type": ["null", "float"], "default": null},
{"name": "mystring", "type": ["null", "string"], "default": null},
{"name": "ts", "type": ["null", {"type": "long", "logicalType": "timestamp-millis"}], "default": null}
]
}`)

c.Assert(err, check.IsNil)

table := model.TableName{
Schema: "testdb",
Expand All @@ -135,14 +169,30 @@ func (s *avroBatchEncoderSuite) TestAvroTimeZone(c *check.C) {
c.Check(err, check.IsNil)

timestamp := time.Now()
r, err := avroEncode(&table, s.encoder.valueSchemaManager, 1, []*model.Column{
cols := []*model.Column{
{Name: "id", Value: int64(1), Type: mysql.TypeLong},
{Name: "myint", Value: int64(2), Type: mysql.TypeLong},
{Name: "mybool", Value: int64(1), Type: mysql.TypeTiny},
{Name: "myfloat", Value: float64(3.14), Type: mysql.TypeFloat},
{Name: "mystring", Value: []byte("Hello World"), Type: mysql.TypeBlob},
{Name: "ts", Value: timestamp.In(location).Format(types.TimeFSPFormat), Type: mysql.TypeTimestamp},
}, location)
}

colInfos := []rowcodec.ColInfo{
{ID: 1, IsPKHandle: true, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeLong)},
{ID: 2, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeLong)},
{ID: 3, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeTiny)},
{ID: 4, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeFloat)},
{ID: 5, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeBlob)},
{ID: 6, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeTimestamp)},
}

schema, err := ColumnInfoToAvroSchema(table.Table, cols)
c.Assert(err, check.IsNil)
avroCodec, err := goavro.NewCodec(schema)
c.Assert(err, check.IsNil)

r, err := avroEncode(&table, s.encoder.valueSchemaManager, 1, cols, colInfos, location)
c.Assert(err, check.IsNil)

res, _, err := avroCodec.NativeFromBinary(r.data)
Expand Down Expand Up @@ -206,6 +256,13 @@ func (s *avroBatchEncoderSuite) TestAvroEncode(c *check.C) {
{Name: "utiny", Type: mysql.TypeTiny, Flag: model.UnsignedFlag, Value: uint64(100)},
{Name: "comment", Type: mysql.TypeBlob, Value: []byte("测试")},
},
ColInfos: []rowcodec.ColInfo{
{ID: 1, IsPKHandle: true, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeLong)},
{ID: 2, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeVarchar)},
{ID: 3, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeTiny)},
{ID: 4, IsPKHandle: false, VirtualGenCol: false, Ft: setFlag(types.NewFieldType(mysql.TypeTiny), uint(model.UnsignedFlag))},
{ID: 5, IsPKHandle: false, VirtualGenCol: false, Ft: types.NewFieldType(mysql.TypeBlob)},
},
}

testCaseDdl := &model.DDLEvent{
Expand Down
4 changes: 0 additions & 4 deletions cdc/sink/codec/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,6 @@ func (b *canalEntryBuilder) formatValue(value interface{}, javaType JavaSQLType)
}
result = string(decoded)
}
case types.Enum:
result = strconv.FormatUint(v.Value, 10)
case types.Set:
result = strconv.FormatUint(v.Value, 10)
default:
result = fmt.Sprintf("%v", v)
}
Expand Down
5 changes: 2 additions & 3 deletions cdc/sink/codec/canal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/pingcap/check"
mm "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"golang.org/x/text/encoding/charmap"

"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -444,8 +443,8 @@ var testColumnsTable = []*testColumnTuple{
{&model.Column{Name: "time", Type: mysql.TypeDuration, Value: "02:20:20"}, "time", JavaSQLTypeTIME, "02:20:20"},
{&model.Column{Name: "year", Type: mysql.TypeYear, Value: "2020", Flag: model.UnsignedFlag}, "year", JavaSQLTypeVARCHAR, "2020"},

{&model.Column{Name: "enum", Type: mysql.TypeEnum, Value: types.Enum{Name: "v", Value: 1}}, "enum", JavaSQLTypeINTEGER, "1"},
{&model.Column{Name: "set", Type: mysql.TypeSet, Value: types.Set{Name: "v", Value: 3}}, "set", JavaSQLTypeBIT, "3"},
{&model.Column{Name: "enum", Type: mysql.TypeEnum, Value: uint64(1)}, "enum", JavaSQLTypeINTEGER, "1"},
{&model.Column{Name: "set", Type: mysql.TypeSet, Value: uint64(3)}, "set", JavaSQLTypeBIT, "3"},
{&model.Column{Name: "bit", Type: mysql.TypeBit, Value: uint64(65), Flag: model.UnsignedFlag | model.BinaryFlag}, "bit", JavaSQLTypeBIT, "65"},
{&model.Column{Name: "json", Type: mysql.TypeJSON, Value: "{\"key1\": \"value1\"}", Flag: model.BinaryFlag}, "json", JavaSQLTypeVARCHAR, "{\"key1\": \"value1\"}"},
}
Expand Down
7 changes: 1 addition & 6 deletions cdc/sink/codec/craft/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"unsafe"

"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tiflow/cdc/model"
)

Expand Down Expand Up @@ -194,11 +193,7 @@ func EncodeTiDBType(allocator *SliceAllocator, ty byte, flag model.ColumnFlagTyp
case mysql.TypeDate, mysql.TypeDatetime, mysql.TypeNewDate, mysql.TypeTimestamp, mysql.TypeDuration, mysql.TypeJSON, mysql.TypeNewDecimal:
// value type for these mysql types are string
return unsafeStringToBytes(value.(string))
case mysql.TypeEnum:
return encodeUvarint(allocator.byteSlice(binary.MaxVarintLen64)[:0], value.(types.Enum).Value)
case mysql.TypeSet:
return encodeUvarint(allocator.byteSlice(binary.MaxVarintLen64)[:0], value.(types.Set).Value)
case mysql.TypeBit:
case mysql.TypeEnum, mysql.TypeSet, mysql.TypeBit:
// value type for these mysql types are uint64
return encodeUvarint(allocator.byteSlice(binary.MaxVarintLen64)[:0], value.(uint64))
case mysql.TypeString, mysql.TypeVarString, mysql.TypeVarchar,
Expand Down
Loading

0 comments on commit 012c834

Please sign in to comment.