Skip to content

Commit 0e24a43

Browse files
authored
feat: add workflow client updater for updating workflow client (#258)
1 parent 1f6676b commit 0e24a43

File tree

8 files changed

+287
-80
lines changed

8 files changed

+287
-80
lines changed

src/Temporalio.Extensions.Hosting/README.md

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,4 +99,48 @@ start instead of expecting a `ITemporalClient` to be present on the service coll
9999
Some users may prefer to manually create the `TemporalWorker` without using host support, but still make their
100100
activities created via the service provider. `CreateTemporalActivityDefinitions` extension methods are present on
101101
`IServiceProvider` that will return a collection of `ActivityDefinition` instances for each activity on the type. These
102-
can be added to the `TemporalWorkerOptions` directly.
102+
can be added to the `TemporalWorkerOptions` directly.
103+
104+
## Worker Client Refresh
105+
106+
Some users may need to update the worker's connection to Temporal. It's desirable to do this without stopping the worker entirely, as that will evict the sticky workflow cache.
107+
108+
This can be done by using the `IWorkerClientUpdater`.
109+
110+
```csharp
111+
using Temporalio.Extensions.Hosting;
112+
113+
var builder = Host.CreateApplicationBuilder(args);
114+
115+
// Register a worker client updater.
116+
builder.Services.AddSingleton<TemporalWorkerClientUpdater>();
117+
118+
// Add a hosted Temporal worker which returns a builder to add activities and workflows, along with the worker client updater.
119+
builder.Services.
120+
AddHostedTemporalWorker(
121+
"my-temporal-host:7233",
122+
"my-namespace",
123+
"my-task-queue").
124+
AddScopedActivities<MyActivityClass>().
125+
AddWorkflow<MyWorkflow>().
126+
ConfigureOptions().
127+
Configure<TemporalWorkerClientUpdater>((options, workerClientUpdater) => options.WorkerClientUpdater = workerClientUpdater);
128+
129+
var host = builder.Build();
130+
131+
// You can have a BackgroundService periodically refresh the worker client like this.
132+
TemporalWorkerClientUpdater workerClientUpdater = host.Services.GetRequiredService<TemporalWorkerClientUpdater>();
133+
134+
// Can update the TLS options if you need.
135+
TemporalClientConnectOptions clientConnectOptions = new("my-other-temporal-host:7233")
136+
{
137+
Namespace = "default"
138+
};
139+
140+
ITemporalClient updatedClient = await TemporalClient.ConnectAsync(clientConnectOptions).ConfigureAwait(false);
141+
142+
workerClientUpdater.UpdateClient(updatedClient);
143+
144+
// Make sure you use RunAsync and not Run, see https://github.com/temporalio/sdk-dotnet/issues/220
145+
await host.RunAsync();
146+
```
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
using System;
2+
using Temporalio.Worker;
3+
4+
namespace Temporalio.Extensions.Hosting
5+
{
6+
/// <summary>
7+
/// Notification hub that can be used to push Temporal worker client updates to subscribing Temporal workers.
8+
/// </summary>
9+
public class TemporalWorkerClientUpdater
10+
{
11+
private readonly object clientLock = new();
12+
13+
private event EventHandler<IWorkerClient>? OnClientUpdatedEvent;
14+
15+
/// <summary>
16+
/// Dispatches a notification to all subscribers that a new worker client should be used.
17+
/// </summary>
18+
/// <param name="client">The new <see cref="IWorkerClient"/> that should be pushed out to all subscribing workers.</param>
19+
public void UpdateClient(IWorkerClient client)
20+
{
21+
OnClientUpdatedEvent?.Invoke(this, client);
22+
}
23+
24+
/// <summary>
25+
/// Adds a new subscriber that will be notified when a new worker client should be used.
26+
/// </summary>
27+
/// <param name="eventHandler">The event handler to add to the event listeners.</param>
28+
internal void Subscribe(EventHandler<IWorkerClient> eventHandler)
29+
{
30+
lock (clientLock)
31+
{
32+
OnClientUpdatedEvent += eventHandler;
33+
}
34+
}
35+
36+
/// <summary>
37+
/// Removes an existing subscriber from receiving notifications when a new worker client should be used.
38+
/// </summary>
39+
/// <param name="eventHandler">The event handler to remove from the event listeners.</param>
40+
internal void Unsubscribe(EventHandler<IWorkerClient> eventHandler)
41+
{
42+
lock (clientLock)
43+
{
44+
OnClientUpdatedEvent -= eventHandler;
45+
}
46+
}
47+
}
48+
}

src/Temporalio.Extensions.Hosting/TemporalWorkerService.cs

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@ namespace Temporalio.Extensions.Hosting
1515
/// </summary>
1616
public class TemporalWorkerService : BackgroundService
1717
{
18-
// These two are mutually exclusive
18+
// These two (newClientOptions and existingClient) are mutually exclusive
1919
private readonly TemporalClientConnectOptions? newClientOptions;
2020
private readonly ITemporalClient? existingClient;
2121
private readonly TemporalWorkerOptions workerOptions;
22+
private readonly TemporalWorkerClientUpdater? workerClientUpdater;
2223

2324
/// <summary>
2425
/// Initializes a new instance of the <see cref="TemporalWorkerService"/> class using
@@ -30,8 +31,11 @@ public class TemporalWorkerService : BackgroundService
3031
/// <param name="options">Options to use to create the worker service.</param>
3132
public TemporalWorkerService(TemporalWorkerServiceOptions options)
3233
{
33-
newClientOptions = options.ClientOptions ?? throw new ArgumentException(
34-
"Client options is required", nameof(options));
34+
if (options.ClientOptions == null)
35+
{
36+
throw new ArgumentException("Client options is required", nameof(options));
37+
}
38+
3539
workerOptions = options;
3640
}
3741

@@ -156,6 +160,11 @@ public TemporalWorkerService(
156160
if (newClientOptions != null && workerOptions.LoggerFactory != null)
157161
{
158162
newClientOptions.LoggerFactory = workerOptions.LoggerFactory;
163+
}
164+
165+
if (options.WorkerClientUpdater != null)
166+
{
167+
this.workerClientUpdater = options.WorkerClientUpdater;
159168
}
160169
}
161170

@@ -166,7 +175,28 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
166175
// Call connect just in case it was a lazy client (no-op if already connected)
167176
await client.Connection.ConnectAsync().ConfigureAwait(false);
168177
using var worker = new TemporalWorker(client, workerOptions);
169-
await worker.ExecuteAsync(stoppingToken).ConfigureAwait(false);
178+
179+
if (workerClientUpdater != null)
180+
{
181+
void SubscribeToClientUpdates(object? sender, IWorkerClient updatedClient)
182+
{
183+
worker!.Client = updatedClient;
184+
}
185+
186+
try
187+
{
188+
workerClientUpdater.Subscribe(SubscribeToClientUpdates);
189+
await worker.ExecuteAsync(stoppingToken).ConfigureAwait(false);
190+
}
191+
finally
192+
{
193+
workerClientUpdater.Unsubscribe(SubscribeToClientUpdates);
194+
}
195+
}
196+
else
197+
{
198+
await worker.ExecuteAsync(stoppingToken).ConfigureAwait(false);
199+
}
170200
}
171201
}
172202
}

src/Temporalio.Extensions.Hosting/TemporalWorkerServiceOptions.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ public TemporalWorkerServiceOptions(string taskQueue)
3131
/// </summary>
3232
public TemporalClientConnectOptions? ClientOptions { get; set; }
3333

34+
/// <summary>
35+
/// Gets or sets the <see cref="TemporalWorkerClientUpdater"/> that can be used to push Temporal worker client updates to the underlying <see cref="TemporalWorker"/>.
36+
/// If not set, the worker service will not be updateable with a new Temporal worker client.
37+
/// </summary>
38+
public TemporalWorkerClientUpdater? WorkerClientUpdater { get; set; }
39+
3440
/// <inheritdoc />
3541
public override object Clone()
3642
{
@@ -39,6 +45,7 @@ public override object Clone()
3945
{
4046
options.ClientOptions = (TemporalClientConnectOptions)ClientOptions.Clone();
4147
}
48+
4249
return options;
4350
}
4451

src/Temporalio/Worker/TemporalWorker.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public TemporalWorker(IWorkerClient client, TemporalWorkerOptions options)
108108
/// </summary>
109109
/// <remarks>
110110
/// When this property is set, it actually replaces the underlying client that is being used
111-
/// by the worker. This means the next calls by the worker to Temporal (e.g. responding
111+
/// by the worker. This means subsequent calls by the worker to Temporal (e.g. responding
112112
/// task completion, activity heartbeat, etc) will be on this new client, but outstanding
113113
/// calls will not be immediately interrupted.
114114
/// </remarks>
@@ -255,6 +255,7 @@ protected virtual void Dispose(bool disposing)
255255
{
256256
activityWorker?.Dispose();
257257
BridgeWorker.Dispose();
258+
258259
// Remove task tracing if not disabled and there are workflows present
259260
if (workflowTracingEventListenerEnabled)
260261
{

tests/Temporalio.Tests/AssertMore.cs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,67 @@
11
using System.Text;
22
using System.Text.Json;
3+
using Temporalio.Api.History.V1;
4+
using Temporalio.Client;
35
using Xunit;
46

57
namespace Temporalio.Tests
68
{
79
public static class AssertMore
810
{
11+
public static Task TaskFailureEventuallyAsync(WorkflowHandle handle, Action<WorkflowTaskFailedEventAttributes> assert)
12+
{
13+
return AssertMore.EventuallyAsync(async () =>
14+
{
15+
WorkflowTaskFailedEventAttributes? attrs = null;
16+
await foreach (var evt in handle.FetchHistoryEventsAsync())
17+
{
18+
if (evt.WorkflowTaskFailedEventAttributes != null)
19+
{
20+
attrs = evt.WorkflowTaskFailedEventAttributes;
21+
}
22+
}
23+
Assert.NotNull(attrs);
24+
assert(attrs!);
25+
});
26+
}
27+
28+
public static Task StartedEventuallyAsync(WorkflowHandle handle)
29+
{
30+
return HasEventEventuallyAsync(handle, e => e.WorkflowExecutionStartedEventAttributes != null);
31+
}
32+
33+
public static async Task ChildStartedEventuallyAsync(WorkflowHandle handle)
34+
{
35+
// Wait for started
36+
string? childId = null;
37+
await HasEventEventuallyAsync(
38+
handle,
39+
e =>
40+
{
41+
childId = e.ChildWorkflowExecutionStartedEventAttributes?.WorkflowExecution?.WorkflowId;
42+
return childId != null;
43+
});
44+
// Check that a workflow task has completed proving child has really started
45+
await HasEventEventuallyAsync(
46+
handle.Client.GetWorkflowHandle(childId!),
47+
e => e.WorkflowTaskCompletedEventAttributes != null);
48+
}
49+
50+
public static Task HasEventEventuallyAsync(WorkflowHandle handle, Func<HistoryEvent, bool> predicate)
51+
{
52+
return EventuallyAsync(async () =>
53+
{
54+
await foreach (var evt in handle.FetchHistoryEventsAsync())
55+
{
56+
if (predicate(evt))
57+
{
58+
return;
59+
}
60+
}
61+
Assert.Fail("Event not found");
62+
});
63+
}
64+
965
public static Task EventuallyAsync(
1066
Func<Task> func, TimeSpan? interval = null, int iterations = 15) =>
1167
EventuallyAsync(

tests/Temporalio.Tests/Extensions/Hosting/TemporalWorkerServiceTests.cs

Lines changed: 83 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,11 @@ public async Task TemporalWorkerService_ExecuteAsync_SimpleWorker()
7979
// Add the rest of the services
8080
services.
8181
AddSingleton<ILoggerFactory>(loggerFactory).
82-
AddScoped<DatabaseClient>().
83-
// We are also adding the DB client as a keyed service to demonstrate keyed service
84-
// support for our DI logic. This used to break because newer DI library versions
85-
// disallowed accessing certain properties on keyed services which we access
86-
// internally for dupe checks.
82+
AddScoped<DatabaseClient>().
83+
// We are also adding the DB client as a keyed service to demonstrate keyed service
84+
// support for our DI logic. This used to break because newer DI library versions
85+
// disallowed accessing certain properties on keyed services which we access
86+
// internally for dupe checks.
8787
AddKeyedScoped<DatabaseClient>("client-keyed").
8888
AddHostedTemporalWorker(taskQueue).
8989
AddScopedActivities<DatabaseActivities>().
@@ -206,6 +206,84 @@ public async Task TemporalWorkerService_ExecuteAsync_MultipleWorkers()
206206
["scoped-other2"] = $"tq: {taskQueue2}, counter: 6",
207207
},
208208
result);
209+
}
210+
211+
[Workflow]
212+
public class TickingWorkflow
213+
{
214+
[WorkflowRun]
215+
public async Task RunAsync()
216+
{
217+
// Just tick every 100ms for 10s
218+
for (var i = 0; i < 100; i++)
219+
{
220+
await Workflow.DelayAsync(100);
221+
}
222+
}
223+
}
224+
225+
[Fact]
226+
public async Task TemporalWorkerService_WorkerClientReplacement_UsesNewClient()
227+
{
228+
// We are going to start a second ephemeral server and then replace the client. So we will
229+
// start a no-cache ticking workflow with the current client and confirm it has accomplished
230+
// at least one task. Then we will start another on the other client, and confirm it gets
231+
// started too. Then we will terminate both. We have to use a ticking workflow with only one
232+
// poller to force a quick re-poll to recognize our client change quickly (as opposed to
233+
// just waiting the minute for poll timeout).
234+
await using var otherEnv = await Temporalio.Testing.WorkflowEnvironment.StartLocalAsync();
235+
236+
// Start both workflows on different servers
237+
var taskQueue = $"tq-{Guid.NewGuid()}";
238+
var handle1 = await Client.StartWorkflowAsync(
239+
(TickingWorkflow wf) => wf.RunAsync(),
240+
new(id: $"workflow-{Guid.NewGuid()}", taskQueue));
241+
var handle2 = await otherEnv.Client.StartWorkflowAsync(
242+
(TickingWorkflow wf) => wf.RunAsync(),
243+
new(id: $"workflow-{Guid.NewGuid()}", taskQueue));
244+
245+
var bld = Host.CreateApplicationBuilder();
246+
247+
TemporalWorkerClientUpdater workerClientUpdater = new TemporalWorkerClientUpdater();
248+
249+
// Register the worker client updater.
250+
bld.Services.AddSingleton<TemporalWorkerClientUpdater>(workerClientUpdater);
251+
252+
// Add the first worker with the workflow and client already DI'd, and add the worker client updater.
253+
bld.Services.
254+
AddSingleton(Client).
255+
AddHostedTemporalWorker(taskQueue).
256+
AddWorkflow<TickingWorkflow>()
257+
.ConfigureOptions()
258+
.Configure<TemporalWorkerClientUpdater>((options, updater) =>
259+
{
260+
options.WorkerClientUpdater = updater;
261+
options.MaxCachedWorkflows = 0;
262+
options.MaxConcurrentWorkflowTaskPolls = 1;
263+
});
264+
265+
// Start the host
266+
using var tokenSource = new CancellationTokenSource();
267+
using var host = bld.Build();
268+
var hostTask = Task.Run(() => host.RunAsync(tokenSource.Token));
269+
270+
// Confirm the first ticking workflow has completed a task but not the second workflow
271+
await AssertMore.HasEventEventuallyAsync(handle1, e => e.WorkflowTaskCompletedEventAttributes != null);
272+
await foreach (var evt in handle2.FetchHistoryEventsAsync())
273+
{
274+
Assert.Null(evt.WorkflowTaskCompletedEventAttributes);
275+
}
276+
277+
// Now replace the client, which should be used fairly quickly because we should have
278+
// timer-done poll completions every 100ms
279+
workerClientUpdater.UpdateClient(otherEnv.Client);
280+
281+
// Now confirm the other workflow has started
282+
await AssertMore.HasEventEventuallyAsync(handle2, e => e.WorkflowTaskCompletedEventAttributes != null);
283+
284+
// Terminate both
285+
await handle1.TerminateAsync();
286+
await handle2.TerminateAsync();
209287
}
210288

211289
[Workflow("Workflow")]

0 commit comments

Comments
 (0)