Skip to content

Commit a091c2a

Browse files
Added telemetry injection point for Ask<T> (#5297)
Looking for a way to help trace timeouts inside `Ask<T>` operations - needed some way to tap into the `TaskCompletionSource` and to create an active `ISpan` before the operation begins.
1 parent 01c35a9 commit a091c2a

File tree

7 files changed

+37
-7
lines changed

7 files changed

+37
-7
lines changed

src/contrib/cluster/Akka.Cluster.Metrics/ClusterMetricsStrategy.cs

-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
using System;
99
using Akka.Actor;
1010
using Akka.Configuration;
11-
using Akka.Configuration;
1211

1312
namespace Akka.Cluster.Metrics
1413
{

src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt

+3
Original file line numberDiff line numberDiff line change
@@ -968,6 +968,8 @@ namespace Akka.Actor
968968
Akka.Actor.IInternalActorRef TempContainer { get; }
969969
System.Threading.Tasks.Task TerminationTask { get; }
970970
Akka.Actor.IInternalActorRef ActorOf(Akka.Actor.Internal.ActorSystemImpl system, Akka.Actor.Props props, Akka.Actor.IInternalActorRef supervisor, Akka.Actor.ActorPath path, bool systemService, Akka.Actor.Deploy deploy, bool lookupDeploy, bool async);
971+
[Akka.Annotations.InternalApiAttribute()]
972+
Akka.Actor.FutureActorRef<T> CreateFutureRef<T>(System.Threading.Tasks.TaskCompletionSource<T> tcs);
971973
Akka.Actor.Address GetExternalAddressFor(Akka.Actor.Address address);
972974
void Init(Akka.Actor.Internal.ActorSystemImpl system);
973975
void RegisterTempActor(Akka.Actor.IInternalActorRef actorRef, Akka.Actor.ActorPath path);
@@ -1279,6 +1281,7 @@ namespace Akka.Actor
12791281
public Akka.Actor.IInternalActorRef TempContainer { get; }
12801282
public System.Threading.Tasks.Task TerminationTask { get; }
12811283
public Akka.Actor.IInternalActorRef ActorOf(Akka.Actor.Internal.ActorSystemImpl system, Akka.Actor.Props props, Akka.Actor.IInternalActorRef supervisor, Akka.Actor.ActorPath path, bool systemService, Akka.Actor.Deploy deploy, bool lookupDeploy, bool async) { }
1284+
public Akka.Actor.FutureActorRef<T> CreateFutureRef<T>(System.Threading.Tasks.TaskCompletionSource<T> tcs) { }
12821285
public Akka.Actor.Address GetExternalAddressFor(Akka.Actor.Address address) { }
12831286
public void Init(Akka.Actor.Internal.ActorSystemImpl system) { }
12841287
public void RegisterExtraName(string name, Akka.Actor.IInternalActorRef actor) { }

src/core/Akka.API.Tests/CoreAPISpec.ApproveRemote.approved.txt

+1
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ namespace Akka.Remote
202202
public System.Threading.Tasks.Task TerminationTask { get; }
203203
public Akka.Remote.RemoteTransport Transport { get; }
204204
public Akka.Actor.IInternalActorRef ActorOf(Akka.Actor.Internal.ActorSystemImpl system, Akka.Actor.Props props, Akka.Actor.IInternalActorRef supervisor, Akka.Actor.ActorPath path, bool systemService, Akka.Actor.Deploy deploy, bool lookupDeploy, bool async) { }
205+
public Akka.Actor.FutureActorRef<T> CreateFutureRef<T>(System.Threading.Tasks.TaskCompletionSource<T> tcs) { }
205206
protected virtual Akka.Actor.IActorRef CreateRemoteDeploymentWatcher(Akka.Actor.Internal.ActorSystemImpl system) { }
206207
protected virtual Akka.Actor.IInternalActorRef CreateRemoteRef(Akka.Actor.ActorPath actorPath, Akka.Actor.Address localAddress) { }
207208
protected virtual Akka.Actor.IInternalActorRef CreateRemoteRef(Akka.Actor.Props props, Akka.Actor.IInternalActorRef supervisor, Akka.Actor.Address localAddress, Akka.Actor.ActorPath rpath, Akka.Actor.Deploy deployment) { }

src/core/Akka.Cluster/ClusterDaemon.cs

+3-2
Original file line numberDiff line numberDiff line change
@@ -1561,8 +1561,9 @@ public void StopSeedNodeProcess()
15611561
/// Received `Join` message and replies with `Welcome` message, containing
15621562
/// current gossip state, including the new joining member.
15631563
/// </summary>
1564-
/// <param name="node">TBD</param>
1565-
/// <param name="roles">TBD</param>
1564+
/// <param name="node">The unique address of the joining node.</param>
1565+
/// <param name="roles">The roles, if any, of the joining node.</param>
1566+
/// <param name="appVersion">The software version of the joining node.</param>
15661567
public void Joining(UniqueAddress node, ImmutableHashSet<string> roles, AppVersion appVersion)
15671568
{
15681569
var selfStatus = LatestGossip.GetMember(SelfUniqueAddress).Status;

src/core/Akka.Remote/RemoteActorRefProvider.cs

+6
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,12 @@ public void UnregisterTempActor(ActorPath path)
235235
_local.UnregisterTempActor(path);
236236
}
237237

238+
/// <inheritdoc/>
239+
public FutureActorRef<T> CreateFutureRef<T>(TaskCompletionSource<T> tcs)
240+
{
241+
return _local.CreateFutureRef(tcs);
242+
}
243+
238244
private IActorRef _remotingTerminator;
239245
private IActorRef _remoteWatcher;
240246

src/core/Akka/Actor/ActorRefProvider.cs

+22
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,18 @@ public interface IActorRefProvider
9191
/// <param name="path">A path returned by <see cref="TempPath"/>. Do NOT pass in any other path!</param>
9292
void UnregisterTempActor(ActorPath path);
9393

94+
/// <summary>
95+
/// Automatically generates a <see cref="FutureActorRef{T}"/> with a temporary path.
96+
/// </summary>
97+
/// <remarks>
98+
/// Does not call <see cref="RegisterTempActor"/> or <see cref="UnregisterTempActor"/>.
99+
/// </remarks>
100+
/// <param name="tcs">A typed <see cref="TaskCompletionSource{T}"/></param>
101+
/// <typeparam name="T">The type of output this <see cref="FutureActorRef{T}"/> expects.</typeparam>
102+
/// <returns>A new, single-use <see cref="FutureActorRef{T}"/> instance.</returns>
103+
[InternalApi]
104+
FutureActorRef<T> CreateFutureRef<T>(TaskCompletionSource<T> tcs);
105+
94106
/// <summary>
95107
/// Actor factory with create-only semantics: will create an actor as
96108
/// described by <paramref name="props"/> with the given <paramref name="supervisor"/> and <paramref name="path"/> (may be different
@@ -386,6 +398,16 @@ public void UnregisterTempActor(ActorPath path)
386398
_tempContainer.RemoveChild(path.Name);
387399
}
388400

401+
/// <inheritdoc cref="IActorRefProvider.CreateFutureRef{T}"/>
402+
public FutureActorRef<T> CreateFutureRef<T>(TaskCompletionSource<T> tcs)
403+
{
404+
//create a new tempcontainer path
405+
var path = TempPath();
406+
407+
var future = new FutureActorRef<T>(tcs, path, this);
408+
return future;
409+
}
410+
389411
/// <summary>
390412
/// Initializes the ActorRefProvider
391413
/// </summary>

src/core/Akka/Actor/Futures.cs

+2-4
Original file line numberDiff line numberDiff line change
@@ -148,10 +148,8 @@ public static Task<T> Ask<T>(this ICanTell self, Func<IActorRef, object> message
148148
ctr2 = cancellationToken.Register(() => result.TrySetCanceled());
149149
}
150150

151-
//create a new tempcontainer path
152-
var path = provider.TempPath();
153-
154-
var future = new FutureActorRef<T>(result, path, provider);
151+
var future = provider.CreateFutureRef(result);
152+
var path = future.Path;
155153

156154
//The future actor needs to be unregistered in the temp container
157155
_ = result.Task.ContinueWith(t =>

0 commit comments

Comments
 (0)