Skip to content

Commit

Permalink
Merge remote-tracking branch 'pingcap/master' into cdc-remove-and-add…
Browse files Browse the repository at this point in the history
…-partitioning
  • Loading branch information
mjonss committed Sep 21, 2023
2 parents 8aeb695 + f5ecaf9 commit 2e0ac9b
Show file tree
Hide file tree
Showing 173 changed files with 4,018 additions and 2,532 deletions.
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ kafka_consumer:
storage_consumer:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_storage_consumer ./cmd/storage-consumer/main.go

pulsar_consumer:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_pulsar_consumer ./cmd/pulsar-consumer/main.go

kafka_consumer_image:
@which docker || (echo "docker not found in ${PATH}"; exit 1)
DOCKER_BUILDKIT=1 docker build -f ./deployments/ticdc/docker/kafka-consumer.Dockerfile . -t ticdc:kafka-consumer --platform linux/amd64
Expand All @@ -171,6 +174,10 @@ storage_consumer_image:
@which docker || (echo "docker not found in ${PATH}"; exit 1)
DOCKER_BUILDKIT=1 docker build -f ./deployments/ticdc/docker/storage-consumer.Dockerfile . -t ticdc:storage-consumer --platform linux/amd64

pulsar_consumer_image:
@which docker || (echo "docker not found in ${PATH}"; exit 1)
DOCKER_BUILDKIT=1 docker build -f ./deployments/ticdc/docker/pulsar-consumer.Dockerfile . -t ticdc:pulsar-consumer --platform linux/amd64

filter_helper:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_filter_helper ./cmd/filter-helper/main.go

Expand Down Expand Up @@ -215,7 +222,7 @@ check_third_party_binary:
@which bin/minio
@which bin/bin/schema-registry-start

integration_test_build: check_failpoint_ctl storage_consumer kafka_consumer
integration_test_build: check_failpoint_ctl storage_consumer kafka_consumer pulsar_consumer
$(FAILPOINT_ENABLE)
$(GOTEST) -ldflags '$(LDFLAGS)' -c -cover -covermode=atomic \
-coverpkg=github.com/pingcap/tiflow/... \
Expand Down
11 changes: 3 additions & 8 deletions cdc/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,13 +308,6 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
_ = c.Error(err)
return
}

infoStr, err := info.Marshal()
if err != nil {
_ = c.Error(err)
return
}

upstreamInfo := &model.UpstreamInfo{
ID: up.ID,
PDEndpoints: strings.Join(up.PdEndpoints, ","),
Expand All @@ -331,7 +324,9 @@ func (h *OpenAPI) CreateChangefeed(c *gin.Context) {
return
}

log.Info("Create changefeed successfully!", zap.String("id", changefeedConfig.ID), zap.String("changefeed", infoStr))
log.Info("Create changefeed successfully!",
zap.String("id", changefeedConfig.ID),
zap.String("changefeed", info.String()))
c.Status(http.StatusAccepted)
}

Expand Down
5 changes: 2 additions & 3 deletions cdc/api/v1/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,13 @@ func verifyCreateChangefeedConfig(
if err != nil {
return nil, err
}
// set sortEngine and EnableOldValue
// set sortEngine
cdcClusterVer, err := version.GetTiCDCClusterVersion(model.ListVersionsFromCaptureInfos(captureInfos))
if err != nil {
return nil, err
}
sortEngine := model.SortUnified
if !cdcClusterVer.ShouldEnableOldValueByDefault() {
replicaConfig.EnableOldValue = false
if !cdcClusterVer.LessThan500RC() {
log.Warn("The TiCDC cluster is built from unknown branch or less than 5.0.0-rc, the old-value are disabled by default.")
if !cdcClusterVer.ShouldEnableUnifiedSorterByDefault() {
sortEngine = model.SortInMemory
Expand Down
1 change: 0 additions & 1 deletion cdc/api/v2/api_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,6 @@ func (APIV2HelpersImpl) verifyUpdateChangefeedConfig(
if cfg.CertAllowedCN != nil {
newUpInfo.CertAllowedCN = cfg.CertAllowedCN
}

changefeedInfoChanged := diff.Changed(oldInfo, newInfo)
upstreamInfoChanged := diff.Changed(oldUpInfo, newUpInfo)
if !changefeedInfoChanged && !upstreamInfoChanged {
Expand Down
32 changes: 8 additions & 24 deletions cdc/api/v2/api_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) {
})
cfg.ReplicaConfig = GetDefaultReplicaConfig()
cfg.ReplicaConfig.ForceReplicate = true
cfg.ReplicaConfig.EnableOldValue = false
cfg.SinkURI = "mysql://"
ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, nil)
// disable old value but force replicate, and using mysql sink.
Expand All @@ -68,47 +67,44 @@ func TestVerifyCreateChangefeedConfig(t *testing.T) {

// invalid changefeed id or namespace id
cfg.ID = "abdc/sss"
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage)
_, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage)
require.NotNil(t, err)
cfg.ID = ""
cfg.Namespace = "abdc/sss"
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage)
_, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage)
require.NotNil(t, err)
cfg.ID = ""
cfg.Namespace = ""
// changefeed already exists
ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(true, nil)
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage)
_, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage)
require.NotNil(t, err)
ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, cerror.ErrChangeFeedNotExists.GenWithStackByArgs("aaa"))
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage)
_, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage)
require.Nil(t, err)
require.Equal(t, uint64(123), cfInfo.UpstreamID)
cfg.TargetTs = 3
cfg.StartTs = 4
ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, nil)
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage)
_, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage)
require.NotNil(t, err)
cfg.TargetTs = 6
cfg.ReplicaConfig.EnableOldValue = false
cfg.SinkURI = "aaab://"
ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, nil)
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage)
_, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage)
require.NotNil(t, err)
cfg.SinkURI = string([]byte{0x7f, ' '})
ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, nil)
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage)
_, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage)
require.NotNil(t, err)

cfg.StartTs = 0
// use blackhole to workaround
cfg.SinkURI = "blackhole://127.0.0.1:9092/test?protocol=avro"
cfg.ReplicaConfig.EnableOldValue = true
cfg.ReplicaConfig.ForceReplicate = false
ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, nil)
cfInfo, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage)
_, err = h.verifyCreateChangefeedConfig(ctx, cfg, pdClient, ctrl, "en", storage)
require.NoError(t, err)
require.False(t, cfInfo.Config.EnableOldValue)

cfg.ReplicaConfig.ForceReplicate = true
ctrl.EXPECT().IsChangefeedExists(gomock.Any(), gomock.Any()).Return(false, nil)
Expand Down Expand Up @@ -165,16 +161,4 @@ func TestVerifyUpdateChangefeedConfig(t *testing.T) {
cfg.TargetTs = 9
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.NotNil(t, err)

cfg.StartTs = 0
cfg.TargetTs = 0
cfg.ReplicaConfig.EnableOldValue = true
cfg.SinkURI = "blackhole://127.0.0.1:9092/test?protocol=avro"
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.NoError(t, err)
require.False(t, newCfInfo.Config.EnableOldValue)

cfg.ReplicaConfig.ForceReplicate = true
newCfInfo, newUpInfo, err = h.verifyUpdateChangefeedConfig(ctx, cfg, oldInfo, oldUpInfo, storage, 0)
require.Error(t, cerror.ErrOldValueNotEnabled, err)
}
75 changes: 39 additions & 36 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package v2

import (
"context"
"fmt"
"net/http"
"sort"
"strings"
Expand All @@ -23,16 +24,17 @@ import (
"github.com/gin-gonic/gin"
"github.com/pingcap/errors"
"github.com/pingcap/log"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/pingcap/tiflow/cdc/api"
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -135,12 +137,6 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {
CAPath: cfg.CAPath,
CertAllowedCN: cfg.CertAllowedCN,
}
infoStr, err := info.Marshal()
if err != nil {
needRemoveGCSafePoint = true
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
}

// cannot create changefeed if there are running lightning/restore tasks
tlsCfg, err := credential.ToTLSConfig()
Expand Down Expand Up @@ -176,7 +172,7 @@ func (h *OpenAPIV2) createChangefeed(c *gin.Context) {

log.Info("Create changefeed successfully!",
zap.String("id", info.ID),
zap.String("changefeed", infoStr))
zap.String("changefeed", info.String()))
c.JSON(http.StatusOK, toAPIModel(info,
info.StartTs, info.StartTs,
nil, true))
Expand Down Expand Up @@ -446,26 +442,31 @@ func (h *OpenAPIV2) updateChangefeed(c *gin.Context) {
zap.String("changefeedInfo", oldCfInfo.String()),
zap.Any("upstreamInfo", OldUpInfo))

var pdAddrs []string
var credentials *security.Credential
if OldUpInfo != nil {
pdAddrs = strings.Split(OldUpInfo.PDEndpoints, ",")
credentials = &security.Credential{
CAPath: OldUpInfo.CAPath,
CertPath: OldUpInfo.CertPath,
KeyPath: OldUpInfo.KeyPath,
CertAllowedCN: OldUpInfo.CertAllowedCN,
}
}
if len(updateCfConfig.PDAddrs) != 0 {
pdAddrs = updateCfConfig.PDAddrs
credentials = updateCfConfig.PDConfig.toCredential()
upManager, err := h.capture.GetUpstreamManager()
if err != nil {
_ = c.Error(err)
return
}

storage, err := h.helpers.createTiStore(pdAddrs, credentials)
if err != nil {
_ = c.Error(errors.Trace(err))
var storage tidbkv.Storage
// if PDAddrs is not empty, use it to create a new kvstore
// Note: upManager is nil in some unit test cases
if len(updateCfConfig.PDAddrs) != 0 || upManager == nil {
pdAddrs := updateCfConfig.PDAddrs
credentials := updateCfConfig.PDConfig.toCredential()
storage, err = h.helpers.createTiStore(pdAddrs, credentials)
if err != nil {
_ = c.Error(errors.Trace(err))
}
} else { // get the upstream of the changefeed to get the kvstore
up, ok := upManager.Get(oldCfInfo.UpstreamID)
if !ok {
_ = c.Error(errors.New(fmt.Sprintf("upstream %d not found", oldCfInfo.UpstreamID)))
return
}
storage = up.KVStorage
}

newCfInfo, newUpInfo, err := h.helpers.verifyUpdateChangefeedConfig(ctx,
updateCfConfig, oldCfInfo, OldUpInfo, storage, cfStatus.CheckpointTs)
if err != nil {
Expand Down Expand Up @@ -719,24 +720,26 @@ func (h *OpenAPIV2) resumeChangefeed(c *gin.Context) {
return
}

var pdClient pd.Client
// if PDAddrs is empty, use the default pdClient
if len(cfg.PDAddrs) == 0 {
up, err := getCaptureDefaultUpstream(h.capture)
if err != nil {
_ = c.Error(err)
return
}
cfg.PDConfig = getUpstreamPDConfig(up)
}
credential := cfg.PDConfig.toCredential()

timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
if err != nil {
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
pdClient = up.PDClient
} else {
credential := cfg.PDConfig.toCredential()
timeoutCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
pdClient, err = h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
if err != nil {
_ = c.Error(cerror.WrapError(cerror.ErrAPIInvalidParam, err))
return
}
defer pdClient.Close()
}
defer pdClient.Close()

if err := h.helpers.verifyResumeChangefeedConfig(
ctx,
Expand Down
18 changes: 11 additions & 7 deletions cdc/api/v2/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,14 +333,14 @@ func TestUpdateChangefeed(t *testing.T) {
t.Parallel()
update := testCase{url: "/api/v2/changefeeds/%s", method: "PUT"}
helpers := NewMockAPIV2Helpers(gomock.NewController(t))
cp := mock_capture.NewMockCapture(gomock.NewController(t))
apiV2 := NewOpenAPIV2ForTest(cp, helpers)
mockCapture := mock_capture.NewMockCapture(gomock.NewController(t))
apiV2 := NewOpenAPIV2ForTest(mockCapture, helpers)
router := newRouter(apiV2)

statusProvider := &mockStatusProvider{}
cp.EXPECT().StatusProvider().Return(statusProvider).AnyTimes()
cp.EXPECT().IsReady().Return(true).AnyTimes()
cp.EXPECT().IsController().Return(true).AnyTimes()
mockCapture.EXPECT().StatusProvider().Return(statusProvider).AnyTimes()
mockCapture.EXPECT().IsReady().Return(true).AnyTimes()
mockCapture.EXPECT().IsController().Return(true).AnyTimes()

// case 1 invalid id
invalidID := "Invalid_#"
Expand Down Expand Up @@ -394,7 +394,7 @@ func TestUpdateChangefeed(t *testing.T) {
etcdClient.EXPECT().
GetUpstreamInfo(gomock.Any(), gomock.Eq(uint64(100)), gomock.Any()).
Return(nil, cerrors.ErrUpstreamNotFound).Times(1)
cp.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes()
mockCapture.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes()

w = httptest.NewRecorder()
req, _ = http.NewRequestWithContext(context.Background(), update.method,
Expand All @@ -411,7 +411,7 @@ func TestUpdateChangefeed(t *testing.T) {
etcdClient.EXPECT().
GetUpstreamInfo(gomock.Any(), gomock.Eq(uint64(1)), gomock.Any()).
Return(nil, nil).AnyTimes()
cp.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes()
mockCapture.EXPECT().GetEtcdClient().Return(etcdClient).AnyTimes()

w = httptest.NewRecorder()
req, _ = http.NewRequestWithContext(context.Background(), update.method,
Expand Down Expand Up @@ -448,6 +448,7 @@ func TestUpdateChangefeed(t *testing.T) {
createTiStore(gomock.Any(), gomock.Any()).
Return(nil, nil).
AnyTimes()
mockCapture.EXPECT().GetUpstreamManager().Return(nil, nil).AnyTimes()
helpers.EXPECT().
verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&model.ChangeFeedInfo{}, &model.UpstreamInfo{}, cerrors.ErrChangefeedUpdateRefused).
Expand All @@ -467,6 +468,7 @@ func TestUpdateChangefeed(t *testing.T) {
require.Equal(t, http.StatusBadRequest, w.Code)

// case 7: update transaction failed
mockCapture.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes()
helpers.EXPECT().
verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&model.ChangeFeedInfo{}, &model.UpstreamInfo{}, nil).
Expand All @@ -490,6 +492,7 @@ func TestUpdateChangefeed(t *testing.T) {
verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(oldCfInfo, &model.UpstreamInfo{}, nil).
Times(1)
mockCapture.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes()
etcdClient.EXPECT().
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).Times(1)
Expand All @@ -506,6 +509,7 @@ func TestUpdateChangefeed(t *testing.T) {
verifyUpdateChangefeedConfig(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(oldCfInfo, &model.UpstreamInfo{}, nil).
Times(1)
mockCapture.EXPECT().GetUpstreamManager().Return(upstream.NewManager4Test(&mockPDClient{}), nil).AnyTimes()
etcdClient.EXPECT().
UpdateChangefeedAndUpstream(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).Times(1)
Expand Down
3 changes: 0 additions & 3 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ func (d *JSONDuration) UnmarshalJSON(b []byte) error {
type ReplicaConfig struct {
MemoryQuota uint64 `json:"memory_quota"`
CaseSensitive bool `json:"case_sensitive"`
EnableOldValue bool `json:"enable_old_value"`
ForceReplicate bool `json:"force_replicate"`
IgnoreIneligibleTable bool `json:"ignore_ineligible_table"`
CheckGCSafePoint bool `json:"check_gc_safe_point"`
Expand Down Expand Up @@ -206,7 +205,6 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
) *config.ReplicaConfig {
res.MemoryQuota = c.MemoryQuota
res.CaseSensitive = c.CaseSensitive
res.EnableOldValue = c.EnableOldValue
res.ForceReplicate = c.ForceReplicate
res.CheckGCSafePoint = c.CheckGCSafePoint
res.EnableSyncPoint = c.EnableSyncPoint
Expand Down Expand Up @@ -487,7 +485,6 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
res := &ReplicaConfig{
MemoryQuota: cloned.MemoryQuota,
CaseSensitive: cloned.CaseSensitive,
EnableOldValue: cloned.EnableOldValue,
ForceReplicate: cloned.ForceReplicate,
IgnoreIneligibleTable: cloned.IgnoreIneligibleTable,
CheckGCSafePoint: cloned.CheckGCSafePoint,
Expand Down
Loading

0 comments on commit 2e0ac9b

Please sign in to comment.