@@ -760,10 +760,21 @@ export class BatchTriggerV3Service extends BaseService {
760
760
} ) ) ;
761
761
762
762
let workingIndex = currentIndex ;
763
+ let expectedCount = 0 ;
763
764
764
765
for ( const item of itemsToProcess ) {
765
766
try {
766
- await this . #processBatchTaskRunItem( batch , environment , item , workingIndex , options ) ;
767
+ const created = await this . #processBatchTaskRunItem(
768
+ batch ,
769
+ environment ,
770
+ item ,
771
+ workingIndex ,
772
+ options
773
+ ) ;
774
+
775
+ if ( created ) {
776
+ expectedCount ++ ;
777
+ }
767
778
768
779
workingIndex ++ ;
769
780
} catch ( error ) {
@@ -780,6 +791,17 @@ export class BatchTriggerV3Service extends BaseService {
780
791
}
781
792
}
782
793
794
+ if ( expectedCount > 0 ) {
795
+ await this . _prisma . batchTaskRun . update ( {
796
+ where : { id : batch . id } ,
797
+ data : {
798
+ expectedCount : {
799
+ increment : expectedCount ,
800
+ } ,
801
+ } ,
802
+ } ) ;
803
+ }
804
+
783
805
return { workingIndex } ;
784
806
}
785
807
@@ -825,21 +847,15 @@ export class BatchTriggerV3Service extends BaseService {
825
847
826
848
if ( ! result . isCached ) {
827
849
try {
828
- await $transaction ( this . _prisma , async ( tx ) => {
829
- // [batchTaskRunId, taskRunId] is a unique index
830
- await tx . batchTaskRunItem . create ( {
831
- data : {
832
- batchTaskRunId : batch . id ,
833
- taskRunId : result . run . id ,
834
- status : batchTaskRunItemStatusForRunStatus ( result . run . status ) ,
835
- } ,
836
- } ) ;
837
-
838
- await tx . batchTaskRun . update ( {
839
- where : { id : batch . id } ,
840
- data : { expectedCount : { increment : 1 } } ,
841
- } ) ;
850
+ await this . _prisma . batchTaskRunItem . create ( {
851
+ data : {
852
+ batchTaskRunId : batch . id ,
853
+ taskRunId : result . run . id ,
854
+ status : batchTaskRunItemStatusForRunStatus ( result . run . status ) ,
855
+ } ,
842
856
} ) ;
857
+
858
+ return true ;
843
859
} catch ( error ) {
844
860
if ( isUniqueConstraintError ( error , [ "batchTaskRunId" , "taskRunId" ] ) ) {
845
861
// This means there is already a batchTaskRunItem for this batch and taskRun
@@ -852,12 +868,14 @@ export class BatchTriggerV3Service extends BaseService {
852
868
}
853
869
) ;
854
870
855
- return ;
871
+ return false ;
856
872
}
857
873
858
874
throw error ;
859
875
}
860
876
}
877
+
878
+ return false ;
861
879
}
862
880
863
881
async #enqueueBatchTaskRun( options : BatchProcessingOptions , tx ?: PrismaClientOrTransaction ) {
@@ -907,62 +925,69 @@ export async function completeBatchTaskRunItemV3(
907
925
scheduleResumeOnComplete = false ,
908
926
taskRunAttemptId ?: string
909
927
) {
910
- await $transaction ( tx , "completeBatchTaskRunItemV3" , async ( tx , span ) => {
911
- span ?. setAttribute ( "batch_id" , batchTaskRunId ) ;
912
-
913
- // Update the item to complete
914
- const updated = await tx . batchTaskRunItem . updateMany ( {
915
- where : {
916
- id : itemId ,
917
- status : "PENDING" ,
918
- } ,
919
- data : {
920
- status : "COMPLETED" ,
921
- taskRunAttemptId,
922
- } ,
923
- } ) ;
924
-
925
- if ( updated . count === 0 ) {
926
- return ;
927
- }
928
-
929
- const updatedBatchRun = await tx . batchTaskRun . update ( {
930
- where : {
931
- id : batchTaskRunId ,
932
- } ,
933
- data : {
934
- completedCount : {
935
- increment : 1 ,
928
+ await $transaction (
929
+ tx ,
930
+ "completeBatchTaskRunItemV3" ,
931
+ async ( tx , span ) => {
932
+ span ?. setAttribute ( "batch_id" , batchTaskRunId ) ;
933
+
934
+ // Update the item to complete
935
+ const updated = await tx . batchTaskRunItem . updateMany ( {
936
+ where : {
937
+ id : itemId ,
938
+ status : "PENDING" ,
936
939
} ,
937
- } ,
938
- select : {
939
- sealed : true ,
940
- status : true ,
941
- completedCount : true ,
942
- expectedCount : true ,
943
- dependentTaskAttemptId : true ,
944
- } ,
945
- } ) ;
940
+ data : {
941
+ status : "COMPLETED" ,
942
+ taskRunAttemptId ,
943
+ } ,
944
+ } ) ;
945
+
946
+ if ( updated . count === 0 ) {
947
+ return ;
948
+ }
946
949
947
- if (
948
- updatedBatchRun . status === "PENDING" &&
949
- updatedBatchRun . completedCount === updatedBatchRun . expectedCount &&
950
- updatedBatchRun . sealed
951
- ) {
952
- await tx . batchTaskRun . update ( {
950
+ const updatedBatchRun = await tx . batchTaskRun . update ( {
953
951
where : {
954
952
id : batchTaskRunId ,
955
953
} ,
956
954
data : {
957
- status : "COMPLETED" ,
958
- completedAt : new Date ( ) ,
955
+ completedCount : {
956
+ increment : 1 ,
957
+ } ,
958
+ } ,
959
+ select : {
960
+ sealed : true ,
961
+ status : true ,
962
+ completedCount : true ,
963
+ expectedCount : true ,
964
+ dependentTaskAttemptId : true ,
959
965
} ,
960
966
} ) ;
961
967
962
- // We only need to resume the batch if it has a dependent task attempt ID
963
- if ( scheduleResumeOnComplete && updatedBatchRun . dependentTaskAttemptId ) {
964
- await ResumeBatchRunService . enqueue ( batchTaskRunId , true , tx ) ;
968
+ if (
969
+ updatedBatchRun . status === "PENDING" &&
970
+ updatedBatchRun . completedCount === updatedBatchRun . expectedCount &&
971
+ updatedBatchRun . sealed
972
+ ) {
973
+ await tx . batchTaskRun . update ( {
974
+ where : {
975
+ id : batchTaskRunId ,
976
+ } ,
977
+ data : {
978
+ status : "COMPLETED" ,
979
+ completedAt : new Date ( ) ,
980
+ } ,
981
+ } ) ;
982
+
983
+ // We only need to resume the batch if it has a dependent task attempt ID
984
+ if ( scheduleResumeOnComplete && updatedBatchRun . dependentTaskAttemptId ) {
985
+ await ResumeBatchRunService . enqueue ( batchTaskRunId , true , tx ) ;
986
+ }
965
987
}
988
+ } ,
989
+ {
990
+ timeout : 10000 ,
966
991
}
967
- } ) ;
992
+ ) ;
968
993
}
0 commit comments