Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { RequestClient, ModifiedResponse, PayloadValidationError } from '@segment/actions-core'
import { RequestClient, ModifiedResponse, PayloadValidationError, MultiStatusResponse } from '@segment/actions-core'
import { Settings } from './generated-types'
import { Payload } from './syncAudience/generated-types'
// eslint-disable-next-line no-restricted-syntax
Expand Down Expand Up @@ -62,8 +62,18 @@ export async function processPayload(input: ProcessPayloadInput) {
crmID = input.payloads[0].external_id
}

const multiStatusResponse = new MultiStatusResponse()
// mark all payloads as successful and then filter out payloads with invalid emails
for (let i = 0; i < input.payloads.length; i++) {
const payload = input.payloads[i]
multiStatusResponse.setSuccessResponseAtIndex(i, {
sent: { ...payload },
status: 200,
body: 'Successfully uploaded to S3'
})
}
// Get user emails from the payloads
const [usersFormatted, rowCount] = extractUsers(input.payloads)
const [usersFormatted, rowCount] = extractUsers(input.payloads, multiStatusResponse)

// Overwrite to Legacy Flow if feature flag is enabled
if (input.features && input.features[TTD_LEGACY_FLOW_FLAG_NAME]) {
Expand All @@ -72,43 +82,90 @@ export async function processPayload(input: ProcessPayloadInput) {
// -----------

if (input.payloads.length < TTD_MIN_RECORD_COUNT) {
throw new PayloadValidationError(
`received payload count below The Trade Desk's ingestion minimum. Expected: >=${TTD_MIN_RECORD_COUNT} actual: ${input.payloads.length}`
)
for (let i = 0; i < input.payloads.length; i++) {
multiStatusResponse.setErrorResponseAtIndex(i, {
status: 400,
errortype: 'PAYLOAD_VALIDATION_FAILED',
errormessage: `received payload count below The Trade Desk's ingestion minimum. Expected: >=${TTD_MIN_RECORD_COUNT} actual: ${input.payloads.length}`,
sent: { ...input.payloads[i] },
body: `received payload count below The Trade Desk's ingestion minimum. Expected: >=${TTD_MIN_RECORD_COUNT} actual: ${input.payloads.length}`
})
}
return multiStatusResponse
}

// Create a new TTD Drop Endpoint
const dropEndpoint = await getCRMDataDropEndpoint(input.request, input.settings, input.payloads[0], crmID)

// Upload CRM Data to Drop Endpoint
return uploadCRMDataToDropEndpoint(input.request, dropEndpoint, usersFormatted)
try {
// Create a new TTD Drop Endpoint
const dropEndpoint = await getCRMDataDropEndpoint(input.request, input.settings, input.payloads[0], crmID)

// Upload CRM Data to Drop Endpoint
await uploadCRMDataToDropEndpoint(input.request, dropEndpoint, usersFormatted)

return multiStatusResponse
} catch (error) {
for (let i = 0; i < input.payloads.length; i++) {
if (multiStatusResponse.isSuccessResponseAtIndex(i)) {
multiStatusResponse.setErrorResponseAtIndex(i, {
status: 500,
errortype: 'RETRYABLE_ERROR',
errormessage: `Failed to upload to The Trade Desk Drop Endpoint: ${(error as Error).message}`,
sent: { ...input.payloads[i] },
body: `The Trade Desk Drop Endpoint upload failed: ${(error as Error).message}`
})
}
}
return multiStatusResponse
}
} else {
//------------
// AWS FLOW
// -----------

// Send request to AWS to be processed
return sendEventToAWS({
TDDAuthToken: input.settings.auth_token,
AdvertiserId: input.settings.advertiser_id,
CrmDataId: crmID,
UsersFormatted: usersFormatted,
RowCount: rowCount,
DropOptions: {
PiiType: input.payloads[0].pii_type,
MergeMode: 'Replace',
RetentionEnabled: true
try {
// Send request to AWS to be processed
await sendEventToAWS({
TDDAuthToken: input.settings.auth_token,
AdvertiserId: input.settings.advertiser_id,
CrmDataId: crmID,
UsersFormatted: usersFormatted,
RowCount: rowCount,
DropOptions: {
PiiType: input.payloads[0].pii_type,
MergeMode: 'Replace',
RetentionEnabled: true
}
})
} catch (error) {
// Mark all remaining success payloads as failed if AWS upload fails
for (let i = 0; i < input.payloads.length; i++) {
if (multiStatusResponse.isSuccessResponseAtIndex(i)) {
multiStatusResponse.setErrorResponseAtIndex(i, {
status: 500,
errortype: 'RETRYABLE_ERROR',
errormessage: `Failed to upload to AWS: ${(error as Error).message}`,
sent: { ...input.payloads[i] },
body: `AWS upload failed: ${(error as Error).message}`
})
}
}
})
}
return multiStatusResponse
}
}

function extractUsers(payloads: Payload[]): [string, number] {
function extractUsers(payloads: Payload[], multiStatusResponse: MultiStatusResponse): [string, number] {
let users = ''
let rowCount = 0

payloads.forEach((payload: Payload) => {
payloads.forEach((payload: Payload, index: number) => {
if (!payload.email || !validateEmail(payload.email, payload.pii_type)) {
multiStatusResponse.setErrorResponseAtIndex(index, {
status: 400,
errortype: 'PAYLOAD_VALIDATION_FAILED',
errormessage: `Invalid email: ${payload.email}`,
sent: { ...payload },
body: `Invalid email: ${payload.email}`
})
return
}

Expand Down Expand Up @@ -196,7 +253,7 @@ async function getCRMDataDropEndpoint(request: RequestClient, settings: Settings

// Uploads CRM Data to Drop Endpoint (Legacy Flow)
async function uploadCRMDataToDropEndpoint(request: RequestClient, endpoint: string, users: string) {
return await request(endpoint, {
await request(endpoint, {
method: 'PUT',
headers: {
'Content-Type': 'text/plain'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@ import nock from 'nock'
import { createTestEvent, createTestIntegration, SegmentEvent } from '@segment/actions-core'
import Destination from '../../index'

// Type for MultiStatus error response node
interface MultiStatusErrorNode {
status: number
errortype: string
errormessage: string
sent: any
body: string
errorreporter: string
}

import { TTD_LEGACY_FLOW_FLAG_NAME } from '../../functions'

import { getAWSCredentialsFromEKS, AWSCredentials } from '../../../../lib/AWS/sts'
Expand Down Expand Up @@ -103,24 +113,40 @@ describe('TheTradeDeskCrm.syncAudience', () => {
.get(/.*/)
.reply(200, { Segments: [], PagingToken: null })

await expect(
testDestination.testAction('syncAudience', {
event,
settings: {
advertiser_id: 'advertiser_id',
auth_token: 'test_token',
__segment_internal_engage_force_full_sync: true,
__segment_internal_engage_batch_sync: true
},
features: { 'actions-the-trade-desk-crm-legacy-flow': true },
useDefaultMappings: true,
mapping: {
name: 'test_audience',
region: 'US',
pii_type: 'Email'
}
})
).rejects.toThrow(`received payload count below The Trade Desk's ingestion minimum. Expected: >=1500 actual: 1`)
const response = await testDestination.testBatchAction('syncAudience', {
events: [event],
settings: {
advertiser_id: 'advertiser_id',
auth_token: 'test_token',
__segment_internal_engage_force_full_sync: true,
__segment_internal_engage_batch_sync: true
},
features: {
[TTD_LEGACY_FLOW_FLAG_NAME]: true
},
useDefaultMappings: true,
mapping: {
name: 'test_audience',
region: 'US',
pii_type: 'Email'
}
})

const multiStatusResponse = testDestination.results?.[0]?.multistatus

expect(multiStatusResponse).toBeDefined()
if (multiStatusResponse) {
expect(multiStatusResponse.length).toBe(1)
expect(multiStatusResponse[0].status).toBe(400)

// Type-safe access to error properties
const errorResponse = multiStatusResponse[0] as MultiStatusErrorNode
expect(errorResponse.errortype).toBe('PAYLOAD_VALIDATION_FAILED')
expect(errorResponse.errormessage).toContain('received payload count below')
}

// No HTTP requests should be made when validation fails early
expect(response.length).toBe(0)
})

it('should execute legacy flow if flagon override is defined', async () => {
Expand Down Expand Up @@ -231,6 +257,94 @@ describe('TheTradeDeskCrm.syncAudience', () => {
).rejects.toThrow(`No external_id found in payload.`)
})

it('should mark the payload with invalid email as failed in multistatus response', async () => {
const dropReferenceId = 'aabbcc5b01-c9c7-4000-9191-000000000000'
const dropEndpoint = `https://thetradedesk-crm-data.s3.us-east-1.amazonaws.com/data/advertiser/advertiser-id/drop/${dropReferenceId}/pii?X-Amz-Security-Token=token&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=date&X-Amz-SignedHeaders=host&X-Amz-Expires=3600&X-Amz-Credential=credentials&X-Amz-Signature=signature&`

nock(`https://api.thetradedesk.com/v3/crmdata/segment/advertiser_id/personas_test_audience`)
.post(/.*/, { PiiType: 'Email', MergeMode: 'Replace', RetentionEnabled: true })
.reply(200, { ReferenceId: dropReferenceId, Url: dropEndpoint })

nock(dropEndpoint).put(/.*/).reply(200)
const events: SegmentEvent[] = []
for (let index = 1; index <= 1500; index++) {
events.push(
createTestEvent({
event: 'Audience Entered',
type: 'track',
properties: {
audience_key: 'personas_test_audience'
},
context: {
device: {
advertisingId: '123'
},
traits: {
email: `testing${index}@testing.com`
},
personas: {
external_audience_id: 'external_audience_id'
}
}
})
)
}
events.push(
createTestEvent({
event: 'Audience Entered',
type: 'track',
properties: {
audience_key: 'personas_test_audience'
},
context: {
device: {
advertisingId: '123'
},
traits: {
email: `invalid-email-address`
},
personas: {
external_audience_id: 'external_audience_id'
}
}
})
)

const responses = await testDestination.testBatchAction('syncAudience', {
events,
settings: {
advertiser_id: 'advertiser_id',
auth_token: 'test_token',
__segment_internal_engage_force_full_sync: true,
__segment_internal_engage_batch_sync: true
},
features: {
[TTD_LEGACY_FLOW_FLAG_NAME]: true
},
useDefaultMappings: true,
mapping: {
name: 'test_audience',
region: 'US',
pii_type: 'Email'
}
})

expect(responses.length).toBe(2)
const multiStatusResponse = testDestination.results?.[0]?.multistatus
expect(multiStatusResponse).toBeDefined()
if (multiStatusResponse) {
const length = multiStatusResponse.length
expect(length).toBe(1501)
const invalidEmailResponse = multiStatusResponse[length - 1]
expect(invalidEmailResponse.status).toBe(400)

// Type-safe access to error properties
const errorResponse = invalidEmailResponse as MultiStatusErrorNode
expect(errorResponse.errortype).toBe('PAYLOAD_VALIDATION_FAILED')
expect(errorResponse.errormessage).toContain('Invalid email: invalid-email-address')
}
})

it('should not double hash an email that is already base64 encoded', async () => {
const dropReferenceId = 'aabbcc5b01-c9c7-4000-9191-000000000000'
const dropEndpoint = `https://thetradedesk-crm-data.s3.us-east-1.amazonaws.com/data/advertiser/advertiser-id/drop/${dropReferenceId}/pii?X-Amz-Security-Token=token&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Date=date&X-Amz-SignedHeaders=host&X-Amz-Expires=3600&X-Amz-Credential=credentials&X-Amz-Signature=signature&`
Expand Down
Loading