diff --git a/cdc/kv/client.go b/cdc/kv/client.go index 2ba11d93046..1b3efa924e8 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -14,6 +14,7 @@ package kv import ( + "container/list" "context" "fmt" "io" @@ -198,6 +199,13 @@ type CDCKVClient interface { lockResolver txnutil.LockResolver, eventCh chan<- model.RegionFeedEvent, ) error + + // RegionCount returns the number of captured regions. + RegionCount() uint64 + // ResolvedTs returns the current ingress resolved ts. + ResolvedTs() model.Ts + // CommitTs returns the current ingress commit ts. + CommitTs() model.Ts } // NewCDCKVClient is the constructor of CDC KV client @@ -212,14 +220,20 @@ type CDCClient struct { grpcPool GrpcPool - regionCache *tikv.RegionCache - pdClock pdutil.Clock + regionCache *tikv.RegionCache + pdClock pdutil.Clock + regionLimiters *regionEventFeedLimiters changefeed model.ChangeFeedID tableID model.TableID tableName string - regionLimiters *regionEventFeedLimiters + regionCounts struct { + sync.Mutex + counts *list.List + } + ingressCommitTs model.Ts + ingressResolvedTs model.Ts } // NewCDCClient creates a CDCClient instance @@ -248,6 +262,12 @@ func NewCDCClient( changefeed: changefeed, tableID: tableID, tableName: tableName, + regionCounts: struct { + sync.Mutex + counts *list.List + }{ + counts: list.New(), + }, } return } @@ -303,9 +323,35 @@ func (c *CDCClient) EventFeed( lockResolver txnutil.LockResolver, eventCh chan<- model.RegionFeedEvent, ) error { + c.regionCounts.Lock() + regionCount := int64(0) + c.regionCounts.counts.PushBack(®ionCount) + c.regionCounts.Unlock() s := newEventFeedSession( ctx, c, span, lockResolver, ts, eventCh, c.changefeed, c.tableID, c.tableName) - return s.eventFeed(ctx, ts) + return s.eventFeed(ctx, ts, ®ionCount) +} + +// RegionCount returns the number of captured regions. +func (c *CDCClient) RegionCount() uint64 { + c.regionCounts.Lock() + defer c.regionCounts.Unlock() + + totalCount := uint64(0) + for e := c.regionCounts.counts.Front(); e != nil; e = e.Next() { + totalCount += uint64(atomic.LoadInt64(e.Value.(*int64))) + } + return totalCount +} + +// ResolvedTs returns the current ingress resolved ts. +func (c *CDCClient) ResolvedTs() model.Ts { + return atomic.LoadUint64(&c.ingressResolvedTs) +} + +// CommitTs returns the current ingress commit ts. +func (c *CDCClient) CommitTs() model.Ts { + return atomic.LoadUint64(&c.ingressCommitTs) } var currentID uint64 = 0 @@ -403,7 +449,7 @@ func newEventFeedSession( } } -func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { +func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64, regionCount *int64) error { eventFeedGauge.Inc() defer eventFeedGauge.Dec() @@ -414,7 +460,7 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error { }) g.Go(func() error { - return s.requestRegionToStore(ctx, g) + return s.requestRegionToStore(ctx, g, regionCount) }) g.Go(func() error { @@ -593,6 +639,7 @@ func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErr func (s *eventFeedSession) requestRegionToStore( ctx context.Context, g *errgroup.Group, + regionCount *int64, ) error { // Stores pending regions info for each stream. After sending a new request, the region info wil be put to the map, // and it will be loaded by the receiver thread when it receives the first response from that region. We need this @@ -682,7 +729,8 @@ func (s *eventFeedSession) requestRegionToStore( g.Go(func() error { defer s.deleteStream(storeAddr) - return s.receiveFromStream(ctx, g, storeAddr, storeID, stream.client, pendingRegions) + return s.receiveFromStream( + ctx, g, storeAddr, storeID, stream.client, pendingRegions, regionCount) }) } @@ -1035,6 +1083,7 @@ func (s *eventFeedSession) receiveFromStream( storeID uint64, stream cdcpb.ChangeData_EventFeedClient, pendingRegions *syncRegionFeedStateMap, + regionCount *int64, ) error { // Cancel the pending regions if the stream failed. // Otherwise, it will remain unhandled in the pendingRegions list @@ -1067,6 +1116,7 @@ func (s *eventFeedSession) receiveFromStream( return worker.run(ctx) }) + maxCommitTs := model.Ts(0) for { cevent, err := stream.Recv() @@ -1141,6 +1191,14 @@ func (s *eventFeedSession) receiveFromStream( zap.Int("resolvedRegionCount", regionCount)) } + if len(cevent.Events) != 0 { + if entries, ok := cevent.Events[0].Event.(*cdcpb.Event_Entries_); ok { + commitTs := entries.Entries.Entries[0].CommitTs + if maxCommitTs < commitTs { + maxCommitTs = commitTs + } + } + } err = s.sendRegionChangeEvents(ctx, cevent.Events, worker, pendingRegions, addr) if err != nil { return err @@ -1151,6 +1209,17 @@ func (s *eventFeedSession) receiveFromStream( if err != nil { return err } + // TiKV send resolved ts events every second by default. + // We check and update region count here to save CPU. + atomic.StoreInt64(regionCount, worker.statesManager.regionCount()) + atomic.StoreUint64(&s.client.ingressResolvedTs, cevent.ResolvedTs.Ts) + if maxCommitTs == 0 { + // In case, there is no write for the table, + // we use resolved ts as maxCommitTs to make the stats meaningful. + atomic.StoreUint64(&s.client.ingressCommitTs, cevent.ResolvedTs.Ts) + } else { + atomic.StoreUint64(&s.client.ingressCommitTs, maxCommitTs) + } } } } diff --git a/cdc/kv/region_state.go b/cdc/kv/region_state.go index 497b7afa2a8..05078099856 100644 --- a/cdc/kv/region_state.go +++ b/cdc/kv/region_state.go @@ -288,3 +288,10 @@ func (rsm *regionStateManager) delState(regionID uint64) { bucket := rsm.getBucket(regionID) rsm.states[bucket].delByRegionID(regionID) } + +func (rsm *regionStateManager) regionCount() (count int64) { + for _, bucket := range rsm.states { + count += int64(bucket.len()) + } + return +} diff --git a/cdc/model/owner.go b/cdc/model/owner.go index f2622266e87..3ab56c8175c 100644 --- a/cdc/model/owner.go +++ b/cdc/model/owner.go @@ -176,31 +176,6 @@ func (o *TableOperation) Clone() *TableOperation { return &clone } -// TaskWorkload records the workloads of a task -// the value of the struct is the workload -type TaskWorkload map[TableID]WorkloadInfo - -// WorkloadInfo records the workload info of a table -type WorkloadInfo struct { - Workload uint64 `json:"workload"` -} - -// Unmarshal unmarshals into *TaskWorkload from json marshal byte slice -func (w *TaskWorkload) Unmarshal(data []byte) error { - err := json.Unmarshal(data, w) - return errors.Annotatef( - cerror.WrapError(cerror.ErrUnmarshalFailed, err), "Unmarshal data: %v", data) -} - -// Marshal returns the json marshal format of a TaskWorkload -func (w *TaskWorkload) Marshal() (string, error) { - if w == nil { - return "{}", nil - } - data, err := json.Marshal(w) - return string(data), cerror.WrapError(cerror.ErrMarshalFailed, err) -} - // TableReplicaInfo records the table replica info type TableReplicaInfo struct { StartTs Ts `json:"start-ts"` diff --git a/cdc/model/owner_test.go b/cdc/model/owner_test.go index 0712ffba26f..65854b27041 100644 --- a/cdc/model/owner_test.go +++ b/cdc/model/owner_test.go @@ -114,30 +114,6 @@ func TestTableOperationState(t *testing.T) { require.Nil(t, nilTableOper.Clone()) } -func TestTaskWorkloadMarshal(t *testing.T) { - t.Parallel() - - workload := &TaskWorkload{ - 12: WorkloadInfo{Workload: uint64(1)}, - 15: WorkloadInfo{Workload: uint64(3)}, - } - expected := `{"12":{"workload":1},"15":{"workload":3}}` - - data, err := workload.Marshal() - require.Nil(t, err) - require.Equal(t, expected, data) - - newWorkload := &TaskWorkload{} - err = newWorkload.Unmarshal([]byte(data)) - require.Nil(t, err) - require.Equal(t, workload, newWorkload) - - workload = nil - data, err = workload.Marshal() - require.Nil(t, err) - require.Equal(t, "{}", data) -} - func TestShouldBeDeepCopy(t *testing.T) { t.Parallel() diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index a87573f5649..c03942196f2 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -41,9 +41,9 @@ import ( "go.uber.org/zap" ) -// newSchedulerV2FromCtx creates a new schedulerV2 from context. +// newSchedulerFromCtx creates a new schedulerV2 from context. // This function is factored out to facilitate unit testing. -func newSchedulerV2FromCtx( +func newSchedulerFromCtx( ctx cdcContext.Context, startTs uint64, ) (ret scheduler.Scheduler, err error) { changeFeedID := ctx.ChangefeedVars().ID @@ -64,8 +64,10 @@ func newSchedulerV2FromCtx( return ret, errors.Trace(err) } -func newScheduler(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error) { - return newSchedulerV2FromCtx(ctx, startTs) +func newScheduler( + ctx cdcContext.Context, startTs uint64, +) (scheduler.Scheduler, error) { + return newSchedulerFromCtx(ctx, startTs) } type changefeed struct { @@ -119,6 +121,7 @@ type changefeed struct { metricsChangefeedResolvedTsGauge prometheus.Gauge metricsChangefeedResolvedTsLagGauge prometheus.Gauge metricsChangefeedResolvedTsLagDuration prometheus.Observer + metricsCurrentPDTsGauge prometheus.Gauge metricsChangefeedBarrierTsGauge prometheus.Gauge metricsChangefeedTickDuration prometheus.Observer @@ -131,7 +134,9 @@ type changefeed struct { ) (puller.DDLPuller, error) newSink func() DDLSink - newScheduler func(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error) + newScheduler func( + ctx cdcContext.Context, startTs uint64, + ) (scheduler.Scheduler, error) lastDDLTs uint64 // Timestamp of the last executed DDL. Only used for tests. } @@ -169,7 +174,9 @@ func newChangefeed4Test( changefeed model.ChangeFeedID, ) (puller.DDLPuller, error), newSink func() DDLSink, - newScheduler func(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error), + newScheduler func( + ctx cdcContext.Context, startTs uint64, + ) (scheduler.Scheduler, error), ) *changefeed { c := newChangefeed(id, state, up) c.newDDLPuller = newDDLPuller @@ -549,6 +556,7 @@ func (c *changefeed) initMetrics() { WithLabelValues(c.id.Namespace, c.id.ID) c.metricsChangefeedResolvedTsLagDuration = changefeedResolvedTsLagDuration. WithLabelValues(c.id.Namespace, c.id.ID) + c.metricsCurrentPDTsGauge = currentPDTsGauge.WithLabelValues(c.id.Namespace, c.id.ID) c.metricsChangefeedBarrierTsGauge = changefeedBarrierTsGauge. WithLabelValues(c.id.Namespace, c.id.ID) @@ -615,9 +623,11 @@ func (c *changefeed) cleanupMetrics() { changefeedResolvedTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID) changefeedResolvedTsLagGauge.DeleteLabelValues(c.id.Namespace, c.id.ID) changefeedResolvedTsLagDuration.DeleteLabelValues(c.id.Namespace, c.id.ID) + currentPDTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID) c.metricsChangefeedResolvedTsGauge = nil c.metricsChangefeedResolvedTsLagGauge = nil c.metricsChangefeedResolvedTsLagDuration = nil + c.metricsCurrentPDTsGauge = nil changefeedTickDuration.DeleteLabelValues(c.id.Namespace, c.id.ID) c.metricsChangefeedTickDuration = nil @@ -903,6 +913,8 @@ func (c *changefeed) updateMetrics(currentTs int64, checkpointTs, resolvedTs mod resolvedLag := float64(currentTs-phyRTs) / 1e3 c.metricsChangefeedResolvedTsLagGauge.Set(resolvedLag) c.metricsChangefeedResolvedTsLagDuration.Observe(resolvedLag) + + c.metricsCurrentPDTsGauge.Set(float64(currentTs)) } func (c *changefeed) updateStatus(checkpointTs, resolvedTs model.Ts) { diff --git a/cdc/owner/metrics.go b/cdc/owner/metrics.go index 7d430fa45aa..abafa22a231 100644 --- a/cdc/owner/metrics.go +++ b/cdc/owner/metrics.go @@ -42,6 +42,13 @@ var ( Name: "checkpoint_ts_lag", Help: "checkpoint ts lag of changefeeds in seconds", }, []string{"namespace", "changefeed"}) + currentPDTsGauge = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "owner", + Name: "current_pd_ts", + Help: "The current PD ts", + }, []string{"namespace", "changefeed"}) changefeedCheckpointLagDuration = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -134,6 +141,7 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(changefeedResolvedTsGauge) registry.MustRegister(changefeedResolvedTsLagGauge) registry.MustRegister(changefeedResolvedTsLagDuration) + registry.MustRegister(currentPDTsGauge) registry.MustRegister(ownershipCounter) registry.MustRegister(changefeedStatusGauge) diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index 2097ca91ccc..de46c008f08 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -61,7 +61,9 @@ func newOwner4Test( changefeed model.ChangeFeedID, ) (puller.DDLPuller, error), newSink func() DDLSink, - newScheduler func(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error), + newScheduler func( + ctx cdcContext.Context, startTs uint64, + ) (scheduler.Scheduler, error), pdClient pd.Client, ) Owner { m := upstream.NewManager4Test(pdClient) @@ -100,7 +102,9 @@ func createOwner4Test(ctx cdcContext.Context, t *testing.T) (*ownerImpl, *orches return &mockDDLSink{} }, // new scheduler - func(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error) { + func( + ctx cdcContext.Context, startTs uint64, + ) (scheduler.Scheduler, error) { return &mockScheduler{}, nil }, pdClient, diff --git a/cdc/processor/pipeline/puller.go b/cdc/processor/pipeline/puller.go index 62a88951f33..5706ebe9769 100644 --- a/cdc/processor/pipeline/puller.go +++ b/cdc/processor/pipeline/puller.go @@ -31,6 +31,7 @@ import ( type pullerNode struct { tableName string // quoted schema and table, used in metircs only + plr puller.Puller tableID model.TableID startTs model.Ts changefeed model.ChangeFeedID @@ -70,7 +71,7 @@ func (n *pullerNode) start(ctx pipeline.NodeContext, kvCfg := config.GetGlobalServerConfig().KVClient // NOTICE: always pull the old value internally // See also: https://github.com/pingcap/tiflow/issues/2301. - plr := puller.New( + n.plr = puller.New( ctxC, up.PDClient, up.GrpcPool, @@ -85,7 +86,7 @@ func (n *pullerNode) start(ctx pipeline.NodeContext, n.tableName, ) n.wg.Go(func() error { - ctx.Throw(errors.Trace(plr.Run(ctxC))) + ctx.Throw(errors.Trace(n.plr.Run(ctxC))) return nil }) n.wg.Go(func() error { @@ -93,7 +94,7 @@ func (n *pullerNode) start(ctx pipeline.NodeContext, select { case <-ctxC.Done(): return nil - case rawKV := <-plr.Output(): + case rawKV := <-n.plr.Output(): if rawKV == nil { continue } diff --git a/cdc/processor/pipeline/sink.go b/cdc/processor/pipeline/sink.go index 5ec58d9a45a..da8619358b6 100755 --- a/cdc/processor/pipeline/sink.go +++ b/cdc/processor/pipeline/sink.go @@ -431,3 +431,18 @@ func (n *sinkNode) verifySplitTxn(e *model.PolymorphicEvent) error { } return nil } + +func (n *sinkNode) Stats() Stats { + return Stats{ + CheckpointTs: n.CheckpointTs(), + ResolvedTs: n.getResolvedTs().Ts, + BarrierTs: n.barrierTs, + } +} + +// Stats of a sink. +type Stats struct { + CheckpointTs model.Ts + ResolvedTs model.Ts + BarrierTs model.Ts +} diff --git a/cdc/processor/pipeline/sorter_test.go b/cdc/processor/pipeline/sorter_test.go index f31437693bd..57d3ffc0d2a 100644 --- a/cdc/processor/pipeline/sorter_test.go +++ b/cdc/processor/pipeline/sorter_test.go @@ -77,6 +77,7 @@ func TestSorterResolvedTs(t *testing.T) { } type checkSorter struct { + sorter.EventSorter ch chan *model.PolymorphicEvent } diff --git a/cdc/processor/pipeline/table_actor.go b/cdc/processor/pipeline/table_actor.go index 1a87521a6e0..3850465fff4 100644 --- a/cdc/processor/pipeline/table_actor.go +++ b/cdc/processor/pipeline/table_actor.go @@ -35,6 +35,7 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" pmessage "github.com/pingcap/tiflow/pkg/pipeline/message" "github.com/pingcap/tiflow/pkg/upstream" + "github.com/tikv/client-go/v2/oracle" uberatomic "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -42,10 +43,9 @@ import ( ) var ( - _ tablepb.TablePipeline = (*tableActor)(nil) - _ actor.Actor[pmessage.Message] = (*tableActor)(nil) - stopped = uint32(1) - workload = model.WorkloadInfo{Workload: 1} + _ tablepb.TablePipeline = (*tableActor)(nil) + _ actor.Actor[pmessage.Message] = (*tableActor)(nil) + stopped = uint32(1) ) // Assume 1KB per row in upstream TiDB, it takes about 250 MB (1024*4*64) for @@ -454,10 +454,39 @@ func (t *tableActor) AsyncStop() bool { return true } -// Workload returns the workload of this table pipeline -func (t *tableActor) Workload() model.WorkloadInfo { - // We temporarily set the value to constant 1 - return workload +// Stats returns the statistics of this table pipeline +func (t *tableActor) Stats() tablepb.Stats { + pullerStats := t.pullerNode.plr.Stats() + sorterStats := t.sortNode.sorter.Stats() + sinkStats := t.sinkNode.Stats() + now, _ := t.upstream.PDClock.CurrentTime() + + return tablepb.Stats{ + RegionCount: pullerStats.RegionCount, + CurrentTs: oracle.ComposeTS(oracle.GetPhysical(now), 0), + StageCheckpoints: map[string]tablepb.Checkpoint{ + "puller-ingress": { + CheckpointTs: pullerStats.CheckpointTsIngress, + ResolvedTs: pullerStats.ResolvedTsIngress, + }, + "puller-egress": { + CheckpointTs: pullerStats.CheckpointTsEgress, + ResolvedTs: pullerStats.ResolvedTsEgress, + }, + "sorter-ingress": { + CheckpointTs: sorterStats.CheckpointTsIngress, + ResolvedTs: sorterStats.ResolvedTsIngress, + }, + "sorter-egress": { + CheckpointTs: sorterStats.CheckpointTsEgress, + ResolvedTs: sorterStats.ResolvedTsEgress, + }, + "sink": { + CheckpointTs: sinkStats.CheckpointTs, + ResolvedTs: sinkStats.ResolvedTs, + }, + }, + } } // State returns the state of this table pipeline diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index f00a439d1c4..de208fa03d5 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -390,6 +390,7 @@ func (p *processor) GetTableStatus(tableID model.TableID) tablepb.TableStatus { ResolvedTs: table.ResolvedTs(), }, State: table.State(), + Stats: table.Stats(), } } diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index 831f9ca7bad..f54c321d5b0 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -168,8 +168,8 @@ func (m *mockTablePipeline) AsyncStop() bool { return true } -func (m *mockTablePipeline) Workload() model.WorkloadInfo { - return model.WorkloadInfo{Workload: 1} +func (m *mockTablePipeline) Stats() tablepb.Stats { + return tablepb.Stats{} } func (m *mockTablePipeline) RemainEvents() int64 { diff --git a/cdc/processor/tablepb/table.go b/cdc/processor/tablepb/table.go index a5c0f7eea97..74a054b1aea 100644 --- a/cdc/processor/tablepb/table.go +++ b/cdc/processor/tablepb/table.go @@ -47,8 +47,8 @@ type TablePipeline interface { // Start the sink consume data from the given `ts` Start(ts model.Ts) - // Workload returns the workload of this table - Workload() model.WorkloadInfo + // Stats returns statistic for a table. + Stats() Stats // State returns the state of this table pipeline State() TableState // Cancel stops this table pipeline immediately and destroy all resources created by this table pipeline diff --git a/cdc/processor/tablepb/table.pb.go b/cdc/processor/tablepb/table.pb.go index c6418c1ed0b..8d28942066a 100644 --- a/cdc/processor/tablepb/table.pb.go +++ b/cdc/processor/tablepb/table.pb.go @@ -26,13 +26,13 @@ const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package // TableState is the state of table replication in processor. // -// ┌────────┐ ┌───────────┐ ┌──────────┐ -// │ Absent ├─> │ Preparing ├─> │ Prepared │ -// └────────┘ └───────────┘ └─────┬────┘ -// v -// ┌─────────┐ ┌──────────┐ ┌─────────────┐ -// │ Stopped │ <─┤ Stopping │ <─┤ Replicating │ -// └─────────┘ └──────────┘ └─────────────┘ +// ┌────────┐ ┌───────────┐ ┌──────────┐ +// │ Absent ├─> │ Preparing ├─> │ Prepared │ +// └────────┘ └───────────┘ └─────┬────┘ +// v +// ┌─────────┐ ┌──────────┐ ┌─────────────┐ +// │ Stopped │ <─┤ Stopping │ <─┤ Replicating │ +// └─────────┘ └──────────┘ └─────────────┘ type TableState int32 const ( @@ -125,18 +125,83 @@ func (m *Checkpoint) GetResolvedTs() github_com_pingcap_tiflow_cdc_model.Ts { return 0 } +// Stats holds a statistic for a table. +type Stats struct { + // Number of captured regions. + RegionCount uint64 `protobuf:"varint,1,opt,name=region_count,json=regionCount,proto3" json:"region_count,omitempty"` + // The current timestamp from the table's point of view. + CurrentTs github_com_pingcap_tiflow_cdc_model.Ts `protobuf:"varint,2,opt,name=current_ts,json=currentTs,proto3,casttype=github.com/pingcap/tiflow/cdc/model.Ts" json:"current_ts,omitempty"` + // Checkponits at each stage. + StageCheckpoints map[string]Checkpoint `protobuf:"bytes,3,rep,name=stage_checkpoints,json=stageCheckpoints,proto3" json:"stage_checkpoints" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (m *Stats) Reset() { *m = Stats{} } +func (m *Stats) String() string { return proto.CompactTextString(m) } +func (*Stats) ProtoMessage() {} +func (*Stats) Descriptor() ([]byte, []int) { + return fileDescriptor_ae83c9c6cf5ef75c, []int{1} +} +func (m *Stats) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Stats) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Stats.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Stats) XXX_Merge(src proto.Message) { + xxx_messageInfo_Stats.Merge(m, src) +} +func (m *Stats) XXX_Size() int { + return m.Size() +} +func (m *Stats) XXX_DiscardUnknown() { + xxx_messageInfo_Stats.DiscardUnknown(m) +} + +var xxx_messageInfo_Stats proto.InternalMessageInfo + +func (m *Stats) GetRegionCount() uint64 { + if m != nil { + return m.RegionCount + } + return 0 +} + +func (m *Stats) GetCurrentTs() github_com_pingcap_tiflow_cdc_model.Ts { + if m != nil { + return m.CurrentTs + } + return 0 +} + +func (m *Stats) GetStageCheckpoints() map[string]Checkpoint { + if m != nil { + return m.StageCheckpoints + } + return nil +} + // TableStatus is the running status of a table. type TableStatus struct { TableID github_com_pingcap_tiflow_cdc_model.TableID `protobuf:"varint,1,opt,name=table_id,json=tableId,proto3,casttype=github.com/pingcap/tiflow/cdc/model.TableID" json:"table_id,omitempty"` State TableState `protobuf:"varint,2,opt,name=state,proto3,enum=pingcap.tiflow.cdc.processor.tablepb.TableState" json:"state,omitempty"` Checkpoint Checkpoint `protobuf:"bytes,3,opt,name=checkpoint,proto3" json:"checkpoint"` + Stats Stats `protobuf:"bytes,4,opt,name=stats,proto3" json:"stats"` } func (m *TableStatus) Reset() { *m = TableStatus{} } func (m *TableStatus) String() string { return proto.CompactTextString(m) } func (*TableStatus) ProtoMessage() {} func (*TableStatus) Descriptor() ([]byte, []int) { - return fileDescriptor_ae83c9c6cf5ef75c, []int{1} + return fileDescriptor_ae83c9c6cf5ef75c, []int{2} } func (m *TableStatus) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -186,46 +251,63 @@ func (m *TableStatus) GetCheckpoint() Checkpoint { return Checkpoint{} } +func (m *TableStatus) GetStats() Stats { + if m != nil { + return m.Stats + } + return Stats{} +} + func init() { proto.RegisterEnum("pingcap.tiflow.cdc.processor.tablepb.TableState", TableState_name, TableState_value) proto.RegisterType((*Checkpoint)(nil), "pingcap.tiflow.cdc.processor.tablepb.Checkpoint") + proto.RegisterType((*Stats)(nil), "pingcap.tiflow.cdc.processor.tablepb.Stats") + proto.RegisterMapType((map[string]Checkpoint)(nil), "pingcap.tiflow.cdc.processor.tablepb.Stats.StageCheckpointsEntry") proto.RegisterType((*TableStatus)(nil), "pingcap.tiflow.cdc.processor.tablepb.TableStatus") } func init() { proto.RegisterFile("processor/tablepb/table.proto", fileDescriptor_ae83c9c6cf5ef75c) } var fileDescriptor_ae83c9c6cf5ef75c = []byte{ - // 473 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x93, 0xc1, 0x6e, 0xd3, 0x30, - 0x18, 0xc7, 0x93, 0xb6, 0x6b, 0xc7, 0x57, 0x40, 0xc1, 0x6c, 0x30, 0x22, 0x91, 0x46, 0xd5, 0x34, - 0x4d, 0x45, 0x72, 0x10, 0xdc, 0xb8, 0x51, 0x10, 0xd2, 0x84, 0x10, 0x28, 0xeb, 0x38, 0x70, 0x99, - 0x12, 0xdb, 0x64, 0xd1, 0xb2, 0xd8, 0x8a, 0x3d, 0xf6, 0x02, 0x9c, 0x72, 0xe2, 0xc4, 0x2d, 0x2f, - 0xc0, 0x93, 0xec, 0xd8, 0x23, 0xa7, 0x0a, 0xda, 0xb7, 0xd8, 0x09, 0x25, 0xe9, 0xe2, 0xa9, 0x48, - 0xa8, 0x9c, 0x62, 0xe7, 0xff, 0xff, 0xff, 0x3e, 0x7f, 0x5f, 0x62, 0x78, 0x2c, 0x32, 0x4e, 0x98, - 0x94, 0x3c, 0xf3, 0x54, 0x10, 0x26, 0x4c, 0x84, 0xf5, 0x13, 0x8b, 0x8c, 0x2b, 0x8e, 0x76, 0x45, - 0x9c, 0x46, 0x24, 0x10, 0x58, 0xc5, 0x9f, 0x13, 0x7e, 0x81, 0x09, 0x25, 0xb8, 0x49, 0xe0, 0x65, - 0xc2, 0xde, 0x8a, 0x78, 0xc4, 0xab, 0x80, 0x57, 0xae, 0xea, 0xec, 0xf0, 0x87, 0x09, 0xf0, 0xea, - 0x84, 0x91, 0x53, 0xc1, 0xe3, 0x54, 0xa1, 0xf7, 0x70, 0x87, 0x34, 0xbb, 0x63, 0x25, 0x77, 0x4c, - 0xd7, 0xdc, 0xef, 0x8c, 0x47, 0x57, 0xb3, 0xc1, 0x5e, 0x14, 0xab, 0x93, 0xf3, 0x10, 0x13, 0x7e, - 0xe6, 0x2d, 0x0b, 0x7a, 0x75, 0x41, 0x8f, 0x50, 0xe2, 0x9d, 0x71, 0xca, 0x12, 0x3c, 0x91, 0xfe, - 0x6d, 0x0d, 0x98, 0x48, 0xf4, 0x16, 0xfa, 0x19, 0x93, 0x3c, 0xf9, 0xc2, 0x68, 0x89, 0x6b, 0xfd, - 0x37, 0x0e, 0xae, 0xe3, 0x13, 0x39, 0xfc, 0xda, 0x82, 0xfe, 0xa4, 0x6c, 0xe7, 0x50, 0x05, 0xea, - 0x5c, 0xa2, 0x23, 0xd8, 0xac, 0xba, 0x3b, 0x8e, 0x69, 0x75, 0xd0, 0xf6, 0xf8, 0xc5, 0x7c, 0x36, - 0xe8, 0x55, 0x96, 0x83, 0xd7, 0x57, 0xb3, 0xc1, 0x93, 0xb5, 0x8a, 0xd4, 0x76, 0xbf, 0x57, 0xb1, - 0x0e, 0x28, 0x7a, 0x03, 0x1b, 0x52, 0x05, 0x8a, 0x55, 0xa7, 0xbd, 0xfb, 0xec, 0x29, 0x5e, 0x67, - 0xbe, 0xb8, 0x39, 0x18, 0xf3, 0xeb, 0x38, 0xfa, 0x08, 0xa0, 0x67, 0xb1, 0xd3, 0x76, 0xcd, 0xfd, - 0xfe, 0xba, 0x30, 0xfd, 0x49, 0xc6, 0x9d, 0xcb, 0xd9, 0xc0, 0xf0, 0x6f, 0x90, 0x46, 0xdf, 0x5b, - 0x00, 0xba, 0x1a, 0x1a, 0x42, 0xef, 0x28, 0x3d, 0x4d, 0xf9, 0x45, 0x6a, 0x19, 0xf6, 0x76, 0x5e, - 0xb8, 0xf7, 0xb4, 0xb8, 0x14, 0x90, 0x0b, 0xdd, 0x97, 0xa1, 0x64, 0xa9, 0xb2, 0x4c, 0x7b, 0x2b, - 0x2f, 0x5c, 0x4b, 0x5b, 0xea, 0xf7, 0x68, 0x0f, 0x6e, 0x7d, 0xc8, 0x98, 0x08, 0xb2, 0x38, 0x8d, - 0xac, 0x96, 0xfd, 0x30, 0x2f, 0xdc, 0xfb, 0xda, 0xd4, 0x48, 0x68, 0x17, 0x36, 0xeb, 0x0d, 0xa3, - 0x56, 0xdb, 0x7e, 0x90, 0x17, 0x2e, 0x5a, 0xb5, 0x31, 0x8a, 0x46, 0xd0, 0xf7, 0x99, 0x48, 0x62, - 0x12, 0xa8, 0x92, 0xd7, 0xb1, 0x1f, 0xe5, 0x85, 0xbb, 0x7d, 0x63, 0x44, 0x5a, 0x2c, 0x89, 0x87, - 0x8a, 0x8b, 0x72, 0x2e, 0xd6, 0xc6, 0x2a, 0xf1, 0x5a, 0x29, 0xbb, 0xac, 0xd6, 0x8c, 0x5a, 0xdd, - 0xd5, 0x2e, 0x97, 0xc2, 0xf8, 0xdd, 0xf4, 0xb7, 0x63, 0x5c, 0xce, 0x1d, 0x73, 0x3a, 0x77, 0xcc, - 0x5f, 0x73, 0xc7, 0xfc, 0xb6, 0x70, 0x8c, 0xe9, 0xc2, 0x31, 0x7e, 0x2e, 0x1c, 0xe3, 0x93, 0xf7, - 0xef, 0x9f, 0xe1, 0xaf, 0x3b, 0x16, 0x76, 0xab, 0x2b, 0xf2, 0xfc, 0x4f, 0x00, 0x00, 0x00, 0xff, - 0xff, 0x20, 0x55, 0xc1, 0xb8, 0x7f, 0x03, 0x00, 0x00, + // 597 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0xcb, 0x6e, 0xd3, 0x40, + 0x14, 0x8d, 0xed, 0xf4, 0x75, 0x5d, 0x90, 0x3b, 0xb4, 0x10, 0x2c, 0xe1, 0x98, 0xa8, 0xaa, 0xaa, + 0x56, 0xb2, 0x51, 0xd9, 0xa0, 0xee, 0x9a, 0xf2, 0x50, 0x85, 0x10, 0xc8, 0x4d, 0x59, 0xb0, 0x89, + 0x9c, 0xf1, 0xe0, 0x5a, 0x75, 0x3d, 0x96, 0x67, 0xdc, 0xaa, 0xbf, 0x90, 0x15, 0x2b, 0x76, 0xf9, + 0x01, 0xbe, 0xa4, 0x1b, 0xa4, 0x2e, 0x59, 0x45, 0x90, 0xfe, 0x45, 0x57, 0x68, 0x3c, 0x6e, 0x5c, + 0x05, 0x84, 0x12, 0x36, 0xc9, 0xcc, 0x9c, 0x7b, 0xce, 0x3d, 0xe7, 0x8e, 0x35, 0xf0, 0x24, 0xcd, + 0x28, 0x26, 0x8c, 0xd1, 0xcc, 0xe5, 0x7e, 0x2f, 0x26, 0x69, 0x4f, 0xfe, 0x3b, 0x69, 0x46, 0x39, + 0x45, 0xeb, 0x69, 0x94, 0x84, 0xd8, 0x4f, 0x1d, 0x1e, 0x7d, 0x8e, 0xe9, 0xb9, 0x83, 0x03, 0xec, + 0x8c, 0x19, 0x4e, 0xc9, 0x30, 0x57, 0x43, 0x1a, 0xd2, 0x82, 0xe0, 0x8a, 0x95, 0xe4, 0xb6, 0xbe, + 0x29, 0x00, 0xfb, 0xc7, 0x04, 0x9f, 0xa4, 0x34, 0x4a, 0x38, 0x7a, 0x0f, 0xf7, 0xf0, 0x78, 0xd7, + 0xe5, 0xac, 0xa1, 0xd8, 0xca, 0x66, 0xbd, 0xbd, 0x75, 0x33, 0x6c, 0x6e, 0x84, 0x11, 0x3f, 0xce, + 0x7b, 0x0e, 0xa6, 0xa7, 0x6e, 0xd9, 0xd0, 0x95, 0x0d, 0x5d, 0x1c, 0x60, 0xf7, 0x94, 0x06, 0x24, + 0x76, 0x3a, 0xcc, 0x5b, 0xae, 0x04, 0x3a, 0x0c, 0xbd, 0x05, 0x3d, 0x23, 0x8c, 0xc6, 0x67, 0x24, + 0x10, 0x72, 0xea, 0xcc, 0x72, 0x70, 0x4b, 0xef, 0xb0, 0xd6, 0x48, 0x85, 0xb9, 0x43, 0xee, 0x73, + 0x86, 0x9e, 0xc2, 0x72, 0x46, 0xc2, 0x88, 0x26, 0x5d, 0x4c, 0xf3, 0x84, 0x4b, 0x9b, 0x9e, 0x2e, + 0xcf, 0xf6, 0xc5, 0x11, 0x3a, 0x00, 0xc0, 0x79, 0x96, 0x11, 0x99, 0x63, 0xf6, 0xc6, 0x4b, 0x25, + 0xbb, 0xc3, 0x10, 0x87, 0x15, 0xc6, 0xfd, 0x90, 0x74, 0xab, 0x68, 0xac, 0xa1, 0xd9, 0xda, 0xa6, + 0xbe, 0xb3, 0xe7, 0x4c, 0x33, 0x7c, 0xa7, 0x70, 0x2d, 0x7e, 0x43, 0x52, 0x4d, 0x9b, 0xbd, 0x4a, + 0x78, 0x76, 0xd1, 0xae, 0x5f, 0x0e, 0x9b, 0x35, 0xcf, 0x60, 0x13, 0xa0, 0x99, 0xc3, 0xda, 0x5f, + 0x09, 0xc8, 0x00, 0xed, 0x84, 0x5c, 0x14, 0x99, 0x97, 0x3c, 0xb1, 0x44, 0xaf, 0x61, 0xee, 0xcc, + 0x8f, 0x73, 0x52, 0xc4, 0xd4, 0x77, 0x9e, 0x4d, 0x67, 0xaa, 0x12, 0xf6, 0x24, 0x7d, 0x57, 0x7d, + 0xa1, 0xb4, 0xbe, 0xab, 0xa0, 0x77, 0x44, 0x85, 0xf0, 0x9c, 0x33, 0x74, 0x04, 0x8b, 0x05, 0xa1, + 0x1b, 0x05, 0x45, 0x4b, 0xad, 0xbd, 0x3b, 0x1a, 0x36, 0x17, 0x8a, 0x92, 0x83, 0x97, 0x37, 0xc3, + 0xe6, 0xf6, 0x54, 0x03, 0x95, 0xe5, 0xde, 0x42, 0xa1, 0x75, 0x10, 0x08, 0xcb, 0x8c, 0xfb, 0x5c, + 0x5a, 0xbe, 0x3f, 0xad, 0xe5, 0xb1, 0x31, 0xe2, 0x49, 0x3a, 0xfa, 0x08, 0x50, 0xdd, 0x4a, 0x43, + 0xfb, 0xbf, 0xfc, 0xe5, 0x1d, 0xdc, 0x51, 0x42, 0x6f, 0xa4, 0x3f, 0xd6, 0xa8, 0x17, 0x92, 0xdb, + 0x33, 0xdc, 0x73, 0xa9, 0x26, 0xf9, 0x5b, 0x5f, 0x55, 0x80, 0xca, 0x36, 0x6a, 0xc1, 0xc2, 0x51, + 0x72, 0x92, 0xd0, 0xf3, 0xc4, 0xa8, 0x99, 0x6b, 0xfd, 0x81, 0xbd, 0x52, 0x81, 0x25, 0x80, 0x6c, + 0x98, 0xdf, 0xeb, 0x31, 0x92, 0x70, 0x43, 0x31, 0x57, 0xfb, 0x03, 0xdb, 0xa8, 0x4a, 0xe4, 0x39, + 0xda, 0x80, 0xa5, 0x0f, 0x19, 0x49, 0xfd, 0x2c, 0x4a, 0x42, 0x43, 0x35, 0x1f, 0xf5, 0x07, 0xf6, + 0x83, 0xaa, 0x68, 0x0c, 0xa1, 0x75, 0x58, 0x94, 0x1b, 0x12, 0x18, 0x9a, 0xf9, 0xb0, 0x3f, 0xb0, + 0xd1, 0x64, 0x19, 0x09, 0xd0, 0x16, 0xe8, 0x1e, 0x49, 0xe3, 0x08, 0xfb, 0x5c, 0xe8, 0xd5, 0xcd, + 0xc7, 0xfd, 0x81, 0xbd, 0x76, 0x67, 0xd6, 0x15, 0x28, 0x14, 0x0f, 0x39, 0x4d, 0xc5, 0x34, 0x8c, + 0xb9, 0x49, 0xc5, 0x5b, 0x44, 0xa4, 0x2c, 0xd6, 0x24, 0x30, 0xe6, 0x27, 0x53, 0x96, 0x40, 0xfb, + 0xdd, 0xd5, 0x2f, 0xab, 0x76, 0x39, 0xb2, 0x94, 0xab, 0x91, 0xa5, 0xfc, 0x1c, 0x59, 0xca, 0x97, + 0x6b, 0xab, 0x76, 0x75, 0x6d, 0xd5, 0x7e, 0x5c, 0x5b, 0xb5, 0x4f, 0xee, 0xbf, 0xbf, 0xaa, 0x3f, + 0x5e, 0xc4, 0xde, 0x7c, 0xf1, 0xa0, 0x3d, 0xff, 0x1d, 0x00, 0x00, 0xff, 0xff, 0x50, 0x46, 0x7c, + 0xc9, 0x2d, 0x05, 0x00, 0x00, } func (m *Checkpoint) Marshal() (dAtA []byte, err error) { @@ -261,6 +343,63 @@ func (m *Checkpoint) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *Stats) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Stats) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Stats) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.StageCheckpoints) > 0 { + for k := range m.StageCheckpoints { + v := m.StageCheckpoints[k] + baseI := i + { + size, err := (&v).MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintTable(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + i -= len(k) + copy(dAtA[i:], k) + i = encodeVarintTable(dAtA, i, uint64(len(k))) + i-- + dAtA[i] = 0xa + i = encodeVarintTable(dAtA, i, uint64(baseI-i)) + i-- + dAtA[i] = 0x1a + } + } + if m.CurrentTs != 0 { + i = encodeVarintTable(dAtA, i, uint64(m.CurrentTs)) + i-- + dAtA[i] = 0x10 + } + if m.RegionCount != 0 { + i = encodeVarintTable(dAtA, i, uint64(m.RegionCount)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + func (m *TableStatus) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -281,6 +420,16 @@ func (m *TableStatus) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + { + size, err := m.Stats.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintTable(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x22 { size, err := m.Checkpoint.MarshalToSizedBuffer(dAtA[:i]) if err != nil { @@ -330,6 +479,30 @@ func (m *Checkpoint) Size() (n int) { return n } +func (m *Stats) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.RegionCount != 0 { + n += 1 + sovTable(uint64(m.RegionCount)) + } + if m.CurrentTs != 0 { + n += 1 + sovTable(uint64(m.CurrentTs)) + } + if len(m.StageCheckpoints) > 0 { + for k, v := range m.StageCheckpoints { + _ = k + _ = v + l = v.Size() + mapEntrySize := 1 + len(k) + sovTable(uint64(len(k))) + 1 + l + sovTable(uint64(l)) + n += mapEntrySize + 1 + sovTable(uint64(mapEntrySize)) + } + } + return n +} + func (m *TableStatus) Size() (n int) { if m == nil { return 0 @@ -344,6 +517,8 @@ func (m *TableStatus) Size() (n int) { } l = m.Checkpoint.Size() n += 1 + l + sovTable(uint64(l)) + l = m.Stats.Size() + n += 1 + l + sovTable(uint64(l)) return n } @@ -441,6 +616,223 @@ func (m *Checkpoint) Unmarshal(dAtA []byte) error { } return nil } +func (m *Stats) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTable + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Stats: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Stats: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field RegionCount", wireType) + } + m.RegionCount = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTable + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.RegionCount |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field CurrentTs", wireType) + } + m.CurrentTs = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTable + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.CurrentTs |= github_com_pingcap_tiflow_cdc_model.Ts(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field StageCheckpoints", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTable + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTable + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthTable + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.StageCheckpoints == nil { + m.StageCheckpoints = make(map[string]Checkpoint) + } + var mapkey string + mapvalue := &Checkpoint{} + for iNdEx < postIndex { + entryPreIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTable + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + if fieldNum == 1 { + var stringLenmapkey uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTable + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLenmapkey |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLenmapkey := int(stringLenmapkey) + if intStringLenmapkey < 0 { + return ErrInvalidLengthTable + } + postStringIndexmapkey := iNdEx + intStringLenmapkey + if postStringIndexmapkey < 0 { + return ErrInvalidLengthTable + } + if postStringIndexmapkey > l { + return io.ErrUnexpectedEOF + } + mapkey = string(dAtA[iNdEx:postStringIndexmapkey]) + iNdEx = postStringIndexmapkey + } else if fieldNum == 2 { + var mapmsglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTable + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + mapmsglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if mapmsglen < 0 { + return ErrInvalidLengthTable + } + postmsgIndex := iNdEx + mapmsglen + if postmsgIndex < 0 { + return ErrInvalidLengthTable + } + if postmsgIndex > l { + return io.ErrUnexpectedEOF + } + mapvalue = &Checkpoint{} + if err := mapvalue.Unmarshal(dAtA[iNdEx:postmsgIndex]); err != nil { + return err + } + iNdEx = postmsgIndex + } else { + iNdEx = entryPreIndex + skippy, err := skipTable(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthTable + } + if (iNdEx + skippy) > postIndex { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + m.StageCheckpoints[mapkey] = *mapvalue + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipTable(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthTable + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *TableStatus) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -541,6 +933,39 @@ func (m *TableStatus) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Stats", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTable + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTable + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthTable + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := m.Stats.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipTable(dAtA[iNdEx:]) diff --git a/cdc/processor/tablepb/table.proto b/cdc/processor/tablepb/table.proto index dc7e9142be3..683a0040043 100644 --- a/cdc/processor/tablepb/table.proto +++ b/cdc/processor/tablepb/table.proto @@ -43,6 +43,16 @@ message Checkpoint { uint64 resolved_ts = 2 [(gogoproto.casttype) = "github.com/pingcap/tiflow/cdc/model.Ts"]; } +// Stats holds a statistic for a table. +message Stats { + // Number of captured regions. + uint64 region_count = 1; + // The current timestamp from the table's point of view. + uint64 current_ts = 2 [(gogoproto.casttype) = "github.com/pingcap/tiflow/cdc/model.Ts"]; + // Checkponits at each stage. + map stage_checkpoints = 3 [(gogoproto.nullable) = false]; +} + // TableStatus is the running status of a table. message TableStatus { int64 table_id = 1 [ @@ -51,4 +61,5 @@ message TableStatus { ]; TableState state = 2; Checkpoint checkpoint = 3 [(gogoproto.nullable) = false]; + Stats stats = 4 [(gogoproto.nullable) = false]; } diff --git a/cdc/puller/ddl_puller_test.go b/cdc/puller/ddl_puller_test.go index 78a41eb2856..5b2356945be 100644 --- a/cdc/puller/ddl_puller_test.go +++ b/cdc/puller/ddl_puller_test.go @@ -81,6 +81,10 @@ func (m *mockPuller) Output() <-chan *model.RawKVEntry { return m.outCh } +func (m *mockPuller) Stats() Stats { + return Stats{} +} + func (m *mockPuller) append(e *model.RawKVEntry) { m.inCh <- e } diff --git a/cdc/puller/puller.go b/cdc/puller/puller.go index a0267c82897..17d92f7c03d 100644 --- a/cdc/puller/puller.go +++ b/cdc/puller/puller.go @@ -41,22 +41,34 @@ const ( defaultPullerOutputChanSize = 128 ) +// Stats of a puller. +type Stats struct { + RegionCount uint64 + CheckpointTsIngress model.Ts + ResolvedTsIngress model.Ts + CheckpointTsEgress model.Ts + ResolvedTsEgress model.Ts +} + // Puller pull data from tikv and push changes into a buffer. type Puller interface { // Run the puller, continually fetch event from TiKV and add event into buffer. Run(ctx context.Context) error GetResolvedTs() uint64 Output() <-chan *model.RawKVEntry + Stats() Stats } type pullerImpl struct { - kvCli kv.CDCKVClient - kvStorage tikv.Storage + kvCli kv.CDCKVClient + kvStorage tikv.Storage + spans []regionspan.ComparableSpan + outputCh chan *model.RawKVEntry + tsTracker frontier.Frontier + // The commit ts of the latest raw kv event that puller has sent. checkpointTs uint64 - spans []regionspan.ComparableSpan - outputCh chan *model.RawKVEntry - tsTracker frontier.Frontier - resolvedTs uint64 + // The latest resolved ts that puller has sent. + resolvedTs uint64 changefeed model.ChangeFeedID tableID model.TableID @@ -169,10 +181,14 @@ func (p *pullerImpl) Run(ctx context.Context) error { zap.Any("row", raw)) return nil } + commitTs := raw.CRTs select { case <-ctx.Done(): return errors.Trace(ctx.Err()) case p.outputCh <- raw: + if atomic.LoadUint64(&p.checkpointTs) < commitTs { + atomic.StoreUint64(&p.checkpointTs, commitTs) + } } return nil } @@ -255,3 +271,13 @@ func (p *pullerImpl) GetResolvedTs() uint64 { func (p *pullerImpl) Output() <-chan *model.RawKVEntry { return p.outputCh } + +func (p *pullerImpl) Stats() Stats { + return Stats{ + RegionCount: p.kvCli.RegionCount(), + ResolvedTsIngress: p.kvCli.ResolvedTs(), + CheckpointTsIngress: p.kvCli.CommitTs(), + ResolvedTsEgress: atomic.LoadUint64(&p.resolvedTs), + CheckpointTsEgress: atomic.LoadUint64(&p.checkpointTs), + } +} diff --git a/cdc/puller/puller_test.go b/cdc/puller/puller_test.go index 347d7864ae5..a305324344d 100644 --- a/cdc/puller/puller_test.go +++ b/cdc/puller/puller_test.go @@ -47,6 +47,7 @@ func (mc *mockPdClientForPullerTest) GetClusterID(ctx context.Context) uint64 { } type mockCDCKVClient struct { + kv.CDCKVClient expectations chan model.RegionFeedEvent } @@ -91,10 +92,6 @@ func (mc *mockCDCKVClient) EventFeed( } } -func (mc *mockCDCKVClient) RegionCount() uint64 { - return 0 -} - func (mc *mockCDCKVClient) Close() error { close(mc.expectations) if len(mc.expectations) > 0 { diff --git a/cdc/scheduler/internal/v3/replication/metrics.go b/cdc/scheduler/internal/v3/replication/metrics.go index a454e7b8c48..05d7b469c84 100644 --- a/cdc/scheduler/internal/v3/replication/metrics.go +++ b/cdc/scheduler/internal/v3/replication/metrics.go @@ -74,6 +74,57 @@ var ( Name: "slow_table_replication_state", Help: "The replication state of the slowest table", }, []string{"namespace", "changefeed"}) + slowestTableStageCheckpointTsGaugeVec = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "scheduler", + Name: "slow_table_stage_checkpoint_ts", + Help: "Checkpoint ts of each stage of the slowest table", + }, []string{"namespace", "changefeed", "stage"}) + slowestTableStageResolvedTsGaugeVec = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "scheduler", + Name: "slow_table_stage_resolved_ts", + Help: "Resolved ts of each stage of the slowest table", + }, []string{"namespace", "changefeed", "stage"}) + slowestTableStageCheckpointTsLagGaugeVec = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "scheduler", + Name: "slow_table_stage_checkpoint_ts_lag", + Help: "Checkpoint ts lag of each stage of the slowest table", + }, []string{"namespace", "changefeed", "stage"}) + slowestTableStageResolvedTsLagGaugeVec = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "scheduler", + Name: "slow_table_stage_resolved_ts_lag", + Help: "Resolved ts lag of each stage of the slowest table", + }, []string{"namespace", "changefeed", "stage"}) + slowestTableStageCheckpointTsLagHistogramVec = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "scheduler", + Name: "slow_table_stage_checkpoint_ts_lag_histogram", + Help: "Histogram of the slowest table checkpoint ts lag of each stage", + Buckets: prometheus.LinearBuckets(0.5, 1, 16), + }, []string{"namespace", "changefeed", "stage"}) + slowestTableStageResolvedTsLagHistogramVec = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "scheduler", + Name: "slow_table_stage_resolved_ts_lag_histogram", + Help: "Histogram of the slowest table resolved ts lag of each stage", + Buckets: prometheus.LinearBuckets(0.5, 1, 16), + }, []string{"namespace", "changefeed", "stage"}) + slowestTableRegionGaugeVec = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "scheduler", + Name: "slow_table_region_count", + Help: "The number of regions captured by the slowest table", + }, []string{"namespace", "changefeed"}) ) // InitMetrics registers all metrics used in scheduler @@ -86,4 +137,11 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(slowestTableCheckpointTsGauge) registry.MustRegister(slowestTableResolvedTsGauge) registry.MustRegister(slowestTableStateGauge) + registry.MustRegister(slowestTableStageCheckpointTsGaugeVec) + registry.MustRegister(slowestTableStageResolvedTsGaugeVec) + registry.MustRegister(slowestTableStageCheckpointTsLagGaugeVec) + registry.MustRegister(slowestTableStageResolvedTsLagGaugeVec) + registry.MustRegister(slowestTableStageCheckpointTsLagHistogramVec) + registry.MustRegister(slowestTableStageResolvedTsLagHistogramVec) + registry.MustRegister(slowestTableRegionGaugeVec) } diff --git a/cdc/scheduler/internal/v3/replication/replication_manager.go b/cdc/scheduler/internal/v3/replication/replication_manager.go index 7a89476963e..d4872b1624e 100644 --- a/cdc/scheduler/internal/v3/replication/replication_manager.go +++ b/cdc/scheduler/internal/v3/replication/replication_manager.go @@ -518,6 +518,29 @@ func (r *Manager) CollectMetrics() { phyRTs := oracle.ExtractPhysical(table.Checkpoint.ResolvedTs) slowestTableResolvedTsGauge. WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyRTs)) + phyCurrentTs := oracle.ExtractPhysical(table.Stats.CurrentTs) + for stage, checkpoint := range table.Stats.StageCheckpoints { + // Checkpoint ts + phyCkpTs := oracle.ExtractPhysical(checkpoint.CheckpointTs) + slowestTableStageCheckpointTsGaugeVec. + WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyCkpTs)) + checkpointLag := float64(phyCurrentTs-phyCkpTs) / 1e3 + slowestTableStageCheckpointTsLagGaugeVec. + WithLabelValues(cf.Namespace, cf.ID, stage).Set(checkpointLag) + slowestTableStageCheckpointTsLagHistogramVec. + WithLabelValues(cf.Namespace, cf.ID, stage).Observe(checkpointLag) + // Resolved ts + phyRTs := oracle.ExtractPhysical(checkpoint.ResolvedTs) + slowestTableStageResolvedTsGaugeVec. + WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyRTs)) + resolvedTsLag := float64(phyCurrentTs-phyRTs) / 1e3 + slowestTableStageResolvedTsLagGaugeVec. + WithLabelValues(cf.Namespace, cf.ID, stage).Set(resolvedTsLag) + slowestTableStageResolvedTsLagHistogramVec. + WithLabelValues(cf.Namespace, cf.ID, stage).Observe(resolvedTsLag) + } + slowestTableRegionGaugeVec. + WithLabelValues(cf.Namespace, cf.ID).Set(float64(table.Stats.RegionCount)) } metricAcceptScheduleTask := acceptScheduleTaskCounter.MustCurryWith(map[string]string{ "namespace": cf.Namespace, "changefeed": cf.ID, @@ -593,6 +616,13 @@ func (r *Manager) CleanMetrics() { tableStateGauge. DeleteLabelValues(cf.Namespace, cf.ID, ReplicationSetState(s).String()) } + slowestTableStageCheckpointTsGaugeVec.Reset() + slowestTableStageResolvedTsGaugeVec.Reset() + slowestTableStageCheckpointTsLagGaugeVec.Reset() + slowestTableStageResolvedTsLagGaugeVec.Reset() + slowestTableStageCheckpointTsLagHistogramVec.Reset() + slowestTableStageResolvedTsLagHistogramVec.Reset() + slowestTableRegionGaugeVec.Reset() } // SetReplicationSetForTests is only used in tests. diff --git a/cdc/scheduler/internal/v3/replication/replication_set.go b/cdc/scheduler/internal/v3/replication/replication_set.go index d598d08195b..0ed591a2d63 100644 --- a/cdc/scheduler/internal/v3/replication/replication_set.go +++ b/cdc/scheduler/internal/v3/replication/replication_set.go @@ -136,6 +136,7 @@ type ReplicationSet struct { //nolint:revive // CaptureRolePrimary. Captures map[model.CaptureID]Role Checkpoint tablepb.Checkpoint + Stats tablepb.Stats } // NewReplicationSet returns a new replication set. @@ -162,7 +163,7 @@ func NewReplicationSet( return nil, r.inconsistentError(table, captureID, "schedulerv3: table id inconsistent") } - r.updateCheckpoint(table.Checkpoint) + r.updateCheckpointAndStats(table.Checkpoint, table.Stats) switch table.State { case tablepb.TableStateReplicating: @@ -478,7 +479,7 @@ func (r *ReplicationSet) pollOnPrepare( } case tablepb.TableStateReplicating: if r.Primary == captureID { - r.updateCheckpoint(input.Checkpoint) + r.updateCheckpointAndStats(input.Checkpoint, input.Stats) return nil, false, nil } case tablepb.TableStateStopping, tablepb.TableStateStopped: @@ -580,7 +581,7 @@ func (r *ReplicationSet) pollOnCommit( case tablepb.TableStateStopped, tablepb.TableStateAbsent: if r.Primary == captureID { - r.updateCheckpoint(input.Checkpoint) + r.updateCheckpointAndStats(input.Checkpoint, input.Stats) original := r.Primary r.clearPrimary() if !r.hasRole(RoleSecondary) { @@ -644,7 +645,7 @@ func (r *ReplicationSet) pollOnCommit( case tablepb.TableStateReplicating: if r.Primary == captureID { - r.updateCheckpoint(input.Checkpoint) + r.updateCheckpointAndStats(input.Checkpoint, input.Stats) if r.hasRole(RoleSecondary) { // Original primary is not stopped, ask for stopping. return &schedulepb.Message{ @@ -677,7 +678,7 @@ func (r *ReplicationSet) pollOnCommit( case tablepb.TableStateStopping: if r.Primary == captureID && r.hasRole(RoleSecondary) { - r.updateCheckpoint(input.Checkpoint) + r.updateCheckpointAndStats(input.Checkpoint, input.Stats) return nil, false, nil } else if r.isInRole(captureID, RoleUndetermined) { log.Info("schedulerv3: capture is stopping during Commit", @@ -702,7 +703,7 @@ func (r *ReplicationSet) pollOnReplicating( switch input.State { case tablepb.TableStateReplicating: if r.Primary == captureID { - r.updateCheckpoint(input.Checkpoint) + r.updateCheckpointAndStats(input.Checkpoint, input.Stats) return nil, false, nil } return nil, false, r.multiplePrimaryError( @@ -714,7 +715,7 @@ func (r *ReplicationSet) pollOnReplicating( case tablepb.TableStateStopping: case tablepb.TableStateStopped: if r.Primary == captureID { - r.updateCheckpoint(input.Checkpoint) + r.updateCheckpointAndStats(input.Checkpoint, input.Stats) // Primary is stopped, but we still has secondary. // Clear primary and promote secondary when it's prepared. @@ -898,11 +899,14 @@ func (r *ReplicationSet) handleCaptureShutdown( return msgs, true, errors.Trace(err) } -func (r *ReplicationSet) updateCheckpoint(checkpoint tablepb.Checkpoint) { +func (r *ReplicationSet) updateCheckpointAndStats( + checkpoint tablepb.Checkpoint, stats tablepb.Stats, +) { if r.Checkpoint.CheckpointTs < checkpoint.CheckpointTs { r.Checkpoint.CheckpointTs = checkpoint.CheckpointTs } if r.Checkpoint.ResolvedTs < checkpoint.ResolvedTs { r.Checkpoint.ResolvedTs = checkpoint.ResolvedTs } + r.Stats = stats } diff --git a/cdc/scheduler/schedulepb/table_schedule.pb.go b/cdc/scheduler/schedulepb/table_schedule.pb.go index 76f072262d0..01e5d466e3c 100644 --- a/cdc/scheduler/schedulepb/table_schedule.pb.go +++ b/cdc/scheduler/schedulepb/table_schedule.pb.go @@ -354,6 +354,7 @@ func (m *RemoveTableResponse) GetCheckpoint() tablepb.Checkpoint { type DispatchTableResponse struct { // Types that are valid to be assigned to Response: + // // *DispatchTableResponse_AddTable // *DispatchTableResponse_RemoveTable Response isDispatchTableResponse_Response `protobuf_oneof:"response"` diff --git a/cdc/sorter/db/reader.go b/cdc/sorter/db/reader.go index 6f98583cc82..50de36a9fa3 100644 --- a/cdc/sorter/db/reader.go +++ b/cdc/sorter/db/reader.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sorter" "github.com/pingcap/tiflow/cdc/sorter/db/message" "github.com/pingcap/tiflow/cdc/sorter/encoding" "github.com/pingcap/tiflow/pkg/actor" @@ -52,6 +53,13 @@ type reader struct { var _ actor.Actor[message.Task] = (*reader)(nil) +func (r *reader) stats() sorter.Stats { + return sorter.Stats{ + CheckpointTsEgress: atomic.LoadUint64(&r.lastSentCommitTs), + ResolvedTsEgress: atomic.LoadUint64(&r.lastSentResolvedTs), + } +} + // setTaskDelete set delete range if there are too many events can be deleted or // it has been a long time since last delete. func (r *reader) setTaskDelete(task *message.Task, deleteKeys []message.Key) { @@ -84,7 +92,7 @@ func (r *reader) output(event *model.PolymorphicEvent) bool { select { case r.outputCh <- event: r.lastEvent = event - r.lastSentCommitTs = event.CRTs + atomic.StoreUint64(&r.lastSentCommitTs, event.CRTs) return true default: return false @@ -96,7 +104,7 @@ func (r *reader) outputResolvedTs(rts model.Ts) { ok := r.output(model.NewResolvedPolymorphicEvent(0, rts)) if ok { r.metricTotalEventsResolved.Inc() - r.lastSentResolvedTs = rts + atomic.StoreUint64(&r.lastSentResolvedTs, rts) } } @@ -262,6 +270,8 @@ type pollState struct { // Iterator is released once it exceeds `iterMaxAliveDuration`. iterAliveTime time.Time iterMaxAliveDuration time.Duration + // A timestamp when we request an iterator. + iterRequestTime time.Time // A channel for receiving iterator asynchronously. iterCh chan *message.LimitedIterator // An iterator for reading resolved events, up to the `iterResolvedTs`. @@ -270,6 +280,7 @@ type pollState struct { // A flag to mark whether the current position has been read. iterHasRead bool + metricIterRequest prometheus.Observer metricIterFirst prometheus.Observer metricIterRelease prometheus.Observer } @@ -331,6 +342,7 @@ func (state *pollState) tryGetIterator(uid uint32, tableID uint64) (*message.Ite // We haven't sent request. iterCh := make(chan *message.LimitedIterator, 1) state.iterCh = iterCh + state.iterRequestTime = time.Now() readerRouter := state.readerRouter readerID := state.readerID lowerBoundTs := atomic.LoadUint64(&state.startTs) @@ -365,6 +377,8 @@ func (state *pollState) tryGetIterator(uid uint32, tableID uint64) (*message.Ite state.iterCh = nil state.iter = iter start := time.Now() + requestDuration := start.Sub(state.iterRequestTime) + state.metricIterRequest.Observe(requestDuration.Seconds()) state.iterAliveTime = start state.iterResolvedTs = iter.ResolvedTs state.iterHasRead = false diff --git a/cdc/sorter/db/reader_test.go b/cdc/sorter/db/reader_test.go index 2925361826b..1d0755a7db8 100644 --- a/cdc/sorter/db/reader_test.go +++ b/cdc/sorter/db/reader_test.go @@ -54,6 +54,7 @@ func newTestReader() *reader { serde: &encoding.MsgPackGenSerde{}, }, state: pollState{ + metricIterRequest: metricIterDuration.WithLabelValues("request"), metricIterFirst: metricIterDuration.WithLabelValues("first"), metricIterRelease: metricIterDuration.WithLabelValues("release"), }, @@ -808,6 +809,7 @@ func TestReaderPoll(t *testing.T) { // Do not send delete range. r.delete.countThreshold = 1000 r.delete.period = 100 * time.Second + r.state.metricIterRequest = metricIterDuration.WithLabelValues("request") r.state.metricIterFirst = metricIterDuration.WithLabelValues("first") r.state.metricIterRelease = metricIterDuration.WithLabelValues("release") for j, cs := range css { diff --git a/cdc/sorter/db/sorter.go b/cdc/sorter/db/sorter.go index bd01b13b669..e12cadfbd15 100644 --- a/cdc/sorter/db/sorter.go +++ b/cdc/sorter/db/sorter.go @@ -34,7 +34,7 @@ import ( const ( // Capacity of db sorter input and output channels. - sorterInputCap, sorterOutputCap = 64, 64 + sorterInputCap, sorterOutputCap = 1, 64 // Max size of received event batch. batchReceiveEventSize = 32 ) @@ -75,9 +75,11 @@ type Sorter struct { writerRouter *actor.Router[message.Task] writerActorID actor.ID + writer *writer readerRouter *actor.Router[message.Task] ReaderActorID actor.ID + reader *reader outputCh chan *model.PolymorphicEvent closed int32 @@ -152,6 +154,7 @@ func NewSorter( iterMaxAliveDuration: time.Duration(cfg.IteratorMaxAliveDuration) * time.Millisecond, iterFirstSlowDuration: time.Duration(cfg.IteratorSlowReadDuration) * time.Millisecond, + metricIterRequest: metricIterDuration.WithLabelValues("request"), metricIterFirst: metricIterDuration.WithLabelValues("first"), metricIterRelease: metricIterDuration.WithLabelValues("release"), }, @@ -179,8 +182,10 @@ func NewSorter( common: c, writerRouter: writerRouter, writerActorID: actorID, + writer: w, readerRouter: readerRouter, ReaderActorID: actorID, + reader: r, outputCh: r.outputCh, }, nil } @@ -262,3 +267,15 @@ func (ls *Sorter) EmitStartTs(ctx context.Context, ts uint64) { }) _ = ls.readerRouter.SendB(ctx, ls.ReaderActorID, msg) } + +// Stats implement sorter interface +func (ls *Sorter) Stats() sorter.Stats { + ingress := ls.writer.stats() + egress := ls.reader.stats() + return sorter.Stats{ + CheckpointTsEgress: egress.CheckpointTsEgress, + ResolvedTsEgress: egress.ResolvedTsEgress, + CheckpointTsIngress: ingress.CheckpointTsIngress, + ResolvedTsIngress: ingress.ResolvedTsIngress, + } +} diff --git a/cdc/sorter/db/writer.go b/cdc/sorter/db/writer.go index 98ed4c03dd1..ac11bd22fa5 100644 --- a/cdc/sorter/db/writer.go +++ b/cdc/sorter/db/writer.go @@ -15,8 +15,10 @@ package db import ( "context" + "sync/atomic" "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/sorter" "github.com/pingcap/tiflow/cdc/sorter/db/message" "github.com/pingcap/tiflow/cdc/sorter/encoding" "github.com/pingcap/tiflow/pkg/actor" @@ -45,6 +47,7 @@ var _ actor.Actor[message.Task] = (*writer)(nil) func (w *writer) Poll(ctx context.Context, msgs []actormsg.Message[message.Task]) (running bool) { kvEventCount, resolvedEventCount := 0, 0 + maxCommitTs, maxResolvedTs := uint64(0), uint64(0) writes := make(map[message.Key][]byte) for i := range msgs { switch msgs[i].Tp { @@ -57,14 +60,14 @@ func (w *writer) Poll(ctx context.Context, msgs []actormsg.Message[message.Task] ev := msgs[i].Value.InputEvent if ev.IsResolved() { - if w.maxResolvedTs < ev.CRTs { - w.maxResolvedTs = ev.CRTs + if maxResolvedTs < ev.CRTs { + maxResolvedTs = ev.CRTs } resolvedEventCount++ continue } - if w.maxCommitTs < ev.CRTs { - w.maxCommitTs = ev.CRTs + if maxCommitTs < ev.CRTs { + maxCommitTs = ev.CRTs } kvEventCount++ @@ -79,6 +82,12 @@ func (w *writer) Poll(ctx context.Context, msgs []actormsg.Message[message.Task] } w.metricTotalEventsKV.Add(float64(kvEventCount)) w.metricTotalEventsResolved.Add(float64(resolvedEventCount)) + if atomic.LoadUint64(&w.maxCommitTs) < maxCommitTs { + atomic.StoreUint64(&w.maxCommitTs, maxCommitTs) + } + if atomic.LoadUint64(&w.maxResolvedTs) < maxResolvedTs { + atomic.StoreUint64(&w.maxResolvedTs, maxResolvedTs) + } if len(writes) != 0 { // Send write task to db. @@ -117,8 +126,8 @@ func (w *writer) Poll(ctx context.Context, msgs []actormsg.Message[message.Task] // ---------------------------------------------> // writer: ^ actual maxCommitTs // reader: ^ maxCommitTs ^ exhaustedResolvedTs ^ maxResolvedTs - MaxCommitTs: w.maxCommitTs, - MaxResolvedTs: w.maxResolvedTs, + MaxCommitTs: atomic.LoadUint64(&w.maxCommitTs), + MaxResolvedTs: atomic.LoadUint64(&w.maxResolvedTs), }, }) // It's ok if send fails, as resolved ts events are received periodically. @@ -134,3 +143,17 @@ func (w *writer) OnClose() { w.stopped = true w.common.closedWg.Done() } + +func (w *writer) stats() sorter.Stats { + maxCommitTs := atomic.LoadUint64(&w.maxCommitTs) + maxResolvedTs := atomic.LoadUint64(&w.maxResolvedTs) + if maxCommitTs < maxResolvedTs { + // In case, there is no write for the table, + // we use maxResolvedTs as maxCommitTs to make the stats meaningful. + maxCommitTs = maxResolvedTs + } + return sorter.Stats{ + CheckpointTsIngress: maxCommitTs, + ResolvedTsIngress: maxResolvedTs, + } +} diff --git a/cdc/sorter/memory/entry_sorter.go b/cdc/sorter/memory/entry_sorter.go index 08a13f1089e..f49f4d93c68 100644 --- a/cdc/sorter/memory/entry_sorter.go +++ b/cdc/sorter/memory/entry_sorter.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sorter" "github.com/pingcap/tiflow/pkg/notify" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -157,6 +158,9 @@ func (es *EntrySorter) Output() <-chan *model.PolymorphicEvent { // EmitStartTs implement sorter interface func (es *EntrySorter) EmitStartTs(_ context.Context, _ uint64) {} +// Stats implement sorter interface +func (es *EntrySorter) Stats() sorter.Stats { return sorter.Stats{} } + func eventLess(i *model.PolymorphicEvent, j *model.PolymorphicEvent) bool { return model.ComparePolymorphicEvents(i, j) } diff --git a/cdc/sorter/sorter.go b/cdc/sorter/sorter.go index 7a97b0d47b3..3fb88656759 100644 --- a/cdc/sorter/sorter.go +++ b/cdc/sorter/sorter.go @@ -19,6 +19,14 @@ import ( "github.com/pingcap/tiflow/cdc/model" ) +// Stats of a sorter. +type Stats struct { + CheckpointTsIngress model.Ts + ResolvedTsIngress model.Ts + CheckpointTsEgress model.Ts + ResolvedTsEgress model.Ts +} + // EventSorter accepts unsorted PolymorphicEvents, sort them in background and returns // sorted PolymorphicEvents in Output channel type EventSorter interface { @@ -45,4 +53,6 @@ type EventSorter interface { // EmitStartTs let sorter know the start timestamp for consuming data EmitStartTs(ctx context.Context, ts uint64) + + Stats() Stats } diff --git a/cdc/sorter/unified/unified_sorter.go b/cdc/sorter/unified/unified_sorter.go index cf39b21b262..a7773925c31 100644 --- a/cdc/sorter/unified/unified_sorter.go +++ b/cdc/sorter/unified/unified_sorter.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/contextutil" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sorter" "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/util" @@ -263,3 +264,6 @@ func RunWorkerPool(ctx context.Context) error { // EmitStartTs implement sorter interface func (s *Sorter) EmitStartTs(ctx context.Context, ts uint64) {} + +// Stats implement sorter interface +func (s *Sorter) Stats() sorter.Stats { return sorter.Stats{} } diff --git a/dm/pb/dmmaster.pb.go b/dm/pb/dmmaster.pb.go index 3552a0dca73..1917e98f5ff 100644 --- a/dm/pb/dmmaster.pb.go +++ b/dm/pb/dmmaster.pb.go @@ -448,8 +448,10 @@ func (m *OperateTaskResponse) GetSources() []*CommonWorkerResponse { // UpdateTaskRequest used to update task after it has beed started // task: task's configuration, yaml format -// now, only support to update config for routes, filters, column-mappings, block-allow-list -// support update partial config for syncer, loader, etc later +// +// now, only support to update config for routes, filters, column-mappings, block-allow-list +// support update partial config for syncer, loader, etc later +// // sources need to do update, empty for all sources in processing the task type UpdateTaskRequest struct { Task string `protobuf:"bytes,1,opt,name=task,proto3" json:"task,omitempty"` @@ -686,7 +688,9 @@ func (m *QueryStatusListResponse) GetSources() []*QueryStatusResponse { // ShowDDLLocksRequest used to query DDL locks which are un-resolved // task: task's name, empty for all tasks // sources: source need to query, empty for all sources -// any DDL lock in which the source is synced or unsynced will return +// +// any DDL lock in which the source is synced or unsynced will return +// // if specify task and sources both, and sources not doing the task , it will return empty DDL locks type ShowDDLLocksRequest struct { Task string `protobuf:"bytes,1,opt,name=task,proto3" json:"task,omitempty"` @@ -2174,6 +2178,7 @@ func (m *ListWorkerMember) GetWorkers() []*WorkerInfo { type Members struct { // Types that are valid to be assigned to Member: + // // *Members_Leader // *Members_Master // *Members_Worker @@ -3236,9 +3241,11 @@ type StartValidationRequest struct { // see https://github.com/protocolbuffers/protobuf/issues/1606 // // Types that are valid to be assigned to Mode: + // // *StartValidationRequest_ModeValue Mode isStartValidationRequest_Mode `protobuf_oneof:"mode"` // Types that are valid to be assigned to StartTime: + // // *StartValidationRequest_StartTimeValue StartTime isStartValidationRequest_StartTime `protobuf_oneof:"startTime"` Sources []string `protobuf:"bytes,3,rep,name=sources,proto3" json:"sources,omitempty"` diff --git a/engine/enginepb/error.pb.go b/engine/enginepb/error.pb.go index ce8f903127f..cc718ea48a6 100644 --- a/engine/enginepb/error.pb.go +++ b/engine/enginepb/error.pb.go @@ -34,7 +34,6 @@ const ( ErrorCode_SubJobSubmitFailed ErrorCode = 4 // TombstoneExecutor ErrorCode_TombstoneExecutor ErrorCode = 5 - // ErrorCode_SubJobBuildFailed ErrorCode = 6 // create gRPC connection failed ErrorCode_BuildGrpcConnFailed ErrorCode = 7 diff --git a/pkg/pdutil/clock.go b/pkg/pdutil/clock.go index 2687bc18033..ee7edec660b 100644 --- a/pkg/pdutil/clock.go +++ b/pkg/pdutil/clock.go @@ -26,7 +26,7 @@ import ( "go.uber.org/zap" ) -const pdTimeUpdateInterval = 200 * time.Millisecond +const pdTimeUpdateInterval = 10 * time.Millisecond // Clock is a time source of PD cluster. type Clock interface {