From aaf02ab34cf874e67c43fc418338db21679e5d52 Mon Sep 17 00:00:00 2001 From: Yang Cao Date: Fri, 17 Feb 2023 10:31:41 -0500 Subject: [PATCH] Properly guard upload (#2439) * Revert "Revert "Uploading step logs to Results as well (#2422)" (#2437)" This reverts commit 8c096baf49f579380d4e911c74b0b749729028ac. * Properly guard the upload to results feature * Delete skipped file if deletesource is true --- src/Runner.Common/JobServer.cs | 14 +- src/Runner.Common/JobServerQueue.cs | 135 ++++++++++++----- src/Runner.Common/Logging.cs | 63 ++++++-- src/Runner.Worker/ExecutionContext.cs | 7 +- src/Sdk/DTWebApi/WebApi/TaskAttachment.cs | 3 +- src/Sdk/WebApi/WebApi/Contracts.cs | 48 +++++++ src/Sdk/WebApi/WebApi/ResultsHttpClient.cs | 159 +++++++++++++++++++-- 7 files changed, 370 insertions(+), 59 deletions(-) diff --git a/src/Runner.Common/JobServer.cs b/src/Runner.Common/JobServer.cs index 8a6c4a6a76e..085cc84979d 100644 --- a/src/Runner.Common/JobServer.cs +++ b/src/Runner.Common/JobServer.cs @@ -30,7 +30,8 @@ public interface IJobServer : IRunnerService, IAsyncDisposable Task AppendLogContentAsync(Guid scopeIdentifier, string hubName, Guid planId, int logId, Stream uploadStream, CancellationToken cancellationToken); Task AppendTimelineRecordFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList lines, long? startLine, CancellationToken cancellationToken); Task CreateAttachmentAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, String type, String name, Stream uploadStream, CancellationToken cancellationToken); - Task CreateStepSymmaryAsync(string planId, string jobId, string stepId, string file, CancellationToken cancellationToken); + Task CreateStepSummaryAsync(string planId, string jobId, Guid stepId, string file, CancellationToken cancellationToken); + Task CreateResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken); Task CreateLogAsync(Guid scopeIdentifier, string hubName, Guid planId, TaskLog log, CancellationToken cancellationToken); Task CreateTimelineAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, CancellationToken cancellationToken); Task> UpdateTimelineRecordsAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, IEnumerable records, CancellationToken cancellationToken); @@ -316,7 +317,7 @@ public Task CreateAttachmentAsync(Guid scopeIdentifier, string h return _taskClient.CreateAttachmentAsync(scopeIdentifier, hubName, planId, timelineId, timelineRecordId, type, name, uploadStream, cancellationToken: cancellationToken); } - public Task CreateStepSymmaryAsync(string planId, string jobId, string stepId, string file, CancellationToken cancellationToken) + public Task CreateStepSummaryAsync(string planId, string jobId, Guid stepId, string file, CancellationToken cancellationToken) { if (_resultsClient != null) { @@ -325,6 +326,15 @@ public Task CreateStepSymmaryAsync(string planId, string jobId, string stepId, s throw new InvalidOperationException("Results client is not initialized."); } + public Task CreateResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken) + { + if (_resultsClient != null) + { + return _resultsClient.UploadResultsStepLogAsync(planId, jobId, stepId, file, finalize, firstBlock, lineCount, cancellationToken: cancellationToken); + } + throw new InvalidOperationException("Results client is not initialized."); + } + public Task CreateLogAsync(Guid scopeIdentifier, string hubName, Guid planId, TaskLog log, CancellationToken cancellationToken) { diff --git a/src/Runner.Common/JobServerQueue.cs b/src/Runner.Common/JobServerQueue.cs index 6440da73607..f5f5a5494fe 100644 --- a/src/Runner.Common/JobServerQueue.cs +++ b/src/Runner.Common/JobServerQueue.cs @@ -20,7 +20,7 @@ public interface IJobServerQueue : IRunnerService, IThrottlingReporter void Start(Pipelines.AgentJobRequestMessage jobRequest); void QueueWebConsoleLine(Guid stepRecordId, string line, long? lineNumber = null); void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type, string name, string path, bool deleteSource); - void QueueSummaryUpload(Guid stepRecordId, string name, string path, bool deleteSource); + void QueueResultsUpload(Guid timelineRecordId, string name, string path, string type, bool deleteSource, bool finalize, bool firstBlock, long totalLines); void QueueTimelineRecordUpdate(Guid timelineId, TimelineRecord timelineRecord); } @@ -31,7 +31,7 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue private static readonly TimeSpan _delayForWebConsoleLineDequeue = TimeSpan.FromMilliseconds(500); private static readonly TimeSpan _delayForTimelineUpdateDequeue = TimeSpan.FromMilliseconds(500); private static readonly TimeSpan _delayForFileUploadDequeue = TimeSpan.FromMilliseconds(1000); - private static readonly TimeSpan _delayForSummaryUploadDequeue = TimeSpan.FromMilliseconds(1000); + private static readonly TimeSpan _delayForResultsUploadDequeue = TimeSpan.FromMilliseconds(1000); // Job message information private Guid _scopeIdentifier; @@ -46,7 +46,7 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue // queue for file upload (log file or attachment) private readonly ConcurrentQueue _fileUploadQueue = new(); - private readonly ConcurrentQueue _summaryFileUploadQueue = new(); + private readonly ConcurrentQueue _resultsFileUploadQueue = new(); // queue for timeline or timeline record update (one queue per timeline) private readonly ConcurrentDictionary> _timelineUpdateQueue = new(); @@ -60,7 +60,7 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue // Task for each queue's dequeue process private Task _webConsoleLineDequeueTask; private Task _fileUploadDequeueTask; - private Task _summaryUploadDequeueTask; + private Task _resultsUploadDequeueTask; private Task _timelineUpdateDequeueTask; // common @@ -84,6 +84,8 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue private bool _webConsoleLineAggressiveDequeue = true; private bool _firstConsoleOutputs = true; + private bool _resultsClientInitiated = false; + public override void Initialize(IHostContext hostContext) { base.Initialize(hostContext); @@ -109,9 +111,9 @@ public void Start(Pipelines.AgentJobRequestMessage jobRequest) { Trace.Info("Initializing results client"); _jobServer.InitializeResultsClient(new Uri(resultsReceiverEndpoint), accessToken); + _resultsClientInitiated = true; } - if (_queueInProcess) { Trace.Info("No-opt, all queue process tasks are running."); @@ -140,12 +142,12 @@ public void Start(Pipelines.AgentJobRequestMessage jobRequest) _fileUploadDequeueTask = ProcessFilesUploadQueueAsync(); Trace.Info("Start results file upload queue."); - _summaryUploadDequeueTask = ProcessSummaryUploadQueueAsync(); + _resultsUploadDequeueTask = ProcessResultsUploadQueueAsync(); Trace.Info("Start process timeline update queue."); _timelineUpdateDequeueTask = ProcessTimelinesUpdateQueueAsync(); - _allDequeueTasks = new Task[] { _webConsoleLineDequeueTask, _fileUploadDequeueTask, _timelineUpdateDequeueTask, _summaryUploadDequeueTask }; + _allDequeueTasks = new Task[] { _webConsoleLineDequeueTask, _fileUploadDequeueTask, _timelineUpdateDequeueTask, _resultsUploadDequeueTask }; _queueInProcess = true; } @@ -176,9 +178,9 @@ public async Task ShutdownAsync() await ProcessFilesUploadQueueAsync(runOnce: true); Trace.Info("File upload queue drained."); - Trace.Verbose("Draining results summary upload queue."); - await ProcessSummaryUploadQueueAsync(runOnce: true); - Trace.Info("Results summary upload queue drained."); + Trace.Verbose("Draining results upload queue."); + await ProcessResultsUploadQueueAsync(runOnce: true); + Trace.Info("Results upload queue drained."); // ProcessTimelinesUpdateQueueAsync() will throw exception during shutdown // if there is any timeline records that failed to update contains output variabls. @@ -230,21 +232,49 @@ public void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type, _fileUploadQueue.Enqueue(newFile); } - public void QueueSummaryUpload(Guid stepRecordId, string name, string path, bool deleteSource) + public void QueueResultsUpload(Guid timelineRecordId, string name, string path, string type, bool deleteSource, bool finalize, bool firstBlock, long totalLines) { + if (!_resultsClientInitiated) + { + Trace.Verbose("Skipping results upload"); + try + { + if (deleteSource) + { + File.Delete(path); + } + } + catch (Exception ex) + { + Trace.Info("Catch exception during delete skipped results upload file."); + Trace.Error(ex); + } + return; + } + + if (timelineRecordId == _jobTimelineRecordId && String.Equals(type, CoreAttachmentType.ResultsLog, StringComparison.Ordinal)) + { + Trace.Verbose("Skipping job log {0} for record {1}", path, timelineRecordId); + return; + } + // all parameter not null, file path exist. - var newFile = new SummaryUploadFileInfo() + var newFile = new ResultsUploadFileInfo() { Name = name, Path = path, + Type = type, PlanId = _planId.ToString(), JobId = _jobTimelineRecordId.ToString(), - StepId = stepRecordId.ToString(), - DeleteSource = deleteSource + RecordId = timelineRecordId, + DeleteSource = deleteSource, + Finalize = finalize, + FirstBlock = firstBlock, + TotalLines = totalLines, }; - Trace.Verbose("Enqueue results file upload queue: file '{0}' attach to job {1} step {2}", newFile.Path, _jobTimelineRecordId, stepRecordId); - _summaryFileUploadQueue.Enqueue(newFile); + Trace.Verbose("Enqueue results file upload queue: file '{0}' attach to job {1} step {2}", newFile.Path, _jobTimelineRecordId, timelineRecordId); + _resultsFileUploadQueue.Enqueue(newFile); } public void QueueTimelineRecordUpdate(Guid timelineId, TimelineRecord timelineRecord) @@ -437,18 +467,18 @@ private async Task ProcessFilesUploadQueueAsync(bool runOnce = false) } } - private async Task ProcessSummaryUploadQueueAsync(bool runOnce = false) + private async Task ProcessResultsUploadQueueAsync(bool runOnce = false) { Trace.Info("Starting results-based upload queue..."); while (!_jobCompletionSource.Task.IsCompleted || runOnce) { - List filesToUpload = new(); - SummaryUploadFileInfo dequeueFile; - while (_summaryFileUploadQueue.TryDequeue(out dequeueFile)) + List filesToUpload = new(); + ResultsUploadFileInfo dequeueFile; + while (_resultsFileUploadQueue.TryDequeue(out dequeueFile)) { filesToUpload.Add(dequeueFile); - // process at most 10 file upload. + // process at most 10 file uploads. if (!runOnce && filesToUpload.Count > 10) { break; @@ -459,7 +489,7 @@ private async Task ProcessSummaryUploadQueueAsync(bool runOnce = false) { if (runOnce) { - Trace.Info($"Uploading {filesToUpload.Count} summary files in one shot through results service."); + Trace.Info($"Uploading {filesToUpload.Count} file(s) in one shot through results service."); } int errorCount = 0; @@ -467,11 +497,19 @@ private async Task ProcessSummaryUploadQueueAsync(bool runOnce = false) { try { - await UploadSummaryFile(file); + if (String.Equals(file.Type, ChecksAttachmentType.StepSummary, StringComparison.OrdinalIgnoreCase)) + { + await UploadSummaryFile(file); + } + else if (String.Equals(file.Type, CoreAttachmentType.ResultsLog, StringComparison.OrdinalIgnoreCase)) + { + Trace.Info($"Got a step log file to send to results service."); + await UploadResultsStepLogFile(file); + } } catch (Exception ex) { - var issue = new Issue() { Type = IssueType.Warning, Message = $"Caught exception during summary file upload to results. {ex.Message}" }; + var issue = new Issue() { Type = IssueType.Warning, Message = $"Caught exception during file upload to results. {ex.Message}" }; issue.Data[Constants.Runner.InternalTelemetryIssueDataKey] = Constants.Runner.ResultsUploadFailure; var telemetryRecord = new TimelineRecord() @@ -481,16 +519,13 @@ private async Task ProcessSummaryUploadQueueAsync(bool runOnce = false) telemetryRecord.Issues.Add(issue); QueueTimelineRecordUpdate(_jobTimelineId, telemetryRecord); - Trace.Info("Catch exception during summary file upload to results, keep going since the process is best effort."); + Trace.Info("Catch exception during file upload to results, keep going since the process is best effort."); Trace.Error(ex); - } - finally - { errorCount++; } } - Trace.Info("Tried to upload {0} summary files to results, success rate: {1}/{0}.", filesToUpload.Count, filesToUpload.Count - errorCount); + Trace.Info("Tried to upload {0} file(s) to results, success rate: {1}/{0}.", filesToUpload.Count, filesToUpload.Count - errorCount); } if (runOnce) @@ -499,7 +534,7 @@ private async Task ProcessSummaryUploadQueueAsync(bool runOnce = false) } else { - await Task.Delay(_delayForSummaryUploadDequeue); + await Task.Delay(_delayForResultsUploadDequeue); } } } @@ -776,15 +811,14 @@ private async Task UploadFile(UploadFileInfo file) } } - private async Task UploadSummaryFile(SummaryUploadFileInfo file) + private async Task UploadSummaryFile(ResultsUploadFileInfo file) { bool uploadSucceed = false; try { // Upload the step summary Trace.Info($"Starting to upload summary file to results service {file.Name}, {file.Path}"); - var cancellationTokenSource = new CancellationTokenSource(); - await _jobServer.CreateStepSymmaryAsync(file.PlanId, file.JobId, file.StepId, file.Path, cancellationTokenSource.Token); + await _jobServer.CreateStepSummaryAsync(file.PlanId, file.JobId, file.RecordId, file.Path, CancellationToken.None); uploadSucceed = true; } @@ -804,6 +838,33 @@ private async Task UploadSummaryFile(SummaryUploadFileInfo file) } } } + + private async Task UploadResultsStepLogFile(ResultsUploadFileInfo file) + { + bool uploadSucceed = false; + try + { + Trace.Info($"Starting upload of step log file to results service {file.Name}, {file.Path}"); + await _jobServer.CreateResultsStepLogAsync(file.PlanId, file.JobId, file.RecordId, file.Path, file.Finalize, file.FirstBlock, file.TotalLines, CancellationToken.None); + + uploadSucceed = true; + } + finally + { + if (uploadSucceed && file.DeleteSource) + { + try + { + File.Delete(file.Path); + } + catch (Exception ex) + { + Trace.Info("Exception encountered during deletion of a temporary file that was already successfully uploaded to results."); + Trace.Error(ex); + } + } + } + } } internal class PendingTimelineRecord @@ -822,14 +883,18 @@ internal class UploadFileInfo public bool DeleteSource { get; set; } } - internal class SummaryUploadFileInfo + internal class ResultsUploadFileInfo { public string Name { get; set; } + public string Type { get; set; } public string Path { get; set; } public string PlanId { get; set; } public string JobId { get; set; } - public string StepId { get; set; } + public Guid RecordId { get; set; } public bool DeleteSource { get; set; } + public bool Finalize { get; set; } + public bool FirstBlock { get; set; } + public long TotalLines { get; set; } } diff --git a/src/Runner.Common/Logging.cs b/src/Runner.Common/Logging.cs index 40be5cdcff0..0bc2a13f0e3 100644 --- a/src/Runner.Common/Logging.cs +++ b/src/Runner.Common/Logging.cs @@ -21,6 +21,12 @@ public class PagingLogger : RunnerService, IPagingLogger // 8 MB public const int PageSize = 8 * 1024 * 1024; + // For Results + public static string BlocksFolder = "blocks"; + + // 2 MB + public const int BlockSize = 2 * 1024 * 1024; + private Guid _timelineId; private Guid _timelineRecordId; private FileStream _pageData; @@ -32,6 +38,13 @@ public class PagingLogger : RunnerService, IPagingLogger private string _pagesFolder; private IJobServerQueue _jobServerQueue; + private string _resultsDataFileName; + private FileStream _resultsBlockData; + private StreamWriter _resultsBlockWriter; + private string _resultsBlockFolder; + private int _blockByteCount; + private int _blockCount; + public long TotalLines => _totalLines; public override void Initialize(IHostContext hostContext) @@ -39,8 +52,10 @@ public override void Initialize(IHostContext hostContext) base.Initialize(hostContext); _totalLines = 0; _pagesFolder = Path.Combine(hostContext.GetDirectory(WellKnownDirectory.Diag), PagingFolder); - _jobServerQueue = HostContext.GetService(); Directory.CreateDirectory(_pagesFolder); + _resultsBlockFolder = Path.Combine(hostContext.GetDirectory(WellKnownDirectory.Diag), BlocksFolder); + Directory.CreateDirectory(_resultsBlockFolder); + _jobServerQueue = HostContext.GetService(); } public void Setup(Guid timelineId, Guid timelineRecordId) @@ -60,11 +75,17 @@ public void Write(string message) // lazy creation on write if (_pageWriter == null) { - Create(); + NewPage(); + } + + if (_resultsBlockWriter == null) + { + NewBlock(); } string line = $"{DateTime.UtcNow.ToString("O")} {message}"; _pageWriter.WriteLine(line); + _resultsBlockWriter.WriteLine(line); _totalLines++; if (line.IndexOf('\n') != -1) @@ -78,21 +99,25 @@ public void Write(string message) } } - _byteCount += System.Text.Encoding.UTF8.GetByteCount(line); + var bytes = System.Text.Encoding.UTF8.GetByteCount(line); + _byteCount += bytes; + _blockByteCount += bytes; if (_byteCount >= PageSize) { NewPage(); } + + if (_blockByteCount >= BlockSize) + { + NewBlock(); + } + } public void End() { EndPage(); - } - - private void Create() - { - NewPage(); + EndBlock(true); } private void NewPage() @@ -117,5 +142,27 @@ private void EndPage() _jobServerQueue.QueueFileUpload(_timelineId, _timelineRecordId, "DistributedTask.Core.Log", "CustomToolLog", _dataFileName, true); } } + + private void NewBlock() + { + EndBlock(false); + _blockByteCount = 0; + _resultsDataFileName = Path.Combine(_resultsBlockFolder, $"{_timelineId}_{_timelineRecordId}.{++_blockCount}"); + _resultsBlockData = new FileStream(_resultsDataFileName, FileMode.CreateNew, FileAccess.ReadWrite, FileShare.ReadWrite); + _resultsBlockWriter = new StreamWriter(_resultsBlockData, System.Text.Encoding.UTF8); + } + + private void EndBlock(bool finalize) + { + if (_resultsBlockWriter != null) + { + _resultsBlockWriter.Flush(); + _resultsBlockData.Flush(); + _resultsBlockWriter.Dispose(); + _resultsBlockWriter = null; + _resultsBlockData = null; + _jobServerQueue.QueueResultsUpload(_timelineRecordId, "ResultsLog", _resultsDataFileName, "Results.Core.Log", deleteSource: true, finalize, firstBlock: _resultsDataFileName.EndsWith(".1"), totalLines: _totalLines); + } + } } } diff --git a/src/Runner.Worker/ExecutionContext.cs b/src/Runner.Worker/ExecutionContext.cs index d12cb8e343c..8d981c5492c 100644 --- a/src/Runner.Worker/ExecutionContext.cs +++ b/src/Runner.Worker/ExecutionContext.cs @@ -1,4 +1,4 @@ -using System; +using System; using System.Collections.Generic; using System.Globalization; using System.IO; @@ -81,7 +81,7 @@ public interface IExecutionContext : IRunnerService // logging long Write(string tag, string message); void QueueAttachFile(string type, string name, string filePath); - void QueueSummaryFile(string name, string filePath, Guid stepRecordId); + void QueueSummaryFile(string name, string filePath, Guid stepRecordId); // timeline record update methods void Start(string currentOperation = null); @@ -871,8 +871,7 @@ public void QueueSummaryFile(string name, string filePath, Guid stepRecordId) { throw new FileNotFoundException($"Can't upload (name:{name}) file: {filePath}. File does not exist."); } - - _jobServerQueue.QueueSummaryUpload(stepRecordId, name, filePath, deleteSource: false); + _jobServerQueue.QueueResultsUpload(stepRecordId, name, filePath, ChecksAttachmentType.StepSummary, deleteSource: false, finalize: true, firstBlock: true, totalLines: 0); } // Add OnMatcherChanged diff --git a/src/Sdk/DTWebApi/WebApi/TaskAttachment.cs b/src/Sdk/DTWebApi/WebApi/TaskAttachment.cs index 17027d1261a..1bb2a628edf 100644 --- a/src/Sdk/DTWebApi/WebApi/TaskAttachment.cs +++ b/src/Sdk/DTWebApi/WebApi/TaskAttachment.cs @@ -1,4 +1,4 @@ -using GitHub.Services.Common; +using GitHub.Services.Common; using GitHub.Services.WebApi; using System; using System.Runtime.Serialization; @@ -100,6 +100,7 @@ public class CoreAttachmentType public static readonly String Summary = "DistributedTask.Core.Summary"; public static readonly String FileAttachment = "DistributedTask.Core.FileAttachment"; public static readonly String DiagnosticLog = "DistributedTask.Core.DiagnosticLog"; + public static readonly String ResultsLog = "Results.Core.Log"; } [GenerateAllConstants] diff --git a/src/Sdk/WebApi/WebApi/Contracts.cs b/src/Sdk/WebApi/WebApi/Contracts.cs index d240cc1e230..bc6361d62bc 100644 --- a/src/Sdk/WebApi/WebApi/Contracts.cs +++ b/src/Sdk/WebApi/WebApi/Contracts.cs @@ -28,6 +28,30 @@ public class GetSignedStepSummaryURLResponse public string BlobStorageType; } + [DataContract] + [JsonObject(NamingStrategyType = typeof(SnakeCaseNamingStrategy))] + public class GetSignedStepLogsURLRequest + { + [DataMember] + public string WorkflowJobRunBackendId; + [DataMember] + public string WorkflowRunBackendId; + [DataMember] + public string StepBackendId; + } + + [DataContract] + [JsonObject(NamingStrategyType = typeof(SnakeCaseNamingStrategy))] + public class GetSignedStepLogsURLResponse + { + [DataMember] + public string LogsUrl; + [DataMember] + public long SoftSizeLimit; + [DataMember] + public string BlobStorageType; + } + [DataContract] [JsonObject(NamingStrategyType = typeof(SnakeCaseNamingStrategy))] public class StepSummaryMetadataCreate @@ -52,6 +76,30 @@ public class CreateStepSummaryMetadataResponse public bool Ok; } + [DataContract] + [JsonObject(NamingStrategyType = typeof(SnakeCaseNamingStrategy))] + public class StepLogsMetadataCreate + { + [DataMember] + public string StepBackendId; + [DataMember] + public string WorkflowRunBackendId; + [DataMember] + public string WorkflowJobRunBackendId; + [DataMember] + public string UploadedAt; + [DataMember] + public long LineCount; + } + + [DataContract] + [JsonObject(NamingStrategyType = typeof(SnakeCaseNamingStrategy))] + public class CreateStepLogsMetadataResponse + { + [DataMember] + public bool Ok; + } + public static class BlobStorageTypes { public static readonly string AzureBlobStorage = "BLOB_STORAGE_TYPE_AZURE"; diff --git a/src/Sdk/WebApi/WebApi/ResultsHttpClient.cs b/src/Sdk/WebApi/WebApi/ResultsHttpClient.cs index 77a733eaeba..009308b98d6 100644 --- a/src/Sdk/WebApi/WebApi/ResultsHttpClient.cs +++ b/src/Sdk/WebApi/WebApi/ResultsHttpClient.cs @@ -24,13 +24,13 @@ public ResultsHttpClient( m_formatter = new JsonMediaTypeFormatter(); } - public async Task GetStepSummaryUploadUrlAsync(string planId, string jobId, string stepId, CancellationToken cancellationToken) + public async Task GetStepSummaryUploadUrlAsync(string planId, string jobId, Guid stepId, CancellationToken cancellationToken) { var request = new GetSignedStepSummaryURLRequest() { - WorkflowJobRunBackendId= jobId, - WorkflowRunBackendId= planId, - StepBackendId= stepId + WorkflowJobRunBackendId = jobId, + WorkflowRunBackendId = planId, + StepBackendId = stepId.ToString() }; var stepSummaryUploadRequest = new Uri(m_resultsServiceUrl, "twirp/results.services.receiver.Receiver/GetStepSummarySignedBlobURL"); @@ -51,14 +51,41 @@ public async Task GetStepSummaryUploadUrlAsync( } } - private async Task StepSummaryUploadCompleteAsync(string planId, string jobId, string stepId, long size, CancellationToken cancellationToken) + public async Task GetStepLogUploadUrlAsync(string planId, string jobId, Guid stepId, CancellationToken cancellationToken) + { + var request = new GetSignedStepLogsURLRequest() + { + WorkflowJobRunBackendId = jobId, + WorkflowRunBackendId = planId, + StepBackendId = stepId.ToString(), + }; + + var stepLogsUploadRequest = new Uri(m_resultsServiceUrl, "twirp/results.services.receiver.Receiver/GetStepLogsSignedBlobURL"); + + using (HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Post, stepLogsUploadRequest)) + { + requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Bearer", m_token); + requestMessage.Headers.Accept.Add(MediaTypeWithQualityHeaderValue.Parse("application/json")); + + using (HttpContent content = new ObjectContent(request, m_formatter)) + { + requestMessage.Content = content; + using (var response = await SendAsync(requestMessage, HttpCompletionOption.ResponseContentRead, cancellationToken: cancellationToken)) + { + return await ReadJsonContentAsync(response, cancellationToken); + } + } + } + } + + private async Task StepSummaryUploadCompleteAsync(string planId, string jobId, Guid stepId, long size, CancellationToken cancellationToken) { var timestamp = DateTime.UtcNow.ToString("yyyy-MM-dd'T'HH:mm:ss.fffK"); var request = new StepSummaryMetadataCreate() { - WorkflowJobRunBackendId= jobId, - WorkflowRunBackendId= planId, - StepBackendId = stepId, + WorkflowJobRunBackendId = jobId, + WorkflowRunBackendId = planId, + StepBackendId = stepId.ToString(), Size = size, UploadedAt = timestamp }; @@ -85,6 +112,40 @@ private async Task StepSummaryUploadCompleteAsync(string planId, string jobId, s } } + private async Task StepLogUploadCompleteAsync(string planId, string jobId, Guid stepId, long lineCount, CancellationToken cancellationToken) + { + var timestamp = DateTime.UtcNow.ToString("yyyy-MM-dd'T'HH:mm:ss.fffK"); + var request = new StepLogsMetadataCreate() + { + WorkflowJobRunBackendId = jobId, + WorkflowRunBackendId = planId, + StepBackendId = stepId.ToString(), + UploadedAt = timestamp, + LineCount = lineCount, + }; + + var stepLogsUploadCompleteRequest = new Uri(m_resultsServiceUrl, "twirp/results.services.receiver.Receiver/CreateStepLogsMetadata"); + + using (HttpRequestMessage requestMessage = new HttpRequestMessage(HttpMethod.Post, stepLogsUploadCompleteRequest)) + { + requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Bearer", m_token); + requestMessage.Headers.Accept.Add(MediaTypeWithQualityHeaderValue.Parse("application/json")); + + using (HttpContent content = new ObjectContent(request, m_formatter)) + { + requestMessage.Content = content; + using (var response = await SendAsync(requestMessage, HttpCompletionOption.ResponseContentRead, cancellationToken: cancellationToken)) + { + var jsonResponse = await ReadJsonContentAsync(response, cancellationToken); + if (!jsonResponse.Ok) + { + throw new Exception($"Failed to mark step log upload as complete, status code: {response.StatusCode}, ok: {jsonResponse.Ok}, timestamp: {timestamp}"); + } + } + } + } + } + private async Task UploadFileAsync(string url, string blobStorageType, FileStream file, CancellationToken cancellationToken) { // Upload the file to the url @@ -108,8 +169,55 @@ private async Task UploadFileAsync(string url, string blobS } } + private async Task CreateAppendFileAsync(string url, string blobStorageType, CancellationToken cancellationToken) + { + var request = new HttpRequestMessage(HttpMethod.Put, url) + { + Content = new StringContent("") + }; + if (blobStorageType == BlobStorageTypes.AzureBlobStorage) + { + request.Content.Headers.Add("x-ms-blob-type", "AppendBlob"); + request.Content.Headers.Add("Content-Length", "0"); + } + + using (var response = await SendAsync(request, HttpCompletionOption.ResponseHeadersRead, userState: null, cancellationToken)) + { + if (!response.IsSuccessStatusCode) + { + throw new Exception($"Failed to create append file, status code: {response.StatusCode}, reason: {response.ReasonPhrase}"); + } + return response; + } + } + + private async Task UploadAppendFileAsync(string url, string blobStorageType, FileStream file, bool finalize, long fileSize, CancellationToken cancellationToken) + { + var comp = finalize ? "&comp=appendblock&seal=true" : "&comp=appendblock"; + // Upload the file to the url + var request = new HttpRequestMessage(HttpMethod.Put, url + comp) + { + Content = new StreamContent(file) + }; + + if (blobStorageType == BlobStorageTypes.AzureBlobStorage) + { + request.Content.Headers.Add("Content-Length", fileSize.ToString()); + request.Content.Headers.Add("x-ms-blob-sealed", finalize.ToString()); + } + + using (var response = await SendAsync(request, HttpCompletionOption.ResponseHeadersRead, userState: null, cancellationToken)) + { + if (!response.IsSuccessStatusCode) + { + throw new Exception($"Failed to upload append file, status code: {response.StatusCode}, reason: {response.ReasonPhrase}, object: {response}, fileSize: {fileSize}"); + } + return response; + } + } + // Handle file upload for step summary - public async Task UploadStepSummaryAsync(string planId, string jobId, string stepId, string file, CancellationToken cancellationToken) + public async Task UploadStepSummaryAsync(string planId, string jobId, Guid stepId, string file, CancellationToken cancellationToken) { // Get the upload url var uploadUrlResponse = await GetStepSummaryUploadUrlAsync(planId, jobId, stepId, cancellationToken); @@ -135,6 +243,39 @@ public async Task UploadStepSummaryAsync(string planId, string jobId, string ste await StepSummaryUploadCompleteAsync(planId, jobId, stepId, fileSize, cancellationToken); } + // Handle file upload for step log + public async Task UploadResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken) + { + // Get the upload url + var uploadUrlResponse = await GetStepLogUploadUrlAsync(planId, jobId, stepId, cancellationToken); + if (uploadUrlResponse == null || uploadUrlResponse.LogsUrl == null) + { + throw new Exception("Failed to get step log upload url"); + } + + // Do we want to throw an exception here or should we just be uploading/truncating the data + var fileSize = new FileInfo(file).Length; + + // Create the Append blob + if (firstBlock) + { + await CreateAppendFileAsync(uploadUrlResponse.LogsUrl, uploadUrlResponse.BlobStorageType, cancellationToken); + } + + // Upload content + using (var fileStream = new FileStream(file, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, true)) + { + var response = await UploadAppendFileAsync(uploadUrlResponse.LogsUrl, uploadUrlResponse.BlobStorageType, fileStream, finalize, fileSize, cancellationToken); + } + + // Update metadata + if (finalize) + { + // Send step log upload complete message + await StepLogUploadCompleteAsync(planId, jobId, stepId, lineCount, cancellationToken); + } + } + private MediaTypeFormatter m_formatter; private Uri m_resultsServiceUrl; private string m_token;