Durable Jobs (aka Reminders v2)#9717
Conversation
There was a problem hiding this comment.
Pull Request Overview
This PR introduces a new scheduled jobs system (Reminders v2) that provides scalable, distributed job scheduling across Orleans silos. The implementation includes both in-memory and Azure Storage-backed job shard managers, allowing jobs to be scheduled for future execution and distributed across the cluster.
Key changes:
- New
Orleans.ScheduledJobscore library with job scheduling, shard management, and execution capabilities - Azure Storage provider for persistent job storage and shard coordination
- Test infrastructure and integration with the Orleans testing framework
Reviewed Changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| src/Orleans.ScheduledJobs/* | Core scheduled jobs library with job models, shard management, and local job execution |
| src/Azure/Orleans.ScheduledJobs.AzureStorage/* | Azure Storage implementation for persistent job shards and coordination |
| test/Grains/TestGrains/ScheduledJobGrain.cs | Test grain implementing scheduled job receiver interface |
| test/DefaultCluster.Tests/ScheduledJobTests.cs | Integration tests for scheduled job functionality |
| test/Extensions/TesterAzureUtils/ScheduledJobs/Class1.cs | Azure Storage job shard manager tests |
src/Azure/Orleans.ScheduledJobs.AzureStorage/AzureStorageJobShardManager.cs
Outdated
Show resolved
Hide resolved
src/Azure/Orleans.ScheduledJobs.AzureStorage/AzureStorageJobShardManager.cs
Outdated
Show resolved
Hide resolved
|
Hi guys |
|
@zeinali-ali, in short we went with a
The current hash-based reminder system's storage layout in the most commonly used providers is optimized for range queries by grain hash code rather than on due time, and silos currently load reminders regardless of their due date. The storage layout in the most commonly used providers also results in many individual operations since the reminder service cannot assume a specific physical partitioning mechanism for the underlying storage. Some providers might be able to batch requests, but the most common ones cannot (and batching may be complex at this layer anyway). There is high temporal locality for due times in most applications: reminders are typically scheduled at fixed offsets from the current time. Periods are generally very short since Reminders does not guarantee that a given reminder tick will fire if there is a transient failure. A very common pattern for apps is to set a reminder for the near future (1 min) and then set a short period (1 min) to ensure it quickly fires in case it was missed. The grain then deletes the reminder once it has completed the request. Some application set reminders for almost every request to ensure that some work is driven to completion, tolerating faults. The new system is designed to optimize for the above scenario, in which reminders are:
The design optimizes for this by:
In doing so, we made at least one trade-off:
We consider this trade-off acceptable since there needs to be some reminder-specific state stored along-side the grain state for exactly-once processing anyway (i.e, to indicate that the reminder has or has not been processed) and therefore the list of reminders can be stored in the grain's regular state, too (assuming querying is needed). |
src/Azure/Orleans.ScheduledJobs.AzureStorage/AzureStorageJobShardManager.cs
Outdated
Show resolved
Hide resolved
| } | ||
| } | ||
|
|
||
| public override async Task<IJobShard> RegisterShard(SiloAddress siloAddress, DateTimeOffset minDueTime, DateTimeOffset maxDueTime, IDictionary<string, string> metadata, bool assignToCreator = true, CancellationToken cancellationToken = default) |
There was a problem hiding this comment.
We should add tests to the test kit to ensure that behavior of RegisterShard / AssignShards is consistent across providers.
src/Azure/Orleans.ScheduledJobs.AzureStorage/AzureStorageJobShardManager.cs
Outdated
Show resolved
Hide resolved
src/Azure/Orleans.ScheduledJobs.AzureStorage/NetstringJsonSerializer.cs
Outdated
Show resolved
Hide resolved
| { | ||
| } | ||
|
|
||
| public override async Task<List<IJobShard>> AssignJobShardsAsync(SiloAddress siloAddress, DateTimeOffset maxShardStartTime, CancellationToken cancellationToken = default) |
There was a problem hiding this comment.
This should be minimally greedy, like LeaseBasedQueueBalancer is. i.e, take up to 1 more shard than our fair share based on current membership.
| { | ||
| await InitializeIfNeeded(cancellationToken); | ||
| LogAssigningShards(_logger, siloAddress, maxShardStartTime, _containerName); | ||
|
|
There was a problem hiding this comment.
Rebalancing
- We need to de-assign/rebalance shards if we have too many to avoid skew after a new deployment or upgrade.
Concurrent shard limit
- We should limit the number of concurrently assigned shards per silo to prevent memory exhaustion.
Shard assignment slow start
- We should consider performing a slow start for shard assignment, only reading a number of shards based on how long the silo has been up. It's important for disaster recovery scenarios, as we have seen with Azure ML.
Concurrent job slow start
- We should gradually increase job concurrency (semaphore.Release) during startup until we hit our target. This helps to avoid starvation issues which can happen before things have warmed up (caches, connection pools, thread pool sizing, etc).
There was a problem hiding this comment.
This is the kind of thing we need before this is production ready, but the PR can be merged beforehand as long as we create tracking issues.
| public IDictionary<string, string>? Metadata { get; protected set; } | ||
|
|
||
| /// <inheritdoc/> | ||
| public bool IsAddingCompleted { get; protected set; } = false; |
There was a problem hiding this comment.
| public bool IsAddingCompleted { get; protected set; } = false; | |
| public bool IsAddingCompleted { get; protected set; } |
| public async IAsyncEnumerator<IScheduledJobContext> GetAsyncEnumerator(CancellationToken cancellationToken = default) | ||
| { | ||
| var timer = new PeriodicTimer(TimeSpan.FromSeconds(1)); | ||
| while (true) |
There was a problem hiding this comment.
I might be wrong here, but why are we processing 1 job/s here? If this is going to be the backbone of DurableTask, i can see quite a bite of scheduled jobs to be due within a second in high-throughput scenarios. If a bucket contains 60 jobs due at 10:00:00, the last job wouldn't be dispatched until 1 min later.
I think we should dequeue the entire bucket as a single unit, and process all jobs within that bucket in a tight loop, yielding them immediately for execution. We should also support cancellation to interrupt the processing of a large bucket.
Somthing like this:
public async IAsyncEnumerator<IScheduledJobContext> GetAsyncEnumerator1(CancellationToken cancellationToken = default)
{
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(1));
while (!cancellationToken.IsCancellationRequested)
{
JobBucket? dueBucket = null;
lock (_syncLock)
{
if (_queue.TryPeek(out var nextBucket, out var dueTime) && dueTime <= DateTimeOffset.UtcNow)
{
_queue.Dequeue();
_buckets.Remove(nextBucket.DueTime);
dueBucket = nextBucket;
}
}
if (dueBucket != null)
{
foreach (var (job, dequeueCount) in dueBucket.Jobs)
{
lock (_syncLock)
{
_jobsIdToBucket.Remove(job.Id);
}
yield return new ScheduledJobContext(job, Guid.NewGuid().ToString(), dequeueCount + 1);
if (cancellationToken.IsCancellationRequested)
{
yield break;
}
}
}
else
{
lock (_syncLock)
{
if (_isComplete && _queue.Count == 0)
{
yield break;
}
}
try
{
await timer.WaitForNextTickAsync(cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
break;
}
}
}
}
There was a problem hiding this comment.
What makes the current implementation process 1 job/sec? I see it only waiting on the timer if it didn't find a job to process.
There was a problem hiding this comment.
Yes that's true, we shouldn't wait 1s if we removed an empty bucket, we should wait only if we didn't had any bucket instead.
There was a problem hiding this comment.
Yes that's true, we shouldn't wait 1s if we removed an empty bucket, we should wait only if we didn't had any bucket instead.
Exactly, it also keeps the lock contention at a minimum.
… usages and tests
… improve scheduling/cancellation
…creation retry option
the in memory provider conformant
|
I opened #9750 with follow-up items. |
This is a work-in-progress and lays the groundwork for scalable, distributed job scheduling across Orleans silos.
Closes: #9718
Microsoft Reviewers: Open in CodeFlow