Skip to content

Commit 17610b3

Browse files
authored
Add ActivitySource/OpenTelemetry support
1 parent 273b9b4 commit 17610b3

10 files changed

+208
-19
lines changed

src/WorkflowCore/Models/WorkflowOptions.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@ public void UseErrorRetryInterval(TimeSpan interval)
7575
ErrorRetryInterval = interval;
7676
}
7777

78+
public void UseIdleTime(TimeSpan interval)
79+
{
80+
IdleTime = interval;
81+
}
82+
7883
public void UseMaxConcurrentWorkflows(int maxConcurrentWorkflows)
7984
{
8085
MaxConcurrentWorkflows = maxConcurrentWorkflows;

src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
4343
{
4444
cancellationToken.ThrowIfCancellationRequested();
4545
var evt = await _eventRepository.GetEvent(itemId, cancellationToken);
46+
47+
WorkflowActivity.Enrich(evt);
4648
if (evt.IsProcessed)
4749
{
4850
_greylist.Add($"evt:{evt.Id}");

src/WorkflowCore/Services/BackgroundTasks/IndexConsumer.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
3232
try
3333
{
3434
var workflow = await FetchWorkflow(itemId);
35+
36+
WorkflowActivity.Enrich(workflow, "index");
3537
await _searchIndex.IndexWorkflow(workflow);
3638
lock (_errorCounts)
3739
{

src/WorkflowCore/Services/BackgroundTasks/QueueConsumer.cs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Diagnostics;
34
using System.Linq;
45
using System.Threading;
56
using System.Threading.Tasks;
67
using ConcurrentCollections;
78
using Microsoft.Extensions.Logging;
9+
using OpenTelemetry.Trace;
810
using WorkflowCore.Interface;
911
using WorkflowCore.Models;
1012

@@ -61,6 +63,7 @@ private async Task Execute()
6163

6264
while (!cancelToken.IsCancellationRequested)
6365
{
66+
Activity activity = default;
6467
try
6568
{
6669
var activeCount = 0;
@@ -74,15 +77,19 @@ private async Task Execute()
7477
continue;
7578
}
7679

80+
activity = WorkflowActivity.StartConsume(Queue);
7781
var item = await QueueProvider.DequeueWork(Queue, cancelToken);
7882

7983
if (item == null)
8084
{
85+
activity?.Dispose();
8186
if (!QueueProvider.IsDequeueBlocking)
8287
await Task.Delay(Options.IdleTime, cancelToken);
8388
continue;
8489
}
8590

91+
activity?.EnrichWithDequeuedItem(item);
92+
8693
var hasTask = false;
8794
lock (_activeTasks)
8895
{
@@ -93,8 +100,9 @@ private async Task Execute()
93100
_secondPasses.Add(item);
94101
if (!EnableSecondPasses)
95102
await QueueProvider.QueueWork(item, Queue);
103+
activity?.Dispose();
96104
continue;
97-
}
105+
}
98106

99107
_secondPasses.TryRemove(item);
100108

@@ -103,14 +111,19 @@ private async Task Execute()
103111
{
104112
_activeTasks.Add(item, waitHandle);
105113
}
106-
var task = ExecuteItem(item, waitHandle);
114+
var task = ExecuteItem(item, waitHandle, activity);
107115
}
108116
catch (OperationCanceledException)
109117
{
110118
}
111119
catch (Exception ex)
112120
{
113121
Logger.LogError(ex, ex.Message);
122+
activity?.RecordException(ex);
123+
}
124+
finally
125+
{
126+
activity?.Dispose();
114127
}
115128
}
116129

@@ -124,7 +137,7 @@ private async Task Execute()
124137
handle.WaitOne();
125138
}
126139

127-
private async Task ExecuteItem(string itemId, EventWaitHandle waitHandle)
140+
private async Task ExecuteItem(string itemId, EventWaitHandle waitHandle, Activity activity)
128141
{
129142
try
130143
{
@@ -142,6 +155,7 @@ private async Task ExecuteItem(string itemId, EventWaitHandle waitHandle)
142155
catch (Exception ex)
143156
{
144157
Logger.LogError(default(EventId), ex, $"Error executing item {itemId} - {ex.Message}");
158+
activity?.RecordException(ex);
145159
}
146160
finally
147161
{

src/WorkflowCore/Services/BackgroundTasks/RunnablePoller.cs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
using System;
2+
using System.Diagnostics;
23
using System.Linq;
34
using System.Threading;
45
using System.Threading.Tasks;
56
using Microsoft.Extensions.Logging;
7+
using OpenTelemetry.Trace;
68
using WorkflowCore.Interface;
79
using WorkflowCore.Models;
810

@@ -57,13 +59,14 @@ private async void PollRunnables(object target)
5759

5860
private async Task PollWorkflows()
5961
{
62+
var activity = WorkflowActivity.StartPoll("workflows");
6063
try
6164
{
6265
if (await _lockProvider.AcquireLock("poll runnables", new CancellationToken()))
6366
{
6467
try
6568
{
66-
_logger.LogDebug("Polling for runnable workflows");
69+
_logger.LogDebug("Polling for runnable workflows");
6770

6871
var runnables = await _persistenceStore.GetRunnableInstances(_dateTimeProvider.Now);
6972
foreach (var item in runnables)
@@ -83,6 +86,7 @@ await _persistenceStore.ScheduleCommand(new ScheduledCommand()
8386
catch (Exception ex)
8487
{
8588
_logger.LogError(ex, ex.Message);
89+
activity?.RecordException(ex);
8690
}
8791
}
8892
if (_greylist.Contains($"wf:{item}"))
@@ -104,11 +108,17 @@ await _persistenceStore.ScheduleCommand(new ScheduledCommand()
104108
catch (Exception ex)
105109
{
106110
_logger.LogError(ex, ex.Message);
111+
activity?.RecordException(ex);
112+
}
113+
finally
114+
{
115+
activity?.Dispose();
107116
}
108117
}
109118

110119
private async Task PollEvents()
111120
{
121+
var activity = WorkflowActivity.StartPoll("events");
112122
try
113123
{
114124
if (await _lockProvider.AcquireLock("unprocessed events", new CancellationToken()))
@@ -135,6 +145,7 @@ await _persistenceStore.ScheduleCommand(new ScheduledCommand()
135145
catch (Exception ex)
136146
{
137147
_logger.LogError(ex, ex.Message);
148+
activity?.RecordException(ex);
138149
}
139150
}
140151
if (_greylist.Contains($"evt:{item}"))
@@ -156,11 +167,17 @@ await _persistenceStore.ScheduleCommand(new ScheduledCommand()
156167
catch (Exception ex)
157168
{
158169
_logger.LogError(ex, ex.Message);
170+
activity?.RecordException(ex);
171+
}
172+
finally
173+
{
174+
activity?.Dispose();
159175
}
160176
}
161177

162178
private async Task PollCommands()
163179
{
180+
var activity = WorkflowActivity.StartPoll("commands");
164181
try
165182
{
166183
if (!_persistenceStore.SupportsScheduledCommands)
@@ -193,6 +210,11 @@ await _persistenceStore.ProcessCommands(new DateTimeOffset(_dateTimeProvider.Utc
193210
catch (Exception ex)
194211
{
195212
_logger.LogError(ex, ex.Message);
213+
activity?.RecordException(ex);
214+
}
215+
finally
216+
{
217+
activity?.Dispose();
196218
}
197219
}
198220
}

src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Diagnostics;
23
using System.Threading;
34
using System.Threading.Tasks;
45
using Microsoft.Extensions.Logging;
@@ -43,6 +44,8 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
4344
{
4445
cancellationToken.ThrowIfCancellationRequested();
4546
workflow = await _persistenceStore.GetWorkflowInstance(itemId, cancellationToken);
47+
48+
WorkflowActivity.Enrich(workflow, "process");
4649
if (workflow.Status == WorkflowStatus.Runnable)
4750
{
4851
try
@@ -51,6 +54,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
5154
}
5255
finally
5356
{
57+
WorkflowActivity.Enrich(result);
5458
await _persistenceStore.PersistWorkflow(workflow, cancellationToken);
5559
await QueueProvider.QueueWork(itemId, QueueType.Index);
5660
_greylist.Remove($"wf:{itemId}");
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
using System.Diagnostics;
2+
using OpenTelemetry.Trace;
3+
using WorkflowCore.Interface;
4+
using WorkflowCore.Models;
5+
6+
namespace WorkflowCore.Services
7+
{
8+
internal static class WorkflowActivity
9+
{
10+
private static readonly ActivitySource ActivitySource = new ActivitySource("WorkflowCore");
11+
12+
internal static Activity StartHost()
13+
{
14+
var activityName = "workflow start host";
15+
return ActivitySource.StartRootActivity(activityName, ActivityKind.Internal);
16+
}
17+
18+
internal static Activity StartConsume(QueueType queueType)
19+
{
20+
var activityName = $"workflow consume {GetQueueType(queueType)}";
21+
var activity = ActivitySource.StartRootActivity(activityName, ActivityKind.Consumer);
22+
23+
activity?.SetTag("workflow.queue", queueType);
24+
25+
return activity;
26+
}
27+
28+
29+
internal static Activity StartPoll(string type)
30+
{
31+
var activityName = $"workflow poll {type}";
32+
var activity = ActivitySource.StartRootActivity(activityName, ActivityKind.Client);
33+
34+
activity?.SetTag("workflow.poll", type);
35+
36+
return activity;
37+
}
38+
39+
internal static void Enrich(WorkflowInstance workflow, string action)
40+
{
41+
var activity = Activity.Current;
42+
if (activity != null)
43+
{
44+
activity.DisplayName = $"workflow {action} {workflow.WorkflowDefinitionId}";
45+
activity.SetTag("workflow.id", workflow.Id);
46+
activity.SetTag("workflow.definition", workflow.WorkflowDefinitionId);
47+
activity.SetTag("workflow.status", workflow.Status);
48+
}
49+
}
50+
51+
internal static void Enrich(WorkflowStep workflowStep)
52+
{
53+
var activity = Activity.Current;
54+
if (activity != null)
55+
{
56+
var stepName = string.IsNullOrEmpty(workflowStep.Name)
57+
? "inline"
58+
: workflowStep.Name;
59+
60+
activity.DisplayName += $" step {stepName}";
61+
activity.SetTag("workflow.step.id", workflowStep.Id);
62+
activity.SetTag("workflow.step.name", workflowStep.Name);
63+
activity.SetTag("workflow.step.type", workflowStep.BodyType.Name);
64+
}
65+
}
66+
67+
internal static void Enrich(WorkflowExecutorResult result)
68+
{
69+
var activity = Activity.Current;
70+
if (activity != null)
71+
{
72+
activity.SetTag("workflow.subscriptions.count", result.Subscriptions.Count);
73+
activity.SetTag("workflow.errors.count", result.Errors.Count);
74+
75+
if (result.Errors.Count > 0)
76+
{
77+
activity.SetStatus(Status.Error);
78+
activity.SetStatus(ActivityStatusCode.Error);
79+
}
80+
}
81+
}
82+
83+
internal static void Enrich(Event evt)
84+
{
85+
var activity = Activity.Current;
86+
if (activity != null)
87+
{
88+
activity.DisplayName = $"workflow process {evt.EventName}";
89+
activity.SetTag("workflow.event.id", evt.Id);
90+
activity.SetTag("workflow.event.name", evt.EventName);
91+
activity.SetTag("workflow.event.processed", evt.IsProcessed);
92+
}
93+
}
94+
95+
internal static void EnrichWithDequeuedItem(this Activity activity, string item)
96+
{
97+
if (activity != null)
98+
{
99+
activity.SetTag("workflow.queue.item", item);
100+
}
101+
}
102+
103+
private static Activity StartRootActivity(
104+
this ActivitySource activitySource,
105+
string name,
106+
ActivityKind kind)
107+
{
108+
Activity.Current = null;
109+
110+
return activitySource.StartActivity(name, kind);
111+
}
112+
113+
private static string GetQueueType(QueueType queueType)
114+
{
115+
switch (queueType)
116+
{
117+
case QueueType.Workflow: return "workflow";
118+
case QueueType.Event: return "event";
119+
case QueueType.Index: return "index";
120+
default: return "unknown";
121+
}
122+
}
123+
}
124+
}

src/WorkflowCore/Services/WorkflowExecutor.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public async Task<WorkflowExecutorResult> Execute(WorkflowInstance workflow, Can
7373
continue;
7474
}
7575

76+
WorkflowActivity.Enrich(step);
7677
try
7778
{
7879
if (!InitializeStep(workflow, step, wfResult, def, pointer))

0 commit comments

Comments
 (0)