-
Notifications
You must be signed in to change notification settings - Fork 287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Metrics: Use dedicated thread instead of the thread pool for the defa… #1028
Changes from all commits
6e1ca5e
7db2ee1
a315765
2d67a61
acd6c18
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,26 +16,25 @@ internal class DefaultAggregationPeriodCycle | |
[System.Diagnostics.CodeAnalysis.SuppressMessage("Naming Rules", "SA1310: C# Field must not contain an underscore", Justification = "By design: Structured name.")] | ||
private const int RunningState_Stopped = 2; | ||
|
||
private readonly Action workerMethod; | ||
|
||
private readonly MetricAggregationManager aggregationManager; | ||
private readonly MetricManager metricManager; | ||
|
||
private readonly TaskCompletionSource<bool> workerTaskCompletionControl; | ||
|
||
private int runningState; | ||
private Task workerTask; | ||
private Thread aggregationThread; | ||
|
||
public DefaultAggregationPeriodCycle(MetricAggregationManager aggregationManager, MetricManager metricManager) | ||
{ | ||
Util.ValidateNotNull(aggregationManager, nameof(aggregationManager)); | ||
Util.ValidateNotNull(metricManager, nameof(metricManager)); | ||
|
||
this.workerMethod = this.Run; | ||
|
||
this.aggregationManager = aggregationManager; | ||
this.metricManager = metricManager; | ||
|
||
this.runningState = RunningState_NotStarted; | ||
this.workerTask = null; | ||
this.workerTaskCompletionControl = new TaskCompletionSource<bool>(); | ||
this.aggregationThread = null; | ||
} | ||
|
||
~DefaultAggregationPeriodCycle() | ||
|
@@ -52,21 +51,28 @@ public bool Start() | |
return false; // Was already running or stopped. | ||
} | ||
|
||
this.workerTask = Task.Run(this.workerMethod) | ||
.ContinueWith( | ||
(t) => { this.workerTask = null; }, | ||
TaskContinuationOptions.ExecuteSynchronously); | ||
// We create a new thread rather than using the thread pool. | ||
// This is because inside of the main loop in the Run() method we use a synchronous wait. | ||
// The reason for that is to prevent aggregation from being affected by potential thread pool starvation. | ||
// As a result, Run() is a very long running task that occupies a thread forever. | ||
// If we were to schedule Run() on the thread pool it would be possible that the thread chosen by the | ||
// pool had run user code before. Such user code may be doing an asynchronous wait scheduled to | ||
// continue on the same thread(e.g. this can occur when using a custom synchronization context or a | ||
// custom task scheduler). If such case the waiting user code will never continue. | ||
// By creating our own thread, we guarantee no interactions with potentially incorrectly written async user code. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can I suggest we just say: // We create a dedicated thread for aggregation. This is to prevent aggregation
// from being affected by potential thread pool starvation. The rest is about the old implementation that is (IMO) best left unsaid because that implementation contributed to the very thread pool starvation we're trying to avoid! In addition, I think the "had run user code before" and "may be doing an asynchronous wait" parts are misleading and irrelevant to the crux of the problem which was the |
||
|
||
this.aggregationThread = new Thread(this.Run); | ||
this.aggregationThread.Name = nameof(DefaultAggregationPeriodCycle); | ||
|
||
this.aggregationThread.Start(); | ||
return true; | ||
} | ||
|
||
public Task StopAsync() | ||
{ | ||
Interlocked.Exchange(ref this.runningState, RunningState_Stopped); | ||
|
||
// Benign race on being called very soon after start. Will miss a cycle but eventually complete correctly. | ||
|
||
Task workerTask = this.workerTask; | ||
return workerTask ?? Task.FromResult(true); | ||
return this.workerTaskCompletionControl.Task; | ||
} | ||
|
||
public void FetchAndTrackMetrics() | ||
|
@@ -139,11 +145,13 @@ private void Run() | |
DateTimeOffset now = DateTimeOffset.Now; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not part of this change, I know, but please use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are right. If I recall correctly, there may have been purpose to it - take a local TS so that the serializer can choose to UTC it or not. But thinking about it today - I agree that using UTC would be better - and if the serializer wanted to really do something local, it would do that consistently for all telemetry. Please feel free to file an issue. |
||
TimeSpan waitPeriod = GetNextCycleTargetTime(now) - now; | ||
|
||
Task.Delay(waitPeriod).ConfigureAwait(continueOnCapturedContext: false).GetAwaiter().GetResult(); | ||
Thread.Sleep(waitPeriod); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you want a cancelable wait. One way is to create a CancellationToken, get its WaitHandle and call WaitOne with the TimeSpan. The CancellationToken takes the place of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why would we want a cancelable wait at this point? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For fast and graceful shutdown. Maybe I misunderstood the point of StopAsync, but I assume it's to be called at shutdown and the expectation is that shutdown is fast. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. Right now we do not do another data dump on a cancelled cycle, but you are right: it would be a good improvement. On the other hand, dealing with WaitHandles here would mean dealing with Disposables. If we go there, I would prefer to do it as a separate change from addressing the issue at hand. Before the proposed change, this wait was not cancelable either. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
|
||
int shouldBeRunning = Volatile.Read(ref this.runningState); | ||
if (shouldBeRunning != RunningState_Running) | ||
{ | ||
this.aggregationThread = null; | ||
this.workerTaskCompletionControl.TrySetResult(true); | ||
return; | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any value in making this a private static Thread?
I think there is a scenario where a customer is creating multiple TelemetryConfigurations, which would create multiple of these internal classes each with a private thread. Please consider if there is any value in making this thread globally unique.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you could end up with several instances of DefaultAggregationPeriodCycle. Not only if you have several TelemetryConfigurations, but also if you have explicitly opted into aggregating at the scope of a telemetry client.
However, a shared thread would require shared cancellation and waiting logic, which is non-trivial. For a bug fix we should always err on the side of a minimal change. Here such change is to get off the thread pool.
But yes, now that we create threads explicitly, we can consider using one shared thread for all instances of DefaultAggregationPeriodCycle in the future. It would lighten the resource load.