Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 23 additions & 23 deletions internal/matcher/match.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,30 @@ func Match(ctx context.Context, ir *claircore.IndexReport, matchers []driver.Mat
// a channel where concurrent controllers will deliver vulnerabilities affecting a package.
// maps a package id to a list of vulnerabilities.
ctrlC := make(chan map[string][]*claircore.Vulnerability, 1024)
// a channel where controller errors will be reported
errorC := make(chan error, 1024)
// fan out all controllers, write their output to ctrlC, close ctrlC once all writers finish
g, ctx := errgroup.WithContext(ctx)
for _, m := range matchers {
mm := m
g.Go(func() error {
mc := NewController(mm, store)
vulns, err := mc.Match(ctx, records)
if err != nil {
return err
}
// in event of slow reader go routines will block
// if one function fails the context is cancelled and the rest will exit
// without needing to add to channel and be processed
select {
case ctrlC <- vulns:
case <-ctx.Done():
return ctx.Err()
}
return nil
})
}
go func() {
defer close(ctrlC)
var g errgroup.Group
for _, m := range matchers {
mm := m
g.Go(func() error {
mc := NewController(mm, store)
vulns, err := mc.Match(ctx, records)
if err != nil {
return err
}
// in event of slow reader go routines will block
ctrlC <- vulns
return nil
})
}
if err := g.Wait(); err != nil {
errorC <- err
}
g.Wait()
close(ctrlC)
}()
// loop ranges until ctrlC is closed and fully drained, ctrlC is guaranteed to close
for vulnsByPackage := range ctrlC {
Expand All @@ -60,10 +62,8 @@ func Match(ctx context.Context, ir *claircore.IndexReport, matchers []driver.Mat
}
}
}
select {
case err := <-errorC:
if err := g.Wait(); err != nil {
return nil, err
default:
}
return vr, nil
}