Skip to content

Commit

Permalink
Properly guard upload (actions#2439)
Browse files Browse the repository at this point in the history
* Revert "Revert "Uploading step logs to Results as well  (actions#2422)" (actions#2437)"

This reverts commit 8c096ba.

* Properly guard the upload to results feature

* Delete skipped file if deletesource is true
  • Loading branch information
yacaovsnc authored Feb 17, 2023
1 parent 02c9d1c commit aaf02ab
Show file tree
Hide file tree
Showing 7 changed files with 370 additions and 59 deletions.
14 changes: 12 additions & 2 deletions src/Runner.Common/JobServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public interface IJobServer : IRunnerService, IAsyncDisposable
Task<TaskLog> AppendLogContentAsync(Guid scopeIdentifier, string hubName, Guid planId, int logId, Stream uploadStream, CancellationToken cancellationToken);
Task AppendTimelineRecordFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList<string> lines, long? startLine, CancellationToken cancellationToken);
Task<TaskAttachment> CreateAttachmentAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, String type, String name, Stream uploadStream, CancellationToken cancellationToken);
Task CreateStepSymmaryAsync(string planId, string jobId, string stepId, string file, CancellationToken cancellationToken);
Task CreateStepSummaryAsync(string planId, string jobId, Guid stepId, string file, CancellationToken cancellationToken);
Task CreateResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken);
Task<TaskLog> CreateLogAsync(Guid scopeIdentifier, string hubName, Guid planId, TaskLog log, CancellationToken cancellationToken);
Task<Timeline> CreateTimelineAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, CancellationToken cancellationToken);
Task<List<TimelineRecord>> UpdateTimelineRecordsAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, IEnumerable<TimelineRecord> records, CancellationToken cancellationToken);
Expand Down Expand Up @@ -316,7 +317,7 @@ public Task<TaskAttachment> CreateAttachmentAsync(Guid scopeIdentifier, string h
return _taskClient.CreateAttachmentAsync(scopeIdentifier, hubName, planId, timelineId, timelineRecordId, type, name, uploadStream, cancellationToken: cancellationToken);
}

public Task CreateStepSymmaryAsync(string planId, string jobId, string stepId, string file, CancellationToken cancellationToken)
public Task CreateStepSummaryAsync(string planId, string jobId, Guid stepId, string file, CancellationToken cancellationToken)
{
if (_resultsClient != null)
{
Expand All @@ -325,6 +326,15 @@ public Task CreateStepSymmaryAsync(string planId, string jobId, string stepId, s
throw new InvalidOperationException("Results client is not initialized.");
}

public Task CreateResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken)
{
if (_resultsClient != null)
{
return _resultsClient.UploadResultsStepLogAsync(planId, jobId, stepId, file, finalize, firstBlock, lineCount, cancellationToken: cancellationToken);
}
throw new InvalidOperationException("Results client is not initialized.");
}


public Task<TaskLog> CreateLogAsync(Guid scopeIdentifier, string hubName, Guid planId, TaskLog log, CancellationToken cancellationToken)
{
Expand Down
135 changes: 100 additions & 35 deletions src/Runner.Common/JobServerQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public interface IJobServerQueue : IRunnerService, IThrottlingReporter
void Start(Pipelines.AgentJobRequestMessage jobRequest);
void QueueWebConsoleLine(Guid stepRecordId, string line, long? lineNumber = null);
void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type, string name, string path, bool deleteSource);
void QueueSummaryUpload(Guid stepRecordId, string name, string path, bool deleteSource);
void QueueResultsUpload(Guid timelineRecordId, string name, string path, string type, bool deleteSource, bool finalize, bool firstBlock, long totalLines);
void QueueTimelineRecordUpdate(Guid timelineId, TimelineRecord timelineRecord);
}

Expand All @@ -31,7 +31,7 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue
private static readonly TimeSpan _delayForWebConsoleLineDequeue = TimeSpan.FromMilliseconds(500);
private static readonly TimeSpan _delayForTimelineUpdateDequeue = TimeSpan.FromMilliseconds(500);
private static readonly TimeSpan _delayForFileUploadDequeue = TimeSpan.FromMilliseconds(1000);
private static readonly TimeSpan _delayForSummaryUploadDequeue = TimeSpan.FromMilliseconds(1000);
private static readonly TimeSpan _delayForResultsUploadDequeue = TimeSpan.FromMilliseconds(1000);

// Job message information
private Guid _scopeIdentifier;
Expand All @@ -46,7 +46,7 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue
// queue for file upload (log file or attachment)
private readonly ConcurrentQueue<UploadFileInfo> _fileUploadQueue = new();

private readonly ConcurrentQueue<SummaryUploadFileInfo> _summaryFileUploadQueue = new();
private readonly ConcurrentQueue<ResultsUploadFileInfo> _resultsFileUploadQueue = new();

// queue for timeline or timeline record update (one queue per timeline)
private readonly ConcurrentDictionary<Guid, ConcurrentQueue<TimelineRecord>> _timelineUpdateQueue = new();
Expand All @@ -60,7 +60,7 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue
// Task for each queue's dequeue process
private Task _webConsoleLineDequeueTask;
private Task _fileUploadDequeueTask;
private Task _summaryUploadDequeueTask;
private Task _resultsUploadDequeueTask;
private Task _timelineUpdateDequeueTask;

// common
Expand All @@ -84,6 +84,8 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue
private bool _webConsoleLineAggressiveDequeue = true;
private bool _firstConsoleOutputs = true;

private bool _resultsClientInitiated = false;

public override void Initialize(IHostContext hostContext)
{
base.Initialize(hostContext);
Expand All @@ -109,9 +111,9 @@ public void Start(Pipelines.AgentJobRequestMessage jobRequest)
{
Trace.Info("Initializing results client");
_jobServer.InitializeResultsClient(new Uri(resultsReceiverEndpoint), accessToken);
_resultsClientInitiated = true;
}


if (_queueInProcess)
{
Trace.Info("No-opt, all queue process tasks are running.");
Expand Down Expand Up @@ -140,12 +142,12 @@ public void Start(Pipelines.AgentJobRequestMessage jobRequest)
_fileUploadDequeueTask = ProcessFilesUploadQueueAsync();

Trace.Info("Start results file upload queue.");
_summaryUploadDequeueTask = ProcessSummaryUploadQueueAsync();
_resultsUploadDequeueTask = ProcessResultsUploadQueueAsync();

Trace.Info("Start process timeline update queue.");
_timelineUpdateDequeueTask = ProcessTimelinesUpdateQueueAsync();

_allDequeueTasks = new Task[] { _webConsoleLineDequeueTask, _fileUploadDequeueTask, _timelineUpdateDequeueTask, _summaryUploadDequeueTask };
_allDequeueTasks = new Task[] { _webConsoleLineDequeueTask, _fileUploadDequeueTask, _timelineUpdateDequeueTask, _resultsUploadDequeueTask };
_queueInProcess = true;
}

Expand Down Expand Up @@ -176,9 +178,9 @@ public async Task ShutdownAsync()
await ProcessFilesUploadQueueAsync(runOnce: true);
Trace.Info("File upload queue drained.");

Trace.Verbose("Draining results summary upload queue.");
await ProcessSummaryUploadQueueAsync(runOnce: true);
Trace.Info("Results summary upload queue drained.");
Trace.Verbose("Draining results upload queue.");
await ProcessResultsUploadQueueAsync(runOnce: true);
Trace.Info("Results upload queue drained.");

// ProcessTimelinesUpdateQueueAsync() will throw exception during shutdown
// if there is any timeline records that failed to update contains output variabls.
Expand Down Expand Up @@ -230,21 +232,49 @@ public void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type,
_fileUploadQueue.Enqueue(newFile);
}

public void QueueSummaryUpload(Guid stepRecordId, string name, string path, bool deleteSource)
public void QueueResultsUpload(Guid timelineRecordId, string name, string path, string type, bool deleteSource, bool finalize, bool firstBlock, long totalLines)
{
if (!_resultsClientInitiated)
{
Trace.Verbose("Skipping results upload");
try
{
if (deleteSource)
{
File.Delete(path);
}
}
catch (Exception ex)
{
Trace.Info("Catch exception during delete skipped results upload file.");
Trace.Error(ex);
}
return;
}

if (timelineRecordId == _jobTimelineRecordId && String.Equals(type, CoreAttachmentType.ResultsLog, StringComparison.Ordinal))
{
Trace.Verbose("Skipping job log {0} for record {1}", path, timelineRecordId);
return;
}

// all parameter not null, file path exist.
var newFile = new SummaryUploadFileInfo()
var newFile = new ResultsUploadFileInfo()
{
Name = name,
Path = path,
Type = type,
PlanId = _planId.ToString(),
JobId = _jobTimelineRecordId.ToString(),
StepId = stepRecordId.ToString(),
DeleteSource = deleteSource
RecordId = timelineRecordId,
DeleteSource = deleteSource,
Finalize = finalize,
FirstBlock = firstBlock,
TotalLines = totalLines,
};

Trace.Verbose("Enqueue results file upload queue: file '{0}' attach to job {1} step {2}", newFile.Path, _jobTimelineRecordId, stepRecordId);
_summaryFileUploadQueue.Enqueue(newFile);
Trace.Verbose("Enqueue results file upload queue: file '{0}' attach to job {1} step {2}", newFile.Path, _jobTimelineRecordId, timelineRecordId);
_resultsFileUploadQueue.Enqueue(newFile);
}

public void QueueTimelineRecordUpdate(Guid timelineId, TimelineRecord timelineRecord)
Expand Down Expand Up @@ -437,18 +467,18 @@ private async Task ProcessFilesUploadQueueAsync(bool runOnce = false)
}
}

private async Task ProcessSummaryUploadQueueAsync(bool runOnce = false)
private async Task ProcessResultsUploadQueueAsync(bool runOnce = false)
{
Trace.Info("Starting results-based upload queue...");

while (!_jobCompletionSource.Task.IsCompleted || runOnce)
{
List<SummaryUploadFileInfo> filesToUpload = new();
SummaryUploadFileInfo dequeueFile;
while (_summaryFileUploadQueue.TryDequeue(out dequeueFile))
List<ResultsUploadFileInfo> filesToUpload = new();
ResultsUploadFileInfo dequeueFile;
while (_resultsFileUploadQueue.TryDequeue(out dequeueFile))
{
filesToUpload.Add(dequeueFile);
// process at most 10 file upload.
// process at most 10 file uploads.
if (!runOnce && filesToUpload.Count > 10)
{
break;
Expand All @@ -459,19 +489,27 @@ private async Task ProcessSummaryUploadQueueAsync(bool runOnce = false)
{
if (runOnce)
{
Trace.Info($"Uploading {filesToUpload.Count} summary files in one shot through results service.");
Trace.Info($"Uploading {filesToUpload.Count} file(s) in one shot through results service.");
}

int errorCount = 0;
foreach (var file in filesToUpload)
{
try
{
await UploadSummaryFile(file);
if (String.Equals(file.Type, ChecksAttachmentType.StepSummary, StringComparison.OrdinalIgnoreCase))
{
await UploadSummaryFile(file);
}
else if (String.Equals(file.Type, CoreAttachmentType.ResultsLog, StringComparison.OrdinalIgnoreCase))
{
Trace.Info($"Got a step log file to send to results service.");
await UploadResultsStepLogFile(file);
}
}
catch (Exception ex)
{
var issue = new Issue() { Type = IssueType.Warning, Message = $"Caught exception during summary file upload to results. {ex.Message}" };
var issue = new Issue() { Type = IssueType.Warning, Message = $"Caught exception during file upload to results. {ex.Message}" };
issue.Data[Constants.Runner.InternalTelemetryIssueDataKey] = Constants.Runner.ResultsUploadFailure;

var telemetryRecord = new TimelineRecord()
Expand All @@ -481,16 +519,13 @@ private async Task ProcessSummaryUploadQueueAsync(bool runOnce = false)
telemetryRecord.Issues.Add(issue);
QueueTimelineRecordUpdate(_jobTimelineId, telemetryRecord);

Trace.Info("Catch exception during summary file upload to results, keep going since the process is best effort.");
Trace.Info("Catch exception during file upload to results, keep going since the process is best effort.");
Trace.Error(ex);
}
finally
{
errorCount++;
}
}

Trace.Info("Tried to upload {0} summary files to results, success rate: {1}/{0}.", filesToUpload.Count, filesToUpload.Count - errorCount);
Trace.Info("Tried to upload {0} file(s) to results, success rate: {1}/{0}.", filesToUpload.Count, filesToUpload.Count - errorCount);
}

if (runOnce)
Expand All @@ -499,7 +534,7 @@ private async Task ProcessSummaryUploadQueueAsync(bool runOnce = false)
}
else
{
await Task.Delay(_delayForSummaryUploadDequeue);
await Task.Delay(_delayForResultsUploadDequeue);
}
}
}
Expand Down Expand Up @@ -776,15 +811,14 @@ private async Task UploadFile(UploadFileInfo file)
}
}

private async Task UploadSummaryFile(SummaryUploadFileInfo file)
private async Task UploadSummaryFile(ResultsUploadFileInfo file)
{
bool uploadSucceed = false;
try
{
// Upload the step summary
Trace.Info($"Starting to upload summary file to results service {file.Name}, {file.Path}");
var cancellationTokenSource = new CancellationTokenSource();
await _jobServer.CreateStepSymmaryAsync(file.PlanId, file.JobId, file.StepId, file.Path, cancellationTokenSource.Token);
await _jobServer.CreateStepSummaryAsync(file.PlanId, file.JobId, file.RecordId, file.Path, CancellationToken.None);

uploadSucceed = true;
}
Expand All @@ -804,6 +838,33 @@ private async Task UploadSummaryFile(SummaryUploadFileInfo file)
}
}
}

private async Task UploadResultsStepLogFile(ResultsUploadFileInfo file)
{
bool uploadSucceed = false;
try
{
Trace.Info($"Starting upload of step log file to results service {file.Name}, {file.Path}");
await _jobServer.CreateResultsStepLogAsync(file.PlanId, file.JobId, file.RecordId, file.Path, file.Finalize, file.FirstBlock, file.TotalLines, CancellationToken.None);

uploadSucceed = true;
}
finally
{
if (uploadSucceed && file.DeleteSource)
{
try
{
File.Delete(file.Path);
}
catch (Exception ex)
{
Trace.Info("Exception encountered during deletion of a temporary file that was already successfully uploaded to results.");
Trace.Error(ex);
}
}
}
}
}

internal class PendingTimelineRecord
Expand All @@ -822,14 +883,18 @@ internal class UploadFileInfo
public bool DeleteSource { get; set; }
}

internal class SummaryUploadFileInfo
internal class ResultsUploadFileInfo
{
public string Name { get; set; }
public string Type { get; set; }
public string Path { get; set; }
public string PlanId { get; set; }
public string JobId { get; set; }
public string StepId { get; set; }
public Guid RecordId { get; set; }
public bool DeleteSource { get; set; }
public bool Finalize { get; set; }
public bool FirstBlock { get; set; }
public long TotalLines { get; set; }
}


Expand Down
Loading

0 comments on commit aaf02ab

Please sign in to comment.