Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics: Use dedicated thread instead of the thread pool for the defa… #1028

Merged
merged 5 commits into from
Dec 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ This changelog will be used to generate documentation on [release notes page](ht
## Version 2.9.0-beta3
- [Flatten IExtension and Unknown ITelemetry implementations for Rich Payload Event Source consumption](https://github.com/Microsoft/ApplicationInsights-dotnet/pull/1017)
- [Fix: Start/StopOperation with W3C distributed tracing enabled does not track telemetry](https://github.com/Microsoft/ApplicationInsights-dotnet/pull/1031)
- [Fix: Do not run metric aggregation on a thread pool thread](https://github.com/Microsoft/ApplicationInsights-dotnet/pull/1028)

## Version 2.9.0-beta2
- [Remove unused reference to System.Web.Extensions](https://github.com/Microsoft/ApplicationInsights-dotnet/pull/956)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,25 @@ internal class DefaultAggregationPeriodCycle
[System.Diagnostics.CodeAnalysis.SuppressMessage("Naming Rules", "SA1310: C# Field must not contain an underscore", Justification = "By design: Structured name.")]
private const int RunningState_Stopped = 2;

private readonly Action workerMethod;

private readonly MetricAggregationManager aggregationManager;
private readonly MetricManager metricManager;

private readonly TaskCompletionSource<bool> workerTaskCompletionControl;

private int runningState;
private Task workerTask;
private Thread aggregationThread;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any value in making this a private static Thread?

I think there is a scenario where a customer is creating multiple TelemetryConfigurations, which would create multiple of these internal classes each with a private thread. Please consider if there is any value in making this thread globally unique.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you could end up with several instances of DefaultAggregationPeriodCycle. Not only if you have several TelemetryConfigurations, but also if you have explicitly opted into aggregating at the scope of a telemetry client.
However, a shared thread would require shared cancellation and waiting logic, which is non-trivial. For a bug fix we should always err on the side of a minimal change. Here such change is to get off the thread pool.

But yes, now that we create threads explicitly, we can consider using one shared thread for all instances of DefaultAggregationPeriodCycle in the future. It would lighten the resource load.


public DefaultAggregationPeriodCycle(MetricAggregationManager aggregationManager, MetricManager metricManager)
{
Util.ValidateNotNull(aggregationManager, nameof(aggregationManager));
Util.ValidateNotNull(metricManager, nameof(metricManager));

this.workerMethod = this.Run;

this.aggregationManager = aggregationManager;
this.metricManager = metricManager;

this.runningState = RunningState_NotStarted;
this.workerTask = null;
this.workerTaskCompletionControl = new TaskCompletionSource<bool>();
this.aggregationThread = null;
}

~DefaultAggregationPeriodCycle()
Expand All @@ -52,21 +51,28 @@ public bool Start()
return false; // Was already running or stopped.
}

this.workerTask = Task.Run(this.workerMethod)
.ContinueWith(
(t) => { this.workerTask = null; },
TaskContinuationOptions.ExecuteSynchronously);
// We create a new thread rather than using the thread pool.
// This is because inside of the main loop in the Run() method we use a synchronous wait.
// The reason for that is to prevent aggregation from being affected by potential thread pool starvation.
// As a result, Run() is a very long running task that occupies a thread forever.
// If we were to schedule Run() on the thread pool it would be possible that the thread chosen by the
// pool had run user code before. Such user code may be doing an asynchronous wait scheduled to
// continue on the same thread(e.g. this can occur when using a custom synchronization context or a
// custom task scheduler). If such case the waiting user code will never continue.
// By creating our own thread, we guarantee no interactions with potentially incorrectly written async user code.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I suggest we just say:

// We create a dedicated thread for aggregation. This is to prevent aggregation
// from being affected by potential thread pool starvation.

The rest is about the old implementation that is (IMO) best left unsaid because that implementation contributed to the very thread pool starvation we're trying to avoid! In addition, I think the "had run user code before" and "may be doing an asynchronous wait" parts are misleading and irrelevant to the crux of the problem which was the .GetAwaiter().GetResult() piece.


this.aggregationThread = new Thread(this.Run);
this.aggregationThread.Name = nameof(DefaultAggregationPeriodCycle);

this.aggregationThread.Start();
return true;
}

public Task StopAsync()
{
Interlocked.Exchange(ref this.runningState, RunningState_Stopped);

// Benign race on being called very soon after start. Will miss a cycle but eventually complete correctly.

Task workerTask = this.workerTask;
return workerTask ?? Task.FromResult(true);
return this.workerTaskCompletionControl.Task;
}

public void FetchAndTrackMetrics()
Expand Down Expand Up @@ -139,11 +145,13 @@ private void Run()
DateTimeOffset now = DateTimeOffset.Now;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not part of this change, I know, but please use UtcNow instead of Now (also in GetNextCycleTargetTime)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. If I recall correctly, there may have been purpose to it - take a local TS so that the serializer can choose to UTC it or not. But thinking about it today - I agree that using UTC would be better - and if the serializer wanted to really do something local, it would do that consistently for all telemetry. Please feel free to file an issue.

TimeSpan waitPeriod = GetNextCycleTargetTime(now) - now;

Task.Delay(waitPeriod).ConfigureAwait(continueOnCapturedContext: false).GetAwaiter().GetResult();
Thread.Sleep(waitPeriod);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want a cancelable wait. One way is to create a CancellationToken, get its WaitHandle and call WaitOne with the TimeSpan. The CancellationToken takes the place of the shouldBeRunning bool.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we want a cancelable wait at this point?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For fast and graceful shutdown. Maybe I misunderstood the point of StopAsync, but I assume it's to be called at shutdown and the expectation is that shutdown is fast.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Right now we do not do another data dump on a cancelled cycle, but you are right: it would be a good improvement. On the other hand, dealing with WaitHandles here would mean dealing with Disposables. If we go there, I would prefer to do it as a separate change from addressing the issue at hand. Before the proposed change, this wait was not cancelable either.
Thoughts?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


int shouldBeRunning = Volatile.Read(ref this.runningState);
if (shouldBeRunning != RunningState_Running)
{
this.aggregationThread = null;
this.workerTaskCompletionControl.TrySetResult(true);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard1.3'">
<PackageReference Include="System.Diagnostics.StackTrace" Version="4.3.0" />
<PackageReference Include="System.Net.Requests" Version="4.3.0" />
<PackageReference Include="System.Threading.Thread" Version="4.3.0" />
</ItemGroup>

<ItemGroup>
Expand Down