diff --git a/collection_rangescan.go b/collection_rangescan.go index d5f1c31..7bda453 100644 --- a/collection_rangescan.go +++ b/collection_rangescan.go @@ -28,6 +28,14 @@ type ScanOptions struct { // Defaults to 50. A value of 0 is equivalent to no limit. BatchItemLimit *uint32 + // Concurrency specifies the maximum number of scans that can be active at the same time. + // Defaults to 1. Care must be taken to ensure that the server does not run out of resources due to concurrent scans. + // + // # UNCOMMITTED + // + // This API is UNCOMMITTED and may change in the future. + Concurrency uint16 + // Internal: This should never be used and is not supported. Internal struct { User string diff --git a/kvprovider_core.go b/kvprovider_core.go index 7a7f133..979054c 100644 --- a/kvprovider_core.go +++ b/kvprovider_core.go @@ -27,7 +27,7 @@ func (p *kvProviderCore) Scan(c *Collection, scanType ScanType, opts *ScanOption opm.SetTimeout(opts.Timeout) opm.SetItemLimit(opts.BatchItemLimit) opm.SetByteLimit(opts.BatchByteLimit) - opm.SetMaxConcurrency(1) + opm.SetMaxConcurrency(opts.Concurrency) config, err := p.snapshotProvider.WaitForConfigSnapshot(opts.Context, time.Now().Add(opm.Timeout())) if err != nil { @@ -48,6 +48,22 @@ func (p *kvProviderCore) Scan(c *Collection, scanType ScanType, opts *ScanOption opm.SetNumVbuckets(numVbuckets) + serverToVbucketMap := make(map[int][]uint16) + numServers, err := config.NumServers() + if err != nil { + opm.Finish() + return nil, err + } + for serverIndex := 0; serverIndex < numServers; serverIndex++ { + vbuckets, err := config.VbucketsOnServer(serverIndex) + if err != nil { + opm.Finish() + return nil, err + } + serverToVbucketMap[serverIndex] = vbuckets + } + opm.SetServerToVbucketMap(serverToVbucketMap) + cid, err := p.getCollectionID(opts.Context, c, opm.TraceSpan(), opm.Timeout(), opm.Impersonate()) if err != nil { opm.Finish() diff --git a/kvprovider_core_provider.go b/kvprovider_core_provider.go index 4888d47..e0bc382 100644 --- a/kvprovider_core_provider.go +++ b/kvprovider_core_provider.go @@ -41,6 +41,8 @@ type coreConfigSnapshot interface { RevID() int64 NumVbuckets() (int, error) NumReplicas() (int, error) + NumServers() (int, error) + VbucketsOnServer(index int) ([]uint16, error) } type stdCoreConfigSnapshotProvider struct { diff --git a/rangescanopmanager.go b/rangescanopmanager.go index e08904a..700b719 100644 --- a/rangescanopmanager.go +++ b/rangescanopmanager.go @@ -5,7 +5,9 @@ import ( "encoding/hex" "errors" "io" + "math" "math/rand" + "sync" "sync/atomic" "time" "unsafe" @@ -51,17 +53,23 @@ type rangeScanOpManager struct { samplingOptions *gocbcore.RangeScanCreateRandomSamplingConfig vBucketToSnapshotOpts map[uint16]gocbcore.RangeScanCreateSnapshotRequirements - numVbuckets int - keysOnly bool - itemLimit uint32 - byteLimit uint32 - maxConcurrency uint16 + numVbuckets int + serverToVbucketMap map[int][]uint16 + keysOnly bool + itemLimit uint32 + byteLimit uint32 + maxConcurrency uint16 result *ScanResult cancelled uint32 } +type rangeScanVbucket struct { + id uint16 + server int +} + func (p *kvProviderCore) newRangeScanOpManager(c *Collection, scanType ScanType, agent kvProviderCoreProvider, parentSpan RequestSpan, consistentWith *MutationState, keysOnly bool) (*rangeScanOpManager, error) { var tracectx RequestSpanContext @@ -210,6 +218,10 @@ func (m *rangeScanOpManager) SetNumVbuckets(numVbuckets int) { m.span.SetAttribute("num_partitions", numVbuckets) } +func (m *rangeScanOpManager) SetServerToVbucketMap(serverVbucketMap map[int][]uint16) { + m.serverToVbucketMap = serverVbucketMap +} + func (m *rangeScanOpManager) SetTimeout(timeout time.Duration) { m.timeout = timeout } @@ -312,7 +324,7 @@ func (m *rangeScanOpManager) CheckReadyForOp() error { m.deadline = time.Now().Add(timeout) if m.numVbuckets == 0 { - return errors.New("range sacn op manager had no number of partitions specified") + return errors.New("range scan op manager had no number of partitions specified") } return nil @@ -371,7 +383,7 @@ func (m *rangeScanOpManager) Scan(ctx context.Context) (*ScanResult, error) { } m.SetResult(r) - vbucketCh := m.createVbucketChannel() + balancer := m.createLoadBalancer() var complete uint32 var seenData uint32 @@ -387,32 +399,30 @@ func (m *rangeScanOpManager) Scan(ctx context.Context) (*ScanResult, error) { defer func() { if atomic.AddUint32(&complete, 1) == uint32(m.maxConcurrency) { m.Finish() - close(vbucketCh) + balancer.close() close(resultCh) } }() - for vbID := range vbucketCh { + for vbucket, ok := balancer.selectVbucket(); ok; vbucket, ok = balancer.selectVbucket() { if atomic.LoadUint32(&m.cancelled) == 1 { return } deadline := time.Now().Add(m.Timeout()) - failPoint, err := m.scanPartition(ctx, deadline, vbID, resultCh) + failPoint, err := m.scanPartition(ctx, deadline, vbucket.id, resultCh) + balancer.scanEnded(vbucket) if err != nil { err = m.EnhanceErr(err) if failPoint == scanFailPointCreate { if errors.Is(err, gocbcore.ErrDocumentNotFound) { - logDebugf("Ignoring vbid %d as no documents exist for that vbucket", vbID) - if len(vbucketCh) == 0 { - return - } + logDebugf("Ignoring vbid %d as no documents exist for that vbucket", vbucket.id) continue } if errors.Is(err, ErrTemporaryFailure) || errors.Is(err, gocbcore.ErrBusy) { // Put the vbucket back into the channel to be retried later. - vbucketCh <- vbID + balancer.retryScan(vbucket) if errors.Is(err, gocbcore.ErrBusy) { // Busy indicates that the server is reporting too many active scans. @@ -425,18 +435,10 @@ func (m *rangeScanOpManager) Scan(ctx context.Context) (*ScanResult, error) { } } - if len(vbucketCh) == 0 { - return - } - continue } if !m.IsRangeScan() { - // We can ignore stream create errors for sampling scans. - if len(vbucketCh) == 0 { - return - } continue } @@ -477,16 +479,8 @@ func (m *rangeScanOpManager) Scan(ctx context.Context) (*ScanResult, error) { return } - if len(vbucketCh) == 0 { - return - } - continue } - - if len(vbucketCh) == 0 { - return - } } }() } @@ -717,20 +711,111 @@ func (m *rangeScanOpManager) cancelStream(ctx context.Context, spanCtx RequestSp } } -func (m *rangeScanOpManager) createVbucketChannel() chan uint16 { - var vbuckets []uint16 - for vbucket := 0; vbucket < m.numVbuckets; vbucket++ { - vbuckets = append(vbuckets, uint16(vbucket)) +func (m *rangeScanOpManager) createLoadBalancer() *rangeScanLoadBalancer { + var seed int64 + if m.SamplingOptions() != nil && m.SamplingOptions().Seed != 0 { + // Using the sampling scan seed for the load balancer ensures that when concurrency is 1 the vbuckets are + // always scanned in the same order for a given seed + seed = int64(m.SamplingOptions().Seed) + } else { + seed = time.Now().UnixNano() } - rand.Seed(time.Now().UnixNano()) - rand.Shuffle(m.numVbuckets, func(i, j int) { - vbuckets[i], vbuckets[j] = vbuckets[j], vbuckets[i] - }) - vbucketCh := make(chan uint16, m.numVbuckets) - for _, vbucket := range vbuckets { - vbucketCh <- vbucket + return newRangeScanLoadBalancer(m.serverToVbucketMap, seed) +} + +type rangeScanLoadBalancer struct { + vbucketChannels map[int]chan uint16 + servers []int + activeScansPerNode sync.Map + selectLock sync.Mutex +} + +func newRangeScanLoadBalancer(serverToVbucketMap map[int][]uint16, seed int64) *rangeScanLoadBalancer { + b := &rangeScanLoadBalancer{ + vbucketChannels: make(map[int]chan uint16), + activeScansPerNode: sync.Map{}, } - return vbucketCh + for server, vbuckets := range serverToVbucketMap { + b.servers = append(b.servers, server) + + b.vbucketChannels[server] = make(chan uint16, len(vbuckets)) + + r := rand.New(rand.NewSource(seed)) // #nosec G404 + r.Shuffle(len(vbuckets), func(i, j int) { + vbuckets[i], vbuckets[j] = vbuckets[j], vbuckets[i] + }) + + for _, vbucket := range vbuckets { + b.vbucketChannels[server] <- vbucket + } + } + + return b +} + +func (b *rangeScanLoadBalancer) retryScan(vbucket rangeScanVbucket) { + b.vbucketChannels[vbucket.server] <- vbucket.id +} + +func (b *rangeScanLoadBalancer) scanEnded(vbucket rangeScanVbucket) { + zeroVal := uint32(0) + val, _ := b.activeScansPerNode.LoadOrStore(vbucket.server, &zeroVal) + atomic.AddUint32(val.(*uint32), ^uint32(0)) +} + +func (b *rangeScanLoadBalancer) scanStarting(vbucket rangeScanVbucket) { + zeroVal := uint32(0) + val, _ := b.activeScansPerNode.LoadOrStore(vbucket.server, &zeroVal) + atomic.AddUint32(val.(*uint32), uint32(1)) +} + +// close closes all the vbucket channels. This should only be called if no more vbucket scans will happen, i.e. selectVbucket should not be called after close. +func (b *rangeScanLoadBalancer) close() { + for _, ch := range b.vbucketChannels { + close(ch) + } +} + +// selectVbucket returns the vbucket id, alongside the corresponding node index for a vbucket that is on the node with +// the smallest number of active scans. The boolean return value is false if there are no more vbuckets to scan. +func (b *rangeScanLoadBalancer) selectVbucket() (rangeScanVbucket, bool) { + b.selectLock.Lock() + defer b.selectLock.Unlock() + + var selectedServer int + selected := false + min := uint32(math.MaxUint32) + + for s := range b.servers { + if len(b.vbucketChannels[s]) == 0 { + continue + } + zeroVal := uint32(0) + val, _ := b.activeScansPerNode.LoadOrStore(s, &zeroVal) + activeScans := *val.(*uint32) + if activeScans < min { + min = activeScans + selectedServer = s + selected = true + } + } + + if !selected { + return rangeScanVbucket{}, false + } + + selectedVbucket, ok := <-b.vbucketChannels[selectedServer] + if !ok { + // This should be unreachable. selectVbucket should not be called after close. + logWarnf("Vbucket channel has been closed before the range scan has finished") + return rangeScanVbucket{}, false + } + vbucket := rangeScanVbucket{ + id: selectedVbucket, + server: selectedServer, + } + b.scanStarting(vbucket) + return vbucket, true } diff --git a/rangescanopmanager_test.go b/rangescanopmanager_test.go index 3d16078..9b65676 100644 --- a/rangescanopmanager_test.go +++ b/rangescanopmanager_test.go @@ -3,6 +3,7 @@ package gocb import ( "context" "errors" + "fmt" "sync/atomic" "time" @@ -21,9 +22,10 @@ func (p *mockConfigSnapshotProvider) WaitForConfigSnapshot(ctx context.Context, } type mockConfigSnapshot struct { - revID int64 - numVbuckets int - numReplicas int + revID int64 + numVbuckets int + numReplicas int + serverToVbuckets map[int][]uint16 } func (p *mockConfigSnapshot) RevID() int64 { @@ -38,6 +40,38 @@ func (p *mockConfigSnapshot) NumReplicas() (int, error) { return p.numReplicas, nil } +func (p *mockConfigSnapshot) NumServers() (int, error) { + return len(p.serverToVbuckets), nil +} + +func (p *mockConfigSnapshot) VbucketsOnServer(index int) ([]uint16, error) { + vbuckets, ok := p.serverToVbuckets[index] + if !ok { + return nil, errors.New(fmt.Sprintf("could not find server with index %d", index)) + } + + return vbuckets, nil +} + +func newMockConfigSnapshot(numVbuckets int, numServers int) *mockConfigSnapshot { + serverToVbuckets := make(map[int][]uint16) + + // Divide the vbuckets across the nodes + remainingVbucketCount := numVbuckets + for serverIndex := 0; serverIndex < numServers; serverIndex++ { + lowerBound := numVbuckets - remainingVbucketCount + upperBound := lowerBound + remainingVbucketCount/(numServers-serverIndex) + for vbID := uint16(lowerBound); vbID < uint16(upperBound); vbID++ { + remainingVbucketCount-- + serverToVbuckets[serverIndex] = append(serverToVbuckets[serverIndex], vbID) + } + } + return &mockConfigSnapshot{ + numVbuckets: numVbuckets, + serverToVbuckets: serverToVbuckets, + } +} + func (suite *UnitTestSuite) TestScanAllScansTmpFailAtCreate() { test := func(scan ScanType) (*ScanResult, error) { start := time.Now() @@ -57,7 +91,7 @@ func (suite *UnitTestSuite) TestScanAllScansTmpFailAtCreate() { cb(nil, ErrTemporaryFailure) }) - snap := &mockConfigSnapshot{numVbuckets: 8} + snap := newMockConfigSnapshot(8, 1) agent := &kvProviderCore{agent: provider, snapshotProvider: &mockConfigSnapshotProvider{snapshot: snap}} col := suite.collection("mock", "", "", agent) @@ -109,7 +143,7 @@ func (suite *UnitTestSuite) TestScanAllScansEmpty() { IDsOnly: true, } - snap := &mockConfigSnapshot{numVbuckets: 8} + snap := newMockConfigSnapshot(8, 1) agent := &kvProviderCore{agent: provider, snapshotProvider: &mockConfigSnapshotProvider{snapshot: snap}} col := suite.collection("mock", "", "", agent) @@ -209,7 +243,7 @@ func (suite *UnitTestSuite) TestScanFirstCreateFailsUnknownError() { }, nil) }) - snap := &mockConfigSnapshot{numVbuckets: 4} + snap := newMockConfigSnapshot(4, 1) agent := &kvProviderCore{agent: provider, snapshotProvider: &mockConfigSnapshotProvider{snapshot: snap}} col := suite.collection("mock", "", "", agent) @@ -266,7 +300,7 @@ func (suite *UnitTestSuite) TestScanSecondCreateFailsUnknownError() { return }) - snap := &mockConfigSnapshot{numVbuckets: 4} + snap := newMockConfigSnapshot(4, 1) agent := &kvProviderCore{agent: provider, snapshotProvider: &mockConfigSnapshotProvider{snapshot: snap}} col := suite.collection("mock", "", "", agent) @@ -374,7 +408,7 @@ func (suite *UnitTestSuite) TestScanNMV() { return }) - snap := &mockConfigSnapshot{numVbuckets: 4} + snap := newMockConfigSnapshot(4, 1) agent := &kvProviderCore{agent: provider, snapshotProvider: &mockConfigSnapshotProvider{snapshot: snap}} col := suite.collection("mock", "", "", agent) @@ -451,3 +485,93 @@ func makeRangeScanProvider(createCb func(args mock.Arguments)) *mockKvProviderCo return provider } + +func (suite *UnitTestSuite) TestRangeScanLoadBalancer() { + vbucketServers := []int{0, 0, 1, 1, 2, 2} + serverToVbucketMap := make(map[int][]uint16) + for vbucket, server := range vbucketServers { + serverToVbucketMap[server] = append(serverToVbucketMap[server], uint16(vbucket)) + } + + suite.Run("selecting 3 vbuckets from 3-node cluster with equal number of vbuckets in each gives one vbucket from each node", func() { + balancer := newRangeScanLoadBalancer(serverToVbucketMap, 0) + + var selectedNodes []int + for i := 0; i < 3; i++ { + vbucket, ok := balancer.selectVbucket() + suite.Require().True(ok) + suite.Require().Equal(vbucketServers[vbucket.id], vbucket.server) + suite.Assert().NotContains(selectedNodes, vbucket.server) + selectedNodes = append(selectedNodes, vbucket.server) + } + }) + + suite.Run("the selected vbucket should come from the least busy node", func() { + balancer := newRangeScanLoadBalancer(serverToVbucketMap, 0) + + // Nodes 0 and 1 have one scan in-progress each. Node 2 has no active scans + balancer.scanStarting(rangeScanVbucket{id: 0, server: 0}) + balancer.scanStarting(rangeScanVbucket{id: 2, server: 1}) + + vbucket, ok := balancer.selectVbucket() + suite.Require().True(ok) + suite.Require().Equal(vbucketServers[vbucket.id], vbucket.server) + suite.Assert().Equal(vbucket.server, 2) + }) + + suite.Run("when there are no retries each vbucket should be returned once", func() { + balancer := newRangeScanLoadBalancer(serverToVbucketMap, 0) + + // select all vbuckets + ch := make(chan *uint16) + for i := 0; i <= len(vbucketServers); i++ { + go func() { + vbucket, ok := balancer.selectVbucket() + if ok { + ch <- &vbucket.id + } else { + ch <- nil + close(ch) + } + }() + } + + var selectedVbuckets []*uint16 + for i := 0; i < len(vbucketServers); i++ { + v := <-ch + suite.Require().NotNil(v) + suite.Assert().NotContains(selectedVbuckets, v) + selectedVbuckets = append(selectedVbuckets, v) + } + v := <-ch + suite.Assert().Nil(v) + suite.Assert().Empty(ch) + }) + + suite.Run("retried vbucket id will be returned twice", func() { + balancer := newRangeScanLoadBalancer(serverToVbucketMap, 0) + + var retriedVbucket uint16 = 3 + var selectedVbuckets []uint16 + + retried := false + for { + vbucket, ok := balancer.selectVbucket() + if !ok { + break + } + if vbucket.id == retriedVbucket { + if retried { + suite.Assert().Contains(selectedVbuckets, vbucket.id) + } else { + balancer.retryScan(vbucket) + suite.Assert().NotContains(selectedVbuckets, vbucket.id) + } + retried = true + } else { + suite.Assert().NotContains(selectedVbuckets, vbucket.id) + } + selectedVbuckets = append(selectedVbuckets, vbucket.id) + } + }) +}