Skip to content

Commit

Permalink
cdc: collect slow table latency metrics (#7323)
Browse files Browse the repository at this point in the history
ref #4757
  • Loading branch information
overvenus authored Oct 12, 2022
1 parent 9c2e850 commit c6a9dfa
Show file tree
Hide file tree
Showing 33 changed files with 884 additions and 150 deletions.
83 changes: 76 additions & 7 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package kv

import (
"container/list"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -198,6 +199,13 @@ type CDCKVClient interface {
lockResolver txnutil.LockResolver,
eventCh chan<- model.RegionFeedEvent,
) error

// RegionCount returns the number of captured regions.
RegionCount() uint64
// ResolvedTs returns the current ingress resolved ts.
ResolvedTs() model.Ts
// CommitTs returns the current ingress commit ts.
CommitTs() model.Ts
}

// NewCDCKVClient is the constructor of CDC KV client
Expand All @@ -212,14 +220,20 @@ type CDCClient struct {

grpcPool GrpcPool

regionCache *tikv.RegionCache
pdClock pdutil.Clock
regionCache *tikv.RegionCache
pdClock pdutil.Clock
regionLimiters *regionEventFeedLimiters

changefeed model.ChangeFeedID
tableID model.TableID
tableName string

regionLimiters *regionEventFeedLimiters
regionCounts struct {
sync.Mutex
counts *list.List
}
ingressCommitTs model.Ts
ingressResolvedTs model.Ts
}

// NewCDCClient creates a CDCClient instance
Expand Down Expand Up @@ -248,6 +262,12 @@ func NewCDCClient(
changefeed: changefeed,
tableID: tableID,
tableName: tableName,
regionCounts: struct {
sync.Mutex
counts *list.List
}{
counts: list.New(),
},
}
return
}
Expand Down Expand Up @@ -303,9 +323,35 @@ func (c *CDCClient) EventFeed(
lockResolver txnutil.LockResolver,
eventCh chan<- model.RegionFeedEvent,
) error {
c.regionCounts.Lock()
regionCount := int64(0)
c.regionCounts.counts.PushBack(&regionCount)
c.regionCounts.Unlock()
s := newEventFeedSession(
ctx, c, span, lockResolver, ts, eventCh, c.changefeed, c.tableID, c.tableName)
return s.eventFeed(ctx, ts)
return s.eventFeed(ctx, ts, &regionCount)
}

// RegionCount returns the number of captured regions.
func (c *CDCClient) RegionCount() uint64 {
c.regionCounts.Lock()
defer c.regionCounts.Unlock()

totalCount := uint64(0)
for e := c.regionCounts.counts.Front(); e != nil; e = e.Next() {
totalCount += uint64(atomic.LoadInt64(e.Value.(*int64)))
}
return totalCount
}

// ResolvedTs returns the current ingress resolved ts.
func (c *CDCClient) ResolvedTs() model.Ts {
return atomic.LoadUint64(&c.ingressResolvedTs)
}

// CommitTs returns the current ingress commit ts.
func (c *CDCClient) CommitTs() model.Ts {
return atomic.LoadUint64(&c.ingressCommitTs)
}

var currentID uint64 = 0
Expand Down Expand Up @@ -403,7 +449,7 @@ func newEventFeedSession(
}
}

func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64, regionCount *int64) error {
eventFeedGauge.Inc()
defer eventFeedGauge.Dec()

Expand All @@ -414,7 +460,7 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
})

g.Go(func() error {
return s.requestRegionToStore(ctx, g)
return s.requestRegionToStore(ctx, g, regionCount)
})

g.Go(func() error {
Expand Down Expand Up @@ -593,6 +639,7 @@ func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErr
func (s *eventFeedSession) requestRegionToStore(
ctx context.Context,
g *errgroup.Group,
regionCount *int64,
) error {
// Stores pending regions info for each stream. After sending a new request, the region info wil be put to the map,
// and it will be loaded by the receiver thread when it receives the first response from that region. We need this
Expand Down Expand Up @@ -682,7 +729,8 @@ func (s *eventFeedSession) requestRegionToStore(

g.Go(func() error {
defer s.deleteStream(storeAddr)
return s.receiveFromStream(ctx, g, storeAddr, storeID, stream.client, pendingRegions)
return s.receiveFromStream(
ctx, g, storeAddr, storeID, stream.client, pendingRegions, regionCount)
})
}

Expand Down Expand Up @@ -1035,6 +1083,7 @@ func (s *eventFeedSession) receiveFromStream(
storeID uint64,
stream cdcpb.ChangeData_EventFeedClient,
pendingRegions *syncRegionFeedStateMap,
regionCount *int64,
) error {
// Cancel the pending regions if the stream failed.
// Otherwise, it will remain unhandled in the pendingRegions list
Expand Down Expand Up @@ -1067,6 +1116,7 @@ func (s *eventFeedSession) receiveFromStream(
return worker.run(ctx)
})

maxCommitTs := model.Ts(0)
for {
cevent, err := stream.Recv()

Expand Down Expand Up @@ -1141,6 +1191,14 @@ func (s *eventFeedSession) receiveFromStream(
zap.Int("resolvedRegionCount", regionCount))
}

if len(cevent.Events) != 0 {
if entries, ok := cevent.Events[0].Event.(*cdcpb.Event_Entries_); ok {
commitTs := entries.Entries.Entries[0].CommitTs
if maxCommitTs < commitTs {
maxCommitTs = commitTs
}
}
}
err = s.sendRegionChangeEvents(ctx, cevent.Events, worker, pendingRegions, addr)
if err != nil {
return err
Expand All @@ -1151,6 +1209,17 @@ func (s *eventFeedSession) receiveFromStream(
if err != nil {
return err
}
// TiKV send resolved ts events every second by default.
// We check and update region count here to save CPU.
atomic.StoreInt64(regionCount, worker.statesManager.regionCount())
atomic.StoreUint64(&s.client.ingressResolvedTs, cevent.ResolvedTs.Ts)
if maxCommitTs == 0 {
// In case, there is no write for the table,
// we use resolved ts as maxCommitTs to make the stats meaningful.
atomic.StoreUint64(&s.client.ingressCommitTs, cevent.ResolvedTs.Ts)
} else {
atomic.StoreUint64(&s.client.ingressCommitTs, maxCommitTs)
}
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions cdc/kv/region_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,3 +288,10 @@ func (rsm *regionStateManager) delState(regionID uint64) {
bucket := rsm.getBucket(regionID)
rsm.states[bucket].delByRegionID(regionID)
}

func (rsm *regionStateManager) regionCount() (count int64) {
for _, bucket := range rsm.states {
count += int64(bucket.len())
}
return
}
25 changes: 0 additions & 25 deletions cdc/model/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,31 +176,6 @@ func (o *TableOperation) Clone() *TableOperation {
return &clone
}

// TaskWorkload records the workloads of a task
// the value of the struct is the workload
type TaskWorkload map[TableID]WorkloadInfo

// WorkloadInfo records the workload info of a table
type WorkloadInfo struct {
Workload uint64 `json:"workload"`
}

// Unmarshal unmarshals into *TaskWorkload from json marshal byte slice
func (w *TaskWorkload) Unmarshal(data []byte) error {
err := json.Unmarshal(data, w)
return errors.Annotatef(
cerror.WrapError(cerror.ErrUnmarshalFailed, err), "Unmarshal data: %v", data)
}

// Marshal returns the json marshal format of a TaskWorkload
func (w *TaskWorkload) Marshal() (string, error) {
if w == nil {
return "{}", nil
}
data, err := json.Marshal(w)
return string(data), cerror.WrapError(cerror.ErrMarshalFailed, err)
}

// TableReplicaInfo records the table replica info
type TableReplicaInfo struct {
StartTs Ts `json:"start-ts"`
Expand Down
24 changes: 0 additions & 24 deletions cdc/model/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,30 +114,6 @@ func TestTableOperationState(t *testing.T) {
require.Nil(t, nilTableOper.Clone())
}

func TestTaskWorkloadMarshal(t *testing.T) {
t.Parallel()

workload := &TaskWorkload{
12: WorkloadInfo{Workload: uint64(1)},
15: WorkloadInfo{Workload: uint64(3)},
}
expected := `{"12":{"workload":1},"15":{"workload":3}}`

data, err := workload.Marshal()
require.Nil(t, err)
require.Equal(t, expected, data)

newWorkload := &TaskWorkload{}
err = newWorkload.Unmarshal([]byte(data))
require.Nil(t, err)
require.Equal(t, workload, newWorkload)

workload = nil
data, err = workload.Marshal()
require.Nil(t, err)
require.Equal(t, "{}", data)
}

func TestShouldBeDeepCopy(t *testing.T) {
t.Parallel()

Expand Down
24 changes: 18 additions & 6 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ import (
"go.uber.org/zap"
)

// newSchedulerV2FromCtx creates a new schedulerV2 from context.
// newSchedulerFromCtx creates a new schedulerV2 from context.
// This function is factored out to facilitate unit testing.
func newSchedulerV2FromCtx(
func newSchedulerFromCtx(
ctx cdcContext.Context, startTs uint64,
) (ret scheduler.Scheduler, err error) {
changeFeedID := ctx.ChangefeedVars().ID
Expand All @@ -64,8 +64,10 @@ func newSchedulerV2FromCtx(
return ret, errors.Trace(err)
}

func newScheduler(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error) {
return newSchedulerV2FromCtx(ctx, startTs)
func newScheduler(
ctx cdcContext.Context, startTs uint64,
) (scheduler.Scheduler, error) {
return newSchedulerFromCtx(ctx, startTs)
}

type changefeed struct {
Expand Down Expand Up @@ -119,6 +121,7 @@ type changefeed struct {
metricsChangefeedResolvedTsGauge prometheus.Gauge
metricsChangefeedResolvedTsLagGauge prometheus.Gauge
metricsChangefeedResolvedTsLagDuration prometheus.Observer
metricsCurrentPDTsGauge prometheus.Gauge

metricsChangefeedBarrierTsGauge prometheus.Gauge
metricsChangefeedTickDuration prometheus.Observer
Expand All @@ -131,7 +134,9 @@ type changefeed struct {
) (puller.DDLPuller, error)

newSink func() DDLSink
newScheduler func(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error)
newScheduler func(
ctx cdcContext.Context, startTs uint64,
) (scheduler.Scheduler, error)

lastDDLTs uint64 // Timestamp of the last executed DDL. Only used for tests.
}
Expand Down Expand Up @@ -169,7 +174,9 @@ func newChangefeed4Test(
changefeed model.ChangeFeedID,
) (puller.DDLPuller, error),
newSink func() DDLSink,
newScheduler func(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error),
newScheduler func(
ctx cdcContext.Context, startTs uint64,
) (scheduler.Scheduler, error),
) *changefeed {
c := newChangefeed(id, state, up)
c.newDDLPuller = newDDLPuller
Expand Down Expand Up @@ -549,6 +556,7 @@ func (c *changefeed) initMetrics() {
WithLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedResolvedTsLagDuration = changefeedResolvedTsLagDuration.
WithLabelValues(c.id.Namespace, c.id.ID)
c.metricsCurrentPDTsGauge = currentPDTsGauge.WithLabelValues(c.id.Namespace, c.id.ID)

c.metricsChangefeedBarrierTsGauge = changefeedBarrierTsGauge.
WithLabelValues(c.id.Namespace, c.id.ID)
Expand Down Expand Up @@ -615,9 +623,11 @@ func (c *changefeed) cleanupMetrics() {
changefeedResolvedTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
changefeedResolvedTsLagGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
changefeedResolvedTsLagDuration.DeleteLabelValues(c.id.Namespace, c.id.ID)
currentPDTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedResolvedTsGauge = nil
c.metricsChangefeedResolvedTsLagGauge = nil
c.metricsChangefeedResolvedTsLagDuration = nil
c.metricsCurrentPDTsGauge = nil

changefeedTickDuration.DeleteLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedTickDuration = nil
Expand Down Expand Up @@ -903,6 +913,8 @@ func (c *changefeed) updateMetrics(currentTs int64, checkpointTs, resolvedTs mod
resolvedLag := float64(currentTs-phyRTs) / 1e3
c.metricsChangefeedResolvedTsLagGauge.Set(resolvedLag)
c.metricsChangefeedResolvedTsLagDuration.Observe(resolvedLag)

c.metricsCurrentPDTsGauge.Set(float64(currentTs))
}

func (c *changefeed) updateStatus(checkpointTs, resolvedTs model.Ts) {
Expand Down
8 changes: 8 additions & 0 deletions cdc/owner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ var (
Name: "checkpoint_ts_lag",
Help: "checkpoint ts lag of changefeeds in seconds",
}, []string{"namespace", "changefeed"})
currentPDTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "current_pd_ts",
Help: "The current PD ts",
}, []string{"namespace", "changefeed"})

changefeedCheckpointLagDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand Down Expand Up @@ -134,6 +141,7 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(changefeedResolvedTsGauge)
registry.MustRegister(changefeedResolvedTsLagGauge)
registry.MustRegister(changefeedResolvedTsLagDuration)
registry.MustRegister(currentPDTsGauge)

registry.MustRegister(ownershipCounter)
registry.MustRegister(changefeedStatusGauge)
Expand Down
8 changes: 6 additions & 2 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ func newOwner4Test(
changefeed model.ChangeFeedID,
) (puller.DDLPuller, error),
newSink func() DDLSink,
newScheduler func(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error),
newScheduler func(
ctx cdcContext.Context, startTs uint64,
) (scheduler.Scheduler, error),
pdClient pd.Client,
) Owner {
m := upstream.NewManager4Test(pdClient)
Expand Down Expand Up @@ -100,7 +102,9 @@ func createOwner4Test(ctx cdcContext.Context, t *testing.T) (*ownerImpl, *orches
return &mockDDLSink{}
},
// new scheduler
func(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error) {
func(
ctx cdcContext.Context, startTs uint64,
) (scheduler.Scheduler, error) {
return &mockScheduler{}, nil
},
pdClient,
Expand Down
Loading

0 comments on commit c6a9dfa

Please sign in to comment.