From fc4adf45577f1c568f13f0a81a3c53d9e961a91e Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 10 Jun 2021 21:30:30 +0800 Subject: [PATCH] kv/client: cherry pick #1591,#1674,#1682,#1481,#1847,#1903 to release-5.0 (#1922) --- cdc/kv/client.go | 26 +- cdc/kv/client_bench_test.go | 48 ++- cdc/kv/client_test.go | 417 +++++++++++++++++++++++++- cdc/kv/client_v2.go | 18 +- cdc/kv/region_worker.go | 526 ++++++++++++++++++++++++++------- cdc/kv/region_worker_test.go | 26 ++ cdc/kv/resolvedts_heap.go | 100 +++++-- cdc/kv/resolvedts_heap_test.go | 58 +++- cdc/server.go | 5 + cmd/server_test.go | 12 + errors.toml | 5 + pkg/config/config.go | 12 +- pkg/config/config_test.go | 4 +- pkg/config/kvclient.go | 20 ++ pkg/errors/errors.go | 1 + pkg/util/testleak/leaktest.go | 3 + 16 files changed, 1111 insertions(+), 170 deletions(-) create mode 100644 pkg/config/kvclient.go diff --git a/cdc/kv/client.go b/cdc/kv/client.go index e78efaf5be1..d7aa4dfd7ab 100644 --- a/cdc/kv/client.go +++ b/cdc/kv/client.go @@ -71,15 +71,15 @@ const ( // defines the scan region limit for each table regionScanLimitPerTable = 40 - - // time interval to force kv client to terminate gRPC stream and reconnect - reconnectInterval = 15 * time.Minute ) +// time interval to force kv client to terminate gRPC stream and reconnect +var reconnectInterval = 15 * time.Minute + // hard code switch // true: use kv client v2, which has a region worker for each stream // false: use kv client v1, which runs a goroutine for every single region -var enableKVClientV2 = false +var enableKVClientV2 = true type singleRegionInfo struct { verID tikv.RegionVerID @@ -177,6 +177,24 @@ func (s *regionFeedState) isStopped() bool { return atomic.LoadInt32(&s.stopped) > 0 } +func (s *regionFeedState) isInitialized() bool { + s.lock.RLock() + defer s.lock.RUnlock() + return s.initialized +} + +func (s *regionFeedState) getLastResolvedTs() uint64 { + s.lock.RLock() + defer s.lock.RUnlock() + return s.lastResolvedTs +} + +func (s *regionFeedState) getRegionSpan() regionspan.ComparableSpan { + s.lock.RLock() + defer s.lock.RUnlock() + return s.sri.span +} + type syncRegionFeedStateMap struct { mu *sync.Mutex regionInfoMap map[uint64]*regionFeedState diff --git a/cdc/kv/client_bench_test.go b/cdc/kv/client_bench_test.go index d347eb66c1b..5329df61b05 100644 --- a/cdc/kv/client_bench_test.go +++ b/cdc/kv/client_bench_test.go @@ -389,12 +389,34 @@ func benchmarkSingleWorkerResolvedTs(b *testing.B, clientV2 bool) { } } +func benchmarkResolvedTsClientV2(b *testing.B) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + InitWorkerPool() + go func() { + RunWorkerPool(ctx) //nolint:errcheck + }() + benchmarkSingleWorkerResolvedTs(b, true /* clientV2 */) +} + func BenchmarkResolvedTsClientV1(b *testing.B) { benchmarkSingleWorkerResolvedTs(b, false /* clientV1 */) } func BenchmarkResolvedTsClientV2(b *testing.B) { - benchmarkSingleWorkerResolvedTs(b, true /* clientV2 */) + benchmarkResolvedTsClientV2(b) +} + +func BenchmarkResolvedTsClientV2WorkerPool(b *testing.B) { + hwm := regionWorkerHighWatermark + lwm := regionWorkerLowWatermark + regionWorkerHighWatermark = 10000 + regionWorkerLowWatermark = 2000 + defer func() { + regionWorkerHighWatermark = hwm + regionWorkerLowWatermark = lwm + }() + benchmarkResolvedTsClientV2(b) } func benchmarkMultipleStoreResolvedTs(b *testing.B, clientV2 bool) { @@ -489,10 +511,32 @@ func benchmarkMultipleStoreResolvedTs(b *testing.B, clientV2 bool) { } } +func benchmarkMultiStoreResolvedTsClientV2(b *testing.B) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + InitWorkerPool() + go func() { + RunWorkerPool(ctx) //nolint:errcheck + }() + benchmarkMultipleStoreResolvedTs(b, true /* clientV2 */) +} + func BenchmarkMultiStoreResolvedTsClientV1(b *testing.B) { benchmarkMultipleStoreResolvedTs(b, false /* clientV1 */) } func BenchmarkMultiStoreResolvedTsClientV2(b *testing.B) { - benchmarkMultipleStoreResolvedTs(b, true /* clientV2 */) + benchmarkMultiStoreResolvedTsClientV2(b) +} + +func BenchmarkMultiStoreResolvedTsClientV2WorkerPool(b *testing.B) { + hwm := regionWorkerHighWatermark + lwm := regionWorkerLowWatermark + regionWorkerHighWatermark = 1000 + regionWorkerLowWatermark = 200 + defer func() { + regionWorkerHighWatermark = hwm + regionWorkerLowWatermark = lwm + }() + benchmarkMultiStoreResolvedTsClientV2(b) } diff --git a/cdc/kv/client_test.go b/cdc/kv/client_test.go index 55dd7ce1b8b..52b227af288 100644 --- a/cdc/kv/client_test.go +++ b/cdc/kv/client_test.go @@ -17,6 +17,8 @@ import ( "context" "fmt" "net" + "runtime" + "strings" "sync" "sync/atomic" "testing" @@ -31,6 +33,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/config" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/regionspan" "github.com/pingcap/ticdc/pkg/retry" @@ -46,7 +49,15 @@ import ( "google.golang.org/grpc" ) -func Test(t *testing.T) { check.TestingT(t) } +func Test(t *testing.T) { + conf := config.GetDefaultServerConfig() + config.StoreGlobalServerConfig(conf) + InitWorkerPool() + go func() { + RunWorkerPool(context.Background()) //nolint:errcheck + }() + check.TestingT(t) +} type clientSuite struct { } @@ -279,7 +290,7 @@ func newMockServiceSpecificAddr( // waitRequestID waits request ID larger than the given allocated ID func waitRequestID(c *check.C, allocatedID uint64) { - err := retry.Run(time.Millisecond*20, 10, func() error { + err := retry.Run(time.Millisecond*10, 20, func() error { if currentRequestID() > allocatedID { return nil } @@ -688,8 +699,7 @@ func (s *etcdSuite) TestCompatibilityWithSameConn(c *check.C) { cancel() } -func (s *etcdSuite) TestHandleFeedEvent(c *check.C) { - defer testleak.AfterTest(c)() +func (s *etcdSuite) testHandleFeedEvent(c *check.C) { defer s.TearDownTest(c) ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} @@ -966,6 +976,17 @@ func (s *etcdSuite) TestHandleFeedEvent(c *check.C) { Ts: 145, }, } + multiSize := 100 + regions := make([]uint64, multiSize) + for i := range regions { + regions[i] = 3 + } + multipleResolved := &cdcpb.ChangeDataEvent{ + ResolvedTs: &cdcpb.ResolvedTs{ + Regions: regions, + Ts: 160, + }, + } expected := []*model.RegionFeedEvent{ { @@ -1055,6 +1076,13 @@ func (s *etcdSuite) TestHandleFeedEvent(c *check.C) { RegionID: 3, }, } + multipleExpected := &model.RegionFeedEvent{ + Resolved: &model.ResolvedSpan{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + ResolvedTs: 160, + }, + RegionID: 3, + } ch1 <- eventsBeforeInit ch1 <- initialized @@ -1071,9 +1099,37 @@ func (s *etcdSuite) TestHandleFeedEvent(c *check.C) { } } + ch1 <- multipleResolved + for i := 0; i < multiSize; i++ { + select { + case event := <-eventCh: + c.Assert(event, check.DeepEquals, multipleExpected) + case <-time.After(time.Second): + c.Errorf("expected event %v not received", multipleExpected) + } + } + cancel() } +func (s *etcdSuite) TestHandleFeedEvent(c *check.C) { + defer testleak.AfterTest(c)() + s.testHandleFeedEvent(c) +} + +func (s *etcdSuite) TestHandleFeedEventWithWorkerPool(c *check.C) { + defer testleak.AfterTest(c)() + hwm := regionWorkerHighWatermark + lwm := regionWorkerLowWatermark + regionWorkerHighWatermark = 8 + regionWorkerLowWatermark = 2 + defer func() { + regionWorkerHighWatermark = hwm + regionWorkerLowWatermark = lwm + }() + s.testHandleFeedEvent(c) +} + // TestStreamSendWithError mainly tests the scenario that the `Send` call of a gPRC // stream of kv client meets error, and kv client can clean up the broken stream, // establish a new one and recover the normal evend feed processing. @@ -1182,6 +1238,13 @@ func (s *etcdSuite) TestStreamSendWithError(c *check.C) { expectedInitRegions := map[uint64]struct{}{regionID3: {}, regionID4: {}} c.Assert(initRegions, check.DeepEquals, expectedInitRegions) + // a hack way to check the goroutine count of region worker is 1 + buf := make([]byte, 1<<20) + stacklen := runtime.Stack(buf, true) + stack := string(buf[:stacklen]) + c.Assert(strings.Count(stack, "resolveLock"), check.Equals, 1) + c.Assert(strings.Count(stack, "collectWorkpoolError"), check.Equals, 1) + cancel() } @@ -1293,7 +1356,6 @@ func (s *etcdSuite) testStreamRecvWithError(c *check.C, failpointStr string) { for _, expectedEv := range expected { select { case event := <-eventCh: - log.Info("receive event", zap.Reflect("event", event), zap.Reflect("expected", expectedEv)) c.Assert(event, check.DeepEquals, expectedEv) case <-time.After(time.Second): c.Errorf("expected event %v not received", expectedEv) @@ -2712,11 +2774,19 @@ func (s *etcdSuite) TestKVClientForceReconnect(c *check.C) { defer s.TearDownTest(c) clientv2 := enableKVClientV2 - enableKVClientV2 = false defer func() { enableKVClientV2 = clientv2 }() + // test kv client v1 + enableKVClientV2 = false + s.testKVClientForceReconnect(c) + + enableKVClientV2 = true + s.testKVClientForceReconnect(c) +} + +func (s *etcdSuite) testKVClientForceReconnect(c *check.C) { ctx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} @@ -2751,11 +2821,15 @@ func (s *etcdSuite) TestKVClientForceReconnect(c *check.C) { cluster.AddStore(1, addr1) cluster.Bootstrap(regionID3, []uint64{1}, []uint64{4}, 4) - err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientForceReconnect", "return(true)") + err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientResolveLockInterval", "return(1)") c.Assert(err, check.IsNil) + originalReconnectInterval := reconnectInterval + reconnectInterval = 3 * time.Second defer func() { - _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientForceReconnect") + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientResolveLockInterval") + reconnectInterval = originalReconnectInterval }() + lockresolver := txnutil.NewLockerResolver(kvStorage) isPullInit := &mockPullerInit{} cdcClient := NewCDCClient(ctx, pdClient, kvStorage, &security.Credential{}) @@ -2774,8 +2848,20 @@ func (s *etcdSuite) TestKVClientForceReconnect(c *check.C) { ch1 <- initialized <-server1Stopped + + var requestIds sync.Map ch2 := make(chan *cdcpb.ChangeDataEvent, 10) srv2 := newMockChangeDataService(c, ch2) + srv2.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) { + for { + req, err := server.Recv() + if err != nil { + log.Error("mock server error", zap.Error(err)) + return + } + requestIds.Store(req.RegionId, req.RequestId) + } + } // Reuse the same listen addresss as server 1 to simulate TiKV handles the // gRPC stream terminate and reconnect. server2, _ := newMockServiceSpecificAddr(ctx, c, srv2, addr1, wg) @@ -2785,14 +2871,26 @@ func (s *etcdSuite) TestKVClientForceReconnect(c *check.C) { wg.Wait() }() - waitRequestID(c, baseAllocatedID+2) - initialized = mockInitializedEvent(regionID3, currentRequestID()) + // The second TiKV could start up slowly, which causes the kv client retries + // to TiKV for more than one time, so we can't determine the correct requestID + // here, we must use the real request ID received by TiKV server + err = retry.Run(time.Millisecond*300, 10, func() error { + _, ok := requestIds.Load(regionID3) + if ok { + return nil + } + return errors.New("waiting for kv client requests received by server") + }) + c.Assert(err, check.IsNil) + requestID, _ := requestIds.Load(regionID3) + + initialized = mockInitializedEvent(regionID3, requestID.(uint64)) ch2 <- initialized resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ { RegionId: regionID3, - RequestId: currentRequestID(), + RequestId: requestID.(uint64), Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 135}, }, }} @@ -2952,3 +3050,300 @@ checkEvent: cancel() } + +// TestKVClientForceReconnect2 tests force reconnect gRPC stream can work, this +// test mocks the reconnectInterval tool, and simulate un-initialized regions +// can be reconnected. +func (s *etcdSuite) TestKVClientForceReconnect2(c *check.C) { + defer testleak.AfterTest(c)() + defer s.TearDownTest(c) + + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + + server1Stopped := make(chan struct{}) + ch1 := make(chan *cdcpb.ChangeDataEvent, 10) + srv1 := newMockChangeDataService(c, ch1) + server1, addr1 := newMockService(ctx, c, srv1, wg) + srv1.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) { + defer func() { + close(ch1) + server1.Stop() + server1Stopped <- struct{}{} + }() + for { + _, err := server.Recv() + if err != nil { + log.Error("mock server error", zap.Error(err)) + break + } + } + } + + rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("") + c.Assert(err, check.IsNil) + pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} + tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) + c.Assert(err, check.IsNil) + kvStorage := newStorageWithCurVersionCache(tiStore, addr1) + defer kvStorage.Close() //nolint:errcheck + + regionID3 := uint64(3) + cluster.AddStore(1, addr1) + cluster.Bootstrap(regionID3, []uint64{1}, []uint64{4}, 4) + + originalReconnectInterval := reconnectInterval + reconnectInterval = 3 * time.Second + // check interval is less than reconnect interval, so we can test both the hit and miss case + err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientCheckUnInitRegionInterval", "return(1)") + c.Assert(err, check.IsNil) + defer func() { + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientCheckUnInitRegionInterval") + reconnectInterval = originalReconnectInterval + }() + lockresolver := txnutil.NewLockerResolver(kvStorage) + isPullInit := &mockPullerInit{} + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, &security.Credential{}) + eventCh := make(chan *model.RegionFeedEvent, 10) + wg.Add(1) + go func() { + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, 100, false, lockresolver, isPullInit, eventCh) + c.Assert(errors.Cause(err), check.Equals, context.Canceled) + cdcClient.Close() //nolint:errcheck + wg.Done() + }() + + baseAllocatedID := currentRequestID() + waitRequestID(c, baseAllocatedID+1) + committed := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ + { + RegionId: 3, + RequestId: currentRequestID(), + Event: &cdcpb.Event_Entries_{ + Entries: &cdcpb.Event_Entries{ + Entries: []*cdcpb.Event_Row{{ + Type: cdcpb.Event_COMMITTED, + OpType: cdcpb.Event_Row_PUT, + Key: []byte("a"), + Value: []byte("b"), + StartTs: 105, + CommitTs: 115, + }}, + }, + }, + }, + }} + ch1 <- committed + + <-server1Stopped + + var requestIds sync.Map + ch2 := make(chan *cdcpb.ChangeDataEvent, 10) + srv2 := newMockChangeDataService(c, ch2) + srv2.recvLoop = func(server cdcpb.ChangeData_EventFeedServer) { + for { + req, err := server.Recv() + if err != nil { + log.Error("mock server error", zap.Error(err)) + return + } + requestIds.Store(req.RegionId, req.RequestId) + } + } + // Reuse the same listen addresss as server 1 to simulate TiKV handles the + // gRPC stream terminate and reconnect. + server2, _ := newMockServiceSpecificAddr(ctx, c, srv2, addr1, wg) + defer func() { + close(ch2) + server2.Stop() + wg.Wait() + }() + + // The second TiKV could start up slowly, which causes the kv client retries + // to TiKV for more than one time, so we can't determine the correct requestID + // here, we must use the real request ID received by TiKV server + err = retry.Run(time.Millisecond*300, 10, func() error { + _, ok := requestIds.Load(regionID3) + if ok { + return nil + } + return errors.New("waiting for kv client requests received by server") + }) + c.Assert(err, check.IsNil) + requestID, _ := requestIds.Load(regionID3) + + initialized := mockInitializedEvent(regionID3, requestID.(uint64)) + ch2 <- initialized + + resolved := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ + { + RegionId: regionID3, + RequestId: requestID.(uint64), + Event: &cdcpb.Event_ResolvedTs{ResolvedTs: 135}, + }, + }} + ch2 <- resolved + + expected := []*model.RegionFeedEvent{ + { + Resolved: &model.ResolvedSpan{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, + ResolvedTs: 100, + }, + RegionID: regionID3, + }, + { + Val: &model.RawKVEntry{ + OpType: model.OpTypePut, + Key: []byte("a"), + Value: []byte("b"), + StartTs: 105, + CRTs: 115, + RegionID: 3, + }, + RegionID: 3, + }, + { + Resolved: &model.ResolvedSpan{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, + ResolvedTs: 100, + }, + RegionID: regionID3, + }, + { + Resolved: &model.ResolvedSpan{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("c")}, + ResolvedTs: 135, + }, + RegionID: regionID3, + }, + } + + for _, expectedEv := range expected { + select { + case event := <-eventCh: + c.Assert(event, check.DeepEquals, expectedEv) + case <-time.After(time.Second): + c.Errorf("expected event %v not received", expectedEv) + } + } + + cancel() +} + +// TestEvTimeUpdate creates a new event feed, send N committed events every 100ms, +// use failpoint to set reconnect interval to 1s, the last event time of region +// should be updated correctly and no reconnect is triggered +func (s *etcdSuite) TestEvTimeUpdate(c *check.C) { + defer testleak.AfterTest(c)() + + defer s.TearDownTest(c) + ctx, cancel := context.WithCancel(context.Background()) + wg := &sync.WaitGroup{} + + ch1 := make(chan *cdcpb.ChangeDataEvent, 10) + srv1 := newMockChangeDataService(c, ch1) + server1, addr1 := newMockService(ctx, c, srv1, wg) + + defer func() { + close(ch1) + server1.Stop() + wg.Wait() + }() + + rpcClient, cluster, pdClient, err := mocktikv.NewTiKVAndPDClient("") + c.Assert(err, check.IsNil) + pdClient = &mockPDClient{Client: pdClient, versionGen: defaultVersionGen} + tiStore, err := tikv.NewTestTiKVStore(rpcClient, pdClient, nil, nil, 0) + c.Assert(err, check.IsNil) + kvStorage := newStorageWithCurVersionCache(tiStore, addr1) + defer kvStorage.Close() //nolint:errcheck + + cluster.AddStore(1, addr1) + cluster.Bootstrap(3, []uint64{1}, []uint64{4}, 4) + + originalReconnectInterval := reconnectInterval + reconnectInterval = 1500 * time.Millisecond + err = failpoint.Enable("github.com/pingcap/ticdc/cdc/kv/kvClientCheckUnInitRegionInterval", "return(2)") + c.Assert(err, check.IsNil) + defer func() { + _ = failpoint.Disable("github.com/pingcap/ticdc/cdc/kv/kvClientCheckUnInitRegionInterval") + reconnectInterval = originalReconnectInterval + }() + + baseAllocatedID := currentRequestID() + lockresolver := txnutil.NewLockerResolver(kvStorage) + isPullInit := &mockPullerInit{} + cdcClient := NewCDCClient(ctx, pdClient, kvStorage, &security.Credential{}) + eventCh := make(chan *model.RegionFeedEvent, 10) + wg.Add(1) + go func() { + err := cdcClient.EventFeed(ctx, regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, 100, false, lockresolver, isPullInit, eventCh) + c.Assert(errors.Cause(err), check.Equals, context.Canceled) + cdcClient.Close() //nolint:errcheck + wg.Done() + }() + + // wait request id allocated with: new session, new request + waitRequestID(c, baseAllocatedID+1) + + eventCount := 20 + for i := 0; i < eventCount; i++ { + events := &cdcpb.ChangeDataEvent{Events: []*cdcpb.Event{ + { + RegionId: 3, + RequestId: currentRequestID(), + Event: &cdcpb.Event_Entries_{ + Entries: &cdcpb.Event_Entries{ + Entries: []*cdcpb.Event_Row{{ + Type: cdcpb.Event_COMMITTED, + OpType: cdcpb.Event_Row_PUT, + Key: []byte("aaaa"), + Value: []byte("committed put event before init"), + StartTs: 105, + CommitTs: 115, + }}, + }, + }, + }, + }} + ch1 <- events + time.Sleep(time.Millisecond * 100) + } + + expected := []*model.RegionFeedEvent{ + { + Resolved: &model.ResolvedSpan{ + Span: regionspan.ComparableSpan{Start: []byte("a"), End: []byte("b")}, + ResolvedTs: 100, + }, + RegionID: 3, + }, + { + Val: &model.RawKVEntry{ + OpType: model.OpTypePut, + Key: []byte("aaaa"), + Value: []byte("committed put event before init"), + StartTs: 105, + CRTs: 115, + RegionID: 3, + }, + RegionID: 3, + }, + } + + for i := 0; i < eventCount+1; i++ { + select { + case event := <-eventCh: + if i == 0 { + c.Assert(event, check.DeepEquals, expected[0]) + } else { + c.Assert(event, check.DeepEquals, expected[1]) + } + case <-time.After(time.Second): + c.Errorf("expected event not received, %d received", i) + } + } + + cancel() +} diff --git a/cdc/kv/client_v2.go b/cdc/kv/client_v2.go index 88ce7432a8b..cffc960b16e 100644 --- a/cdc/kv/client_v2.go +++ b/cdc/kv/client_v2.go @@ -37,6 +37,13 @@ type regionStatefulEvent struct { changeEvent *cdcpb.Event resolvedTs *cdcpb.ResolvedTs state *regionFeedState + + // regionID is used for load balancer, we don't use fileds in state to reduce lock usage + regionID uint64 + + // finishedCounter is used to mark events that are sent from a give region + // worker to this worker(one of the workers in worker pool) are all processed. + finishedCounter *int32 } func (s *eventFeedSession) sendRegionChangeEventV2( @@ -86,7 +93,6 @@ func (s *eventFeedSession) sendRegionChangeEventV2( } state.start() - // Then spawn the goroutine to process messages of this region. worker.setRegionState(event.RegionId, state) // send resolved event when starting a single event feed @@ -109,11 +115,18 @@ func (s *eventFeedSession) sendRegionChangeEventV2( return nil } + // TODO: If a region doesn't receive any event from TiKV, this region + // can't be reconnected since the region state is not initialized. + if !state.isInitialized() { + worker.notifyEvTimeUpdate(event.RegionId, false /* isDelete */) + } + select { case <-ctx.Done(): return ctx.Err() case worker.inputCh <- ®ionStatefulEvent{ changeEvent: event, + regionID: event.RegionId, state: state, }: } @@ -140,6 +153,7 @@ func (s *eventFeedSession) sendResolvedTsV2( select { case worker.inputCh <- ®ionStatefulEvent{ resolvedTs: resolvedTs, + regionID: regionID, state: state, }: case <-ctx.Done(): @@ -193,7 +207,7 @@ func (s *eventFeedSession) receiveFromStreamV2( // always create a new region worker, because `receiveFromStreamV2` is ensured // to call exactly once from outter code logic - worker := newRegionWorker(s, limiter) + worker := newRegionWorker(s, limiter, addr) s.workersLock.Lock() s.workers[addr] = worker s.workersLock.Unlock() diff --git a/cdc/kv/region_worker.go b/cdc/kv/region_worker.go index 9d2dc8cc5ce..bbcb886813c 100644 --- a/cdc/kv/region_worker.go +++ b/cdc/kv/region_worker.go @@ -15,8 +15,10 @@ package kv import ( "context" + "reflect" "runtime" "sync" + "sync/atomic" "time" "github.com/pingcap/errors" @@ -24,9 +26,11 @@ import ( "github.com/pingcap/kvproto/pkg/cdcpb" "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/config" cerror "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/regionspan" "github.com/pingcap/ticdc/pkg/util" + "github.com/pingcap/ticdc/pkg/workerpool" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -34,9 +38,23 @@ import ( "golang.org/x/time/rate" ) +var ( + regionWorkerPool workerpool.WorkerPool + workerPoolOnce sync.Once + // The magic number here is keep the same with some magic numbers in some + // other components in TiCDC, including worker pool task chan size, mounter + // chan size etc. + // TODO: unified channel buffer mechanism + regionWorkerInputChanSize = 128000 + regionWorkerLowWatermark = int(float64(regionWorkerInputChanSize) * 0.2) + regionWorkerHighWatermark = int(float64(regionWorkerInputChanSize) * 0.7) +) + const ( minRegionStateBucket = 4 maxRegionStateBucket = 16 + + maxWorkerPoolSize = 64 ) // regionStateManager provides the get/put way like a sync.Map, and it is divided @@ -83,6 +101,21 @@ func (rsm *regionStateManager) setState(regionID uint64, state *regionFeedState) rsm.states[bucket].Store(regionID, state) } +type regionWorkerMetrics struct { + // kv events related metrics + metricEventSize prometheus.Observer + metricPullEventInitializedCounter prometheus.Counter + metricPullEventPrewriteCounter prometheus.Counter + metricPullEventCommitCounter prometheus.Counter + metricPullEventCommittedCounter prometheus.Counter + metricPullEventRollbackCounter prometheus.Counter + metricSendEventResolvedCounter prometheus.Counter + metricSendEventCommitCounter prometheus.Counter + metricSendEventCommittedCounter prometheus.Counter + + // TODO: add region runtime related metrics +} + /* `regionWorker` maintains N regions, it runs in background for each gRPC stream, corresponding to one TiKV store. It receives `regionStatefulEvent` in a channel @@ -102,29 +135,84 @@ type regionWorker struct { inputCh chan *regionStatefulEvent outputCh chan<- *model.RegionFeedEvent + errorCh chan error + // event handlers in region worker + handles []workerpool.EventHandle + // how many workers in worker pool will be used for this region worker + concurrent int statesManager *regionStateManager - rtsManager *resolvedTsManager - rtsUpdateCh chan *regionResolvedTs + rtsManager *regionTsManager + rtsUpdateCh chan *regionTsInfo + + metrics *regionWorkerMetrics + + // evTimeManager maintains the time that last event is received of each + // uninitialized region, note the regionTsManager is not thread safe, so we + // use a single routine to handle evTimeUpdate and evTimeManager + evTimeManager *regionTsManager + evTimeUpdateCh chan *evTimeUpdate enableOldValue bool + storeAddr string } -func newRegionWorker(s *eventFeedSession, limiter *rate.Limiter) *regionWorker { +func newRegionWorker(s *eventFeedSession, limiter *rate.Limiter, addr string) *regionWorker { + cfg := config.GetGlobalServerConfig().KVClient worker := ®ionWorker{ session: s, limiter: limiter, - inputCh: make(chan *regionStatefulEvent, 1024), + inputCh: make(chan *regionStatefulEvent, regionWorkerInputChanSize), outputCh: s.eventCh, + errorCh: make(chan error, 1), statesManager: newRegionStateManager(-1), - rtsManager: newResolvedTsManager(), - rtsUpdateCh: make(chan *regionResolvedTs, 1024), + rtsManager: newRegionTsManager(), + evTimeManager: newRegionTsManager(), + rtsUpdateCh: make(chan *regionTsInfo, 1024), + evTimeUpdateCh: make(chan *evTimeUpdate, 1024), enableOldValue: s.enableOldValue, + storeAddr: addr, + concurrent: cfg.WorkerConcurrent, } return worker } +func (w *regionWorker) initMetrics(ctx context.Context) { + captureAddr := util.CaptureAddrFromCtx(ctx) + changefeedID := util.ChangefeedIDFromCtx(ctx) + + metrics := ®ionWorkerMetrics{} + metrics.metricEventSize = eventSize.WithLabelValues(captureAddr) + metrics.metricPullEventInitializedCounter = pullEventCounter.WithLabelValues(cdcpb.Event_INITIALIZED.String(), captureAddr, changefeedID) + metrics.metricPullEventCommittedCounter = pullEventCounter.WithLabelValues(cdcpb.Event_COMMITTED.String(), captureAddr, changefeedID) + metrics.metricPullEventCommitCounter = pullEventCounter.WithLabelValues(cdcpb.Event_COMMIT.String(), captureAddr, changefeedID) + metrics.metricPullEventPrewriteCounter = pullEventCounter.WithLabelValues(cdcpb.Event_PREWRITE.String(), captureAddr, changefeedID) + metrics.metricPullEventRollbackCounter = pullEventCounter.WithLabelValues(cdcpb.Event_ROLLBACK.String(), captureAddr, changefeedID) + metrics.metricSendEventResolvedCounter = sendEventCounter.WithLabelValues("native-resolved", captureAddr, changefeedID) + metrics.metricSendEventCommitCounter = sendEventCounter.WithLabelValues("commit", captureAddr, changefeedID) + metrics.metricSendEventCommittedCounter = sendEventCounter.WithLabelValues("committed", captureAddr, changefeedID) + + w.metrics = metrics +} + +type evTimeUpdate struct { + info *regionTsInfo + isDelete bool +} + +// notifyEvTimeUpdate trys to send a evTimeUpdate to evTimeUpdateCh in region worker +// to upsert or delete the last received event time for a region +func (w *regionWorker) notifyEvTimeUpdate(regionID uint64, isDelete bool) { + select { + case w.evTimeUpdateCh <- &evTimeUpdate{ + info: ®ionTsInfo{regionID: regionID, ts: newEventTimeItem()}, + isDelete: isDelete, + }: + default: + } +} + func (w *regionWorker) getRegionState(regionID uint64) (*regionFeedState, bool) { return w.statesManager.getState(regionID) } @@ -179,6 +267,49 @@ func (w *regionWorker) handleSingleRegionError(ctx context.Context, err error, s }, revokeToken) } +func (w *regionWorker) checkUnInitRegions(ctx context.Context) error { + checkInterval := time.Minute + + failpoint.Inject("kvClientCheckUnInitRegionInterval", func(val failpoint.Value) { + checkInterval = time.Duration(val.(int)) * time.Second + }) + + ticker := time.NewTicker(checkInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case update := <-w.evTimeUpdateCh: + if update.isDelete { + w.evTimeManager.Remove(update.info.regionID) + } else { + w.evTimeManager.Upsert(update.info) + } + case <-ticker.C: + for w.evTimeManager.Len() > 0 { + item := w.evTimeManager.Pop() + sinceLastEvent := time.Since(item.ts.eventTime) + if sinceLastEvent < reconnectInterval { + w.evTimeManager.Upsert(item) + break + } + state, ok := w.getRegionState(item.regionID) + if !ok || state.isStopped() || state.isInitialized() { + // check state is deleted, stopped, or initialized, if + // so just ignore this region, and don't need to push the + // eventTimeItem back to heap. + continue + } + log.Warn("kv client reconnect triggered", + zap.Duration("duration", sinceLastEvent), zap.Uint64("region", item.regionID)) + return errReconnect + } + } + } +} + func (w *regionWorker) resolveLock(ctx context.Context) error { resolveLockInterval := 20 * time.Second failpoint.Inject("kvClientResolveLockInterval", func(val failpoint.Value) { @@ -194,20 +325,16 @@ func (w *regionWorker) resolveLock(ctx context.Context) error { case rtsUpdate := <-w.rtsUpdateCh: w.rtsManager.Upsert(rtsUpdate) case <-advanceCheckTicker.C: - if !w.session.isPullerInit.IsInitialized() { - // Initializing a puller may take a long time, skip resolved lock to save unnecessary overhead. - continue - } version, err := w.session.kvStorage.GetCachedCurrentVersion() if err != nil { log.Warn("failed to get current version from PD", zap.Error(err)) continue } currentTimeFromPD := oracle.GetTimeFromTS(version.Ver) - expired := make([]*regionResolvedTs, 0) + expired := make([]*regionTsInfo, 0) for w.rtsManager.Len() > 0 { item := w.rtsManager.Pop() - sinceLastResolvedTs := currentTimeFromPD.Sub(oracle.GetTimeFromTS(item.resolvedTs)) + sinceLastResolvedTs := currentTimeFromPD.Sub(oracle.GetTimeFromTS(item.ts.resolvedTs)) // region does not reach resolve lock boundary, put it back if sinceLastResolvedTs < resolveLockInterval { w.rtsManager.Upsert(item) @@ -226,125 +353,275 @@ func (w *regionWorker) resolveLock(ctx context.Context) error { // and don't need to push resolved ts back to heap. continue } - state.lock.RLock() // recheck resolved ts from region state, which may be larger than that in resolved ts heap - sinceLastResolvedTs := currentTimeFromPD.Sub(oracle.GetTimeFromTS(state.lastResolvedTs)) - if sinceLastResolvedTs >= resolveLockInterval && state.initialized { + lastResolvedTs := state.getLastResolvedTs() + sinceLastResolvedTs := currentTimeFromPD.Sub(oracle.GetTimeFromTS(lastResolvedTs)) + if sinceLastResolvedTs >= resolveLockInterval { + sinceLastEvent := time.Since(rts.ts.eventTime) + if sinceLastResolvedTs > reconnectInterval && sinceLastEvent > reconnectInterval { + log.Warn("kv client reconnect triggered", + zap.Duration("duration", sinceLastResolvedTs), zap.Duration("since last event", sinceLastResolvedTs)) + return errReconnect + } log.Warn("region not receiving resolved event from tikv or resolved ts is not pushing for too long time, try to resolve lock", - zap.Uint64("regionID", rts.regionID), zap.Stringer("span", state.sri.span), - zap.Duration("duration", sinceLastResolvedTs), - zap.Uint64("resolvedTs", state.lastResolvedTs)) + zap.Uint64("regionID", rts.regionID), zap.Stringer("span", state.getRegionSpan()), + zap.Duration("duration", sinceLastResolvedTs), zap.Uint64("resolvedTs", lastResolvedTs)) err = w.session.lockResolver.Resolve(ctx, rts.regionID, maxVersion) if err != nil { log.Warn("failed to resolve lock", zap.Uint64("regionID", rts.regionID), zap.Error(err)) - state.lock.RUnlock() continue } } - rts.resolvedTs = state.lastResolvedTs + rts.ts.resolvedTs = lastResolvedTs w.rtsManager.Upsert(rts) - state.lock.RUnlock() } } } } -func (w *regionWorker) eventHandler(ctx context.Context) error { - captureAddr := util.CaptureAddrFromCtx(ctx) - changefeedID := util.ChangefeedIDFromCtx(ctx) - metricEventSize := eventSize.WithLabelValues(captureAddr) - metricPullEventInitializedCounter := pullEventCounter.WithLabelValues(cdcpb.Event_INITIALIZED.String(), captureAddr, changefeedID) - metricPullEventCommittedCounter := pullEventCounter.WithLabelValues(cdcpb.Event_COMMITTED.String(), captureAddr, changefeedID) - metricPullEventCommitCounter := pullEventCounter.WithLabelValues(cdcpb.Event_COMMIT.String(), captureAddr, changefeedID) - metricPullEventPrewriteCounter := pullEventCounter.WithLabelValues(cdcpb.Event_PREWRITE.String(), captureAddr, changefeedID) - metricPullEventRollbackCounter := pullEventCounter.WithLabelValues(cdcpb.Event_ROLLBACK.String(), captureAddr, changefeedID) - metricSendEventResolvedCounter := sendEventCounter.WithLabelValues("native-resolved", captureAddr, changefeedID) - metricSendEventCommitCounter := sendEventCounter.WithLabelValues("commit", captureAddr, changefeedID) - metricSendEventCommittedCounter := sendEventCounter.WithLabelValues("committed", captureAddr, changefeedID) - +func (w *regionWorker) processEvent(ctx context.Context, event *regionStatefulEvent) error { + if event.finishedCounter != nil { + atomic.AddInt32(event.finishedCounter, -1) + return nil + } var err error - for { + event.state.lock.Lock() + if event.changeEvent != nil { + w.metrics.metricEventSize.Observe(float64(event.changeEvent.Event.Size())) + switch x := event.changeEvent.Event.(type) { + case *cdcpb.Event_Entries_: + err = w.handleEventEntry(ctx, x, event.state) + if err != nil { + err = w.handleSingleRegionError(ctx, err, event.state) + } + case *cdcpb.Event_Admin_: + log.Info("receive admin event", zap.Stringer("event", event.changeEvent)) + case *cdcpb.Event_Error: + err = w.handleSingleRegionError( + ctx, + cerror.WrapError(cerror.ErrEventFeedEventError, &eventError{err: x.Error}), + event.state, + ) + case *cdcpb.Event_ResolvedTs: + if err = w.handleResolvedTs(ctx, x.ResolvedTs, event.state); err != nil { + err = w.handleSingleRegionError(ctx, err, event.state) + } + } + } + + if event.resolvedTs != nil { + if err = w.handleResolvedTs(ctx, event.resolvedTs.Ts, event.state); err != nil { + err = w.handleSingleRegionError(ctx, err, event.state) + } + } + event.state.lock.Unlock() + return err +} + +func (w *regionWorker) initPoolHandles(ctx context.Context, handleCount int) { + handles := make([]workerpool.EventHandle, 0, handleCount) + for i := 0; i < handleCount; i++ { + poolHandle := regionWorkerPool.RegisterEvent(func(ctx context.Context, eventI interface{}) error { + event := eventI.(*regionStatefulEvent) + return w.processEvent(ctx, event) + }).OnExit(func(err error) { + w.onHandleExit(err) + }) + handles = append(handles, poolHandle) + } + w.handles = handles +} + +func (w *regionWorker) onHandleExit(err error) { + select { + case w.errorCh <- err: + default: + } +} + +func (w *regionWorker) eventHandler(ctx context.Context) error { + preprocess := func(event *regionStatefulEvent, ok bool) ( + exitEventHandler bool, + skipEvent bool, + ) { + // event == nil means the region worker should exit and re-establish + // all existing regions. + if !ok || event == nil { + log.Info("region worker closed by error") + exitEventHandler = true + err := w.evictAllRegions(ctx) + if err != nil { + log.Warn("region worker evict all regions error", zap.Error(err)) + } + return + } + if event.state.isStopped() { + skipEvent = true + } + return + } + pollEvent := func() (event *regionStatefulEvent, ok bool, err error) { select { case <-ctx.Done(): - return errors.Trace(ctx.Err()) - case event, ok := <-w.inputCh: - // event == nil means the region worker should exit and re-establish - // all existing regions. - if !ok || event == nil { - log.Info("region worker closed by error") - return w.evictAllRegions(ctx) + err = errors.Trace(ctx.Err()) + case err = <-w.errorCh: + case event, ok = <-w.inputCh: + } + return + } + for { + event, ok, err := pollEvent() + if err != nil { + return err + } + exitEventHandler, skipEvent := preprocess(event, ok) + if exitEventHandler { + return cerror.ErrRegionWorkerExit.GenWithStackByArgs() + } + if skipEvent { + continue + } + // We measure whether the current worker is busy based on the input + // channel size. If the buffered event count is larger than the high + // watermark, we send events to worker pool to increase processing + // throughput. Otherwise we process event in local region worker to + // ensure low processing latency. + if len(w.inputCh) < regionWorkerHighWatermark { + err = w.processEvent(ctx, event) + if err != nil { + return err } - if event.state.isStopped() { - continue + } else { + err = w.handles[int(event.regionID)%w.concurrent].AddEvent(ctx, event) + if err != nil { + return err } - event.state.lock.Lock() - if event.changeEvent != nil { - metricEventSize.Observe(float64(event.changeEvent.Event.Size())) - switch x := event.changeEvent.Event.(type) { - case *cdcpb.Event_Entries_: - err = w.handleEventEntry( - ctx, x, event.state, - metricPullEventInitializedCounter, - metricPullEventPrewriteCounter, - metricPullEventCommitCounter, - metricPullEventCommittedCounter, - metricPullEventRollbackCounter, - metricSendEventCommitCounter, - metricSendEventCommittedCounter, - ) - if err != nil { - err = w.handleSingleRegionError(ctx, err, event.state) - } - case *cdcpb.Event_Admin_: - log.Info("receive admin event", zap.Stringer("event", event.changeEvent)) - case *cdcpb.Event_Error: - err = w.handleSingleRegionError( - ctx, - cerror.WrapError(cerror.ErrEventFeedEventError, &eventError{err: x.Error}), - event.state, - ) - case *cdcpb.Event_ResolvedTs: - if err = w.handleResolvedTs(ctx, x.ResolvedTs, event.state, metricSendEventResolvedCounter); err != nil { - err = w.handleSingleRegionError(ctx, err, event.state) - } + // TODO: add events in batch + for len(w.inputCh) >= regionWorkerLowWatermark { + event, ok, err = pollEvent() + if err != nil { + return err + } + exitEventHandler, skipEvent := preprocess(event, ok) + if exitEventHandler { + return cerror.ErrRegionWorkerExit.GenWithStackByArgs() + } + if skipEvent { + continue + } + err = w.handles[int(event.regionID)%w.concurrent].AddEvent(ctx, event) + if err != nil { + return err } } - - if event.resolvedTs != nil { - if err = w.handleResolvedTs(ctx, event.resolvedTs.Ts, event.state, metricSendEventResolvedCounter); err != nil { - err = w.handleSingleRegionError(ctx, err, event.state) + // Principle: events from the same region must be processed linearly. + // + // When buffered events exceed high watermark, we start to use worker + // pool to improve throughtput, and we need a mechanism to quit worker + // pool when buffered events are less than low watermark, which means + // we should have a way to know whether events sent to the worker pool + // are all processed. + // Send a dummy event to each worker pool handler, after each of these + // events are processed, we can ensure all events sent to worker pool + // from this region worker are processed. + counter := int32(len(w.handles)) + for _, handle := range w.handles { + err = handle.AddEvent(ctx, ®ionStatefulEvent{finishedCounter: &counter}) + if err != nil { + return err } } - event.state.lock.Unlock() - if err != nil { - return err + checkEventsProcessed: + for { + select { + case <-ctx.Done(): + return errors.Trace(ctx.Err()) + case err = <-w.errorCh: + return err + case <-time.After(50 * time.Millisecond): + if atomic.LoadInt32(&counter) == 0 { + break checkEventsProcessed + } + } } } } } +func (w *regionWorker) collectWorkpoolError(ctx context.Context) error { + cases := make([]reflect.SelectCase, 0, len(w.handles)+1) + cases = append(cases, reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(ctx.Done()), + }) + for _, handle := range w.handles { + cases = append(cases, reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(handle.ErrCh()), + }) + } + idx, value, ok := reflect.Select(cases) + if idx == 0 { + return ctx.Err() + } + if !ok { + return nil + } + return value.Interface().(error) +} + +func (w *regionWorker) checkErrorReconnect(err error) error { + if errors.Cause(err) == errReconnect { + cancel, ok := w.session.getStreamCancel(w.storeAddr) + if ok { + // cancel the stream to trigger strem.Recv with context cancel error + // Note use context cancel is the only way to terminate a gRPC stream + cancel() + // Failover in stream.Recv has 0-100ms delay, the onRegionFail + // should be called after stream has been deleted. Add a delay here + // to avoid too frequent region rebuilt. + time.Sleep(time.Second) + } + // if stream is already deleted, just ignore errReconnect + return nil + } + return err +} + func (w *regionWorker) run(ctx context.Context) error { + defer func() { + for _, h := range w.handles { + h.Unregister() + } + }() wg, ctx := errgroup.WithContext(ctx) + w.initMetrics(ctx) + w.initPoolHandles(ctx, w.concurrent) + wg.Go(func() error { + return w.checkErrorReconnect(w.resolveLock(ctx)) + }) wg.Go(func() error { - return w.resolveLock(ctx) + return w.checkErrorReconnect(w.checkUnInitRegions(ctx)) }) wg.Go(func() error { return w.eventHandler(ctx) }) - return wg.Wait() + wg.Go(func() error { + return w.collectWorkpoolError(ctx) + }) + err := wg.Wait() + // ErrRegionWorkerExit means the region worker exits normally, but we don't + // need to terminate the other goroutines in errgroup + if cerror.ErrRegionWorkerExit.Equal(err) { + return nil + } + return err } func (w *regionWorker) handleEventEntry( ctx context.Context, x *cdcpb.Event_Entries_, state *regionFeedState, - metricPullEventInitializedCounter prometheus.Counter, - metricPullEventPrewriteCounter prometheus.Counter, - metricPullEventCommitCounter prometheus.Counter, - metricPullEventCommittedCounter prometheus.Counter, - metricPullEventRollbackCounter prometheus.Counter, - metricSendEventCommitCounter prometheus.Counter, - metricSendEventCommittedCounter prometheus.Counter, ) error { regionID := state.sri.verID.GetID() for _, entry := range x.Entries.GetEntries() { @@ -364,7 +641,16 @@ func (w *regionWorker) handleEventEntry( zap.Duration("timeCost", time.Since(state.startFeedTime)), zap.Uint64("regionID", regionID)) } - metricPullEventInitializedCounter.Inc() + w.metrics.metricPullEventInitializedCounter.Inc() + + select { + case w.rtsUpdateCh <- ®ionTsInfo{regionID: regionID, ts: newResolvedTsItem(state.sri.ts)}: + default: + // rtsUpdateCh block often means too many regions are suffering + // lock resolve, the kv client status is not very healthy. + log.Warn("region is not upsert into rts manager", zap.Uint64("region-id", regionID)) + } + w.notifyEvTimeUpdate(regionID, true /* isDelete */) state.initialized = true w.session.regionRouter.Release(state.sri.rpcCtx.Addr) cachedEvents := state.matcher.matchCachedRow() @@ -375,13 +661,13 @@ func (w *regionWorker) handleEventEntry( } select { case w.outputCh <- revent: - metricSendEventCommitCounter.Inc() + w.metrics.metricSendEventCommitCounter.Inc() case <-ctx.Done(): return errors.Trace(ctx.Err()) } } case cdcpb.Event_COMMITTED: - metricPullEventCommittedCounter.Inc() + w.metrics.metricPullEventCommittedCounter.Inc() revent, err := assembleRowEvent(regionID, entry, w.enableOldValue) if err != nil { return errors.Trace(err) @@ -397,15 +683,15 @@ func (w *regionWorker) handleEventEntry( } select { case w.outputCh <- revent: - metricSendEventCommittedCounter.Inc() + w.metrics.metricSendEventCommittedCounter.Inc() case <-ctx.Done(): return errors.Trace(ctx.Err()) } case cdcpb.Event_PREWRITE: - metricPullEventPrewriteCounter.Inc() + w.metrics.metricPullEventPrewriteCounter.Inc() state.matcher.putPrewriteRow(entry) case cdcpb.Event_COMMIT: - metricPullEventCommitCounter.Inc() + w.metrics.metricPullEventCommitCounter.Inc() if entry.CommitTs <= state.lastResolvedTs { logPanic("The CommitTs must be greater than the resolvedTs", zap.String("Event Type", "COMMIT"), @@ -430,12 +716,12 @@ func (w *regionWorker) handleEventEntry( select { case w.outputCh <- revent: - metricSendEventCommitCounter.Inc() + w.metrics.metricSendEventCommitCounter.Inc() case <-ctx.Done(): return errors.Trace(ctx.Err()) } case cdcpb.Event_ROLLBACK: - metricPullEventRollbackCounter.Inc() + w.metrics.metricPullEventRollbackCounter.Inc() state.matcher.rollbackRow(entry) } } @@ -446,7 +732,6 @@ func (w *regionWorker) handleResolvedTs( ctx context.Context, resolvedTs uint64, state *regionFeedState, - metricSendEventResolvedCounter prometheus.Counter, ) error { if !state.initialized { return nil @@ -472,13 +757,13 @@ func (w *regionWorker) handleResolvedTs( // Send resolved ts update in non blocking way, since we can re-query real // resolved ts from region state even if resolved ts update is discarded. select { - case w.rtsUpdateCh <- ®ionResolvedTs{regionID: regionID, resolvedTs: resolvedTs}: + case w.rtsUpdateCh <- ®ionTsInfo{regionID: regionID, ts: newResolvedTsItem(resolvedTs)}: default: } select { case w.outputCh <- revent: - metricSendEventResolvedCounter.Inc() + w.metrics.metricSendEventResolvedCounter.Inc() case <-ctx.Done(): return errors.Trace(ctx.Err()) } @@ -516,3 +801,42 @@ func (w *regionWorker) evictAllRegions(ctx context.Context) error { } return err } + +func getWorkerPoolSize() (size int) { + cfg := config.GetGlobalServerConfig().KVClient + if cfg.WorkerPoolSize > 0 { + size = cfg.WorkerPoolSize + } else { + size = runtime.NumCPU() * 2 + } + if size > maxWorkerPoolSize { + size = maxWorkerPoolSize + } + return +} + +// InitWorkerPool initializs workerpool once, the workerpool must be initialized +// before any kv event is received. +func InitWorkerPool() { + if !enableKVClientV2 { + return + } + workerPoolOnce.Do(func() { + size := getWorkerPoolSize() + regionWorkerPool = workerpool.NewDefaultWorkerPool(size) + }) +} + +// RunWorkerPool runs the worker pool used by the region worker in kv client v2 +// It must be running before region worker starts to work +func RunWorkerPool(ctx context.Context) error { + if !enableKVClientV2 { + return nil + } + InitWorkerPool() + errg, ctx := errgroup.WithContext(ctx) + errg.Go(func() error { + return errors.Trace(regionWorkerPool.Run(ctx)) + }) + return errg.Wait() +} diff --git a/cdc/kv/region_worker_test.go b/cdc/kv/region_worker_test.go index aa8e1dda8a6..68b983d5668 100644 --- a/cdc/kv/region_worker_test.go +++ b/cdc/kv/region_worker_test.go @@ -15,9 +15,11 @@ package kv import ( "math/rand" + "runtime" "sync" "github.com/pingcap/check" + "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/util/testleak" ) @@ -109,3 +111,27 @@ func (s *regionWorkerSuite) TestRegionStateManagerBucket(c *check.C) { rsm = newRegionStateManager(bucket) c.Assert(rsm.bucket, check.Equals, bucket) } + +func (s *regionWorkerSuite) TestRegionWorkerPoolSize(c *check.C) { + defer testleak.AfterTest(c)() + + conf := config.GetDefaultServerConfig() + conf.KVClient.WorkerPoolSize = 0 + config.StoreGlobalServerConfig(conf) + size := getWorkerPoolSize() + min := func(a, b int) int { + if a < b { + return a + } + return b + } + c.Assert(size, check.Equals, min(runtime.NumCPU()*2, maxWorkerPoolSize)) + + conf.KVClient.WorkerPoolSize = 5 + size = getWorkerPoolSize() + c.Assert(size, check.Equals, 5) + + conf.KVClient.WorkerPoolSize = maxWorkerPoolSize + 1 + size = getWorkerPoolSize() + c.Assert(size, check.Equals, maxWorkerPoolSize) +} diff --git a/cdc/kv/resolvedts_heap.go b/cdc/kv/resolvedts_heap.go index 87bbcec167d..b385a9db14e 100644 --- a/cdc/kv/resolvedts_heap.go +++ b/cdc/kv/resolvedts_heap.go @@ -13,37 +13,57 @@ package kv -import "container/heap" +import ( + "container/heap" + "time" +) -// regionResolvedTs contains region resolvedTs information -type regionResolvedTs struct { - regionID uint64 - resolvedTs uint64 - index int +type tsItem struct { + sortByEvTime bool + resolvedTs uint64 + eventTime time.Time } -type resolvedTsHeap []*regionResolvedTs +func newResolvedTsItem(ts uint64) tsItem { + return tsItem{resolvedTs: ts, eventTime: time.Now()} +} + +func newEventTimeItem() tsItem { + return tsItem{sortByEvTime: true, eventTime: time.Now()} +} + +// regionTsInfo contains region resolvedTs information +type regionTsInfo struct { + regionID uint64 + index int + ts tsItem +} -func (rh resolvedTsHeap) Len() int { return len(rh) } +type regionTsHeap []*regionTsInfo -func (rh resolvedTsHeap) Less(i, j int) bool { - return rh[i].resolvedTs < rh[j].resolvedTs +func (rh regionTsHeap) Len() int { return len(rh) } + +func (rh regionTsHeap) Less(i, j int) bool { + if rh[i].ts.sortByEvTime { + return rh[i].ts.eventTime.Before(rh[j].ts.eventTime) + } + return rh[i].ts.resolvedTs < rh[j].ts.resolvedTs } -func (rh resolvedTsHeap) Swap(i, j int) { +func (rh regionTsHeap) Swap(i, j int) { rh[i], rh[j] = rh[j], rh[i] rh[i].index = i rh[j].index = j } -func (rh *resolvedTsHeap) Push(x interface{}) { +func (rh *regionTsHeap) Push(x interface{}) { n := len(*rh) - item := x.(*regionResolvedTs) + item := x.(*regionTsInfo) item.index = n *rh = append(*rh, item) } -func (rh *resolvedTsHeap) Pop() interface{} { +func (rh *regionTsHeap) Pop() interface{} { old := *rh n := len(old) item := old[n-1] @@ -52,28 +72,36 @@ func (rh *resolvedTsHeap) Pop() interface{} { return item } -// resolvedTsManager is a used to maintain resolved ts information for N regions. +// regionTsManager is a used to maintain resolved ts information for N regions. // This struct is not thread safe -type resolvedTsManager struct { - // mapping from regionID to regionResolvedTs object - m map[uint64]*regionResolvedTs - h resolvedTsHeap +type regionTsManager struct { + // mapping from regionID to regionTsInfo object + m map[uint64]*regionTsInfo + h regionTsHeap } -func newResolvedTsManager() *resolvedTsManager { - return &resolvedTsManager{ - m: make(map[uint64]*regionResolvedTs), - h: make(resolvedTsHeap, 0), +func newRegionTsManager() *regionTsManager { + return ®ionTsManager{ + m: make(map[uint64]*regionTsInfo), + h: make(regionTsHeap, 0), } } // Upsert implements insert and update on duplicated key -func (rm *resolvedTsManager) Upsert(item *regionResolvedTs) { +func (rm *regionTsManager) Upsert(item *regionTsInfo) { if old, ok := rm.m[item.regionID]; ok { // in a single resolved ts manager, the resolved ts of a region should not be fallen back - if item.resolvedTs > old.resolvedTs { - old.resolvedTs = item.resolvedTs - heap.Fix(&rm.h, old.index) + if !item.ts.sortByEvTime { + if item.ts.resolvedTs > old.ts.resolvedTs || item.ts.eventTime.After(old.ts.eventTime) { + old.ts.resolvedTs = item.ts.resolvedTs + old.ts.eventTime = item.ts.eventTime + heap.Fix(&rm.h, old.index) + } + } else { + if item.ts.eventTime.After(old.ts.eventTime) { + old.ts.eventTime = item.ts.eventTime + heap.Fix(&rm.h, old.index) + } } } else { heap.Push(&rm.h, item) @@ -81,16 +109,26 @@ func (rm *resolvedTsManager) Upsert(item *regionResolvedTs) { } } -// Pop pops a regionResolvedTs from rts heap, delete it from region rts map -func (rm *resolvedTsManager) Pop() *regionResolvedTs { +// Pop pops a regionTsInfo from rts heap, delete it from region rts map +func (rm *regionTsManager) Pop() *regionTsInfo { if rm.Len() == 0 { return nil } - item := heap.Pop(&rm.h).(*regionResolvedTs) + item := heap.Pop(&rm.h).(*regionTsInfo) delete(rm.m, item.regionID) return item } -func (rm *resolvedTsManager) Len() int { +// Remove removes item from regionTsManager +func (rm *regionTsManager) Remove(regionID uint64) *regionTsInfo { + if item, ok := rm.m[regionID]; ok { + delete(rm.m, item.regionID) + return heap.Remove(&rm.h, item.index).(*regionTsInfo) + } + return nil +} + +// Len returns the item count in regionTsManager +func (rm *regionTsManager) Len() int { return len(rm.m) } diff --git a/cdc/kv/resolvedts_heap_test.go b/cdc/kv/resolvedts_heap_test.go index 3d4202ef6a7..0ec5b28f4cc 100644 --- a/cdc/kv/resolvedts_heap_test.go +++ b/cdc/kv/resolvedts_heap_test.go @@ -14,6 +14,8 @@ package kv import ( + "time" + "github.com/pingcap/check" "github.com/pingcap/ticdc/pkg/util/testleak" ) @@ -23,37 +25,67 @@ type rtsHeapSuite struct { var _ = check.Suite(&rtsHeapSuite{}) -func (s *rtsHeapSuite) TestResolvedTsManager(c *check.C) { +func checkRegionTsInfoWithoutEvTime(c *check.C, obtained, expected *regionTsInfo) { + c.Assert(obtained.regionID, check.Equals, expected.regionID) + c.Assert(obtained.index, check.Equals, expected.index) + c.Assert(obtained.ts.resolvedTs, check.Equals, expected.ts.resolvedTs) + c.Assert(obtained.ts.sortByEvTime, check.IsFalse) +} + +func (s *rtsHeapSuite) TestRegionTsManagerResolvedTs(c *check.C) { defer testleak.AfterTest(c)() - mgr := newResolvedTsManager() - initRegions := []*regionResolvedTs{ - {regionID: 102, resolvedTs: 1040}, - {regionID: 100, resolvedTs: 1000}, - {regionID: 101, resolvedTs: 1020}, + mgr := newRegionTsManager() + initRegions := []*regionTsInfo{ + {regionID: 102, ts: newResolvedTsItem(1040)}, + {regionID: 100, ts: newResolvedTsItem(1000)}, + {regionID: 101, ts: newResolvedTsItem(1020)}, } for _, rts := range initRegions { mgr.Upsert(rts) } c.Assert(mgr.Len(), check.Equals, 3) rts := mgr.Pop() - c.Assert(rts, check.DeepEquals, ®ionResolvedTs{regionID: 100, resolvedTs: 1000, index: -1}) + checkRegionTsInfoWithoutEvTime(c, rts, ®ionTsInfo{regionID: 100, ts: newResolvedTsItem(1000), index: -1}) // resolved ts is not updated mgr.Upsert(rts) rts = mgr.Pop() - c.Assert(rts, check.DeepEquals, ®ionResolvedTs{regionID: 100, resolvedTs: 1000, index: -1}) + checkRegionTsInfoWithoutEvTime(c, rts, ®ionTsInfo{regionID: 100, ts: newResolvedTsItem(1000), index: -1}) // resolved ts updated - rts.resolvedTs = 1001 + rts.ts.resolvedTs = 1001 mgr.Upsert(rts) - mgr.Upsert(®ionResolvedTs{regionID: 100, resolvedTs: 1100}) + mgr.Upsert(®ionTsInfo{regionID: 100, ts: newResolvedTsItem(1100)}) rts = mgr.Pop() - c.Assert(rts, check.DeepEquals, ®ionResolvedTs{regionID: 101, resolvedTs: 1020, index: -1}) + checkRegionTsInfoWithoutEvTime(c, rts, ®ionTsInfo{regionID: 101, ts: newResolvedTsItem(1020), index: -1}) rts = mgr.Pop() - c.Assert(rts, check.DeepEquals, ®ionResolvedTs{regionID: 102, resolvedTs: 1040, index: -1}) + checkRegionTsInfoWithoutEvTime(c, rts, ®ionTsInfo{regionID: 102, ts: newResolvedTsItem(1040), index: -1}) rts = mgr.Pop() - c.Assert(rts, check.DeepEquals, ®ionResolvedTs{regionID: 100, resolvedTs: 1100, index: -1}) + checkRegionTsInfoWithoutEvTime(c, rts, ®ionTsInfo{regionID: 100, ts: newResolvedTsItem(1100), index: -1}) rts = mgr.Pop() c.Assert(rts, check.IsNil) } + +func (s *rtsHeapSuite) TestRegionTsManagerEvTime(c *check.C) { + defer testleak.AfterTest(c)() + mgr := newRegionTsManager() + initRegions := []*regionTsInfo{ + {regionID: 100, ts: newEventTimeItem()}, + {regionID: 101, ts: newEventTimeItem()}, + } + for _, item := range initRegions { + mgr.Upsert(item) + } + info := mgr.Remove(101) + c.Assert(info.regionID, check.Equals, uint64(101)) + + ts := time.Now() + mgr.Upsert(®ionTsInfo{regionID: 100, ts: newEventTimeItem()}) + info = mgr.Pop() + c.Assert(info.regionID, check.Equals, uint64(100)) + c.Assert(ts.Before(info.ts.eventTime), check.IsTrue) + c.Assert(time.Now().After(info.ts.eventTime), check.IsTrue) + info = mgr.Pop() + c.Assert(info, check.IsNil) +} diff --git a/cdc/server.go b/cdc/server.go index 0af202c7256..310e2bfbf56 100644 --- a/cdc/server.go +++ b/cdc/server.go @@ -108,6 +108,7 @@ func (s *Server) Run(ctx context.Context) error { return err } + kv.InitWorkerPool() kvStore, err := kv.CreateTiStore(strings.Join(s.pdEndpoints, ","), conf.Security) if err != nil { return errors.Trace(err) @@ -263,6 +264,10 @@ func (s *Server) run(ctx context.Context) (err error) { return sorter.RunWorkerPool(cctx) }) + wg.Go(func() error { + return kv.RunWorkerPool(cctx) + }) + wg.Go(func() error { return s.capture.Run(cctx) }) diff --git a/cmd/server_test.go b/cmd/server_test.go index c79565af172..40a5e01519b 100644 --- a/cmd/server_test.go +++ b/cmd/server_test.go @@ -118,6 +118,10 @@ func (s *serverSuite) TestLoadAndVerifyServerConfig(c *check.C) { CertAllowedCN: []string{"dd", "ee"}, }, PerTableMemoryQuota: 20 * 1024 * 1024, // 20M + KVClient: &config.KVClientConfig{ + WorkerConcurrent: 8, + WorkerPoolSize: 0, + }, }) // test decode config file @@ -172,6 +176,10 @@ sort-dir = "/tmp/just_a_test" }, Security: &config.SecurityConfig{}, PerTableMemoryQuota: 20 * 1024 * 1024, // 20M + KVClient: &config.KVClientConfig{ + WorkerConcurrent: 8, + WorkerPoolSize: 0, + }, }) configContent = configContent + ` @@ -226,5 +234,9 @@ cert-allowed-cn = ["dd","ee"] CertAllowedCN: []string{"dd", "ee"}, }, PerTableMemoryQuota: 20 * 1024 * 1024, // 20M + KVClient: &config.KVClientConfig{ + WorkerConcurrent: 8, + WorkerPoolSize: 0, + }, }) } diff --git a/errors.toml b/errors.toml index a980dc8e539..bf62e195a12 100755 --- a/errors.toml +++ b/errors.toml @@ -586,6 +586,11 @@ error = ''' the reactor has done its job and should no longer be executed ''' +["CDC:ErrRegionWorkerExit"] +error = ''' +region worker exited +''' + ["CDC:ErrRegionsNotCoverSpan"] error = ''' regions not completely left cover span, span %v regions: %v diff --git a/pkg/config/config.go b/pkg/config/config.go index 22b3670890e..7149e901b61 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -162,6 +162,10 @@ var defaultServerConfig = &ServerConfig{ }, Security: &SecurityConfig{}, PerTableMemoryQuota: 20 * 1024 * 1024, // 20MB + KVClient: &KVClientConfig{ + WorkerConcurrent: 8, + WorkerPoolSize: 0, // 0 will use NumCPU() * 2 + }, } // ServerConfig represents a config for server @@ -180,10 +184,10 @@ type ServerConfig struct { OwnerFlushInterval TomlDuration `toml:"owner-flush-interval" json:"owner-flush-interval"` ProcessorFlushInterval TomlDuration `toml:"processor-flush-interval" json:"processor-flush-interval"` - Sorter *SorterConfig `toml:"sorter" json:"sorter"` - Security *SecurityConfig `toml:"security" json:"security"` - - PerTableMemoryQuota uint64 `toml:"per-table-memory-quota" json:"per-table-memory-quota"` + Sorter *SorterConfig `toml:"sorter" json:"sorter"` + Security *SecurityConfig `toml:"security" json:"security"` + PerTableMemoryQuota uint64 `toml:"per-table-memory-quota" json:"per-table-memory-quota"` + KVClient *KVClientConfig `toml:"kv-client" json:"kv-client"` } // Marshal returns the json marshal format of a ServerConfig diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index e110bc7fcec..a3d5ba2483f 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -86,9 +86,9 @@ func (s *serverConfigSuite) TestMarshal(c *check.C) { b, err := conf.Marshal() c.Assert(err, check.IsNil) - c.Assert(b, check.Equals, `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520}`) + c.Assert(b, check.Equals, `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0}}`) conf2 := new(ServerConfig) - err = conf2.Unmarshal([]byte(`{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520}`)) + err = conf2.Unmarshal([]byte(`{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":80,"max-memory-consumption":8589934592,"num-workerpool-goroutine":16,"sort-dir":"/tmp/cdc_sort"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":20971520,"kv-client":{"worker-concurrent":8,"worker-pool-size":0}}`)) c.Assert(err, check.IsNil) c.Assert(conf2, check.DeepEquals, conf) } diff --git a/pkg/config/kvclient.go b/pkg/config/kvclient.go new file mode 100644 index 00000000000..9a3a6b6ff0b --- /dev/null +++ b/pkg/config/kvclient.go @@ -0,0 +1,20 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +// KVClientConfig represents config for kv client +type KVClientConfig struct { + WorkerConcurrent int `toml:"worker-concurrent" json:"worker-concurrent"` + WorkerPoolSize int `toml:"worker-pool-size" json:"worker-pool-size"` +} diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 8efc5f26a93..e9b4fe0209e 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -53,6 +53,7 @@ var ( ErrCachedTSONotExists = errors.Normalize("GetCachedCurrentVersion: cache entry does not exist", errors.RFCCodeText("CDC:ErrCachedTSONotExists")) ErrGetStoreSnapshot = errors.Normalize("get snapshot failed", errors.RFCCodeText("CDC:ErrGetStoreSnapshot")) ErrNewStore = errors.Normalize("new store failed", errors.RFCCodeText("CDC:ErrNewStore")) + ErrRegionWorkerExit = errors.Normalize("region worker exited", errors.RFCCodeText("CDC:ErrRegionWorkerExit")) // rule related errors ErrEncodeFailed = errors.Normalize("encode failed: %s", errors.RFCCodeText("CDC:ErrEncodeFailed")) diff --git a/pkg/util/testleak/leaktest.go b/pkg/util/testleak/leaktest.go index f2f8004722b..2f9962bdade 100644 --- a/pkg/util/testleak/leaktest.go +++ b/pkg/util/testleak/leaktest.go @@ -54,6 +54,9 @@ func interestingGoroutines() (gs []string) { // TODO: remove these two lines after unified sorter is fixed "github.com/pingcap/ticdc/cdc/puller/sorter.newBackEndPool", "github.com/pingcap/ticdc/cdc/puller/sorter.(*heapSorter).flush", + // kv client region worker pool + "github.com/pingcap/ticdc/cdc/kv.RunWorkerPool", + "github.com/pingcap/ticdc/pkg/workerpool.(*defaultPoolImpl).Run", } shouldIgnore := func(stack string) bool { if stack == "" {