Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc: collect slow table latency metrics #7323

Merged
merged 11 commits into from
Oct 12, 2022
83 changes: 76 additions & 7 deletions cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package kv

import (
"container/list"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -198,6 +199,13 @@ type CDCKVClient interface {
lockResolver txnutil.LockResolver,
eventCh chan<- model.RegionFeedEvent,
) error

// RegionCount returns the number of captured regions.
RegionCount() uint64
// ResolvedTs returns the current ingress resolved ts.
ResolvedTs() model.Ts
// CommitTs returns the current ingress commit ts.
CommitTs() model.Ts
}

// NewCDCKVClient is the constructor of CDC KV client
Expand All @@ -212,14 +220,20 @@ type CDCClient struct {

grpcPool GrpcPool

regionCache *tikv.RegionCache
pdClock pdutil.Clock
regionCache *tikv.RegionCache
pdClock pdutil.Clock
regionLimiters *regionEventFeedLimiters

changefeed model.ChangeFeedID
tableID model.TableID
tableName string

regionLimiters *regionEventFeedLimiters
regionCounts struct {
sync.Mutex
counts *list.List
}
ingressCommitTs model.Ts
ingressResolvedTs model.Ts
}

// NewCDCClient creates a CDCClient instance
Expand Down Expand Up @@ -248,6 +262,12 @@ func NewCDCClient(
changefeed: changefeed,
tableID: tableID,
tableName: tableName,
regionCounts: struct {
sync.Mutex
counts *list.List
}{
counts: list.New(),
},
}
return
}
Expand Down Expand Up @@ -303,9 +323,35 @@ func (c *CDCClient) EventFeed(
lockResolver txnutil.LockResolver,
eventCh chan<- model.RegionFeedEvent,
) error {
c.regionCounts.Lock()
regionCount := int64(0)
c.regionCounts.counts.PushBack(&regionCount)
c.regionCounts.Unlock()
s := newEventFeedSession(
ctx, c, span, lockResolver, ts, eventCh, c.changefeed, c.tableID, c.tableName)
return s.eventFeed(ctx, ts)
return s.eventFeed(ctx, ts, &regionCount)
}

// RegionCount returns the number of captured regions.
func (c *CDCClient) RegionCount() uint64 {
c.regionCounts.Lock()
defer c.regionCounts.Unlock()

totalCount := uint64(0)
for e := c.regionCounts.counts.Front(); e != nil; e = e.Next() {
totalCount += uint64(atomic.LoadInt64(e.Value.(*int64)))
}
return totalCount
}

// ResolvedTs returns the current ingress resolved ts.
func (c *CDCClient) ResolvedTs() model.Ts {
return atomic.LoadUint64(&c.ingressResolvedTs)
}

// CommitTs returns the current ingress commit ts.
func (c *CDCClient) CommitTs() model.Ts {
return atomic.LoadUint64(&c.ingressCommitTs)
}

var currentID uint64 = 0
Expand Down Expand Up @@ -403,7 +449,7 @@ func newEventFeedSession(
}
}

func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64, regionCount *int64) error {
eventFeedGauge.Inc()
defer eventFeedGauge.Dec()

Expand All @@ -414,7 +460,7 @@ func (s *eventFeedSession) eventFeed(ctx context.Context, ts uint64) error {
})

g.Go(func() error {
return s.requestRegionToStore(ctx, g)
return s.requestRegionToStore(ctx, g, regionCount)
})

g.Go(func() error {
Expand Down Expand Up @@ -593,6 +639,7 @@ func (s *eventFeedSession) onRegionFail(ctx context.Context, errorInfo regionErr
func (s *eventFeedSession) requestRegionToStore(
ctx context.Context,
g *errgroup.Group,
regionCount *int64,
) error {
// Stores pending regions info for each stream. After sending a new request, the region info wil be put to the map,
// and it will be loaded by the receiver thread when it receives the first response from that region. We need this
Expand Down Expand Up @@ -682,7 +729,8 @@ func (s *eventFeedSession) requestRegionToStore(

g.Go(func() error {
defer s.deleteStream(storeAddr)
return s.receiveFromStream(ctx, g, storeAddr, storeID, stream.client, pendingRegions)
return s.receiveFromStream(
ctx, g, storeAddr, storeID, stream.client, pendingRegions, regionCount)
})
}

Expand Down Expand Up @@ -1035,6 +1083,7 @@ func (s *eventFeedSession) receiveFromStream(
storeID uint64,
stream cdcpb.ChangeData_EventFeedClient,
pendingRegions *syncRegionFeedStateMap,
regionCount *int64,
) error {
// Cancel the pending regions if the stream failed.
// Otherwise, it will remain unhandled in the pendingRegions list
Expand Down Expand Up @@ -1067,6 +1116,7 @@ func (s *eventFeedSession) receiveFromStream(
return worker.run(ctx)
})

maxCommitTs := model.Ts(0)
for {
cevent, err := stream.Recv()

Expand Down Expand Up @@ -1141,6 +1191,14 @@ func (s *eventFeedSession) receiveFromStream(
zap.Int("resolvedRegionCount", regionCount))
}

if len(cevent.Events) != 0 {
if entries, ok := cevent.Events[0].Event.(*cdcpb.Event_Entries_); ok {
commitTs := entries.Entries.Entries[0].CommitTs
if maxCommitTs < commitTs {
maxCommitTs = commitTs
}
}
}
err = s.sendRegionChangeEvents(ctx, cevent.Events, worker, pendingRegions, addr)
if err != nil {
return err
Expand All @@ -1151,6 +1209,17 @@ func (s *eventFeedSession) receiveFromStream(
if err != nil {
return err
}
// TiKV send resolved ts events every second by default.
// We check and update region count here to save CPU.
atomic.StoreInt64(regionCount, worker.statesManager.regionCount())
atomic.StoreUint64(&s.client.ingressResolvedTs, cevent.ResolvedTs.Ts)
if maxCommitTs == 0 {
// In case, there is no write for the table,
// we use resolved ts as maxCommitTs to make the stats meaningful.
atomic.StoreUint64(&s.client.ingressCommitTs, cevent.ResolvedTs.Ts)
} else {
atomic.StoreUint64(&s.client.ingressCommitTs, maxCommitTs)
}
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions cdc/kv/region_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,3 +288,10 @@ func (rsm *regionStateManager) delState(regionID uint64) {
bucket := rsm.getBucket(regionID)
rsm.states[bucket].delByRegionID(regionID)
}

func (rsm *regionStateManager) regionCount() (count int64) {
for _, bucket := range rsm.states {
count += int64(bucket.len())
}
return
}
25 changes: 0 additions & 25 deletions cdc/model/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,31 +176,6 @@ func (o *TableOperation) Clone() *TableOperation {
return &clone
}

// TaskWorkload records the workloads of a task
// the value of the struct is the workload
type TaskWorkload map[TableID]WorkloadInfo

// WorkloadInfo records the workload info of a table
type WorkloadInfo struct {
Workload uint64 `json:"workload"`
}

// Unmarshal unmarshals into *TaskWorkload from json marshal byte slice
func (w *TaskWorkload) Unmarshal(data []byte) error {
err := json.Unmarshal(data, w)
return errors.Annotatef(
cerror.WrapError(cerror.ErrUnmarshalFailed, err), "Unmarshal data: %v", data)
}

// Marshal returns the json marshal format of a TaskWorkload
func (w *TaskWorkload) Marshal() (string, error) {
if w == nil {
return "{}", nil
}
data, err := json.Marshal(w)
return string(data), cerror.WrapError(cerror.ErrMarshalFailed, err)
}

// TableReplicaInfo records the table replica info
type TableReplicaInfo struct {
StartTs Ts `json:"start-ts"`
Expand Down
24 changes: 0 additions & 24 deletions cdc/model/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,30 +114,6 @@ func TestTableOperationState(t *testing.T) {
require.Nil(t, nilTableOper.Clone())
}

func TestTaskWorkloadMarshal(t *testing.T) {
t.Parallel()

workload := &TaskWorkload{
12: WorkloadInfo{Workload: uint64(1)},
15: WorkloadInfo{Workload: uint64(3)},
}
expected := `{"12":{"workload":1},"15":{"workload":3}}`

data, err := workload.Marshal()
require.Nil(t, err)
require.Equal(t, expected, data)

newWorkload := &TaskWorkload{}
err = newWorkload.Unmarshal([]byte(data))
require.Nil(t, err)
require.Equal(t, workload, newWorkload)

workload = nil
data, err = workload.Marshal()
require.Nil(t, err)
require.Equal(t, "{}", data)
}

func TestShouldBeDeepCopy(t *testing.T) {
t.Parallel()

Expand Down
24 changes: 18 additions & 6 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ import (
"go.uber.org/zap"
)

// newSchedulerV2FromCtx creates a new schedulerV2 from context.
// newSchedulerFromCtx creates a new schedulerV2 from context.
// This function is factored out to facilitate unit testing.
func newSchedulerV2FromCtx(
func newSchedulerFromCtx(
ctx cdcContext.Context, startTs uint64,
) (ret scheduler.Scheduler, err error) {
changeFeedID := ctx.ChangefeedVars().ID
Expand All @@ -64,8 +64,10 @@ func newSchedulerV2FromCtx(
return ret, errors.Trace(err)
}

func newScheduler(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error) {
return newSchedulerV2FromCtx(ctx, startTs)
func newScheduler(
ctx cdcContext.Context, startTs uint64,
) (scheduler.Scheduler, error) {
return newSchedulerFromCtx(ctx, startTs)
}

type changefeed struct {
Expand Down Expand Up @@ -119,6 +121,7 @@ type changefeed struct {
metricsChangefeedResolvedTsGauge prometheus.Gauge
metricsChangefeedResolvedTsLagGauge prometheus.Gauge
metricsChangefeedResolvedTsLagDuration prometheus.Observer
metricsCurrentPDTsGauge prometheus.Gauge

metricsChangefeedBarrierTsGauge prometheus.Gauge
metricsChangefeedTickDuration prometheus.Observer
Expand All @@ -131,7 +134,9 @@ type changefeed struct {
) (puller.DDLPuller, error)

newSink func() DDLSink
newScheduler func(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error)
newScheduler func(
ctx cdcContext.Context, startTs uint64,
) (scheduler.Scheduler, error)

lastDDLTs uint64 // Timestamp of the last executed DDL. Only used for tests.
}
Expand Down Expand Up @@ -169,7 +174,9 @@ func newChangefeed4Test(
changefeed model.ChangeFeedID,
) (puller.DDLPuller, error),
newSink func() DDLSink,
newScheduler func(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error),
newScheduler func(
ctx cdcContext.Context, startTs uint64,
) (scheduler.Scheduler, error),
) *changefeed {
c := newChangefeed(id, state, up)
c.newDDLPuller = newDDLPuller
Expand Down Expand Up @@ -549,6 +556,7 @@ func (c *changefeed) initMetrics() {
WithLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedResolvedTsLagDuration = changefeedResolvedTsLagDuration.
WithLabelValues(c.id.Namespace, c.id.ID)
c.metricsCurrentPDTsGauge = currentPDTsGauge.WithLabelValues(c.id.Namespace, c.id.ID)

c.metricsChangefeedBarrierTsGauge = changefeedBarrierTsGauge.
WithLabelValues(c.id.Namespace, c.id.ID)
Expand Down Expand Up @@ -615,9 +623,11 @@ func (c *changefeed) cleanupMetrics() {
changefeedResolvedTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
changefeedResolvedTsLagGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
changefeedResolvedTsLagDuration.DeleteLabelValues(c.id.Namespace, c.id.ID)
currentPDTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedResolvedTsGauge = nil
c.metricsChangefeedResolvedTsLagGauge = nil
c.metricsChangefeedResolvedTsLagDuration = nil
c.metricsCurrentPDTsGauge = nil

changefeedTickDuration.DeleteLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedTickDuration = nil
Expand Down Expand Up @@ -903,6 +913,8 @@ func (c *changefeed) updateMetrics(currentTs int64, checkpointTs, resolvedTs mod
resolvedLag := float64(currentTs-phyRTs) / 1e3
c.metricsChangefeedResolvedTsLagGauge.Set(resolvedLag)
c.metricsChangefeedResolvedTsLagDuration.Observe(resolvedLag)

c.metricsCurrentPDTsGauge.Set(float64(currentTs))
}

func (c *changefeed) updateStatus(checkpointTs, resolvedTs model.Ts) {
Expand Down
8 changes: 8 additions & 0 deletions cdc/owner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ var (
Name: "checkpoint_ts_lag",
Help: "checkpoint ts lag of changefeeds in seconds",
}, []string{"namespace", "changefeed"})
currentPDTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "current_pd_ts",
Help: "The current PD ts",
}, []string{"namespace", "changefeed"})

changefeedCheckpointLagDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand Down Expand Up @@ -134,6 +141,7 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(changefeedResolvedTsGauge)
registry.MustRegister(changefeedResolvedTsLagGauge)
registry.MustRegister(changefeedResolvedTsLagDuration)
registry.MustRegister(currentPDTsGauge)

registry.MustRegister(ownershipCounter)
registry.MustRegister(changefeedStatusGauge)
Expand Down
8 changes: 6 additions & 2 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ func newOwner4Test(
changefeed model.ChangeFeedID,
) (puller.DDLPuller, error),
newSink func() DDLSink,
newScheduler func(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error),
newScheduler func(
ctx cdcContext.Context, startTs uint64,
) (scheduler.Scheduler, error),
pdClient pd.Client,
) Owner {
m := upstream.NewManager4Test(pdClient)
Expand Down Expand Up @@ -100,7 +102,9 @@ func createOwner4Test(ctx cdcContext.Context, t *testing.T) (*ownerImpl, *orches
return &mockDDLSink{}
},
// new scheduler
func(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error) {
func(
ctx cdcContext.Context, startTs uint64,
) (scheduler.Scheduler, error) {
return &mockScheduler{}, nil
},
pdClient,
Expand Down
Loading