Skip to content

Commit

Permalink
Merge branch 'release-4.0' into cherry-pick-3002-to-release-4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand authored Oct 14, 2021
2 parents 3dfb369 + d33e738 commit b88d5d8
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 30 deletions.
8 changes: 8 additions & 0 deletions cdc/kv/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ var (
Name: "region_token",
Help: "size of region token in kv client",
}, []string{"store", "changefeed", "capture"})
cachedRegionSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "kvclient",
Name: "cached_region",
Help: "cached region that has not requested to TiKV in kv client",
}, []string{"store", "changefeed", "capture"})
batchResolvedEventSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Expand Down Expand Up @@ -113,6 +120,7 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(sendEventCounter)
registry.MustRegister(clientChannelSize)
registry.MustRegister(clientRegionTokenSize)
registry.MustRegister(cachedRegionSize)
registry.MustRegister(batchResolvedEventSize)
registry.MustRegister(etcdRequestCounter)
registry.MustRegister(grpcPoolStreamGauge)
Expand Down
25 changes: 20 additions & 5 deletions cdc/kv/token_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,28 @@ type LimitRegionRouter interface {
Run(ctx context.Context) error
}

// srrMetrics keeps metrics of a Sized Region Router
type srrMetrics struct {
capture string
changefeed string
tokens map[string]prometheus.Gauge
// mapping from id(TiKV store address) to token used
tokens map[string]prometheus.Gauge
// mapping from id(TiKV store address) to cached regions
cachedRegions map[string]prometheus.Gauge
}

func newSrrMetrics(ctx context.Context) *srrMetrics {
captureAddr := util.CaptureAddrFromCtx(ctx)
changefeed := util.ChangefeedIDFromCtx(ctx)
return &srrMetrics{
capture: captureAddr,
changefeed: changefeed,
tokens: make(map[string]prometheus.Gauge),
capture: captureAddr,
changefeed: changefeed,
tokens: make(map[string]prometheus.Gauge),
cachedRegions: make(map[string]prometheus.Gauge),
}
}

// each changefeed on a capture maintains a sizedRegionRouter
type sizedRegionRouter struct {
buffer map[string][]singleRegionInfo
output chan singleRegionInfo
Expand Down Expand Up @@ -96,10 +102,16 @@ func (r *sizedRegionRouter) AddRegion(sri singleRegionInfo) {
r.output <- sri
} else {
r.buffer[id] = append(r.buffer[id], sri)
if _, ok := r.metrics.cachedRegions[id]; !ok {
r.metrics.cachedRegions[id] = cachedRegionSize.WithLabelValues(id, r.metrics.changefeed, r.metrics.capture)
}
r.metrics.cachedRegions[id].Inc()
}
r.lock.Unlock()
}

// Acquire implements LimitRegionRouter.Acquire
// param: id is TiKV store address
func (r *sizedRegionRouter) Acquire(id string) {
r.lock.Lock()
defer r.lock.Unlock()
Expand All @@ -110,6 +122,8 @@ func (r *sizedRegionRouter) Acquire(id string) {
r.metrics.tokens[id].Inc()
}

// Release implements LimitRegionRouter.Release
// param: id is TiKV store address
func (r *sizedRegionRouter) Release(id string) {
r.lock.Lock()
defer r.lock.Unlock()
Expand All @@ -131,7 +145,7 @@ func (r *sizedRegionRouter) Run(ctx context.Context) error {
r.lock.Lock()
for id, buf := range r.buffer {
available := r.sizeLimit - r.tokens[id]
// the tokens used could be more then size limit, since we have
// the tokens used could be more than size limit, since we have
// a sized channel as level1 cache
if available <= 0 {
continue
Expand All @@ -156,6 +170,7 @@ func (r *sizedRegionRouter) Run(ctx context.Context) error {
}
}
r.buffer[id] = r.buffer[id][available:]
r.metrics.cachedRegions[id].Sub(float64(available))
}
r.lock.Unlock()
}
Expand Down
14 changes: 6 additions & 8 deletions cdc/owner/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,19 @@ func newDDLPuller(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) {
}, nil
}

const ddlPullerName = "DDL_PULLER"

func (h *ddlPullerImpl) Run(ctx cdcContext.Context) error {
ctx, cancel := cdcContext.WithCancel(ctx)
h.cancel = cancel
log.Debug("DDL puller started", zap.String("changefeed-id", ctx.ChangefeedVars().ID))
stdCtx := util.PutTableInfoInCtx(ctx, -1, ddlPullerName)
stdCtx := util.PutTableInfoInCtx(ctx, -1, puller.DDLPullerTableName)
stdCtx = util.PutChangefeedIDInCtx(stdCtx, ctx.ChangefeedVars().ID)
errg, stdCtx := errgroup.WithContext(stdCtx)
ctx = cdcContext.WithStd(ctx, stdCtx)

errg.Go(func() error {
return h.puller.Run(ctx)
return h.puller.Run(stdCtx)
})

rawDDLCh := puller.SortOutput(ctx, h.puller.Output())
rawDDLCh := puller.SortOutput(stdCtx, h.puller.Output())

receiveDDL := func(rawDDL *model.RawKVEntry) error {
if rawDDL == nil {
Expand Down Expand Up @@ -130,8 +128,8 @@ func (h *ddlPullerImpl) Run(ctx cdcContext.Context) error {
errg.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-stdCtx.Done():
return stdCtx.Err()
case e := <-rawDDLCh:
if err := receiveDDL(e); err != nil {
return errors.Trace(err)
Expand Down
6 changes: 4 additions & 2 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,10 @@ func (p *processor) createAndDriveSchemaStorage(ctx cdcContext.Context) (entry.S
kvStorage := ctx.GlobalVars().KVStorage
ddlspans := []regionspan.Span{regionspan.GetDDLSpan(), regionspan.GetAddIndexDDLSpan()}
checkpointTs := p.changefeed.Info.GetCheckpointTs(p.changefeed.Status)
stdCtx := util.PutTableInfoInCtx(ctx, -1, puller.DDLPullerTableName)
stdCtx = util.PutChangefeedIDInCtx(stdCtx, ctx.ChangefeedVars().ID)
ddlPuller := puller.NewPuller(
ctx,
stdCtx,
ctx.GlobalVars().PDClient,
ctx.GlobalVars().GrpcPool,
ctx.GlobalVars().KVStorage,
Expand All @@ -458,7 +460,7 @@ func (p *processor) createAndDriveSchemaStorage(ctx cdcContext.Context) (entry.S
p.wg.Add(1)
go func() {
defer p.wg.Done()
p.sendError(ddlPuller.Run(ctx))
p.sendError(ddlPuller.Run(stdCtx))
}()
ddlRawKVCh := puller.SortOutput(ctx, ddlPuller.Output())
p.wg.Add(1)
Expand Down
3 changes: 3 additions & 0 deletions cdc/puller/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ import (
"golang.org/x/sync/errgroup"
)

// DDLPullerTableName is the fake table name for ddl puller
const DDLPullerTableName = "DDL_PULLER"

const (
defaultPullerEventChanSize = 128
defaultPullerOutputChanSize = 128
Expand Down
6 changes: 3 additions & 3 deletions cmd/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (s *serverSuite) TestLoadAndVerifyServerConfig(c *check.C) {
KeyPath: "cc",
CertAllowedCN: []string{"dd", "ee"},
},
PerTableMemoryQuota: 20 * 1024 * 1024, // 20M
PerTableMemoryQuota: 10 * 1024 * 1024, // 10M
KVClient: &config.KVClientConfig{
WorkerConcurrent: 8,
WorkerPoolSize: 0,
Expand Down Expand Up @@ -241,7 +241,7 @@ sort-dir = "/tmp/just_a_test"
SortDir: config.DefaultSortDir,
},
Security: &config.SecurityConfig{},
PerTableMemoryQuota: 20 * 1024 * 1024, // 20M
PerTableMemoryQuota: 10 * 1024 * 1024, // 10M
KVClient: &config.KVClientConfig{
WorkerConcurrent: 8,
WorkerPoolSize: 0,
Expand Down Expand Up @@ -309,7 +309,7 @@ cert-allowed-cn = ["dd","ee"]
KeyPath: "cc",
CertAllowedCN: []string{"dd", "ee"},
},
PerTableMemoryQuota: 20 * 1024 * 1024, // 20M
PerTableMemoryQuota: 10 * 1024 * 1024, // 10M
KVClient: &config.KVClientConfig{
WorkerConcurrent: 8,
WorkerPoolSize: 0,
Expand Down
Loading

0 comments on commit b88d5d8

Please sign in to comment.