Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
451fad0
Scheduled jobs wip
benjaminpetit Oct 10, 2025
4b0672f
Address comments
benjaminpetit Oct 14, 2025
e75606e
Add README
benjaminpetit Oct 14, 2025
0039ab1
Remove ShardId from interface
benjaminpetit Oct 14, 2025
ba4dc2c
Split extensions
benjaminpetit Oct 14, 2025
1df4b02
Add metadata
benjaminpetit Oct 16, 2025
cb4edde
Decouple extension
benjaminpetit Oct 16, 2025
6c5a85e
More robust test
benjaminpetit Oct 16, 2025
e8871b0
Move context
benjaminpetit Oct 17, 2025
cb39739
Retry wip
benjaminpetit Oct 17, 2025
b7f36dd
Listen to cluster changes
benjaminpetit Oct 23, 2025
2ee5990
wip
benjaminpetit Oct 23, 2025
5b28fce
Add metadata
benjaminpetit Oct 23, 2025
7dfcfd9
Add metadata test
benjaminpetit Oct 23, 2025
5cb8609
Add cancellation test
benjaminpetit Oct 23, 2025
5329033
Add comments
benjaminpetit Oct 23, 2025
45df0eb
Mark test as skippable
benjaminpetit Oct 24, 2025
281f800
Add logging to AzureStorageJobShardManager
benjaminpetit Oct 24, 2025
9be56be
Plumb job cancellation
benjaminpetit Oct 24, 2025
7194243
More consistent naming
benjaminpetit Oct 24, 2025
0643e8c
Fix retry
benjaminpetit Oct 24, 2025
4545d02
Add comments to InMemoryJobQueue and some unit tests
benjaminpetit Oct 24, 2025
3a8807c
Add tests and fix AzureStorageJobShardManager
benjaminpetit Oct 24, 2025
4248b9e
Enable basic parrallelism limit on shard processing
benjaminpetit Oct 24, 2025
72b9275
keep track of running shards
benjaminpetit Oct 24, 2025
79d6ac5
Add logs and comments to LocalScheduledJobManager
benjaminpetit Oct 24, 2025
b576c61
Merge remote-tracking branch 'dotnet/main' into wip/scheduled-jobs
benjaminpetit Oct 24, 2025
2008fcc
Add READMEs
benjaminpetit Oct 24, 2025
2964e3a
Add READMEs
benjaminpetit Oct 28, 2025
de69620
Update src/Orleans.ScheduledJobs/Hosting/ScheduledJobsOptions.cs
ReubenBond Oct 28, 2025
026caaa
Update src/Orleans.ScheduledJobs/InMemoryJobQueue.cs
ReubenBond Oct 28, 2025
e90f6b8
Apply suggestions from code review
benjaminpetit Oct 29, 2025
fc10282
Add Cancellation tokens to LocalScheduledJobManager
benjaminpetit Oct 30, 2025
8b8a416
Remove default values in JobShard
benjaminpetit Oct 30, 2025
d09874c
Switch from line-based format to netstring format for blob operations
benjaminpetit Oct 30, 2025
7ed5369
Refactor JobShard
benjaminpetit Oct 30, 2025
e3a021b
Add membership version in AzureStorageJobShard
benjaminpetit Oct 30, 2025
98fa1ca
Refactor storage operations to use Channel for single-threaded writes…
benjaminpetit Oct 30, 2025
2091124
Refactor job execution to run concurrently
benjaminpetit Oct 30, 2025
f829bd9
Split LocalScheduledJobManager into separate files for better readabi…
benjaminpetit Oct 30, 2025
616e488
Move ILocalScheduledJobManager interface to its own file
benjaminpetit Oct 30, 2025
c5d41da
Merge branch 'wip/scheduled-jobs' of https://github.com/benjaminpetit…
benjaminpetit Oct 30, 2025
cee9969
Refactor job scheduling methods to use TryScheduleJobAsync for improv…
benjaminpetit Oct 30, 2025
e412923
Very simplistic concurrency control on shard creation
benjaminpetit Oct 30, 2025
bedf374
Enhance IScheduledJobReceiverExtension with improved logging and docu…
benjaminpetit Oct 30, 2025
73cba74
Update ScheduledJobTests and InMemoryJobQueueTests to use UtcNow for …
benjaminpetit Oct 30, 2025
4ee03f3
Refactor shard management to simplify task handling and ensure proper…
benjaminpetit Oct 30, 2025
ede6b13
Refactor AssignJobShardsAsync to improve owner/creator status checks
benjaminpetit Oct 30, 2025
329f75d
Clarify shard start-time naming and logging; update defaults and proj…
benjaminpetit Oct 30, 2025
2bb5172
Fix netstring encoding/delimiter and format MembershipVersion invaria…
benjaminpetit Oct 30, 2025
9ca1cb4
Scale default MaxConcurrentJobsPerSilo by CPU count
benjaminpetit Oct 30, 2025
c440fae
Rename GetJobCount to GetJobCountAsync and IsComplete to IsAddingComp…
benjaminpetit Oct 30, 2025
c2f4c70
Return removal result from CancelJob and update tests
benjaminpetit Oct 30, 2025
4f68cc0
Return removal result from RemoveJobAsync and update cancel flow & tests
benjaminpetit Oct 30, 2025
d4c5cd8
Consolidate scheduling API: merge ScheduleJobWithMetadataAsync into S…
benjaminpetit Oct 30, 2025
043fffd
Add cross-grain scheduling test and SchedulerGrain; make ScheduleJobA…
benjaminpetit Oct 30, 2025
db44fad
Add job retry test and RetryTestGrain
benjaminpetit Oct 30, 2025
c5498d6
Extract scheduled job test logic into ScheduledJobTestsRunner and add…
benjaminpetit Oct 30, 2025
069fe77
Add Azure Blob Storage hosting integration for scheduled jobs
benjaminpetit Oct 30, 2025
c6e8cc8
Add AzureStorageScheduledJobTests to run scheduled job tests against …
benjaminpetit Oct 30, 2025
6a05384
Remove stuff
benjaminpetit Oct 30, 2025
f72d4c9
Remove stuff
benjaminpetit Oct 30, 2025
156659a
Add CancellationToken to IClusterMembershipService.Refresh and propag…
benjaminpetit Oct 31, 2025
560aae9
Add blob-prefix support to AzureStorageJobShardManager and update tests
benjaminpetit Oct 31, 2025
29a0dec
Replace TaskCanceledException with OperationCanceledException
benjaminpetit Oct 31, 2025
e23e1ac
Cancel pending storage operations on shutdown
benjaminpetit Oct 31, 2025
5b00a75
Use MembershipVersion.Value for blob metadata; rename/add InMemorySch…
benjaminpetit Oct 31, 2025
173921a
Rename LocalScheduledJobManager system target grain type to "job-mana…
benjaminpetit Oct 31, 2025
1413e15
Replace IScheduledJob interface with ScheduledJob concrete type
benjaminpetit Oct 31, 2025
10e0565
Fix shard bucketing, correct shard assignment window, and add options…
benjaminpetit Oct 31, 2025
7a5a526
Gracefully stop shard storage processor; fix caching and metadata par…
benjaminpetit Oct 31, 2025
7446b8d
Dispose newly-created shard on ownership conflict; remove unused blob…
benjaminpetit Oct 31, 2025
5199685
Remove unused Microsoft.CodeAnalysis using; clarify shard-too-new com…
benjaminpetit Oct 31, 2025
12d694e
Introduce NetstringEncoder utility and tests; use it in AzureStorageJ…
benjaminpetit Oct 31, 2025
0655f6a
Fix bad rename
benjaminpetit Oct 31, 2025
fc15a75
Replace NetstringEncoder with NetstringJsonSerializer; update usages …
benjaminpetit Oct 31, 2025
4ac0c41
Make NetstringJsonSerializer stream-based with pooled buffers; update…
benjaminpetit Nov 3, 2025
fc6ba4d
Batch Azure Storage append operations; add batching options and tests
benjaminpetit Nov 3, 2025
5b9ba88
Rename RegisterShard -> CreateShardAsync and simplify ownership behavior
benjaminpetit Nov 3, 2025
fa3eb54
Make InMemoryJobQueue synchronization and enumeration safer
benjaminpetit Nov 4, 2025
bc26774
Add periodic shard checker and shard activation buffer option
benjaminpetit Nov 4, 2025
46fe7d0
Centralize shard lifecycle in manager; add periodic shard checker and…
benjaminpetit Nov 4, 2025
8ba69f4
Unify StorageOperation completion handling and make shutdown cancellable
benjaminpetit Nov 4, 2025
e20e7a8
Remove Creator metadata and simplify ownership/ID handling; add blob-…
benjaminpetit Nov 4, 2025
9e42a9b
Add structured logging to AzureStorageJobShard and propagate ILoggerF…
benjaminpetit Nov 4, 2025
00a772e
Make InMemoryJobQueue internal, add InternalsVisibleTo
benjaminpetit Nov 4, 2025
e07538f
Drop explicit '= false' initializer for IsAddingCompleted property in…
benjaminpetit Nov 4, 2025
5c781dc
wip
benjaminpetit Nov 4, 2025
c0f937b
Split test into a runner for reusability; make them pass faster; make
benjaminpetit Nov 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Orleans.slnx
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
<Project Path="src/Orleans.Persistence.Memory/Orleans.Persistence.Memory.csproj" />
<Project Path="src/Orleans.Reminders/Orleans.Reminders.csproj" />
<Project Path="src/Orleans.Runtime/Orleans.Runtime.csproj" />
<Project Path="src/Orleans.ScheduledJobs/Orleans.ScheduledJobs.csproj" Id="ab4cc723-b3cf-4c32-be4c-1a180a49797c" />
<Project Path="src/Orleans.Sdk/Orleans.Sdk.csproj" />
<Project Path="src/Orleans.Serialization.Abstractions/Orleans.Serialization.Abstractions.csproj" />
<Project Path="src/Orleans.Serialization.FSharp/Orleans.Serialization.FSharp.csproj" />
Expand Down Expand Up @@ -76,6 +77,7 @@
<Project Path="src/Azure/Orleans.Persistence.Cosmos/Orleans.Persistence.Cosmos.csproj" />
<Project Path="src/Azure/Orleans.Reminders.AzureStorage/Orleans.Reminders.AzureStorage.csproj" />
<Project Path="src/Azure/Orleans.Reminders.Cosmos/Orleans.Reminders.Cosmos.csproj" />
<Project Path="src/Azure/Orleans.ScheduledJobs.AzureStorage/Orleans.ScheduledJobs.AzureStorage.csproj" Id="5bbb1058-9ab2-44e9-b237-49f5b6bee151" />
<Project Path="src/Azure/Orleans.Streaming.AzureStorage/Orleans.Streaming.AzureStorage.csproj" />
<Project Path="src/Azure/Orleans.Streaming.EventHubs/Orleans.Streaming.EventHubs.csproj" />
<Project Path="src/Azure/Orleans.Transactions.AzureStorage/Orleans.Transactions.AzureStorage.csproj" />
Expand All @@ -84,7 +86,7 @@
<Project Path="src/Cassandra/Orleans.Clustering.Cassandra/Orleans.Clustering.Cassandra.csproj" />
</Folder>
<Folder Name="/src/Extensions/NATS/">
<Project Path="src\Orleans.Streaming.NATS\Orleans.Streaming.NATS.csproj" Type="Classic C#" />
<Project Path="src\Orleans.Streaming.NATS\Orleans.Streaming.NATS.csproj" />
</Folder>
<Folder Name="/src/Extensions/Redis/">
<Project Path="src/Redis/Orleans.Clustering.Redis/Orleans.Clustering.Redis.csproj" />
Expand Down Expand Up @@ -133,7 +135,7 @@
<Project Path="test/Extensions/TesterAdoNet/Tester.AdoNet.csproj" />
<Project Path="test/Extensions/TesterAzureUtils/Tester.AzureUtils.csproj" />
<Project Path="test/Extensions/TesterZooKeeperUtils/Tester.ZooKeeperUtils.csproj" />
<Project Path="test\Extensions\NATS.Tests\NATS.Tests.csproj" Type="Classic C#" />
<Project Path="test\Extensions\NATS.Tests\NATS.Tests.csproj" />
</Folder>
<Folder Name="/test/Grains/">
<Project Path="test/Grains/TestFSharp/TestFSharp.fsproj" />
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
using System;
using Microsoft.Extensions.Logging;

namespace Orleans.ScheduledJobs.AzureStorage;

internal sealed partial class AzureStorageJobShard
{
[LoggerMessage(
Level = LogLevel.Information,
Message = "Initializing shard '{ShardId}' from Azure Storage blob"
)]
private static partial void LogInitializingShard(ILogger logger, string shardId);

[LoggerMessage(
Level = LogLevel.Information,
Message = "Shard '{ShardId}' initialized successfully. Loaded {JobCount} job(s) in {ElapsedMilliseconds}ms"
)]
private static partial void LogShardInitialized(ILogger logger, string shardId, int jobCount, long elapsedMilliseconds);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Adding job '{JobId}' (Name: '{JobName}') to shard '{ShardId}' with due time {DueTime}"
)]
private static partial void LogAddingJob(ILogger logger, string jobId, string jobName, string shardId, DateTimeOffset dueTime);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Removing job '{JobId}' from shard '{ShardId}'"
)]
private static partial void LogRemovingJob(ILogger logger, string jobId, string shardId);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Retrying job '{JobId}' in shard '{ShardId}' with new due time {NewDueTime}"
)]
private static partial void LogRetryingJob(ILogger logger, string jobId, string shardId, DateTimeOffset newDueTime);

[LoggerMessage(
Level = LogLevel.Trace,
Message = "Flushing batch of {OperationCount} job operation(s) to shard '{ShardId}'"
)]
private static partial void LogFlushingBatch(ILogger logger, int operationCount, string shardId);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Batch of {OperationCount} job operation(s) written to shard '{ShardId}' in {ElapsedMilliseconds}ms. Total committed blocks: {CommittedBlockCount}"
)]
private static partial void LogBatchWritten(ILogger logger, int operationCount, string shardId, long elapsedMilliseconds, int committedBlockCount);

[LoggerMessage(
Level = LogLevel.Trace,
Message = "Updating metadata for shard '{ShardId}'"
)]
private static partial void LogUpdatingMetadata(ILogger logger, string shardId);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Metadata updated for shard '{ShardId}'"
)]
private static partial void LogMetadataUpdated(ILogger logger, string shardId);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Shard '{ShardId}' has {CommittedBlockCount} committed blocks, approaching Azure Blob append limit of 50,000"
)]
private static partial void LogApproachingBlockLimit(ILogger logger, string shardId, int committedBlockCount);

[LoggerMessage(
Level = LogLevel.Warning,
Message = "Large batch detected for shard '{ShardId}': {OperationCount} operations (max configured: {MaxBatchSize})"
)]
private static partial void LogLargeBatch(ILogger logger, string shardId, int operationCount, int maxBatchSize);

[LoggerMessage(
Level = LogLevel.Error,
Message = "Error writing batch of {OperationCount} operation(s) to shard '{ShardId}'"
)]
private static partial void LogErrorWritingBatch(ILogger logger, Exception exception, int operationCount, string shardId);

[LoggerMessage(
Level = LogLevel.Error,
Message = "Error updating metadata for shard '{ShardId}'"
)]
private static partial void LogErrorUpdatingMetadata(ILogger logger, Exception exception, string shardId);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Stopping storage processor for shard '{ShardId}'"
)]
private static partial void LogStoppingProcessor(ILogger logger, string shardId);

[LoggerMessage(
Level = LogLevel.Information,
Message = "Storage processor stopped for shard '{ShardId}'"
)]
private static partial void LogProcessorStopped(ILogger logger, string shardId);

[LoggerMessage(
Level = LogLevel.Trace,
Message = "Processing storage operation queue for shard '{ShardId}'"
)]
private static partial void LogProcessingStorageQueue(ILogger logger, string shardId);

[LoggerMessage(
Level = LogLevel.Debug,
Message = "Waiting for additional operations to batch (current size: {CurrentSize}, min size: {MinSize}) for shard '{ShardId}'"
)]
private static partial void LogWaitingForBatch(ILogger logger, int currentSize, int minSize, string shardId);
}
Loading