Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#10133
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
hongyunyan authored and ti-chi-bot committed Jan 29, 2024
1 parent 84c2df7 commit 6b25733
Show file tree
Hide file tree
Showing 37 changed files with 1,537 additions and 170 deletions.
7 changes: 7 additions & 0 deletions cdc/api/v1/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ func (p *mockStatusProvider) GetCaptures(ctx context.Context) ([]*model.CaptureI
return args.Get(0).([]*model.CaptureInfo), args.Error(1)
}

func (p *mockStatusProvider) GetChangeFeedSyncedStatus(ctx context.Context,
changefeedID model.ChangeFeedID,
) (*model.ChangeFeedSyncedStatusForAPI, error) {
args := p.Called(ctx)
return args.Get(0).(*model.ChangeFeedSyncedStatusForAPI), args.Error(1)
}

func (p *mockStatusProvider) IsHealthy(ctx context.Context) (bool, error) {
args := p.Called(ctx)
return args.Get(0).(bool), args.Error(1)
Expand Down
13 changes: 13 additions & 0 deletions cdc/api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) {

// changefeed apis
changefeedGroup := v2.Group("/changefeeds")
<<<<<<< HEAD
changefeedGroup.Use(middleware.ForwardToOwnerMiddleware(api.capture))
changefeedGroup.GET("/:changefeed_id", api.getChangeFeed)
changefeedGroup.POST("", api.createChangefeed)
Expand All @@ -59,6 +60,18 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) {
changefeedGroup.POST("/:changefeed_id/resume", api.resumeChangefeed)
changefeedGroup.POST("/:changefeed_id/pause", api.pauseChangefeed)
changefeedGroup.GET("/:changefeed_id/status", api.status)
=======
changefeedGroup.GET("/:changefeed_id", changefeedOwnerMiddleware, api.getChangeFeed)
changefeedGroup.POST("", controllerMiddleware, api.createChangefeed)
changefeedGroup.GET("", controllerMiddleware, api.listChangeFeeds)
changefeedGroup.PUT("/:changefeed_id", changefeedOwnerMiddleware, api.updateChangefeed)
changefeedGroup.DELETE("/:changefeed_id", controllerMiddleware, api.deleteChangefeed)
changefeedGroup.GET("/:changefeed_id/meta_info", changefeedOwnerMiddleware, api.getChangeFeedMetaInfo)
changefeedGroup.POST("/:changefeed_id/resume", changefeedOwnerMiddleware, api.resumeChangefeed)
changefeedGroup.POST("/:changefeed_id/pause", changefeedOwnerMiddleware, api.pauseChangefeed)
changefeedGroup.GET("/:changefeed_id/status", changefeedOwnerMiddleware, api.status)
changefeedGroup.GET("/:changefeed_id/synced", changefeedOwnerMiddleware, api.synced)
>>>>>>> 058786f385 (TiCDC support checking if data is entirely replicated to Downstream (#10133))

// capture apis
captureGroup := v2.Group("/captures")
Expand Down
35 changes: 28 additions & 7 deletions cdc/api/v2/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,13 @@ func (m *mockPDClient) UpdateServiceGCSafePoint(ctx context.Context,
}

// GetTS of mockPDClient returns a mock tso
<<<<<<< HEAD
func (m *mockPDClient) GetTS(ctx context.Context) (int64, int64, error) {
return m.logicTime, m.timestamp, nil
=======
func (c *mockPDClient) GetTS(ctx context.Context) (int64, int64, error) {
return c.timestamp, c.logicTime, nil
>>>>>>> 058786f385 (TiCDC support checking if data is entirely replicated to Downstream (#10133))
}

// GetClusterID of mockPDClient returns a mock ClusterID
Expand All @@ -62,13 +67,14 @@ func (c *mockPDClient) Close() {}

type mockStatusProvider struct {
owner.StatusProvider
changefeedStatus *model.ChangeFeedStatusForAPI
changefeedInfo *model.ChangeFeedInfo
processors []*model.ProcInfoSnap
taskStatus map[model.CaptureID]*model.TaskStatus
changefeedInfos map[model.ChangeFeedID]*model.ChangeFeedInfo
changefeedStatuses map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI
err error
changefeedStatus *model.ChangeFeedStatusForAPI
changefeedInfo *model.ChangeFeedInfo
processors []*model.ProcInfoSnap
taskStatus map[model.CaptureID]*model.TaskStatus
changefeedInfos map[model.ChangeFeedID]*model.ChangeFeedInfo
changefeedStatuses map[model.ChangeFeedID]*model.ChangeFeedStatusForAPI
changeFeedSyncedStatus *model.ChangeFeedSyncedStatusForAPI
err error
}

// GetChangeFeedStatus returns a changefeeds' runtime status.
Expand Down Expand Up @@ -119,3 +125,18 @@ func (m *mockStatusProvider) GetAllChangeFeedStatuses(_ context.Context) (
) {
return m.changefeedStatuses, m.err
}
<<<<<<< HEAD
=======

// GetChangeFeedSyncedStatus returns a mock changefeed status.
func (m *mockStatusProvider) GetChangeFeedSyncedStatus(_ context.Context, changefeedID model.ChangeFeedID) (
*model.ChangeFeedSyncedStatusForAPI,
error,
) {
return m.changeFeedSyncedStatus, m.err
}

func (m *mockStatusProvider) IsChangefeedOwner(_ context.Context, id model.ChangeFeedID) (bool, error) {
return true, nil
}
>>>>>>> 058786f385 (TiCDC support checking if data is entirely replicated to Downstream (#10133))
143 changes: 143 additions & 0 deletions cdc/api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,13 @@ const (
apiOpVarChangefeedState = "state"
// apiOpVarChangefeedID is the key of changefeed ID in HTTP API
apiOpVarChangefeedID = "changefeed_id"
<<<<<<< HEAD
=======
// apiOpVarNamespace is the key of changefeed namespace in HTTP API
apiOpVarNamespace = "namespace"
// timeout for pd client
timeout = 30 * time.Second
>>>>>>> 058786f385 (TiCDC support checking if data is entirely replicated to Downstream (#10133))
)

// createChangefeed handles create changefeed request,
Expand Down Expand Up @@ -826,6 +833,142 @@ func (h *OpenAPIV2) status(c *gin.Context) {
})
}

// synced get the synced status of a changefeed
// @Summary Get synced status
// @Description get the synced status of a changefeed
// @Tags changefeed,v2
// @Accept json
// @Produce json
// @Param changefeed_id path string true "changefeed_id"
// @Param namespace query string false "default"
// @Success 200 {object} SyncedStatus
// @Failure 500,400 {object} model.HTTPError
// @Router /api/v2/changefeeds/{changefeed_id}/synced [get]
func (h *OpenAPIV2) synced(c *gin.Context) {
ctx := c.Request.Context()

namespace := getNamespaceValueWithDefault(c)
changefeedID := model.ChangeFeedID{Namespace: namespace, ID: c.Param(apiOpVarChangefeedID)}
if err := model.ValidateChangefeedID(changefeedID.ID); err != nil {
_ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s",
changefeedID.ID))
return
}

status, err := h.capture.StatusProvider().GetChangeFeedSyncedStatus(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
}

log.Info("Get changefeed synced status:", zap.Any("status", status), zap.Any("changefeedID", changefeedID))

cfg := &ChangefeedConfig{ReplicaConfig: GetDefaultReplicaConfig()}
if (status.SyncedCheckInterval != 0) && (status.CheckpointInterval != 0) {
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval = status.CheckpointInterval
cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval = status.SyncedCheckInterval
}

// try to get pd client to get pd time, and determine synced status based on the pd time
if len(cfg.PDAddrs) == 0 {
up, err := getCaptureDefaultUpstream(h.capture)
if err != nil {
_ = c.Error(err)
return
}
cfg.PDConfig = getUpstreamPDConfig(up)
}
credential := cfg.PDConfig.toCredential()

timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

pdClient, err := h.helpers.getPDClient(timeoutCtx, cfg.PDAddrs, credential)
if err != nil {
// case 1. we can't get pd client, pd may be unavailable.
// if pullerResolvedTs - checkpointTs > checkpointInterval, data is not synced
// otherwise, if pd is unavailable, we decide data whether is synced based on
// the time difference between current time and lastSyncedTs.
var message string
if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) >
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 {
message = fmt.Sprintf("%s. Besides the data is not finish syncing", err.Error())
} else {
message = fmt.Sprintf("%s. You should check the pd status first. If pd status is normal, means we don't finish sync data. "+
"If pd is offline, please check whether we satisfy the condition that "+
"the time difference from lastSyncedTs to the current time from the time zone of pd is greater than %v secs. "+
"If it's satisfied, means the data syncing is totally finished", err, cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval)
}
c.JSON(http.StatusOK, SyncedStatus{
Synced: false,
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
NowTs: model.JSONTime(time.Unix(0, 0)),
Info: message,
})
return
}
defer pdClient.Close()
// get time from pd
physicalNow, _, _ := pdClient.GetTS(ctx)

// We can normally get pd time. Thus we determine synced status based on physicalNow, lastSyncedTs, checkpointTs and pullerResolvedTs
if (physicalNow-oracle.ExtractPhysical(status.LastSyncedTs) > cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval*1000) &&
(physicalNow-oracle.ExtractPhysical(status.CheckpointTs) < cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000) {
// case 2: If physcialNow - lastSyncedTs > SyncedCheckInterval && physcialNow - CheckpointTs < CheckpointInterval
// --> reach strict synced status
c.JSON(http.StatusOK, SyncedStatus{
Synced: true,
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
NowTs: model.JSONTime(time.Unix(physicalNow/1e3, 0)),
Info: "Data syncing is finished",
})
return
}

if physicalNow-oracle.ExtractPhysical(status.LastSyncedTs) > cfg.ReplicaConfig.SyncedStatus.SyncedCheckInterval*1000 {
// case 3: If physcialNow - lastSyncedTs > SyncedCheckInterval && physcialNow - CheckpointTs > CheckpointInterval
// we should consider the situation that pd or tikv region is not healthy to block the advancing resolveTs.
// if pullerResolvedTs - checkpointTs > CheckpointInterval--> data is not synced
// otherwise, if pd & tikv is healthy --> data is not synced
// if not healthy --> data is synced
var message string
if (oracle.ExtractPhysical(status.PullerResolvedTs) - oracle.ExtractPhysical(status.CheckpointTs)) <
cfg.ReplicaConfig.SyncedStatus.CheckpointInterval*1000 {
message = fmt.Sprintf("Please check whether pd is healthy and tikv region is all available. " +
"If pd is not healthy or tikv region is not available, the data syncing is finished. " +
"When pd is offline means that pd is not healthy. For tikv region, you can check the grafana info " +
"in 'TiKV-Details-Resolved-Ts-Max Leader Resolved TS gap'. If the gap is a large value, such as a few minutes, " +
"it means some regions in tikv are unavailable. " +
" Otherwise the data syncing is not finished, please wait")
} else {
message = "The data syncing is not finished, please wait"
}
c.JSON(http.StatusOK, SyncedStatus{
Synced: false,
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
NowTs: model.JSONTime(time.Unix(physicalNow/1e3, 0)),
Info: message,
})
return
}

// case 4: If physcialNow - lastSyncedTs < SyncedCheckInterval --> data is not synced
c.JSON(http.StatusOK, SyncedStatus{
Synced: false,
SinkCheckpointTs: model.JSONTime(oracle.GetTimeFromTS(status.CheckpointTs)),
PullerResolvedTs: model.JSONTime(oracle.GetTimeFromTS(status.PullerResolvedTs)),
LastSyncedTs: model.JSONTime(oracle.GetTimeFromTS(status.LastSyncedTs)),
NowTs: model.JSONTime(time.Unix(physicalNow/1e3, 0)),
Info: "The data syncing is not finished, please wait",
})
}

func toAPIModel(
info *model.ChangeFeedInfo,
resolvedTs uint64,
Expand Down
Loading

0 comments on commit 6b25733

Please sign in to comment.