From b19b9462d847c510db7bd1f132bfa592cc7fb828 Mon Sep 17 00:00:00 2001 From: Yashwanth Anantharaju Date: Wed, 21 Feb 2024 12:04:13 -0500 Subject: [PATCH] handle broker run service exception handling (#3163) * handle run service exception handling * force fail always * format * format --- src/Runner.Listener/JobDispatcher.cs | 128 +++++++++++++-------------- 1 file changed, 63 insertions(+), 65 deletions(-) diff --git a/src/Runner.Listener/JobDispatcher.cs b/src/Runner.Listener/JobDispatcher.cs index 0efdaba243a..ef664936ea8 100644 --- a/src/Runner.Listener/JobDispatcher.cs +++ b/src/Runner.Listener/JobDispatcher.cs @@ -35,7 +35,7 @@ public interface IJobDispatcher : IRunnerService // This implementation of IJobDispatcher is not thread safe. // It is based on the fact that the current design of the runner is a dequeue // and processes one message from the message queue at a time. - // In addition, it only executes one job every time, + // In addition, it only executes one job every time, // and the server will not send another job while this one is still running. public sealed class JobDispatcher : RunnerService, IJobDispatcher { @@ -546,13 +546,27 @@ await processChannel.SendAsync( Trace.Info($"Return code {returnCode} indicate worker encounter an unhandled exception or app crash, attach worker stdout/stderr to JobRequest result."); var jobServer = await InitializeJobServerAsync(systemConnection); - await LogWorkerProcessUnhandledException(jobServer, message, detailInfo); - - // Go ahead to finish the job with result 'Failed' if the STDERR from worker is System.IO.IOException, since it typically means we are running out of disk space. - if (detailInfo.Contains(typeof(System.IO.IOException).ToString(), StringComparison.OrdinalIgnoreCase)) + var unhandledExceptionIssue = new Issue() { Type = IssueType.Error, Message = detailInfo }; + unhandledExceptionIssue.Data[Constants.Runner.InternalTelemetryIssueDataKey] = Constants.Runner.WorkerCrash; + switch (jobServer) { - Trace.Info($"Finish job with result 'Failed' due to IOException."); - await ForceFailJob(jobServer, message, detailInfo); + case IJobServer js: + { + await LogWorkerProcessUnhandledException(js, message, unhandledExceptionIssue); + // Go ahead to finish the job with result 'Failed' if the STDERR from worker is System.IO.IOException, since it typically means we are running out of disk space. + if (detailInfo.Contains(typeof(System.IO.IOException).ToString(), StringComparison.OrdinalIgnoreCase)) + { + Trace.Info($"Finish job with result 'Failed' due to IOException."); + await ForceFailJob(js, message); + } + + break; + } + case IRunServer rs: + await ForceFailJob(rs, message, unhandledExceptionIssue); + break; + default: + throw new NotSupportedException($"JobServer type '{jobServer.GetType().Name}' is not supported."); } } @@ -644,7 +658,7 @@ await processChannel.SendAsync( } } - // wait worker to exit + // wait worker to exit // if worker doesn't exit within timeout, then kill worker. completedTask = await Task.WhenAny(workerProcessTask, Task.Delay(-1, workerCancelTimeoutKillToken)); @@ -1131,86 +1145,70 @@ private async Task CompleteJobRequestAsync(int poolId, Pipelines.AgentJobRequest } // log an error issue to job level timeline record - private async Task LogWorkerProcessUnhandledException(IRunnerService server, Pipelines.AgentJobRequestMessage message, string detailInfo) + private async Task LogWorkerProcessUnhandledException(IJobServer jobServer, Pipelines.AgentJobRequestMessage message, Issue issue) { - if (server is IJobServer jobServer) + try { - try - { - var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None); - ArgUtil.NotNull(timeline, nameof(timeline)); + var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None); + ArgUtil.NotNull(timeline, nameof(timeline)); - TimelineRecord jobRecord = timeline.Records.FirstOrDefault(x => x.Id == message.JobId && x.RecordType == "Job"); - ArgUtil.NotNull(jobRecord, nameof(jobRecord)); + TimelineRecord jobRecord = timeline.Records.FirstOrDefault(x => x.Id == message.JobId && x.RecordType == "Job"); + ArgUtil.NotNull(jobRecord, nameof(jobRecord)); - var unhandledExceptionIssue = new Issue() { Type = IssueType.Error, Message = detailInfo }; - unhandledExceptionIssue.Data[Constants.Runner.InternalTelemetryIssueDataKey] = Constants.Runner.WorkerCrash; - jobRecord.ErrorCount++; - jobRecord.Issues.Add(unhandledExceptionIssue); - if (message.Variables.TryGetValue("DistributedTask.MarkJobAsFailedOnWorkerCrash", out var markJobAsFailedOnWorkerCrash) && - StringUtil.ConvertToBoolean(markJobAsFailedOnWorkerCrash?.Value)) - { - Trace.Info("Mark the job as failed since the worker crashed"); - jobRecord.Result = TaskResult.Failed; - // mark the job as completed so service will pickup the result - jobRecord.State = TimelineRecordState.Completed; - } + jobRecord.ErrorCount++; + jobRecord.Issues.Add(issue); - await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, new TimelineRecord[] { jobRecord }, CancellationToken.None); - } - catch (Exception ex) + if (message.Variables.TryGetValue("DistributedTask.MarkJobAsFailedOnWorkerCrash", out var markJobAsFailedOnWorkerCrash) && + StringUtil.ConvertToBoolean(markJobAsFailedOnWorkerCrash?.Value)) { - Trace.Error("Fail to report unhandled exception from Runner.Worker process"); - Trace.Error(ex); + Trace.Info("Mark the job as failed since the worker crashed"); + jobRecord.Result = TaskResult.Failed; + // mark the job as completed so service will pickup the result + jobRecord.State = TimelineRecordState.Completed; } + + await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, new TimelineRecord[] { jobRecord }, CancellationToken.None); } - else + catch (Exception ex) { - Trace.Info("Job server does not support handling unhandled exception yet, error message: {0}", detailInfo); - return; + Trace.Error("Fail to report unhandled exception from Runner.Worker process"); + Trace.Error(ex); } } // raise job completed event to fail the job. - private async Task ForceFailJob(IRunnerService server, Pipelines.AgentJobRequestMessage message, string detailInfo) + private async Task ForceFailJob(IJobServer jobServer, Pipelines.AgentJobRequestMessage message) { - if (server is IJobServer jobServer) + try { - try - { - var jobCompletedEvent = new JobCompletedEvent(message.RequestId, message.JobId, TaskResult.Failed); - await jobServer.RaisePlanEventAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, jobCompletedEvent, CancellationToken.None); - } - catch (Exception ex) - { - Trace.Error("Fail to raise JobCompletedEvent back to service."); - Trace.Error(ex); - } + var jobCompletedEvent = new JobCompletedEvent(message.RequestId, message.JobId, TaskResult.Failed); + await jobServer.RaisePlanEventAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, jobCompletedEvent, CancellationToken.None); } - else if (server is IRunServer runServer) + catch (Exception ex) { - try - { - var unhandledExceptionIssue = new Issue() { Type = IssueType.Error, Message = detailInfo }; - var unhandledAnnotation = unhandledExceptionIssue.ToAnnotation(); - var jobAnnotations = new List(); - if (unhandledAnnotation.HasValue) - { - jobAnnotations.Add(unhandledAnnotation.Value); - } + Trace.Error("Fail to raise JobCompletedEvent back to service."); + Trace.Error(ex); + } + } - await runServer.CompleteJobAsync(message.Plan.PlanId, message.JobId, TaskResult.Failed, outputs: null, stepResults: null, jobAnnotations: jobAnnotations, environmentUrl: null, CancellationToken.None); - } - catch (Exception ex) + private async Task ForceFailJob(IRunServer runServer, Pipelines.AgentJobRequestMessage message, Issue issue) + { + try + { + var annotation = issue.ToAnnotation(); + var jobAnnotations = new List(); + if (annotation.HasValue) { - Trace.Error("Fail to raise job completion back to service."); - Trace.Error(ex); + jobAnnotations.Add(annotation.Value); } + + await runServer.CompleteJobAsync(message.Plan.PlanId, message.JobId, TaskResult.Failed, outputs: null, stepResults: null, jobAnnotations: jobAnnotations, environmentUrl: null, CancellationToken.None); } - else + catch (Exception ex) { - throw new NotSupportedException($"Server type {server.GetType().FullName} is not supported."); + Trace.Error("Fail to raise job completion back to service."); + Trace.Error(ex); } }