From 5bd552d17d71ded1ec5f01caefa404bd995ca9d9 Mon Sep 17 00:00:00 2001 From: thanhhaudev Date: Thu, 6 Mar 2025 08:57:49 +0700 Subject: [PATCH] perf: improve concurrency in commit fetching logic --- pkg/container/container.go | 47 ++++++++++++++++++++++++++++---------- 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/pkg/container/container.go b/pkg/container/container.go index 1e296c2..da26647 100644 --- a/pkg/container/container.go +++ b/pkg/container/container.go @@ -6,6 +6,7 @@ import ( "log" "os" "strings" + "sync" "github.com/thanhhaudev/github-stats/pkg/clock" "github.com/thanhhaudev/github-stats/pkg/github" @@ -166,6 +167,8 @@ func (d *DataContainer) InitCommits(ctx context.Context) error { errChan := make(chan error, repoCount) commitChan := make(chan []github.Commit, repoCount) seenOIDs := make(map[string]bool) + var mu sync.Mutex + mask := func(input string) string { length := len(input) if length <= 2 { @@ -184,13 +187,17 @@ func (d *DataContainer) InitCommits(ctx context.Context) error { if repoCount == 1 { return "repository" } - return "repositories" }()) } + var wg sync.WaitGroup + semaphore := make(chan struct{}, 5) // Limit to 5 concurrent goroutines + for _, repo := range d.Data.Repositories { + wg.Add(1) go func(repo github.Repository) { + defer wg.Done() if fetchAllBranches { if !hiddenRepoInfo { d.Logger.Println("Fetching commits from all branches of repository:", mask(repo.Name)) @@ -202,17 +209,30 @@ func (d *DataContainer) InitCommits(ctx context.Context) error { return } + var branchWg sync.WaitGroup var allCommits []github.Commit for _, branch := range branches { - commits, err := d.ClientManager.GetCommits(ctx, repo.Owner.Login, repo.Name, d.Data.Viewer.ID, fmt.Sprintf("refs/heads/%s", branch.Name), commitPerQuery) - if err != nil { - errChan <- err - return - } - - allCommits = append(allCommits, commits...) + branchWg.Add(1) + semaphore <- struct{}{} // Acquire a slot + go func(branch github.Branch) { + defer branchWg.Done() + defer func() { <-semaphore }() // Release the slot + commits, err := d.ClientManager.GetCommits(ctx, repo.Owner.Login, repo.Name, d.Data.Viewer.ID, fmt.Sprintf("refs/heads/%s", branch.Name), commitPerQuery) + if err != nil { + errChan <- err + return + } + + mu.Lock() + allCommits = append(allCommits, commits...) + if !hiddenRepoInfo { + log.Printf("Fetched %d commits from branch %s of repository %s", len(commits), mask(branch.Name), mask(repo.Name)) + } + mu.Unlock() + }(branch) } + branchWg.Wait() commitChan <- allCommits } else { if !hiddenRepoInfo { @@ -238,14 +258,18 @@ func (d *DataContainer) InitCommits(ctx context.Context) error { }(repo) } - for i := 0; i < len(d.Data.Repositories); i++ { + go func() { + wg.Wait() + close(commitChan) + close(errChan) + }() + + for i := 0; i < repoCount; i++ { if err := <-errChan; err != nil { return err } } - close(commitChan) // Close the channel to signal that all commits have been fetched - // Deduplicate commits for commits := range commitChan { for _, commit := range commits { @@ -258,7 +282,6 @@ func (d *DataContainer) InitCommits(ctx context.Context) error { } d.Logger.Println("Fetched commits successfully") - return nil }