Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#10901
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
3AceShowHand authored and ti-chi-bot committed Apr 28, 2024
1 parent 4757596 commit 4268a83
Show file tree
Hide file tree
Showing 2 changed files with 209 additions and 0 deletions.
17 changes: 17 additions & 0 deletions pkg/sink/codec/avro/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,23 @@ func (a *BatchEncoder) AppendRowChangedEvent(
e *model.RowChangedEvent,
callback func(),
) error {
<<<<<<< HEAD
=======
topic = sanitizeTopic(topic)

key, err := a.encodeKey(ctx, topic, e)
if err != nil {
log.Error("avro encoding key failed", zap.Error(err), zap.Any("event", e))
return errors.Trace(err)
}

value, err := a.encodeValue(ctx, topic, e)
if err != nil {
log.Error("avro encoding value failed", zap.Error(err), zap.Any("event", e))
return errors.Trace(err)
}

>>>>>>> ba6db077a8 (codec(ticdc): add more logs to the avro encoding to help detect potential key encoding failed issure (#10901))
message := common.NewMsg(
config.ProtocolAvro,
nil,
Expand Down
192 changes: 192 additions & 0 deletions pkg/sink/codec/avro/decoder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package avro

import (
"context"
"math/rand"
"testing"
"time"

timodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/stretchr/testify/require"
)

func TestDMLEventE2E(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

helper.Tk().MustExec("use test")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

codecConfig := common.NewConfig(config.ProtocolAvro)
codecConfig.EnableTiDBExtension = true
encoder, err := SetupEncoderAndSchemaRegistry4Testing(ctx, codecConfig)
defer TeardownEncoderAndSchemaRegistry4Testing()
require.NoError(t, err)
require.NotNil(t, encoder)

_ = helper.DDL2Event(`create table t(a varchar(64) not null, b varchar(64) default null, primary key(a))`)

event := helper.DML2Event(`insert into t values('a', 'b')`, "test", "t")

topic := "avro-test-topic"
err = encoder.AppendRowChangedEvent(ctx, topic, event, func() {})
require.NoError(t, err)

event = helper.DML2Event(`insert into t(a) values ('b')`, "test", "t")
err = encoder.AppendRowChangedEvent(ctx, topic, event, func() {})
require.NoError(t, err)

event = helper.DML2Event(`insert into t(a) values ('')`, "test", "t")
err = encoder.AppendRowChangedEvent(ctx, topic, event, func() {})
require.NoError(t, err)
}

func TestDecodeEvent(t *testing.T) {
codecConfig := common.NewConfig(config.ProtocolAvro)
codecConfig.EnableTiDBExtension = true
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

encoder, err := SetupEncoderAndSchemaRegistry4Testing(ctx, codecConfig)
defer TeardownEncoderAndSchemaRegistry4Testing()
require.NoError(t, err)
require.NotNil(t, encoder)

event := newLargeEvent()
colInfos := event.TableInfo.GetColInfosForRowChangedEvent()

rand.New(rand.NewSource(time.Now().Unix())).Shuffle(len(event.Columns), func(i, j int) {
event.Columns[i], event.Columns[j] = event.Columns[j], event.Columns[i]
colInfos[i], colInfos[j] = colInfos[j], colInfos[i]
})

topic := "avro-test-topic"
err = encoder.AppendRowChangedEvent(ctx, topic, event, func() {})
require.NoError(t, err)

messages := encoder.Build()
require.Len(t, messages, 1)
message := messages[0]

schemaM, err := NewConfluentSchemaManager(ctx, "http://127.0.0.1:8081", nil)
require.NoError(t, err)

decoder := NewDecoder(codecConfig, schemaM, topic)
err = decoder.AddKeyValue(message.Key, message.Value)
require.NoError(t, err)

messageType, exist, err := decoder.HasNext()
require.NoError(t, err)
require.True(t, exist)
require.Equal(t, model.MessageTypeRow, messageType)

decodedEvent, err := decoder.NextRowChangedEvent()
require.NoError(t, err)
require.NotNil(t, decodedEvent)
}

func TestDecodeDDLEvent(t *testing.T) {
t.Parallel()

config := &common.Config{
EnableTiDBExtension: true,
AvroEnableWatermark: true,
}

encoder := &BatchEncoder{
namespace: model.DefaultNamespace,
result: make([]*common.Message, 0, 1),
config: config,
}

message, err := encoder.EncodeDDLEvent(&model.DDLEvent{
StartTs: 1020,
CommitTs: 1030,
TableInfo: &model.TableInfo{
TableName: model.TableName{
Schema: "test",
Table: "t1",
TableID: 0,
IsPartition: false,
},
},
Type: timodel.ActionAddColumn,
Query: "ALTER TABLE test.t1 ADD COLUMN a int",
})
require.NoError(t, err)
require.NotNil(t, message)

topic := "test-topic"
decoder := NewDecoder(config, nil, topic)
err = decoder.AddKeyValue(message.Key, message.Value)
require.NoError(t, err)

messageType, exist, err := decoder.HasNext()
require.NoError(t, err)
require.True(t, exist)
require.Equal(t, model.MessageTypeDDL, messageType)

decodedEvent, err := decoder.NextDDLEvent()
require.NoError(t, err)
require.NotNil(t, decodedEvent)
require.Equal(t, uint64(1030), decodedEvent.CommitTs)
require.Equal(t, timodel.ActionAddColumn, decodedEvent.Type)
require.Equal(t, "ALTER TABLE test.t1 ADD COLUMN a int", decodedEvent.Query)
require.Equal(t, "test", decodedEvent.TableInfo.TableName.Schema)
require.Equal(t, "t1", decodedEvent.TableInfo.TableName.Table)
require.Equal(t, int64(0), decodedEvent.TableInfo.TableName.TableID)
require.False(t, decodedEvent.TableInfo.TableName.IsPartition)
}

func TestDecodeResolvedEvent(t *testing.T) {
t.Parallel()

config := &common.Config{
EnableTiDBExtension: true,
AvroEnableWatermark: true,
}

encoder := &BatchEncoder{
namespace: model.DefaultNamespace,
config: config,
result: make([]*common.Message, 0, 1),
}

resolvedTs := uint64(1591943372224)
message, err := encoder.EncodeCheckpointEvent(resolvedTs)
require.NoError(t, err)
require.NotNil(t, message)

topic := "test-topic"
decoder := NewDecoder(config, nil, topic)
err = decoder.AddKeyValue(message.Key, message.Value)
require.NoError(t, err)

messageType, exist, err := decoder.HasNext()
require.NoError(t, err)
require.True(t, exist)
require.Equal(t, model.MessageTypeResolved, messageType)

obtained, err := decoder.NextResolvedEvent()
require.NoError(t, err)
require.Equal(t, resolvedTs, obtained)
}

0 comments on commit 4268a83

Please sign in to comment.