Skip to content

Commit

Permalink
add ConcurrentReportBucket
Browse files Browse the repository at this point in the history
Signed-off-by: bufferflies <1045931706@qq.com>
  • Loading branch information
bufferflies committed Mar 31, 2022
1 parent d34547c commit 9d481c8
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 11 deletions.
9 changes: 6 additions & 3 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,8 +606,8 @@ func (c *RaftCluster) HandleStoreHeartbeat(stats *pdpb.StoreStats) error {
return nil
}

// processBucketHeartbeat update the bucket information.
func (c *RaftCluster) processBucketHeartbeat(buckets *metapb.Buckets) error {
// processReportBuckets update the bucket information.
func (c *RaftCluster) processReportBuckets(buckets *metapb.Buckets) error {
region := c.core.GetRegion(buckets.GetRegionId())
if region == nil {
bucketEventCounter.WithLabelValues("region_cache_miss").Inc()
Expand All @@ -624,11 +624,14 @@ func (c *RaftCluster) processBucketHeartbeat(buckets *metapb.Buckets) error {
bucketEventCounter.WithLabelValues("version_not_match").Inc()
return nil
}
failpoint.Inject("concurrentBucketHeartbeat", func() {
time.Sleep(500 * time.Millisecond)
})
if ok := region.UpdateBuckets(buckets, old); ok {
return nil
}
}
bucketEventCounter.WithLabelValues("bucket_update_failed").Inc()
bucketEventCounter.WithLabelValues("update_failed").Inc()
return nil
}

Expand Down
34 changes: 30 additions & 4 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ func (s *testClusterInfoSuite) TestBucketHeartbeat(c *C) {
Version: 1,
Keys: [][]byte{{'1'}, {'2'}},
}
c.Assert(cluster.processBucketHeartbeat(buckets), NotNil)
c.Assert(cluster.processReportBuckets(buckets), NotNil)

// case2: bucket can be processed after the region update.
stores := newTestStores(3, "2.0.0")
Expand All @@ -451,14 +451,14 @@ func (s *testClusterInfoSuite) TestBucketHeartbeat(c *C) {

c.Assert(cluster.processRegionHeartbeat(regions[0]), IsNil)
c.Assert(cluster.GetRegion(uint64(0)).GetBuckets(), IsNil)
c.Assert(cluster.processBucketHeartbeat(buckets), IsNil)
c.Assert(cluster.processReportBuckets(buckets), IsNil)
c.Assert(cluster.GetRegion(uint64(0)).GetBuckets(), DeepEquals, buckets)

// case3: the bucket version is same.
c.Assert(cluster.processBucketHeartbeat(buckets), IsNil)
c.Assert(cluster.processReportBuckets(buckets), IsNil)
// case4: the bucket version is changed.
buckets.Version = 3
c.Assert(cluster.processBucketHeartbeat(buckets), IsNil)
c.Assert(cluster.processReportBuckets(buckets), IsNil)
c.Assert(cluster.GetRegion(uint64(0)).GetBuckets(), DeepEquals, buckets)

//case5: region update should inherit buckets.
Expand Down Expand Up @@ -702,6 +702,32 @@ func (s *testClusterInfoSuite) TestRegionFlowChanged(c *C) {
c.Assert(newRegion.GetBytesRead(), Equals, uint64(1000))
}

func (s *testClusterInfoSuite) TestConcurrentReportBucket(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
cluster := newTestRaftCluster(s.ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster.coordinator = newCoordinator(s.ctx, cluster, nil)

regions := []*core.RegionInfo{core.NewTestRegionInfo([]byte{}, []byte{})}
heartbeatRegions(c, cluster, regions)
c.Assert(cluster.GetRegion(0), NotNil)

bucket1 := &metapb.Buckets{RegionId: 0, Version: 3}
bucket2 := &metapb.Buckets{RegionId: 0, Version: 2}
var wg sync.WaitGroup
wg.Add(1)
c.Assert(failpoint.Enable("github.com/tikv/pd/server/cluster/concurrentBucketHeartbeat", "return(true)"), IsNil)
go func() {
defer wg.Done()
cluster.processReportBuckets(bucket1)
}()
time.Sleep(100 * time.Millisecond)
c.Assert(failpoint.Disable("github.com/tikv/pd/server/cluster/concurrentBucketHeartbeat"), IsNil)
c.Assert(cluster.processReportBuckets(bucket2), IsNil)
wg.Wait()
c.Assert(cluster.GetRegion(0).GetBuckets(), DeepEquals, bucket1)
}

func (s *testClusterInfoSuite) TestConcurrentRegionHeartbeat(c *C) {
_, opt, err := newTestScheduleConfig()
c.Assert(err, IsNil)
Expand Down
6 changes: 3 additions & 3 deletions server/cluster/cluster_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (c *RaftCluster) HandleBatchReportSplit(request *pdpb.ReportBatchSplitReque
return &pdpb.ReportBatchSplitResponse{}, nil
}

// HandleBucketHeartbeat processes RegionInfo reports from client
func (c *RaftCluster) HandleBucketHeartbeat(buckets *metapb.Buckets) error {
return c.processBucketHeartbeat(buckets)
// HandleReportBuckets processes RegionInfo reports from client
func (c *RaftCluster) HandleReportBuckets(buckets *metapb.Buckets) error {
return c.processReportBuckets(buckets)
}
12 changes: 12 additions & 0 deletions server/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,18 @@ func checkRegions(c *C, regions *RegionsInfo) {
}
}

func BenchmarkUpdateBuckets(b *testing.B) {
region := NewTestRegionInfo([]byte{}, []byte{})
b.ResetTimer()
for i := 0; i < b.N; i++ {
buckets := &metapb.Buckets{RegionId: 0, Version: uint64(i)}
region.UpdateBuckets(buckets, region.GetBuckets())
}
if region.GetBuckets().GetVersion() != uint64(b.N-1) {
b.Fatal("update buckets failed")
}
}

func BenchmarkRandomRegion(b *testing.B) {
regions := NewRegionsInfo()
for i := 0; i < 5000000; i++ {
Expand Down
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ func (s *GrpcServer) ReportBuckets(stream pdpb.PD_ReportBucketsServer) error {
bucketReportCounter.WithLabelValues(storeAddress, storeLabel, "report", "recv").Inc()

start := time.Now()
err = rc.HandleBucketHeartbeat(buckets)
err = rc.HandleReportBuckets(buckets)
if err != nil {
bucketReportCounter.WithLabelValues(storeAddress, storeLabel, "report", "err").Inc()
continue
Expand Down

0 comments on commit 9d481c8

Please sign in to comment.