Skip to content

Commit d05d8b5

Browse files
authored
feat(scheduler): implement job metadata propagation (#162)
1 parent 9aa787e commit d05d8b5

File tree

2 files changed

+89
-8
lines changed

2 files changed

+89
-8
lines changed

quartz/scheduler.go

Lines changed: 61 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,17 @@ type Scheduler interface {
6666
Stop()
6767
}
6868

69+
type dispatchedJob struct {
70+
ctx context.Context
71+
jobDetail *JobDetail
72+
}
73+
74+
type contextKey string
75+
76+
// JobMetadataContextKey is a context key used to store and retrieve [JobMetadata]
77+
// from the [context.Context] passed to a job's Execute method.
78+
const JobMetadataContextKey = contextKey("JobMetadata")
79+
6980
// StdScheduler implements the [Scheduler] interface.
7081
type StdScheduler struct {
7182
mtx sync.RWMutex
@@ -74,7 +85,7 @@ type StdScheduler struct {
7485
interrupt chan struct{}
7586
cancel context.CancelFunc
7687
feeder chan ScheduledJob
77-
dispatch chan ScheduledJob
88+
dispatch chan dispatchedJob
7889
started bool
7990

8091
queue JobQueue
@@ -126,6 +137,20 @@ type SchedulerConfig struct {
126137
// Adjust OutdatedThreshold to establish an acceptable delay time and
127138
// ensure regular job execution.
128139
MisfiredChan chan ScheduledJob
140+
141+
// JobMetadata, when set to true, enables the scheduler to enrich the context
142+
// passed to a job's Execute method with JobMetadata. This provides custom
143+
// Job implementations with access to additional contextual information
144+
// about the job.
145+
JobMetadata bool
146+
}
147+
148+
// JobMetadata provides read-only details about a specific job execution's scheduling
149+
// context. It is added to the context by the scheduler.
150+
type JobMetadata struct {
151+
// RunTime is the Unix timestamp at which the job was originally scheduled to
152+
// begin execution.
153+
RunTime int64
129154
}
130155

131156
// SchedulerOpt is a functional option type used to configure an [StdScheduler].
@@ -141,6 +166,23 @@ func WithBlockingExecution() SchedulerOpt {
141166
}
142167
}
143168

169+
// WithJobMetadata configures the scheduler to attach a [JobMetadata] struct to the
170+
// [context.Context] passed to each job's Execute method.
171+
//
172+
// To retrieve this metadata within a custom [Job] implementation:
173+
//
174+
// func (j *customJob) Execute(ctx context.Context) error {
175+
// md, ok := ctx.Value(quartz.JobMetadataContextKey).(quartz.JobMetadata)
176+
// // use md as needed
177+
// return nil
178+
// }
179+
func WithJobMetadata() SchedulerOpt {
180+
return func(c *StdScheduler) error {
181+
c.opts.JobMetadata = true
182+
return nil
183+
}
184+
}
185+
144186
// WithWorkerLimit configures the number of worker goroutines for concurrent job execution.
145187
// This option is only used when blocking execution is disabled. If blocking execution
146188
// is enabled, this setting will be ignored. The workerLimit must be non-negative.
@@ -246,7 +288,7 @@ func NewStdScheduler(opts ...SchedulerOpt) (Scheduler, error) {
246288
scheduler := &StdScheduler{
247289
interrupt: make(chan struct{}, 1),
248290
feeder: make(chan ScheduledJob),
249-
dispatch: make(chan ScheduledJob),
291+
dispatch: make(chan dispatchedJob),
250292
queue: NewJobQueue(),
251293
queueLocker: &sync.Mutex{},
252294
opts: config,
@@ -320,13 +362,13 @@ func (sched *StdScheduler) Start(ctx context.Context) {
320362
ctx, sched.cancel = context.WithCancel(ctx)
321363
go func() { <-ctx.Done(); sched.Stop() }()
322364

365+
// start worker pool if configured
366+
sched.startWorkers(ctx)
367+
323368
// start scheduler execution loop
324369
sched.wg.Add(1)
325370
go sched.startExecutionLoop(ctx)
326371

327-
// starts worker pool if configured
328-
sched.startWorkers(ctx)
329-
330372
sched.started = true
331373
}
332374

@@ -549,8 +591,8 @@ func (sched *StdScheduler) startWorkers(ctx context.Context) {
549591
select {
550592
case <-ctx.Done():
551593
return
552-
case scheduled := <-sched.dispatch:
553-
sched.executeWithRetries(ctx, scheduled.JobDetail())
594+
case dispatched := <-sched.dispatch:
595+
sched.executeWithRetries(dispatched.ctx, dispatched.jobDetail)
554596
}
555597
}
556598
}()
@@ -584,6 +626,14 @@ func (sched *StdScheduler) calculateNextTick() time.Duration {
584626
func (sched *StdScheduler) executeAndReschedule(ctx context.Context) {
585627
// fetch a job for processing
586628
scheduled, valid := sched.fetchAndReschedule()
629+
// attach job metadata to the context
630+
if sched.opts.JobMetadata {
631+
ctx = context.WithValue(
632+
ctx,
633+
JobMetadataContextKey,
634+
JobMetadata{RunTime: scheduled.NextRunTime()},
635+
)
636+
}
587637

588638
// execute the job
589639
if valid {
@@ -594,7 +644,10 @@ func (sched *StdScheduler) executeAndReschedule(ctx context.Context) {
594644
sched.executeWithRetries(ctx, scheduled.JobDetail())
595645
case sched.opts.WorkerLimit > 0:
596646
select {
597-
case sched.dispatch <- scheduled:
647+
case sched.dispatch <- dispatchedJob{
648+
ctx: ctx,
649+
jobDetail: scheduled.JobDetail(),
650+
}:
598651
case <-ctx.Done():
599652
return
600653
}

quartz/scheduler_test.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,34 @@ func TestScheduler_Cancel(t *testing.T) {
318318
}
319319
}
320320

321+
func TestScheduler_JobMetadata(t *testing.T) {
322+
t.Parallel()
323+
324+
contextJob := job.NewFunctionJob(func(ctx context.Context) (int32, error) {
325+
md, ok := ctx.Value(quartz.JobMetadataContextKey).(quartz.JobMetadata)
326+
assert.Equal(t, ok, true)
327+
328+
if md.RunTime <= 0 {
329+
t.Error("RunTime should be a valid timestamp")
330+
}
331+
fmt.Printf("Job metadata: %v\n", md)
332+
return 0, nil
333+
})
334+
335+
sched, err := quartz.NewStdScheduler(quartz.WithJobMetadata())
336+
assert.IsNil(t, err)
337+
338+
jobDetail := quartz.NewJobDetail(contextJob, quartz.NewJobKey("contextJob"))
339+
trigger := quartz.NewSimpleTrigger(2 * time.Millisecond)
340+
341+
err = sched.ScheduleJob(jobDetail, trigger)
342+
assert.IsNil(t, err)
343+
344+
sched.Start(context.Background())
345+
time.Sleep(5 * time.Millisecond)
346+
sched.Stop()
347+
}
348+
321349
func TestScheduler_JobWithRetries(t *testing.T) {
322350
t.Parallel()
323351

0 commit comments

Comments
 (0)