Skip to content

Durable Jobs (aka Reminders v2)#9717

Merged
ReubenBond merged 89 commits intodotnet:mainfrom
benjaminpetit:wip/scheduled-jobs
Nov 4, 2025
Merged

Durable Jobs (aka Reminders v2)#9717
ReubenBond merged 89 commits intodotnet:mainfrom
benjaminpetit:wip/scheduled-jobs

Conversation

@benjaminpetit
Copy link
Member

@benjaminpetit benjaminpetit commented Oct 10, 2025

This is a work-in-progress and lays the groundwork for scalable, distributed job scheduling across Orleans silos.

Closes: #9718

Microsoft Reviewers: Open in CodeFlow

@ReubenBond ReubenBond requested a review from Copilot October 10, 2025 17:27
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces a new scheduled jobs system (Reminders v2) that provides scalable, distributed job scheduling across Orleans silos. The implementation includes both in-memory and Azure Storage-backed job shard managers, allowing jobs to be scheduled for future execution and distributed across the cluster.

Key changes:

  • New Orleans.ScheduledJobs core library with job scheduling, shard management, and execution capabilities
  • Azure Storage provider for persistent job storage and shard coordination
  • Test infrastructure and integration with the Orleans testing framework

Reviewed Changes

Copilot reviewed 25 out of 25 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
src/Orleans.ScheduledJobs/* Core scheduled jobs library with job models, shard management, and local job execution
src/Azure/Orleans.ScheduledJobs.AzureStorage/* Azure Storage implementation for persistent job shards and coordination
test/Grains/TestGrains/ScheduledJobGrain.cs Test grain implementing scheduled job receiver interface
test/DefaultCluster.Tests/ScheduledJobTests.cs Integration tests for scheduled job functionality
test/Extensions/TesterAzureUtils/ScheduledJobs/Class1.cs Azure Storage job shard manager tests

@zeinali-ali
Copy link

Hi guys
I'm just curious why LocalScheduledJobManager not implemented using GrainService like LocalReminderService?

@ReubenBond
Copy link
Member

ReubenBond commented Oct 18, 2025

@zeinali-ali, in short we went with a SystemTarget over a GrainService because we want to avoid any cross-host RPC and we want to maximize batching of storage ops. My longer answer below elaborates.

GrainService uses a consistent hash ring to distribute grain responsibility across hosts. When a grain queries its scheduled reminders, it calls a designated location within the cluster. This location is often remote, resulting in frequent cross-host calls. This is fine for grains that infrequently create and remove reminders, but it is inefficient when nearly every request involves scheduling reminders that are removed before the grain deactivates.

The current hash-based reminder system's storage layout in the most commonly used providers is optimized for range queries by grain hash code rather than on due time, and silos currently load reminders regardless of their due date. The storage layout in the most commonly used providers also results in many individual operations since the reminder service cannot assume a specific physical partitioning mechanism for the underlying storage. Some providers might be able to batch requests, but the most common ones cannot (and batching may be complex at this layer anyway).

There is high temporal locality for due times in most applications: reminders are typically scheduled at fixed offsets from the current time. Periods are generally very short since Reminders does not guarantee that a given reminder tick will fire if there is a transient failure. A very common pattern for apps is to set a reminder for the near future (1 min) and then set a short period (1 min) to ensure it quickly fires in case it was missed. The grain then deletes the reminder once it has completed the request. Some application set reminders for almost every request to ensure that some work is driven to completion, tolerating faults.

The new system is designed to optimize for the above scenario, in which reminders are:

  • Created frequently (eg, per-request)
  • Typically deleted shortly after creation
  • Fired at least once, ideally very shortly after their due time
  • Only loaded from storage when they are needed (about to become due)

The design optimizes for this by:

  • Organizing storage into time-bucketed 'shards', where shards can be created on-demand by any given silo in the cluster.
  • Decoupling the grain from the host managing its scheduled tasks, so a grain can always schedule a task without issuing cross-host RPC, and the local host can batch storage operations into those time-bucketed shards.

In doing so, we made at least one trade-off:

  • Reminders cannot be queried on a per-grain basis.

We consider this trade-off acceptable since there needs to be some reminder-specific state stored along-side the grain state for exactly-once processing anyway (i.e, to indicate that the reminder has or has not been processed) and therefore the list of reminders can be stored in the grain's regular state, too (assuming querying is needed).

}
}

public override async Task<IJobShard> RegisterShard(SiloAddress siloAddress, DateTimeOffset minDueTime, DateTimeOffset maxDueTime, IDictionary<string, string> metadata, bool assignToCreator = true, CancellationToken cancellationToken = default)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add tests to the test kit to ensure that behavior of RegisterShard / AssignShards is consistent across providers.

{
}

public override async Task<List<IJobShard>> AssignJobShardsAsync(SiloAddress siloAddress, DateTimeOffset maxShardStartTime, CancellationToken cancellationToken = default)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be minimally greedy, like LeaseBasedQueueBalancer is. i.e, take up to 1 more shard than our fair share based on current membership.

{
await InitializeIfNeeded(cancellationToken);
LogAssigningShards(_logger, siloAddress, maxShardStartTime, _containerName);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rebalancing

  • We need to de-assign/rebalance shards if we have too many to avoid skew after a new deployment or upgrade.

Concurrent shard limit

  • We should limit the number of concurrently assigned shards per silo to prevent memory exhaustion.

Shard assignment slow start

  • We should consider performing a slow start for shard assignment, only reading a number of shards based on how long the silo has been up. It's important for disaster recovery scenarios, as we have seen with Azure ML.

Concurrent job slow start

  • We should gradually increase job concurrency (semaphore.Release) during startup until we hit our target. This helps to avoid starvation issues which can happen before things have warmed up (caches, connection pools, thread pool sizing, etc).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the kind of thing we need before this is production ready, but the PR can be merged beforehand as long as we create tracking issues.

public IDictionary<string, string>? Metadata { get; protected set; }

/// <inheritdoc/>
public bool IsAddingCompleted { get; protected set; } = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public bool IsAddingCompleted { get; protected set; } = false;
public bool IsAddingCompleted { get; protected set; }

public async IAsyncEnumerator<IScheduledJobContext> GetAsyncEnumerator(CancellationToken cancellationToken = default)
{
var timer = new PeriodicTimer(TimeSpan.FromSeconds(1));
while (true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be wrong here, but why are we processing 1 job/s here? If this is going to be the backbone of DurableTask, i can see quite a bite of scheduled jobs to be due within a second in high-throughput scenarios. If a bucket contains 60 jobs due at 10:00:00, the last job wouldn't be dispatched until 1 min later.

I think we should dequeue the entire bucket as a single unit, and process all jobs within that bucket in a tight loop, yielding them immediately for execution. We should also support cancellation to interrupt the processing of a large bucket.

Somthing like this:

public async IAsyncEnumerator<IScheduledJobContext> GetAsyncEnumerator1(CancellationToken cancellationToken = default)
{
    using var timer = new PeriodicTimer(TimeSpan.FromSeconds(1));

    while (!cancellationToken.IsCancellationRequested)
    {
        JobBucket? dueBucket = null;

        lock (_syncLock)
        {
            if (_queue.TryPeek(out var nextBucket, out var dueTime) && dueTime <= DateTimeOffset.UtcNow)
            {
                _queue.Dequeue();
                _buckets.Remove(nextBucket.DueTime);

                dueBucket = nextBucket;
            }
        }

        if (dueBucket != null)
        {
            foreach (var (job, dequeueCount) in dueBucket.Jobs)
            {
                lock (_syncLock)
                {
                    _jobsIdToBucket.Remove(job.Id);
                }

                yield return new ScheduledJobContext(job, Guid.NewGuid().ToString(), dequeueCount + 1);

                if (cancellationToken.IsCancellationRequested)
                {
                    yield break;
                }
            }
        }
        else
        {
            lock (_syncLock)
            {
                if (_isComplete && _queue.Count == 0)
                {
                    yield break;
                }
            }

            try
            {
                await timer.WaitForNextTickAsync(cancellationToken).ConfigureAwait(false);
            }
            catch (OperationCanceledException)
            {
                break;
            }
        }
    }
}

cc @ReubenBond @benjaminpetit

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What makes the current implementation process 1 job/sec? I see it only waiting on the timer if it didn't find a job to process.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's true, we shouldn't wait 1s if we removed an empty bucket, we should wait only if we didn't had any bucket instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's true, we shouldn't wait 1s if we removed an empty bucket, we should wait only if we didn't had any bucket instead.

Exactly, it also keeps the lock contention at a minimum.

@ReubenBond ReubenBond changed the title Scheduled jobs (aka Reminders v2) Durable Jobs (aka Reminders v2) Nov 4, 2025
@ReubenBond ReubenBond mentioned this pull request Nov 4, 2025
14 tasks
@ReubenBond ReubenBond added this pull request to the merge queue Nov 4, 2025
@ReubenBond
Copy link
Member

I opened #9750 with follow-up items.

Merged via the queue into dotnet:main with commit 7996989 Nov 4, 2025
29 checks passed
@github-actions github-actions bot locked and limited conversation to collaborators Dec 6, 2025
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Durable Jobs (aka Reminders v2)

5 participants