Skip to content

Commit e740b44

Browse files
Refactor event.Blocker into job.Scheduler (#3125)
1 parent 2e72c7c commit e740b44

File tree

9 files changed

+675
-358
lines changed

9 files changed

+675
-358
lines changed

snow/engine/snowman/issuer.go

Lines changed: 14 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -10,45 +10,30 @@ import (
1010

1111
"github.com/ava-labs/avalanchego/ids"
1212
"github.com/ava-labs/avalanchego/snow/consensus/snowman"
13-
"github.com/ava-labs/avalanchego/utils/set"
13+
"github.com/ava-labs/avalanchego/snow/engine/snowman/job"
1414
)
1515

16+
var _ job.Job[ids.ID] = (*issuer)(nil)
17+
1618
// issuer issues [blk] into to consensus after its dependencies are met.
1719
type issuer struct {
1820
t *Transitive
1921
nodeID ids.NodeID // nodeID of the peer that provided this block
2022
blk snowman.Block
21-
issuedMetric prometheus.Counter
22-
abandoned bool
23-
deps set.Set[ids.ID]
2423
push bool
24+
issuedMetric prometheus.Counter
2525
}
2626

27-
func (i *issuer) Dependencies() set.Set[ids.ID] {
28-
return i.deps
29-
}
30-
31-
// Mark that a dependency has been met
32-
func (i *issuer) Fulfill(ctx context.Context, id ids.ID) {
33-
i.deps.Remove(id)
34-
i.Update(ctx)
35-
}
36-
37-
// Abandon the attempt to issue [i.block]
38-
func (i *issuer) Abandon(ctx context.Context, _ ids.ID) {
39-
if !i.abandoned {
40-
blkID := i.blk.ID()
41-
i.t.removeFromPending(i.blk)
42-
i.t.addToNonVerifieds(i.blk)
43-
i.t.blocked.Abandon(ctx, blkID)
27+
func (i *issuer) Execute(ctx context.Context, _ []ids.ID, abandoned []ids.ID) error {
28+
if len(abandoned) == 0 {
29+
// If the parent block wasn't abandoned, this block can be issued.
30+
return i.t.deliver(ctx, i.nodeID, i.blk, i.push, i.issuedMetric)
4431
}
45-
i.abandoned = true
46-
}
4732

48-
func (i *issuer) Update(ctx context.Context) {
49-
if i.abandoned || i.deps.Len() != 0 || i.t.errs.Errored() {
50-
return
51-
}
52-
// Issue the block into consensus
53-
i.t.errs.Add(i.t.deliver(ctx, i.nodeID, i.blk, i.push, i.issuedMetric))
33+
// If the parent block was abandoned, this block should be abandoned as
34+
// well.
35+
blkID := i.blk.ID()
36+
i.t.removeFromPending(i.blk)
37+
i.t.addToNonVerifieds(i.blk)
38+
return i.t.blocked.Abandon(ctx, blkID)
5439
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
2+
// See the file LICENSE for licensing terms.
3+
4+
// Package job provides a Scheduler to manage and execute Jobs with
5+
// dependencies.
6+
package job
7+
8+
import "context"
9+
10+
// Job is a unit of work that can be executed based on the result of resolving
11+
// requested dependencies.
12+
type Job[T any] interface {
13+
Execute(ctx context.Context, fulfilled []T, abandoned []T) error
14+
}
15+
16+
type job[T comparable] struct {
17+
// Once all dependencies are resolved, the job will be executed.
18+
numUnresolved int
19+
fulfilled []T
20+
abandoned []T
21+
job Job[T]
22+
}
23+
24+
// Scheduler implements a dependency graph for jobs. Jobs can be registered with
25+
// dependencies, and once all dependencies are resolved, the job will be
26+
// executed.
27+
type Scheduler[T comparable] struct {
28+
// dependents maps a dependency to the jobs that depend on it.
29+
dependents map[T][]*job[T]
30+
}
31+
32+
func NewScheduler[T comparable]() *Scheduler[T] {
33+
return &Scheduler[T]{
34+
dependents: make(map[T][]*job[T]),
35+
}
36+
}
37+
38+
// Schedule a job to be executed once all of its dependencies are resolved. If a
39+
// job is scheduled with no dependencies, it's executed immediately.
40+
//
41+
// In order to prevent a memory leak, all dependencies must eventually either be
42+
// fulfilled or abandoned.
43+
//
44+
// While registering a job with duplicate dependencies is discouraged, it is
45+
// allowed.
46+
func (s *Scheduler[T]) Schedule(ctx context.Context, userJob Job[T], dependencies ...T) error {
47+
numUnresolved := len(dependencies)
48+
if numUnresolved == 0 {
49+
return userJob.Execute(ctx, nil, nil)
50+
}
51+
52+
j := &job[T]{
53+
numUnresolved: numUnresolved,
54+
job: userJob,
55+
}
56+
for _, d := range dependencies {
57+
s.dependents[d] = append(s.dependents[d], j)
58+
}
59+
return nil
60+
}
61+
62+
// NumDependencies returns the number of dependencies that jobs are currently
63+
// blocking on.
64+
func (s *Scheduler[_]) NumDependencies() int {
65+
return len(s.dependents)
66+
}
67+
68+
// Fulfill a dependency. If all dependencies for a job are resolved, the job
69+
// will be executed.
70+
//
71+
// It is safe to call the scheduler during the execution of a job.
72+
func (s *Scheduler[T]) Fulfill(ctx context.Context, dependency T) error {
73+
return s.resolveDependency(ctx, dependency, true)
74+
}
75+
76+
// Abandon a dependency. If all dependencies for a job are resolved, the job
77+
// will be executed.
78+
//
79+
// It is safe to call the scheduler during the execution of a job.
80+
func (s *Scheduler[T]) Abandon(ctx context.Context, dependency T) error {
81+
return s.resolveDependency(ctx, dependency, false)
82+
}
83+
84+
func (s *Scheduler[T]) resolveDependency(
85+
ctx context.Context,
86+
dependency T,
87+
fulfilled bool,
88+
) error {
89+
jobs := s.dependents[dependency]
90+
delete(s.dependents, dependency)
91+
92+
for _, job := range jobs {
93+
job.numUnresolved--
94+
if fulfilled {
95+
job.fulfilled = append(job.fulfilled, dependency)
96+
} else {
97+
job.abandoned = append(job.abandoned, dependency)
98+
}
99+
100+
if job.numUnresolved > 0 {
101+
continue
102+
}
103+
104+
if err := job.job.Execute(ctx, job.fulfilled, job.abandoned); err != nil {
105+
return err
106+
}
107+
}
108+
return nil
109+
}

0 commit comments

Comments
 (0)