Skip to content

Commit 4be8e4a

Browse files
authored
Correlate client and orchestrator functions for DF .NET Isolated Distributed Tracing (#2998)
1 parent 85d4c87 commit 4be8e4a

File tree

15 files changed

+1191
-24
lines changed

15 files changed

+1191
-24
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See LICENSE in the project root for license information.
3+
#nullable enable
4+
5+
namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation
6+
{
7+
internal static class Schema
8+
{
9+
internal static class Task
10+
{
11+
internal const string Type = "durabletask.type";
12+
internal const string Name = "durabletask.task.name";
13+
internal const string Version = "durabletask.task.version";
14+
internal const string InstanceId = "durabletask.task.instance_id";
15+
internal const string ExecutionId = "durabletask.task.execution_id";
16+
internal const string Status = "durabletask.task.status";
17+
internal const string TaskId = "durabletask.task.task_id";
18+
internal const string EventTargetInstanceId = "durabletask.event.target_instance_id";
19+
internal const string FireAt = "durabletask.fire_at";
20+
}
21+
22+
internal static class Status
23+
{
24+
internal const string Code = "otel.status_code";
25+
internal const string Description = "otel.status_description";
26+
}
27+
28+
internal static class SpanNames
29+
{
30+
internal static string CreateOrchestration(string name, string? version)
31+
=> FormatName(TraceActivityConstants.CreateOrchestration, name, version);
32+
33+
private static string FormatName(string prefix, string name, string? version)
34+
=> string.IsNullOrEmpty(version) ? $"{prefix}:{name}" : $"{prefix}:{name}@{version}";
35+
}
36+
}
37+
}

src/WebJobs.Extensions.DurableTask/Correlation/TelemetryActivator.cs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,28 @@ public TelemetryActivator(IOptions<DurableTaskOptions> options, INameResolver na
3636

3737
internal IAsyncDisposable TelemetryModule { get; set; }
3838

39+
internal IAsyncDisposable WebJobsTelemetryModule { get; set; }
40+
3941
/// <inheritdoc/>
4042
public ValueTask DisposeAsync()
4143
{
42-
return this.TelemetryModule?.DisposeAsync() ?? default;
44+
if (this.TelemetryModule != null)
45+
{
46+
this.TelemetryModule.DisposeAsync();
47+
}
48+
49+
if (this.WebJobsTelemetryModule != null)
50+
{
51+
this.WebJobsTelemetryModule.DisposeAsync();
52+
}
53+
54+
return default;
4355
}
4456

4557
/// <inheritdoc/>
4658
public void Dispose()
4759
{
48-
this.TelemetryModule?.DisposeAsync().AsTask().GetAwaiter().GetResult();
60+
this.DisposeAsync().AsTask().GetAwaiter().GetResult();
4961
}
5062

5163
/// <summary>
@@ -65,6 +77,10 @@ public void Initialize(TelemetryConfiguration configuration)
6577
DurableTelemetryModule module = new DurableTelemetryModule();
6678
module.Initialize(configuration);
6779
this.TelemetryModule = module;
80+
81+
WebJobsTelemetryModule webJobsModule = new WebJobsTelemetryModule();
82+
webJobsModule.Initialize(configuration);
83+
this.WebJobsTelemetryModule = webJobsModule;
6884
}
6985
else
7086
{
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See LICENSE in the project root for license information.
3+
4+
namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation
5+
{
6+
internal class TraceActivityConstants
7+
{
8+
public const string Client = "client";
9+
public const string Orchestration = "orchestration";
10+
public const string Activity = "activity";
11+
public const string Event = "event";
12+
public const string Timer = "timer";
13+
14+
public const string CreateOrchestration = "create_orchestration";
15+
public const string OrchestrationEvent = "orchestration_event";
16+
}
17+
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
// Copyright (c) .NET Foundation. All rights reserved.
2+
// Licensed under the MIT License. See LICENSE in the project root for license information.
3+
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Diagnostics;
7+
using System.Threading;
8+
using System.Threading.Tasks;
9+
using Microsoft.ApplicationInsights;
10+
using Microsoft.ApplicationInsights.DataContracts;
11+
using Microsoft.ApplicationInsights.Extensibility;
12+
using Microsoft.ApplicationInsights.Extensibility.Implementation;
13+
14+
namespace Microsoft.Azure.WebJobs.Extensions.DurableTask.Correlation
15+
{
16+
internal class WebJobsTelemetryModule : ITelemetryModule, IAsyncDisposable
17+
{
18+
private TelemetryClient telemetryClient;
19+
private ActivityListener listener;
20+
21+
/// <inheritdoc/>
22+
public void Initialize(TelemetryConfiguration configuration)
23+
{
24+
this.telemetryClient = new TelemetryClient(configuration);
25+
26+
// ActivitySamplingResult.AllData means that the ActivityListener is going to collect all of the data
27+
// for any Activity that's sent to the "DurableTask" source. It isn't going to exclude any data.
28+
this.listener = new ActivityListener
29+
{
30+
ShouldListenTo = source => source.Name.StartsWith("WebJobs.Extensions.DurableTask"),
31+
ActivityStopped = this.OnEndActivity,
32+
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData,
33+
SampleUsingParentId = (ref ActivityCreationOptions<string> _) => ActivitySamplingResult.AllData,
34+
};
35+
36+
ActivitySource.AddActivityListener(this.listener);
37+
}
38+
39+
private void OnEndActivity(Activity activity)
40+
{
41+
if (!activity.IsAllDataRequested)
42+
{
43+
return;
44+
}
45+
46+
OperationTelemetry telemetry = CreateTelemetry(activity);
47+
this.telemetryClient.Track(telemetry);
48+
}
49+
50+
private static OperationTelemetry CreateTelemetry(Activity activity)
51+
{
52+
OperationTelemetry telemetry;
53+
ActivityStatusCode status = activity.Status;
54+
switch (activity.Kind)
55+
{
56+
case ActivityKind.Consumer or ActivityKind.Server:
57+
RequestTelemetry request = CreateTelemetryCore<RequestTelemetry>(activity);
58+
request.Success = status != ActivityStatusCode.Error;
59+
60+
if (string.IsNullOrEmpty(request.ResponseCode))
61+
{
62+
request.ResponseCode = (bool)request.Success ? "200" : "500";
63+
}
64+
65+
telemetry = request;
66+
break;
67+
default:
68+
DependencyTelemetry dependency = CreateTelemetryCore<DependencyTelemetry>(activity);
69+
dependency.Success = status != ActivityStatusCode.Error;
70+
dependency.Type = activity.Kind is ActivityKind.Internal ? "InProc" : "DurableTask";
71+
telemetry = dependency;
72+
break;
73+
}
74+
75+
// telemetry.Properties["otel.status_description"] = description;
76+
return telemetry;
77+
}
78+
79+
private static T CreateTelemetryCore<T>(Activity activity)
80+
where T : OperationTelemetry, new()
81+
{
82+
T telemetry = new ()
83+
{
84+
Name = activity.DisplayName,
85+
Id = activity.SpanId.ToString(),
86+
Timestamp = activity.StartTimeUtc,
87+
Duration = activity.Duration,
88+
};
89+
90+
telemetry.Context.Operation.Id = activity.RootId;
91+
ActivitySpanId parentId = activity.ParentSpanId;
92+
if (parentId != default)
93+
{
94+
telemetry.Context.Operation.ParentId = parentId.ToString();
95+
}
96+
97+
foreach (KeyValuePair<string, string> item in activity.Baggage)
98+
{
99+
telemetry.Properties[item.Key] = item.Value;
100+
}
101+
102+
foreach (KeyValuePair<string, object> item in activity.TagObjects)
103+
{
104+
telemetry.Properties[item.Key] = item.Value.ToString();
105+
}
106+
107+
return telemetry;
108+
}
109+
110+
async ValueTask IAsyncDisposable.DisposeAsync()
111+
{
112+
this.listener?.Dispose();
113+
if (this.telemetryClient != null)
114+
{
115+
using CancellationTokenSource cts = new (millisecondsDelay: 5000);
116+
try
117+
{
118+
await this.telemetryClient.FlushAsync(cts.Token);
119+
}
120+
catch
121+
{
122+
// Ignore for now; potentially log this in the future.
123+
}
124+
}
125+
}
126+
}
127+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# WebJobs Extension Protobuf
2+
3+
This directory contains the protobuf definitions for the WebJobs Extension SDK, which are used to generate the C# source code for the gRPC service contracts. The official protobuf definitions are maintained in the [Durable Task Protobuf repository](https://github.com/microsoft/durabletask-protobuf).
4+
5+
## Updating the Protobuf Definitions
6+
7+
To update the protobuf definitions in this directory, follow these steps:
8+
9+
1. Make sure you have [PowerShell](https://learn.microsoft.com/powershell/scripting/install/installing-powershell) installed on your machine.
10+
2. Run the following command to download the latest protobuf definitions from the Durable Task Protobuf repository:
11+
12+
```powershell
13+
.\refresh-protos.ps1
14+
```
15+
16+
This script will download the latest protobuf definitions from the `https://github.com/microsoft/durabletask-protobuf` repository and copy them to this directory.
17+
18+
By default, the latest versions of the protobufs are downloaded from the `main` branch. To specify an alternative branch, use the `-branch` parameter:
19+
20+
```powershell
21+
.\refresh-protos.ps1 -branch <branch-name>
22+
```
23+
24+
The `versions.txt` file in this directory contains the list of protobuf files and their commit hashes that were last downloaded. It is updated automatically by the `refresh-protos.ps1` script.

0 commit comments

Comments
 (0)