Skip to content
This repository has been archived by the owner on Mar 10, 2024. It is now read-only.

Commit

Permalink
fix: pg error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
lucasmarshall committed Dec 12, 2023
1 parent 7d3c0e8 commit da47932
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 65 deletions.
12 changes: 11 additions & 1 deletion apps/api/services/managed_data_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
getPgPool,
getSchemaName,
isForward,
logger,
MAX_PAGE_SIZE,
} from '@supaglue/core/lib';
import { getCategoryForProvider } from '@supaglue/core/remotes';
Expand All @@ -33,6 +34,10 @@ import type {
import type { ObjectType } from '@supaglue/types/sync';
import type { Pool, PoolClient } from 'pg';

const clientErrorListener = (err: Error) => {
logger.error({ err }, 'Postgres client error');
};

export class ManagedDataService {
#pgPool: Pool;
#syncService: SyncService;
Expand All @@ -44,7 +49,11 @@ export class ManagedDataService {
}

async #getClient(): Promise<PoolClient> {
return await this.#pgPool.connect();
const client = await this.#pgPool.connect();

client.on('error', clientErrorListener);

return client;
}

async #getRecords<T extends Record<string, unknown>>(
Expand Down Expand Up @@ -117,6 +126,7 @@ export class ManagedDataService {
cursor
);
} finally {
client.removeListener('error', clientErrorListener);
client.release();
}
}
Expand Down
167 changes: 104 additions & 63 deletions packages/core/destination_writers/postgres.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import { BaseDestinationWriter } from './base';
import type { ObjectType } from './postgres_impl';
import { kCustomObject, PostgresDestinationWriterImpl } from './postgres_impl';

const clientErrorListener = (err: Error) => {
logger.error({ err }, 'Postgres client error');
};

export class PostgresDestinationWriter extends BaseDestinationWriter {
readonly #destination: DestinationUnsafe<'postgres'>;
#writerImpl: PostgresDestinationWriterImpl;
Expand All @@ -41,7 +45,16 @@ export class PostgresDestinationWriter extends BaseDestinationWriter {
max: 20,
ssl: getSsl(this.#destination.config),
});
return await pool.connect();

pool.on('error', (err) => {
logger.error({ err }, 'Postgres pool error');
});

const client = await pool.connect();

client.on('error', clientErrorListener);

return client;
}

#getSchema(connectionSyncConfig?: ConnectionSyncConfig): string {
Expand All @@ -64,15 +77,20 @@ export class PostgresDestinationWriter extends BaseDestinationWriter {

const client = await this.#getClient();

return await this.#writerImpl.upsertCommonObjectRecordImpl(
client,
connection,
commonObjectType,
record,
schema,
table,
async () => await this.#setupCommonObjectTable(client, schema, table, category, commonObjectType)
);
try {
return await this.#writerImpl.upsertCommonObjectRecordImpl(
client,
connection,
commonObjectType,
record,
schema,
table,
async () => await this.#setupCommonObjectTable(client, schema, table, category, commonObjectType)
);
} finally {
client.removeListener('error', clientErrorListener);
client.release();
}
}

public override async upsertCommonObjectRecord<P extends ProviderCategory, T extends CommonObjectTypeForCategory<P>>(
Expand All @@ -87,15 +105,20 @@ export class PostgresDestinationWriter extends BaseDestinationWriter {
const schema = this.#getSchema(connectionSyncConfig);
const table = getCommonObjectTableName(category, commonObjectType);
const client = await this.#getClient();
return await this.#writerImpl.upsertCommonObjectRecordImpl(
client,
connection,
commonObjectType,
record,
schema,
table,
async () => this.#setupCommonObjectTable(client, schema, table, category, commonObjectType)
);
try {
return await this.#writerImpl.upsertCommonObjectRecordImpl(
client,
connection,
commonObjectType,
record,
schema,
table,
async () => this.#setupCommonObjectTable(client, schema, table, category, commonObjectType)
);
} finally {
client.removeListener('error', clientErrorListener);
client.release();
}
}

async #setupCommonObjectTable(
Expand Down Expand Up @@ -133,26 +156,32 @@ export class PostgresDestinationWriter extends BaseDestinationWriter {
const schema = this.#getSchema(connectionSyncConfig);
const table = getCommonObjectTableName(category, commonObjectType);
const client = await this.#getClient();
return await this.#writerImpl.writeCommonObjectRecordsImpl(
client,
connection,
commonObjectType,
inputStream,
heartbeat,
diffAndDeleteRecords,
schema,
table,
async () => {
await this.#setupCommonObjectTable(
client,
schema,
table,
category,
commonObjectType,
/* alsoCreateTempTable */ true
);
}
);

try {
return await this.#writerImpl.writeCommonObjectRecordsImpl(
client,
connection,
commonObjectType,
inputStream,
heartbeat,
diffAndDeleteRecords,
schema,
table,
async () => {
await this.#setupCommonObjectTable(
client,
schema,
table,
category,
commonObjectType,
/* alsoCreateTempTable */ true
);
}
);
} finally {
client.removeListener('error', clientErrorListener);
client.release();
}
}

public override async writeObjectRecords(
Expand Down Expand Up @@ -272,16 +301,22 @@ export class PostgresDestinationWriter extends BaseDestinationWriter {
): Promise<void> {
const schema = this.#getSchema(connection.connectionSyncConfig);
const client = await this.#getClient();
await this.#writerImpl.upsertRecordImpl(
client,
connection,
schema,
table,
record,
async () =>
await this.#setupObjectOrEntityTable(client, table, schema, /* alsoCreateTempTable */ false, objectType),
objectType
);

try {
await this.#writerImpl.upsertRecordImpl(
client,
connection,
schema,
table,
record,
async () =>
await this.#setupObjectOrEntityTable(client, table, schema, /* alsoCreateTempTable */ false, objectType),
objectType
);
} finally {
client.removeListener('error', clientErrorListener);
client.release();
}
}

async #writeRecords(
Expand All @@ -295,20 +330,26 @@ export class PostgresDestinationWriter extends BaseDestinationWriter {
): Promise<WriteObjectRecordsResult> {
const schema = this.#getSchema(connection.connectionSyncConfig);
const client = await this.#getClient();
return await this.#writerImpl.writeRecordsImpl(
client,
connection,
schema,
table,
inputStream,
heartbeat,
childLogger,
diffAndDeleteRecords,
async () => {
await this.#setupObjectOrEntityTable(client, table, schema, /* alsoCreateTempTable */ true, objectType);
},
objectType
);

try {
return await this.#writerImpl.writeRecordsImpl(
client,
connection,
schema,
table,
inputStream,
heartbeat,
childLogger,
diffAndDeleteRecords,
async () => {
await this.#setupObjectOrEntityTable(client, table, schema, /* alsoCreateTempTable */ true, objectType);
},
objectType
);
} finally {
client.removeListener('error', clientErrorListener);
client.release();
}
}
}

Expand Down
9 changes: 8 additions & 1 deletion packages/core/lib/postgres.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,21 @@ import {
keysOfSnakecasedSequenceStepWithTenant,
keysOfSnakecasedSequenceWithTenant,
} from '../keys/engagement';
import { logger } from './logger';

export const getPgPool = (connectionString: string): Pool => {
const connectionConfig = parse(connectionString) as PoolConfig;

return new Pool({
const pool = new Pool({
max: 5,
...connectionConfig,
});

pool.on('error', (err) => {
logger.error({ err }, 'Postgres pool error');
});

return pool;
};

export function sanitizeForPostgres(tableName: string): string {
Expand Down

0 comments on commit da47932

Please sign in to comment.