Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: fix generating region job is serialized #43123

Merged
merged 6 commits into from
Apr 19, 2023
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 31 additions & 16 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,8 @@ type BackendConfig struct {
MaxConnPerStore int
// compress type when write or ingest into tikv
ConnCompressType config.CompressionType
// concurrency of generateJobForRange.
RangeConcurrency int
// number of import(write & ingest) workers
WorkerConcurrency int
KVWriteBatchSize int
Expand Down Expand Up @@ -425,6 +427,7 @@ func NewBackendConfig(cfg *config.Config, maxOpenFiles int, keyspaceName string)
LocalStoreDir: cfg.TikvImporter.SortedKVDir,
MaxConnPerStore: cfg.TikvImporter.RangeConcurrency,
ConnCompressType: cfg.TikvImporter.CompressKVPairs,
RangeConcurrency: cfg.TikvImporter.RangeConcurrency,
WorkerConcurrency: cfg.TikvImporter.RangeConcurrency * 2,
KVWriteBatchSize: cfg.TikvImporter.SendKVPairs,
CheckpointEnabled: cfg.Checkpoint.Enable,
Expand Down Expand Up @@ -1129,26 +1132,38 @@ func (local *Backend) generateAndSendJob(
}
logger.Debug("the ranges length write to tikv", zap.Int("length", len(jobRanges)))

for _, r := range jobRanges {
jobs, err := local.generateJobForRange(ctx, engine, r, regionSplitSize, regionSplitKeys)
if err != nil {
return err
}
for _, job := range jobs {
jobWg.Add(1)
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(local.RangeConcurrency)
for _, jobRange := range jobRanges {
r := jobRange
eg.Go(func() error {
select {
case <-ctx.Done():
// this job is not put into jobToWorkerCh
jobWg.Done()
// if the context is canceled, it means worker has error, the first error can be
// found by worker's error group LATER. if this function returns an error it will
// seize the "first error".
case <-egCtx.Done():
return nil
case jobToWorkerCh <- job:
default:
}
}

jobs, err := local.generateJobForRange(egCtx, engine, r, regionSplitSize, regionSplitKeys)
if err != nil {
return err
}
for _, job := range jobs {
jobWg.Add(1)
select {
case <-egCtx.Done():
// this job is not put into jobToWorkerCh
jobWg.Done()
// if the context is canceled, it means worker has error, the first error can be
// found by worker's error group LATER. if this function returns an error it will
// seize the "first error".
return nil
case jobToWorkerCh <- job:
}
}
return nil
})
}
return nil
return eg.Wait()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a simper concurrency pattern:

eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(local.RangeConcurrency)
for _, r := range jobRanges {
    r := r
    eg.Go(func() error {
         ....
     }
}
eg.Wait()

}

// fakeRegionJobs is used in test , the injected job can be found by (startKey, endKey).
Expand Down