Skip to content

Commit

Permalink
Refactor logic to drain queues after each task (#4213)
Browse files Browse the repository at this point in the history
* move draining queues before calling method "Complete"

* Revert "BUG 1972388: vsbuild task in YAML build pipeline hangs forever in ADO even though the task has already logged completion (#3979)"

This reverts commit b3cf2c0.

* implemented logic to drain web console and timeline queues after each task

* corrected typo

Co-authored-by: Konstantin Tyukalov <52399739+KonstantinTyukalov@users.noreply.github.com>

* renamed variable "drain" to "shouldDrain"

---------

Co-authored-by: Konstantin Tyukalov <52399739+KonstantinTyukalov@users.noreply.github.com>
Co-authored-by: Kirill Ivlev <102740624+kirill-ivlev@users.noreply.github.com>
  • Loading branch information
3 people committed Apr 5, 2023
1 parent 4e4f00c commit 857cbef
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 53 deletions.
6 changes: 6 additions & 0 deletions src/Agent.Worker/ExecutionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,12 @@ public TaskResult Complete(TaskResult? result = null, string currentOperation =
this.Warning(StringUtil.Loc("TotalThrottlingDelay", TimeSpan.FromMilliseconds(_totalThrottlingDelayInMilliseconds).TotalSeconds));
}

if (!AgentKnobs.DisableDrainQueuesAfterTask.GetValue(this).AsBoolean())
{
_jobServerQueue.ForceDrainWebConsoleQueue = true;
_jobServerQueue.ForceDrainTimelineQueue = true;
}

_record.CurrentOperation = currentOperation ?? _record.CurrentOperation;
_record.ResultCode = resultCode ?? _record.ResultCode;
_record.FinishTime = DateTime.UtcNow;
Expand Down
20 changes: 0 additions & 20 deletions src/Agent.Worker/StepsRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
using System.Threading.Tasks;
using Microsoft.TeamFoundation.DistributedTask.Expressions;
using Pipelines = Microsoft.TeamFoundation.DistributedTask.Pipelines;
using Microsoft.VisualStudio.Services.CircuitBreaker;
using Agent.Sdk.Knob;

namespace Microsoft.VisualStudio.Services.Agent.Worker
Expand All @@ -35,10 +34,6 @@ public interface IStepsRunner : IAgentService

public sealed class StepsRunner : AgentService, IStepsRunner
{
private IJobServerQueue _jobServerQueue;

private IJobServerQueue JobServerQueue => _jobServerQueue ??= HostContext.GetService<IJobServerQueue>();

// StepsRunner should never throw exception to caller
public async Task RunAsync(IExecutionContext jobContext, IList<IStep> steps)
{
Expand Down Expand Up @@ -309,21 +304,6 @@ private async Task RunStepAsync(IStep step, CancellationToken jobCancellationTok
// Complete the step context.
step.ExecutionContext.Section(StringUtil.Loc("StepFinishing", step.DisplayName));
step.ExecutionContext.Complete();

if (!AgentKnobs.DisableDrainQueuesAfterTask.GetValue(step.ExecutionContext).AsBoolean())
{
try
{
// We need to drain the queues after a task just in case if
// there are a lot of items since it can cause some UI hangs.
await JobServerQueue.DrainQueues();
}
catch (Exception ex)
{
Trace.Error($"Error has occurred while draining queues, it can cause some UI glitches but it doesn't affect a pipeline execution itself: {ex}");
step.ExecutionContext.Error(ex);
}
}
}

private async Task SwitchToUtf8Codepage(IStep step)
Expand Down
78 changes: 45 additions & 33 deletions src/Microsoft.VisualStudio.Services.Agent/JobServerQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ namespace Microsoft.VisualStudio.Services.Agent
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Naming", "CA1711: Identifiers should not have incorrect suffix")]
public interface IJobServerQueue : IAgentService, IThrottlingReporter
{
bool ForceDrainWebConsoleQueue { get; set; }
bool ForceDrainTimelineQueue { get; set; }
event EventHandler<ThrottlingEventArgs> JobServerQueueThrottling;
Task ShutdownAsync();
Task DrainQueues();
void Start(Pipelines.AgentJobRequestMessage jobRequest);
void QueueWebConsoleLine(Guid stepRecordId, string line, long lineNumber);
void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type, string name, string path, bool deleteSource);
Expand Down Expand Up @@ -86,34 +87,15 @@ public sealed class JobServerQueue : AgentService, IJobServerQueue
private bool _writeToBlobStoreAttachments = false;
private bool _debugMode = false;

public bool ForceDrainWebConsoleQueue { get; set; }
public bool ForceDrainTimelineQueue { get; set; }

public override void Initialize(IHostContext hostContext)
{
base.Initialize(hostContext);
_jobServer = hostContext.GetService<IJobServer>();
}

public async Task DrainQueues()
{
// Drain the queue
// ProcessWebConsoleLinesQueueAsync() will never throw exception, live console update is always best effort.
Trace.Verbose("Draining web console line queue.");
await ProcessWebConsoleLinesQueueAsync(runOnce: true);
Trace.Info("Web console line queue drained.");

// ProcessFilesUploadQueueAsync() will never throw exception, log file upload is always best effort.
Trace.Verbose("Draining file upload queue.");
await ProcessFilesUploadQueueAsync(runOnce: true);
Trace.Info("File upload queue drained.");

// ProcessTimelinesUpdateQueueAsync() will throw exception during shutdown
// if there is any timeline records that failed to update contains output variabls.
Trace.Verbose("Draining timeline update queue.");
await ProcessTimelinesUpdateQueueAsync(runOnce: true);
Trace.Info("Timeline update queue drained.");

Trace.Info("All queues are drained.");
}

public void Start(Pipelines.AgentJobRequestMessage jobRequest)
{
Trace.Entering();
Expand Down Expand Up @@ -192,7 +174,24 @@ public async Task ShutdownAsync()
_queueInProcess = false;
Trace.Info("All queue process task stopped.");

await DrainQueues();
// Drain the queue
// ProcessWebConsoleLinesQueueAsync() will never throw exception, live console update is always best effort.
Trace.Verbose("Draining web console line queue.");
await ProcessWebConsoleLinesQueueAsync(runOnce: true);
Trace.Info("Web console line queue drained.");

// ProcessFilesUploadQueueAsync() will never throw exception, log file upload is always best effort.
Trace.Verbose("Draining file upload queue.");
await ProcessFilesUploadQueueAsync(runOnce: true);
Trace.Info("File upload queue drained.");

// ProcessTimelinesUpdateQueueAsync() will throw exception during shutdown
// if there is any timeline records that failed to update contains output variables.
Trace.Verbose("Draining timeline update queue.");
await ProcessTimelinesUpdateQueueAsync(runOnce: true);
Trace.Info("Timeline update queue drained.");

Trace.Info("All queue process tasks have been stopped, and all queues are drained.");
}

public void QueueWebConsoleLine(Guid stepRecordId, string line, long lineNumber)
Expand Down Expand Up @@ -248,6 +247,12 @@ private async Task ProcessWebConsoleLinesQueueAsync(bool runOnce = false)
{
while (!_jobCompletionSource.Task.IsCompleted || runOnce)
{
bool shouldDrain = ForceDrainWebConsoleQueue;
if (ForceDrainWebConsoleQueue)
{
ForceDrainWebConsoleQueue = false;
}

if (_webConsoleLineAggressiveDequeue && ++_webConsoleLineAggressiveDequeueCount > _webConsoleLineAggressiveDequeueLimit)
{
Trace.Info("Stop aggressive process web console line queue.");
Expand Down Expand Up @@ -279,7 +284,7 @@ private async Task ProcessWebConsoleLinesQueueAsync(bool runOnce = false)
// process at most about 500 lines of web console line during regular timer dequeue task.
// Send the first line of output to the customer right away
// It might take a while to reach 500 line outputs, which would cause delays before customers see the first line
if ((!runOnce && linesCounter > 500) || _firstConsoleOutputs)
if ((!runOnce && !shouldDrain && linesCounter > 500) || _firstConsoleOutputs)
{
break;
}
Expand Down Expand Up @@ -314,7 +319,7 @@ private async Task ProcessWebConsoleLinesQueueAsync(bool runOnce = false)
// We batch and produce 500 lines of web console output every 500ms
// If customer's task produce massive of outputs, then the last queue drain run might take forever.
// So we will only upload the last 200 lines of each step from all buffered web console lines.
if (runOnce && batchedLines.Count > 2)
if ((runOnce || shouldDrain) && batchedLines.Count > 2)
{
Trace.Info($"Skip {batchedLines.Count - 2} batches web console lines for last run");
batchedLines = batchedLines.TakeLast(2).ToList();
Expand Down Expand Up @@ -430,6 +435,8 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
{
while (!_jobCompletionSource.Task.IsCompleted || runOnce)
{
bool shouldDrain = ForceDrainTimelineQueue;

List<PendingTimelineRecord> pendingUpdates = new List<PendingTimelineRecord>();
foreach (var timeline in _allTimelines)
{
Expand All @@ -442,7 +449,7 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
{
records.Add(record);
// process at most 25 timeline records update for each timeline.
if (!runOnce && records.Count > 25)
if (!runOnce && !shouldDrain && records.Count > 25)
{
break;
}
Expand Down Expand Up @@ -514,7 +521,7 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
}
}

if (runOnce)
if (runOnce || shouldDrain)
{
// continue process timeline records update,
// we might have more records need update,
Expand All @@ -535,14 +542,19 @@ private async Task ProcessTimelinesUpdateQueueAsync(bool runOnce = false)
}
else
{
break;
if (ForceDrainTimelineQueue)
{
ForceDrainTimelineQueue = false;
}
if (runOnce)
{
break;
}
}
}
}
else
{
await Task.Delay(_delayForTimelineUpdateDequeue);
}

await Task.Delay(_delayForTimelineUpdateDequeue);
}
}

Expand Down

0 comments on commit 857cbef

Please sign in to comment.