1- /**
2- * Abstract class for batch processors
3- */
4- import {
1+ import type {
52 BaseRecord ,
63 BatchProcessingOptions ,
74 EventSourceDataClassTypes ,
85 FailureResponse ,
96 ResultType ,
107 SuccessResponse ,
11- } from '.' ;
8+ } from './types ' ;
129
10+ /**
11+ * Abstract class for batch processors.
12+ */
1313abstract class BasePartialProcessor {
1414 public exceptions : Error [ ] ;
1515
@@ -34,6 +34,40 @@ abstract class BasePartialProcessor {
3434 this . handler = new Function ( ) ;
3535 }
3636
37+ /**
38+ * Call instance's handler for each record
39+ * @returns List of processed records
40+ */
41+ public async asyncProcess ( ) : Promise < ( SuccessResponse | FailureResponse ) [ ] > {
42+ /**
43+ * If this is an sync processor, user should have called process instead,
44+ * so we call the method early to throw the error early thus failing fast.
45+ */
46+ if ( this . constructor . name === 'BatchProcessor' ) {
47+ await this . asyncProcessRecord ( this . records [ 0 ] ) ;
48+ }
49+ this . prepare ( ) ;
50+
51+ const processingPromises : Promise < SuccessResponse | FailureResponse > [ ] =
52+ this . records . map ( ( record ) => this . asyncProcessRecord ( record ) ) ;
53+
54+ const processedRecords : ( SuccessResponse | FailureResponse ) [ ] =
55+ await Promise . all ( processingPromises ) ;
56+
57+ this . clean ( ) ;
58+
59+ return processedRecords ;
60+ }
61+
62+ /**
63+ * Process a record with an asyncronous handler
64+ *
65+ * @param record Record to be processed
66+ */
67+ public abstract asyncProcessRecord (
68+ record : BaseRecord
69+ ) : Promise < SuccessResponse | FailureResponse > ;
70+
3771 /**
3872 * Clean class instance after processing
3973 */
@@ -50,7 +84,6 @@ abstract class BasePartialProcessor {
5084 exception : Error
5185 ) : FailureResponse {
5286 const entry : FailureResponse = [ 'fail' , exception . message , record ] ;
53- // console.debug('Record processing exception: ' + exception.message);
5487 this . exceptions . push ( exception ) ;
5588 this . failureMessages . push ( record ) ;
5689
@@ -66,12 +99,19 @@ abstract class BasePartialProcessor {
6699 * Call instance's handler for each record
67100 * @returns List of processed records
68101 */
69- public async process ( ) : Promise < ( SuccessResponse | FailureResponse ) [ ] > {
102+ public process ( ) : ( SuccessResponse | FailureResponse ) [ ] {
103+ /**
104+ * If this is an async processor, user should have called processAsync instead,
105+ * so we call the method early to throw the error early thus failing fast.
106+ */
107+ if ( this . constructor . name === 'AsyncBatchProcessor' ) {
108+ this . processRecord ( this . records [ 0 ] ) ;
109+ }
70110 this . prepare ( ) ;
71111
72112 const processedRecords : ( SuccessResponse | FailureResponse ) [ ] = [ ] ;
73113 for ( const record of this . records ) {
74- processedRecords . push ( await this . processRecord ( record ) ) ;
114+ processedRecords . push ( this . processRecord ( record ) ) ;
75115 }
76116
77117 this . clean ( ) ;
@@ -85,7 +125,7 @@ abstract class BasePartialProcessor {
85125 */
86126 public abstract processRecord (
87127 record : BaseRecord
88- ) : Promise < SuccessResponse | FailureResponse > ;
128+ ) : SuccessResponse | FailureResponse ;
89129
90130 /**
91131 * Set class instance attributes before execution
0 commit comments