1- import { RequestClient , ModifiedResponse , PayloadValidationError } from '@segment/actions-core'
1+ import { RequestClient , ModifiedResponse , PayloadValidationError , MultiStatusResponse } from '@segment/actions-core'
22import { Settings } from './generated-types'
33import { Payload } from './syncAudience/generated-types'
44// eslint-disable-next-line no-restricted-syntax
@@ -62,8 +62,18 @@ export async function processPayload(input: ProcessPayloadInput) {
6262 crmID = input . payloads [ 0 ] . external_id
6363 }
6464
65+ const multiStatusResponse = new MultiStatusResponse ( )
66+ // mark all payloads as successful and then filter out payloads with invalid emails
67+ for ( let i = 0 ; i < input . payloads . length ; i ++ ) {
68+ const payload = input . payloads [ i ]
69+ multiStatusResponse . setSuccessResponseAtIndex ( i , {
70+ sent : { ...payload } ,
71+ status : 200 ,
72+ body : 'Successfully uploaded to S3'
73+ } )
74+ }
6575 // Get user emails from the payloads
66- const [ usersFormatted , rowCount ] = extractUsers ( input . payloads )
76+ const [ usersFormatted , rowCount ] = extractUsers ( input . payloads , multiStatusResponse )
6777
6878 // Overwrite to Legacy Flow if feature flag is enabled
6979 if ( input . features && input . features [ TTD_LEGACY_FLOW_FLAG_NAME ] ) {
@@ -72,43 +82,90 @@ export async function processPayload(input: ProcessPayloadInput) {
7282 // -----------
7383
7484 if ( input . payloads . length < TTD_MIN_RECORD_COUNT ) {
75- throw new PayloadValidationError (
76- `received payload count below The Trade Desk's ingestion minimum. Expected: >=${ TTD_MIN_RECORD_COUNT } actual: ${ input . payloads . length } `
77- )
85+ for ( let i = 0 ; i < input . payloads . length ; i ++ ) {
86+ multiStatusResponse . setErrorResponseAtIndex ( i , {
87+ status : 400 ,
88+ errortype : 'PAYLOAD_VALIDATION_FAILED' ,
89+ errormessage : `received payload count below The Trade Desk's ingestion minimum. Expected: >=${ TTD_MIN_RECORD_COUNT } actual: ${ input . payloads . length } ` ,
90+ sent : { ...input . payloads [ i ] } ,
91+ body : `received payload count below The Trade Desk's ingestion minimum. Expected: >=${ TTD_MIN_RECORD_COUNT } actual: ${ input . payloads . length } `
92+ } )
93+ }
94+ return multiStatusResponse
7895 }
7996
80- // Create a new TTD Drop Endpoint
81- const dropEndpoint = await getCRMDataDropEndpoint ( input . request , input . settings , input . payloads [ 0 ] , crmID )
82-
83- // Upload CRM Data to Drop Endpoint
84- return uploadCRMDataToDropEndpoint ( input . request , dropEndpoint , usersFormatted )
97+ try {
98+ // Create a new TTD Drop Endpoint
99+ const dropEndpoint = await getCRMDataDropEndpoint ( input . request , input . settings , input . payloads [ 0 ] , crmID )
100+
101+ // Upload CRM Data to Drop Endpoint
102+ await uploadCRMDataToDropEndpoint ( input . request , dropEndpoint , usersFormatted )
103+
104+ return multiStatusResponse
105+ } catch ( error ) {
106+ for ( let i = 0 ; i < input . payloads . length ; i ++ ) {
107+ if ( multiStatusResponse . isSuccessResponseAtIndex ( i ) ) {
108+ multiStatusResponse . setErrorResponseAtIndex ( i , {
109+ status : 500 ,
110+ errortype : 'RETRYABLE_ERROR' ,
111+ errormessage : `Failed to upload to The Trade Desk Drop Endpoint: ${ ( error as Error ) . message } ` ,
112+ sent : { ...input . payloads [ i ] } ,
113+ body : `The Trade Desk Drop Endpoint upload failed: ${ ( error as Error ) . message } `
114+ } )
115+ }
116+ }
117+ return multiStatusResponse
118+ }
85119 } else {
86120 //------------
87121 // AWS FLOW
88122 // -----------
89123
90- // Send request to AWS to be processed
91- return sendEventToAWS ( {
92- TDDAuthToken : input . settings . auth_token ,
93- AdvertiserId : input . settings . advertiser_id ,
94- CrmDataId : crmID ,
95- UsersFormatted : usersFormatted ,
96- RowCount : rowCount ,
97- DropOptions : {
98- PiiType : input . payloads [ 0 ] . pii_type ,
99- MergeMode : 'Replace' ,
100- RetentionEnabled : true
124+ try {
125+ // Send request to AWS to be processed
126+ await sendEventToAWS ( {
127+ TDDAuthToken : input . settings . auth_token ,
128+ AdvertiserId : input . settings . advertiser_id ,
129+ CrmDataId : crmID ,
130+ UsersFormatted : usersFormatted ,
131+ RowCount : rowCount ,
132+ DropOptions : {
133+ PiiType : input . payloads [ 0 ] . pii_type ,
134+ MergeMode : 'Replace' ,
135+ RetentionEnabled : true
136+ }
137+ } )
138+ } catch ( error ) {
139+ // Mark all remaining success payloads as failed if AWS upload fails
140+ for ( let i = 0 ; i < input . payloads . length ; i ++ ) {
141+ if ( multiStatusResponse . isSuccessResponseAtIndex ( i ) ) {
142+ multiStatusResponse . setErrorResponseAtIndex ( i , {
143+ status : 500 ,
144+ errortype : 'RETRYABLE_ERROR' ,
145+ errormessage : `Failed to upload to AWS: ${ ( error as Error ) . message } ` ,
146+ sent : { ...input . payloads [ i ] } ,
147+ body : `AWS upload failed: ${ ( error as Error ) . message } `
148+ } )
149+ }
101150 }
102- } )
151+ }
152+ return multiStatusResponse
103153 }
104154}
105155
106- function extractUsers ( payloads : Payload [ ] ) : [ string , number ] {
156+ function extractUsers ( payloads : Payload [ ] , multiStatusResponse : MultiStatusResponse ) : [ string , number ] {
107157 let users = ''
108158 let rowCount = 0
109159
110- payloads . forEach ( ( payload : Payload ) => {
160+ payloads . forEach ( ( payload : Payload , index : number ) => {
111161 if ( ! payload . email || ! validateEmail ( payload . email , payload . pii_type ) ) {
162+ multiStatusResponse . setErrorResponseAtIndex ( index , {
163+ status : 400 ,
164+ errortype : 'PAYLOAD_VALIDATION_FAILED' ,
165+ errormessage : `Invalid email: ${ payload . email } ` ,
166+ sent : { ...payload } ,
167+ body : `Invalid email: ${ payload . email } `
168+ } )
112169 return
113170 }
114171
@@ -196,7 +253,7 @@ async function getCRMDataDropEndpoint(request: RequestClient, settings: Settings
196253
197254// Uploads CRM Data to Drop Endpoint (Legacy Flow)
198255async function uploadCRMDataToDropEndpoint ( request : RequestClient , endpoint : string , users : string ) {
199- return await request ( endpoint , {
256+ await request ( endpoint , {
200257 method : 'PUT' ,
201258 headers : {
202259 'Content-Type' : 'text/plain'
0 commit comments