Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TiCDC support checking if data is entirely replicated to Downstream (#10133) #10336

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cdc/api/v1/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ func (p *mockStatusProvider) GetCaptures(ctx context.Context) ([]*model.CaptureI
return args.Get(0).([]*model.CaptureInfo), args.Error(1)
}

func (p *mockStatusProvider) GetChangeFeedSyncedStatus(ctx context.Context,
changefeedID model.ChangeFeedID,
) (*model.ChangeFeedSyncedStatusForAPI, error) {
args := p.Called(ctx)
return args.Get(0).(*model.ChangeFeedSyncedStatusForAPI), args.Error(1)
}

func (p *mockStatusProvider) IsHealthy(ctx context.Context) (bool, error) {
args := p.Called(ctx)
return args.Get(0).(bool), args.Error(1)
Expand Down
1 change: 1 addition & 0 deletions cdc/api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) {
changefeedGroup.POST("/:changefeed_id/resume", api.resumeChangefeed)
changefeedGroup.POST("/:changefeed_id/pause", api.pauseChangefeed)
changefeedGroup.GET("/:changefeed_id/status", api.status)
changefeedGroup.GET("/:changefeed_id/synced", api.synced)

// capture apis
captureGroup := v2.Group("/captures")
Expand Down
27 changes: 18 additions & 9 deletions cdc/api/v2/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func (m *mockPDClient) UpdateServiceGCSafePoint(ctx context.Context,
}

// GetTS of mockPDClient returns a mock tso
func (m *mockPDClient) GetTS(ctx context.Context) (int64, int64, error) {
return m.logicTime, m.timestamp, nil
func (c *mockPDClient) GetTS(ctx context.Context) (int64, int64, error) {
return c.timestamp, c.logicTime, nil
}

// GetClusterID of mockPDClient returns a mock ClusterID
Expand All @@ -62,13 +62,14 @@ func (c *mockPDClient) Close() {}

type mockStatusProvider struct {
owner.StatusProvider
changefeedStatus *model.ChangeFeedStatusForAPI
changefeedInfo *model.ChangeFeedInfo
processors []*model.ProcInfoSnap
taskStatus map[model.CaptureID]*model.TaskStatus
changefeedInfos map[model.ChangeFeedID]*model.ChangeFeedInfo
changefeedStatuses map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI
err error
changefeedStatus *model.ChangeFeedStatusForAPI
changefeedInfo *model.ChangeFeedInfo
processors []*model.ProcInfoSnap
taskStatus map[model.CaptureID]*model.TaskStatus
changefeedInfos map[model.ChangeFeedID]*model.ChangeFeedInfo
changefeedStatuses map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI
changeFeedSyncedStatus *model.ChangeFeedSyncedStatusForAPI
err error
}

// GetChangeFeedStatus returns a changefeeds' runtime status.
Expand Down Expand Up @@ -119,3 +120,11 @@ func (m *mockStatusProvider) GetAllChangeFeedStatuses(_ context.Context) (
) {
return m.changefeedStatuses, m.err
}

// GetChangeFeedSyncedStatus returns a mock changefeed status.
func (m *mockStatusProvider) GetChangeFeedSyncedStatus(_ context.Context, changefeedID model.ChangeFeedID) (
*model.ChangeFeedSyncedStatusForAPI,
error,
) {
return m.changeFeedSyncedStatus, m.err
}
137 changes: 137 additions & 0 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ const (
apiOpVarChangefeedState = "state"
// apiOpVarChangefeedID is the key of changefeed ID in HTTP API
apiOpVarChangefeedID = "changefeed_id"
// timeout for pd client
timeout = 30 * time.Second
)

// createChangefeed handles create changefeed request,
Expand Down Expand Up @@ -749,6 +751,141 @@ func (h *OpenAPIV2) status(c *gin.Context) {
})
}

// synced get the synced status of a changefeed
// @Summary Get synced status
// @Description get the synced status of a changefeed
// @Tags changefeed,v2
// @Accept json
// @Produce json
// @Param changefeed_id path string true "changefeed_id"
// @Param namespace query string false "default"
// @Success 200 {object} SyncedStatus
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v2/changefeeds/{changefeed_id}/synced [get]
func (h *OpenAPIV2) synced(c *gin.Context) {
ctx := c.Request.Context()

changefeedID := model.DefaultChangeFeedID(c.Param(apiOpVarChangefeedID))
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
return
}

status, err := h.capture.StatusProvider().GetChangeFeedSyncedStatus(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
}

log.Info("Get changefeed synced status:", zap.Any("status", status), zap.Any("changefeedID", changefeedID))

cfg := &ChangefeedConfig{ReplicaConfig: GetDefaultReplicaConfig()}
if (status.SyncedCheckInterval != 0) && (status.CheckpointInterval != 0) {
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval = status.CheckpointInterval
cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval = status.SyncedCheckInterval
}

// try to get pd client to get pd time, and determine synced status based on the pd time
if len(cfg.PDAddrs) == 0 {
up, err := getCaptureDefaultUpstream(h.capture)
if err != nil {
_ = c.Error(err)
return
}
cfg.PDConfig = getUpstreamPDConfig(up)
}
credential := cfg.PDConfig.toCredential()

timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
if err != nil {
// case 1. we can't get pd client, pd may be unavailable.
// if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced
// otherwise, if pd is unavailable, we decide data whether is synced based on
// the time difference between current time and lastSyncedTs.
var message string
if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) >
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 {
message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error())
} else {
message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+
"If pd is offline, please check whether we satisfy the condition that "+
"the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+
"If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval)
}
c.JSON(http.StatusOK, SyncedStatus{
Synced: false,
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
NowTs: model.JSONTime(time.Unix(0, 0)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use the local time here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because now_ts should be the time from pd, not locally. Here we can't get pd time, so we don't provider other local time, which may be misleading.

Info: message,
})
return
}
defer pdClient.Close()
// get time from pd
physicalNow, _, _ := pdClient.GetTS(ctx)

// We can normally get pd time. Thus we determine synced status based on physicalNow, lastSyncedTs, checkpointTs and pullerResolvedTs
if (physicalNow-oracle.ExtractPhysical(status.LastSyncedTs) > cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval*1000) &&
(physicalNow-oracle.ExtractPhysical(status.CheckpointTs) < cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000) {
// case 2: If physcialNow - lastSyncedTs > SyncedCheckInterval && physcialNow - CheckpointTs < CheckpointInterval
// --> reach strict synced status
c.JSON(http.StatusOK, SyncedStatus{
Synced: true,
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
NowTs: model.JSONTime(time.Unix(physicalNow/1e3, 0)),
Info: "Data syncing is finished",
})
return
}

if physicalNow-oracle.ExtractPhysical(status.LastSyncedTs) > cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval*1000 {
// case 3: If physcialNow - lastSyncedTs > SyncedCheckInterval && physcialNow - CheckpointTs > CheckpointInterval
// we should consider the situation that pd or tikv region is not healthy to block the advancing resolveTs.
// if pullerResolvedTs - checkpointTs > CheckpointInterval--> data is not synced
// otherwise, if pd & tikv is healthy --> data is not synced
// if not healthy --> data is synced
var message string
if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) <
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 {
message = fmt.Sprintf("Please check whether PD is online and TiKV Regions are all available. " +
"If PD is offline or some TiKV regions are not available, it means that the data syncing process is complete. " +
"To check whether TiKV regions are all available, " +
"you can view 'TiKV-Details' > 'Resolved-Ts' > 'Max Leader Resolved TS gap' on Grafana. " +
"If the gap is large, such as a few minutes, it means that some regions in TiKV are unavailable. " +
"Otherwise, if the gap is small and PD is online, it means the data syncing is incomplete, so please wait")
} else {
message = "The data syncing is not finished, please wait"
}
c.JSON(http.StatusOK, SyncedStatus{
Synced: false,
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
NowTs: model.JSONTime(time.Unix(physicalNow/1e3, 0)),
Info: message,
})
return
}

// case 4: If physcialNow - lastSyncedTs < SyncedCheckInterval --> data is not synced
c.JSON(http.StatusOK, SyncedStatus{
Synced: false,
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
NowTs: model.JSONTime(time.Unix(physicalNow/1e3, 0)),
Info: "The data syncing is not finished, please wait",
})
}

func toAPIModel(
info *model.ChangeFeedInfo,
resolvedTs uint64,
Expand Down
Loading
Loading