22using Microsoft . Extensions . Logging ;
33using Polly ;
44namespace Microsoft . OneFuzz . Service ;
5+ using System . Net ;
56
67public interface IJobResultOperations : IOrm < JobResult > {
78
8- Async . Task < JobResult ? > GetJobResult ( Guid jobId ) ;
9- Async . Task < OneFuzzResultVoid > CreateOrUpdate ( Guid jobId , JobResultType resultType , Dictionary < string , double > resultValue ) ;
9+ Async . Task < JobResult ? > GetJobResult ( Guid jobId , Guid taskId , Guid machineId , string metricType ) ;
10+ Async . Task < OneFuzzResultVoid > CreateOrUpdate ( Guid jobId , Guid taskId , Guid machineId , DateTime createdAt , double version , string resultType , Dictionary < string , double > resultValue ) ;
1011
1112}
1213public class JobResultOperations : Orm < JobResult > , IJobResultOperations {
1314
15+ const string COVERAGE_DATA = "CoverageData" ;
16+ const string RUNTIME_STATS = "RuntimeStats" ;
17+
1418 public JobResultOperations ( ILogger < JobResultOperations > log , IOnefuzzContext context )
1519 : base ( log , context ) {
1620 }
1721
18- public async Async . Task < JobResult ? > GetJobResult ( Guid jobId ) {
19- return await SearchByPartitionKeys ( new [ ] { jobId . ToString ( ) } ) . SingleOrDefaultAsync ( ) ;
22+ public async Async . Task < JobResult ? > GetJobResult ( Guid jobId , Guid taskId , Guid machineId , string metricType ) {
23+ var data = QueryAsync ( Query . SingleEntity ( jobId . ToString ( ) , string . Concat ( taskId , "-" , machineId , "-" , metricType ) ) ) ;
24+ return await data . FirstOrDefaultAsync ( ) ;
2025 }
2126
22- private JobResult UpdateResult ( JobResult result , JobResultType type , Dictionary < string , double > resultValue ) {
23-
24- var newResult = result ;
25- double newValue ;
26- switch ( type ) {
27- case JobResultType . NewCrashingInput :
28- newValue = result . NewCrashingInput + resultValue [ "count" ] ;
29- newResult = result with { NewCrashingInput = newValue } ;
30- break ;
31- case JobResultType . NewReport :
32- newValue = result . NewReport + resultValue [ "count" ] ;
33- newResult = result with { NewReport = newValue } ;
34- break ;
35- case JobResultType . NewUniqueReport :
36- newValue = result . NewUniqueReport + resultValue [ "count" ] ;
37- newResult = result with { NewUniqueReport = newValue } ;
38- break ;
39- case JobResultType . NewRegressionReport :
40- newValue = result . NewRegressionReport + resultValue [ "count" ] ;
41- newResult = result with { NewRegressionReport = newValue } ;
42- break ;
43- case JobResultType . NewCrashDump :
44- newValue = result . NewCrashDump + resultValue [ "count" ] ;
45- newResult = result with { NewCrashDump = newValue } ;
46- break ;
47- case JobResultType . CoverageData :
48- double newCovered = resultValue [ "covered" ] ;
49- double newTotalCovered = resultValue [ "features" ] ;
50- double newCoverageRate = resultValue [ "rate" ] ;
51- newResult = result with { InstructionsCovered = newCovered , TotalInstructions = newTotalCovered , CoverageRate = newCoverageRate } ;
52- break ;
53- case JobResultType . RuntimeStats :
54- double newTotalIterations = resultValue [ "total_count" ] ;
55- newResult = result with { IterationCount = newTotalIterations } ;
56- break ;
57- default :
58- _logTracer . LogWarning ( $ "Invalid Field { type } .") ;
59- break ;
60- }
61- _logTracer . LogInformation ( $ "Attempting to log new result: { newResult } ") ;
62- return newResult ;
63- }
64-
65- private async Async . Task < bool > TryUpdate ( Job job , JobResultType resultType , Dictionary < string , double > resultValue ) {
27+ private async Async . Task < bool > TryUpdate ( Job job , Guid taskId , Guid machineId , DateTime createdAt , double version , string resultType , Dictionary < string , double > resultValue ) {
6628 var jobId = job . JobId ;
29+ var taskIdMachineIdMetric = string . Concat ( taskId , "-" , machineId , "-" , resultType ) ;
6730
68- var jobResult = await GetJobResult ( jobId ) ;
69-
70- if ( jobResult == null ) {
71- _logTracer . LogInformation ( "Creating new JobResult for Job {JobId}" , jobId ) ;
72-
73- var entry = new JobResult ( JobId : jobId , Project : job . Config . Project , Name : job . Config . Name ) ;
31+ var oldEntry = await GetJobResult ( jobId , taskId , machineId , resultType ) ;
7432
75- jobResult = UpdateResult ( entry , resultType , resultValue ) ;
76-
77- var r = await Insert ( jobResult ) ;
78- if ( ! r . IsOk ) {
79- throw new InvalidOperationException ( $ "failed to insert job result { jobResult . JobId } ") ;
33+ if ( oldEntry == null ) {
34+ _logTracer . LogInformation ( $ "attempt to insert new job result { taskId } and taskId+machineId+metricType { taskIdMachineIdMetric } ") ;
35+ var newEntry = new JobResult ( JobId : jobId , TaskIdMachineIdMetric : taskIdMachineIdMetric , TaskId : taskId , MachineId : machineId , CreatedAt : createdAt , Project : job . Config . Project , Name : job . Config . Name , resultType , Version : version , resultValue ) ;
36+ var result = await Insert ( newEntry ) ;
37+ if ( ! result . IsOk ) {
38+ throw new InvalidOperationException ( $ "failed to insert job result with taskId { taskId } and taskId+machineId+metricType { taskIdMachineIdMetric } ") ;
8039 }
81- _logTracer . LogInformation ( "created job result {JobId}" , jobResult . JobId ) ;
82- } else {
83- _logTracer . LogInformation ( "Updating existing JobResult entry for Job {JobId}" , jobId ) ;
84-
85- jobResult = UpdateResult ( jobResult , resultType , resultValue ) ;
40+ return true ;
41+ }
8642
87- var r = await Update ( jobResult ) ;
88- if ( ! r . IsOk ) {
89- throw new InvalidOperationException ( $ "failed to insert job result { jobResult . JobId } ") ;
90- }
91- _logTracer . LogInformation ( "updated job result {JobId}" , jobResult . JobId ) ;
43+ ResultVoid < ( HttpStatusCode Status , string Reason ) > r ;
44+ switch ( resultType ) {
45+ case COVERAGE_DATA :
46+ case RUNTIME_STATS :
47+ if ( oldEntry . CreatedAt < createdAt ) {
48+ oldEntry = oldEntry with { CreatedAt = createdAt , MetricValue = resultValue } ;
49+ r = await Update ( oldEntry ) ;
50+ if ( ! r . IsOk ) {
51+ throw new InvalidOperationException ( $ "failed to replace job result with taskId { taskId } and machineId+metricType { taskIdMachineIdMetric } ") ;
52+ }
53+ } else {
54+ _logTracer . LogInformation ( $ "received an out-of-date metric. skipping.") ;
55+ }
56+ break ;
57+ default :
58+ _logTracer . LogInformation ( $ "attempt to update job result { taskId } and taskId+machineId+metricType { taskIdMachineIdMetric } ") ;
59+ oldEntry . MetricValue [ "count" ] ++ ;
60+ oldEntry = oldEntry with { MetricValue = oldEntry . MetricValue } ;
61+ r = await Update ( oldEntry ) ;
62+ if ( ! r . IsOk ) {
63+ throw new InvalidOperationException ( $ "failed to update job result with taskId { taskId } and machineId+metricType { taskIdMachineIdMetric } ") ;
64+ }
65+ break ;
9266 }
9367
68+
9469 return true ;
70+
9571 }
9672
97- public async Async . Task < OneFuzzResultVoid > CreateOrUpdate ( Guid jobId , JobResultType resultType , Dictionary < string , double > resultValue ) {
73+ public async Async . Task < OneFuzzResultVoid > CreateOrUpdate ( Guid jobId , Guid taskId , Guid machineId , DateTime createdAt , double version , string resultType , Dictionary < string , double > resultValue ) {
9874
9975 var job = await _context . JobOperations . Get ( jobId ) ;
10076 if ( job == null ) {
@@ -106,7 +82,7 @@ public async Async.Task<OneFuzzResultVoid> CreateOrUpdate(Guid jobId, JobResultT
10682 _logTracer . LogInformation ( "attempt to update job result {JobId}" , job . JobId ) ;
10783 var policy = Policy . Handle < InvalidOperationException > ( ) . WaitAndRetryAsync ( 50 , _ => new TimeSpan ( 0 , 0 , 5 ) ) ;
10884 await policy . ExecuteAsync ( async ( ) => {
109- success = await TryUpdate ( job , resultType , resultValue ) ;
85+ success = await TryUpdate ( job , taskId , machineId , createdAt , version , resultType , resultValue ) ;
11086 _logTracer . LogInformation ( "attempt {success}" , success ) ;
11187 } ) ;
11288 return OneFuzzResultVoid . Ok ;
0 commit comments