diff --git a/packages/core/destination_writers/postgres_impl.ts b/packages/core/destination_writers/postgres_impl.ts index 8c1eecb75..a9113f14d 100644 --- a/packages/core/destination_writers/postgres_impl.ts +++ b/packages/core/destination_writers/postgres_impl.ts @@ -112,8 +112,6 @@ DO UPDATE SET (${columnsToUpdateStr}) = (${excludedColumnsToUpdateStr})`, } catch (err) { childLogger.error({ err }, 'Error upserting common object record'); throw err; - } finally { - client.release(); } } @@ -132,106 +130,105 @@ DO UPDATE SET (${columnsToUpdateStr}) = (${excludedColumnsToUpdateStr})`, const qualifiedTable = `"${schema}".${table}`; const tempTable = `temp_${table}`; - try { - await setup(); - - const columns = getColumnsForCommonObject(category, commonObjectType); - const columnsToUpdate = columns.filter( - (c) => - c !== '_supaglue_application_id' && - c !== '_supaglue_provider_name' && - c !== '_supaglue_customer_id' && - c !== 'id' - ); - - // Output - const stream = client.query( - copyFrom(`COPY ${tempTable} (${columns.join(',')}) FROM STDIN WITH (DELIMITER ',', FORMAT CSV)`) - ); - - // Input - const stringifier = stringify({ - columns, - cast: { - boolean: (value: boolean) => value.toString(), - object: (value: object) => jsonStringifyWithoutNullChars(value), - date: (value: Date) => value.toISOString(), - string: (value: string) => stripNullCharsFromString(value), - }, - quoted: true, - }); - - const mapper = getSnakecasedKeysMapper(category, commonObjectType); - - // Keep track of stuff - let tempTableRowCount = 0; - let maxLastModifiedAt: Date | null = null; - - childLogger.info('Importing common object records into temp table [IN PROGRESS]'); - await pipeline( - inputStream, - new Transform({ - objectMode: true, - transform: (chunk, encoding, callback) => { - try { - const { record, emittedAt } = chunk; - const unifiedData = mapper(record); - const mappedRecord = { - _supaglue_application_id: applicationId, - _supaglue_provider_name: providerName, - _supaglue_customer_id: customerId, - _supaglue_emitted_at: emittedAt, - _supaglue_unified_data: omit(unifiedData, ['raw_data']), - ...unifiedData, - }; - - ++tempTableRowCount; - - // Update the max lastModifiedAt - const { lastModifiedAt } = record; - if (lastModifiedAt && (!maxLastModifiedAt || lastModifiedAt > maxLastModifiedAt)) { - maxLastModifiedAt = lastModifiedAt; - } - - callback(null, mappedRecord); - } catch (e: any) { - return callback(e); + await setup(); + + const columns = getColumnsForCommonObject(category, commonObjectType); + const columnsToUpdate = columns.filter( + (c) => + c !== '_supaglue_application_id' && + c !== '_supaglue_provider_name' && + c !== '_supaglue_customer_id' && + c !== 'id' + ); + + // Output + const stream = client.query( + copyFrom(`COPY ${tempTable} (${columns.join(',')}) FROM STDIN WITH (DELIMITER ',', FORMAT CSV)`) + ); + + // Input + const stringifier = stringify({ + columns, + cast: { + boolean: (value: boolean) => value.toString(), + object: (value: object) => jsonStringifyWithoutNullChars(value), + date: (value: Date) => value.toISOString(), + string: (value: string) => stripNullCharsFromString(value), + }, + quoted: true, + }); + + const mapper = getSnakecasedKeysMapper(category, commonObjectType); + + // Keep track of stuff + let tempTableRowCount = 0; + let maxLastModifiedAt: Date | null = null; + + childLogger.info('Importing common object records into temp table [IN PROGRESS]'); + await pipeline( + inputStream, + new Transform({ + objectMode: true, + transform: (chunk, encoding, callback) => { + try { + const { record, emittedAt } = chunk; + const unifiedData = mapper(record); + const mappedRecord = { + _supaglue_application_id: applicationId, + _supaglue_provider_name: providerName, + _supaglue_customer_id: customerId, + _supaglue_emitted_at: emittedAt, + _supaglue_unified_data: omit(unifiedData, ['raw_data']), + ...unifiedData, + }; + + ++tempTableRowCount; + + // Update the max lastModifiedAt + const { lastModifiedAt } = record; + if (lastModifiedAt && (!maxLastModifiedAt || lastModifiedAt > maxLastModifiedAt)) { + maxLastModifiedAt = lastModifiedAt; } - }, - }), - stringifier, - stream - ); - childLogger.info('Importing common object records into temp table [COMPLETED]'); - // Copy from deduped temp table - const columnsToUpdateStr = columnsToUpdate.join(','); - const excludedColumnsToUpdateStr = columnsToUpdate.map((column) => `EXCLUDED.${column}`).join(','); - - // Paginate - const batchSize = 10000; - for (let offset = 0; offset < tempTableRowCount; offset += batchSize) { - childLogger.info({ offset }, 'Copying from deduped temp table to main table [IN PROGRESS]'); - // IMPORTANT: we need to use DISTINCT ON because we may have multiple records with the same id - // For example, hubspot will return the same record twice when querying for `archived: true` if - // the record was archived, restored, and archived again. - // TODO: This may have performance implications. We should look into this later. - // https://github.com/supaglue-labs/supaglue/issues/497 - await client.query(`INSERT INTO ${qualifiedTable} (${columns.join(',')}) + callback(null, mappedRecord); + } catch (e: any) { + return callback(e); + } + }, + }), + stringifier, + stream + ); + childLogger.info('Importing common object records into temp table [COMPLETED]'); + + // Copy from deduped temp table + const columnsToUpdateStr = columnsToUpdate.join(','); + const excludedColumnsToUpdateStr = columnsToUpdate.map((column) => `EXCLUDED.${column}`).join(','); + + // Paginate + const batchSize = 10000; + for (let offset = 0; offset < tempTableRowCount; offset += batchSize) { + childLogger.info({ offset }, 'Copying from deduped temp table to main table [IN PROGRESS]'); + // IMPORTANT: we need to use DISTINCT ON because we may have multiple records with the same id + // For example, hubspot will return the same record twice when querying for `archived: true` if + // the record was archived, restored, and archived again. + // TODO: This may have performance implications. We should look into this later. + // https://github.com/supaglue-labs/supaglue/issues/497 + await client.query(`INSERT INTO ${qualifiedTable} (${columns.join(',')}) SELECT DISTINCT ON (id) ${columns.join( - ',' - )} FROM ${tempTable} ORDER BY id ASC, last_modified_at DESC OFFSET ${offset} LIMIT ${batchSize} + ',' + )} FROM ${tempTable} ORDER BY id ASC, last_modified_at DESC OFFSET ${offset} LIMIT ${batchSize} ON CONFLICT (_supaglue_application_id, _supaglue_provider_name, _supaglue_customer_id, id) DO UPDATE SET (${columnsToUpdateStr}) = (${excludedColumnsToUpdateStr})`); - childLogger.info({ offset }, 'Copying from deduped temp table to main table [COMPLETED]'); - heartbeat(); - } + childLogger.info({ offset }, 'Copying from deduped temp table to main table [COMPLETED]'); + heartbeat(); + } - childLogger.info('Copying from deduped temp table to main table [COMPLETED]'); + childLogger.info('Copying from deduped temp table to main table [COMPLETED]'); - if (diffAndDeleteRecords) { - childLogger.info('Marking rows as deleted [IN PROGRESS]'); - await client.query(` + if (diffAndDeleteRecords) { + childLogger.info('Marking rows as deleted [IN PROGRESS]'); + await client.query(` UPDATE ${qualifiedTable} AS destination SET is_deleted = TRUE WHERE @@ -244,19 +241,16 @@ DO UPDATE SET (${columnsToUpdateStr}) = (${excludedColumnsToUpdateStr})`); WHERE temp.id = destination.id ); `); - childLogger.info('Marking rows as deleted [COMPLETED]'); - heartbeat(); - } + childLogger.info('Marking rows as deleted [COMPLETED]'); + heartbeat(); + } - // We don't drop deduped temp table here because we're closing the connection here anyway. + // We don't drop deduped temp table here because we're closing the connection here anyway. - return { - maxLastModifiedAt, - numRecords: tempTableRowCount, // TODO: not quite accurate (because there can be duplicates) but good enough - }; - } finally { - client.release(); - } + return { + maxLastModifiedAt, + numRecords: tempTableRowCount, // TODO: not quite accurate (because there can be duplicates) but good enough + }; } async upsertRecordImpl( @@ -327,8 +321,6 @@ DO UPDATE SET (${columnsToUpdateStr}) = (${excludedColumnsToUpdateStr})`, } catch (err) { childLogger.error({ err }, 'Error upserting record'); throw err; - } finally { - client.release(); } } @@ -348,125 +340,124 @@ DO UPDATE SET (${columnsToUpdateStr}) = (${excludedColumnsToUpdateStr})`, const qualifiedTable = `"${schema}".${table}`; const tempTable = `"temp_${table}"`; - try { - await setup(); - - // TODO: Make this type-safe - const columns = [ - '_supaglue_application_id', - '_supaglue_provider_name', - '_supaglue_customer_id', - '_supaglue_id', - '_supaglue_emitted_at', - '_supaglue_last_modified_at', - '_supaglue_is_deleted', - '_supaglue_raw_data', - // This is used to support entities + schemas. We should write empty object otherwise (e.g. for managed destinations). - '_supaglue_mapped_data', - ...(objectType === 'custom' ? ['_supaglue_object_name'] : []), - ]; - const columnsToUpdate = columns.filter( - (c) => - c !== '_supaglue_application_id' && - c !== '_supaglue_provider_name' && - c !== '_supaglue_customer_id' && - c !== '_supaglue_id' && - c !== '_supaglue_object_name' - ); - - // Output - const stream = client.query( - copyFrom(`COPY ${tempTable} (${columns.join(',')}) FROM STDIN WITH (DELIMITER ',', FORMAT CSV)`) - ); - - // Input - const stringifier = stringify({ - columns, - cast: { - boolean: (value: boolean) => value.toString(), - object: (value: object) => jsonStringifyWithoutNullChars(value), - date: (value: Date) => value.toISOString(), - string: (value: string) => stripNullCharsFromString(value), - }, - quoted: true, - }); - - // Keep track of stuff - let tempTableRowCount = 0; - let maxLastModifiedAt: Date | null = null; - - childLogger.info('Importing raw records into temp table [IN PROGRESS]'); - await pipeline( - inputStream, - new Transform({ - objectMode: true, - transform: (record: MappedListedObjectRecord, encoding, callback) => { - try { - const mappedRecord = { - _supaglue_application_id: applicationId, - _supaglue_provider_name: providerName, - _supaglue_customer_id: customerId, - _supaglue_id: record.id, - _supaglue_emitted_at: record.emittedAt, - _supaglue_last_modified_at: record.lastModifiedAt, - _supaglue_is_deleted: record.isDeleted, - _supaglue_raw_data: record.rawData, - _supaglue_mapped_data: {}, - _supaglue_object_name: objectName, - }; - - ++tempTableRowCount; - - // Update the max lastModifiedAt - const { lastModifiedAt } = record; - if (lastModifiedAt && (!maxLastModifiedAt || lastModifiedAt > maxLastModifiedAt)) { - maxLastModifiedAt = lastModifiedAt; - } - - callback(null, mappedRecord); - } catch (e: any) { - return callback(e); + await setup(); + + // TODO: Make this type-safe + const columns = [ + '_supaglue_application_id', + '_supaglue_provider_name', + '_supaglue_customer_id', + '_supaglue_id', + '_supaglue_emitted_at', + '_supaglue_last_modified_at', + '_supaglue_is_deleted', + '_supaglue_raw_data', + // This is used to support entities + schemas. We should write empty object otherwise (e.g. for managed destinations). + '_supaglue_mapped_data', + ...(objectType === 'custom' ? ['_supaglue_object_name'] : []), + ]; + const columnsToUpdate = columns.filter( + (c) => + c !== '_supaglue_application_id' && + c !== '_supaglue_provider_name' && + c !== '_supaglue_customer_id' && + c !== '_supaglue_id' && + c !== '_supaglue_object_name' + ); + + // Output + const stream = client.query( + copyFrom(`COPY ${tempTable} (${columns.join(',')}) FROM STDIN WITH (DELIMITER ',', FORMAT CSV)`) + ); + + // Input + const stringifier = stringify({ + columns, + cast: { + boolean: (value: boolean) => value.toString(), + object: (value: object) => jsonStringifyWithoutNullChars(value), + date: (value: Date) => value.toISOString(), + string: (value: string) => stripNullCharsFromString(value), + }, + quoted: true, + }); + + // Keep track of stuff + let tempTableRowCount = 0; + let maxLastModifiedAt: Date | null = null; + + childLogger.info('Importing raw records into temp table [IN PROGRESS]'); + await pipeline( + inputStream, + new Transform({ + objectMode: true, + transform: (record: MappedListedObjectRecord, encoding, callback) => { + try { + const mappedRecord = { + _supaglue_application_id: applicationId, + _supaglue_provider_name: providerName, + _supaglue_customer_id: customerId, + _supaglue_id: record.id, + _supaglue_emitted_at: record.emittedAt, + _supaglue_last_modified_at: record.lastModifiedAt, + _supaglue_is_deleted: record.isDeleted, + _supaglue_raw_data: record.rawData, + _supaglue_mapped_data: {}, + _supaglue_object_name: objectName, + }; + + ++tempTableRowCount; + + // Update the max lastModifiedAt + const { lastModifiedAt } = record; + if (lastModifiedAt && (!maxLastModifiedAt || lastModifiedAt > maxLastModifiedAt)) { + maxLastModifiedAt = lastModifiedAt; } - }, - }), - stringifier, - stream - ); - childLogger.info('Importing raw records into temp table [COMPLETED]'); - - heartbeat(); - // Copy from deduped temp table - const columnsToUpdateStr = columnsToUpdate.join(','); - const excludedColumnsToUpdateStr = columnsToUpdate.map((column) => `EXCLUDED.${column}`).join(','); - const maybeObjectNameColumn = objectType === 'custom' ? ', _supaglue_object_name' : ''; - // Paginate - const batchSize = 10000; - for (let offset = 0; offset < tempTableRowCount; offset += batchSize) { - childLogger.info({ offset }, 'Copying from deduped temp table to main table [IN PROGRESS]'); - // IMPORTANT: we need to use DISTINCT ON because we may have multiple records with the same id - // For example, hubspot will return the same record twice when querying for `archived: true` if - // the record was archived, restored, and archived again. - // TODO: This may have performance implications. We should look into this later. - // https://github.com/supaglue-labs/supaglue/issues/497 - const supaglueId = `_supaglue_id${maybeObjectNameColumn}`; - await client.query(`INSERT INTO ${qualifiedTable} (${columns.join(',')}) + callback(null, mappedRecord); + } catch (e: any) { + return callback(e); + } + }, + }), + stringifier, + stream + ); + childLogger.info('Importing raw records into temp table [COMPLETED]'); + + heartbeat(); + + // Copy from deduped temp table + const columnsToUpdateStr = columnsToUpdate.join(','); + const excludedColumnsToUpdateStr = columnsToUpdate.map((column) => `EXCLUDED.${column}`).join(','); + const maybeObjectNameColumn = objectType === 'custom' ? ', _supaglue_object_name' : ''; + // Paginate + const batchSize = 10000; + for (let offset = 0; offset < tempTableRowCount; offset += batchSize) { + childLogger.info({ offset }, 'Copying from deduped temp table to main table [IN PROGRESS]'); + // IMPORTANT: we need to use DISTINCT ON because we may have multiple records with the same id + // For example, hubspot will return the same record twice when querying for `archived: true` if + // the record was archived, restored, and archived again. + // TODO: This may have performance implications. We should look into this later. + // https://github.com/supaglue-labs/supaglue/issues/497 + const supaglueId = `_supaglue_id${maybeObjectNameColumn}`; + await client.query(`INSERT INTO ${qualifiedTable} (${columns.join(',')}) SELECT DISTINCT ON (${supaglueId}) ${columns.join( - ',' - )} FROM ${tempTable} ORDER BY ${supaglueId} ASC, _supaglue_last_modified_at DESC OFFSET ${offset} limit ${batchSize} + ',' + )} FROM ${tempTable} ORDER BY ${supaglueId} ASC, _supaglue_last_modified_at DESC OFFSET ${offset} limit ${batchSize} ON CONFLICT (_supaglue_application_id, _supaglue_provider_name, _supaglue_customer_id, ${supaglueId}) DO UPDATE SET (${columnsToUpdateStr}) = (${excludedColumnsToUpdateStr})`); - childLogger.info({ offset }, 'Copying from deduped temp table to main table [COMPLETED]'); - heartbeat(); - } + childLogger.info({ offset }, 'Copying from deduped temp table to main table [COMPLETED]'); + heartbeat(); + } - childLogger.info('Copying from deduped temp table to main table [COMPLETED]'); + childLogger.info('Copying from deduped temp table to main table [COMPLETED]'); - // TODO: Watch for sql-injection attack around custom object name.... we should probably use a better - // sql library so we are not templating raw SQL strings - if (shouldDeleteRecords(diffAndDeleteRecords, providerName)) { - childLogger.info('Marking rows as deleted [IN PROGRESS]'); - await client.query(` + // TODO: Watch for sql-injection attack around custom object name.... we should probably use a better + // sql library so we are not templating raw SQL strings + if (shouldDeleteRecords(diffAndDeleteRecords, providerName)) { + childLogger.info('Marking rows as deleted [IN PROGRESS]'); + await client.query(` UPDATE ${qualifiedTable} AS destination SET _supaglue_is_deleted = TRUE WHERE @@ -478,22 +469,19 @@ DO UPDATE SET (${columnsToUpdateStr}) = (${excludedColumnsToUpdateStr})`); SELECT 1 FROM ${tempTable} AS temp WHERE temp._supaglue_id = destination._supaglue_id - + ); `); - childLogger.info('Marking rows as deleted [COMPLETED]'); - heartbeat(); - } + childLogger.info('Marking rows as deleted [COMPLETED]'); + heartbeat(); + } - // We don't drop deduped temp table here because we're closing the connection here anyway. + // We don't drop deduped temp table here because we're closing the connection here anyway. - return { - maxLastModifiedAt, - numRecords: tempTableRowCount, // TODO: not quite accurate (because there can be duplicates) but good enough - }; - } finally { - client.release(); - } + return { + maxLastModifiedAt, + numRecords: tempTableRowCount, // TODO: not quite accurate (because there can be duplicates) but good enough + }; } } diff --git a/packages/core/destination_writers/supaglue.ts b/packages/core/destination_writers/supaglue.ts index b05a45fa1..e5370aa67 100644 --- a/packages/core/destination_writers/supaglue.ts +++ b/packages/core/destination_writers/supaglue.ts @@ -25,6 +25,10 @@ import { BaseDestinationWriter } from './base'; import type { ObjectType } from './postgres_impl'; import { kCustomObject, PostgresDestinationWriterImpl } from './postgres_impl'; +const clientErrorListener = (err: Error) => { + logger.error({ err }, 'Postgres client error'); +}; + async function createPartitionIfNotExists( client: PoolClient, schema: string, @@ -50,7 +54,9 @@ export class SupaglueDestinationWriter extends BaseDestinationWriter { this.#pgPool = getPgPool(process.env.SUPAGLUE_MANAGED_DATABASE_URL!); } async #getClient(): Promise { - return await this.#pgPool.connect(); + const client = await this.#pgPool.connect(); + client.on('error', clientErrorListener); + return client; } async #setupCommonObjectTable( @@ -93,24 +99,29 @@ export class SupaglueDestinationWriter extends BaseDestinationWriter { const table = getCommonObjectTableName(connection.category, commonObjectType as CommonObjectType); const client = await this.#getClient(); - return await this.#writerImpl.upsertCommonObjectRecordImpl( - client, - connection, - commonObjectType, - record, - schema, - table, - async () => { - await this.#setupCommonObjectTable( - client, - schema, - table, - connection.category, - commonObjectType as CommonObjectType, - connection.customerId - ); - } - ); + try { + return await this.#writerImpl.upsertCommonObjectRecordImpl( + client, + connection, + commonObjectType, + record, + schema, + table, + async () => { + await this.#setupCommonObjectTable( + client, + schema, + table, + connection.category, + commonObjectType as CommonObjectType, + connection.customerId + ); + } + ); + } finally { + client.removeListener('error', clientErrorListener); + client.release(); + } } public override async writeCommonObjectRecords( @@ -126,27 +137,32 @@ export class SupaglueDestinationWriter extends BaseDestinationWriter { const client = await this.#getClient(); - return await this.#writerImpl.writeCommonObjectRecordsImpl( - client, - connection, - commonObjectType, - inputStream, - heartbeat, - diffAndDeleteRecords, - schema, - table, - async () => { - await this.#setupCommonObjectTable( - client, - schema, - table, - category, - commonObjectType, - customerId, - /* alsoCreateTempTable */ true - ); - } - ); + try { + return await this.#writerImpl.writeCommonObjectRecordsImpl( + client, + connection, + commonObjectType, + inputStream, + heartbeat, + diffAndDeleteRecords, + schema, + table, + async () => { + await this.#setupCommonObjectTable( + client, + schema, + table, + category, + commonObjectType, + customerId, + /* alsoCreateTempTable */ true + ); + } + ); + } finally { + client.removeListener('error', clientErrorListener); + client.release(); + } } public override async writeObjectRecords( @@ -204,16 +220,21 @@ export class SupaglueDestinationWriter extends BaseDestinationWriter { const schema = getSchemaName(connection.applicationId); const client = await this.#getClient(); - return await this.#writerImpl.upsertRecordImpl( - client, - connection, - schema, - table, - record, - async () => - await this.#setupStandardOrCustomObjectTable(client, schema, table, connection.customerId, false, objectType), - objectType - ); + try { + return await this.#writerImpl.upsertRecordImpl( + client, + connection, + schema, + table, + record, + async () => + await this.#setupStandardOrCustomObjectTable(client, schema, table, connection.customerId, false, objectType), + objectType + ); + } finally { + client.removeListener('error', clientErrorListener); + client.release(); + } } public override async writeEntityRecords( @@ -283,27 +304,32 @@ export class SupaglueDestinationWriter extends BaseDestinationWriter { const schema = getSchemaName(connection.applicationId); const client = await this.#getClient(); - return await this.#writerImpl.writeRecordsImpl( - client, - connection, - schema, - table, - inputStream, - heartbeat, - childLogger, - diffAndDeleteRecords, - async () => { - await this.#setupStandardOrCustomObjectTable( - client, - schema, - table, - connection.customerId, - /* alsoCreateTempTable */ true, - objectType - ); - }, - objectType - ); + try { + return await this.#writerImpl.writeRecordsImpl( + client, + connection, + schema, + table, + inputStream, + heartbeat, + childLogger, + diffAndDeleteRecords, + async () => { + await this.#setupStandardOrCustomObjectTable( + client, + schema, + table, + connection.customerId, + /* alsoCreateTempTable */ true, + objectType + ); + }, + objectType + ); + } finally { + client.removeListener('error', clientErrorListener); + client.release(); + } } }