From b6bd6181c59c8f700dad2b27e38db18a0259b380 Mon Sep 17 00:00:00 2001 From: Oskar Dudycz Date: Wed, 7 Aug 2024 11:53:41 +0200 Subject: [PATCH] Addec connection factory and extended transaction with connection information --- .../dumbo/src/core/connections/connection.ts | 68 ++++++++++++++++++- .../dumbo/src/core/connections/transaction.ts | 7 +- src/packages/dumbo/src/index.ts | 4 +- .../src/postgres/pg/connections/connection.ts | 38 ++++------- .../postgres/pg/connections/transaction.ts | 57 +++++++++------- ...ts => pongoClient.connections.int.spec.ts} | 18 ++++- src/packages/pongo/src/core/pongoClient.ts | 9 ++- 7 files changed, 144 insertions(+), 57 deletions(-) rename src/packages/pongo/src/core/{pongoClient.connections.e2e.spec.ts => pongoClient.connections.int.spec.ts} (77%) diff --git a/src/packages/dumbo/src/core/connections/connection.ts b/src/packages/dumbo/src/core/connections/connection.ts index d5e722c..c4021e7 100644 --- a/src/packages/dumbo/src/core/connections/connection.ts +++ b/src/packages/dumbo/src/core/connections/connection.ts @@ -1,5 +1,13 @@ -import type { WithSQLExecutor } from '../execute'; -import type { DatabaseTransactionFactory } from './transaction'; +import { + sqlExecutor, + type DbSQLExecutor, + type WithSQLExecutor, +} from '../execute'; +import { + transactionFactoryWithDbClient, + type DatabaseTransaction, + type DatabaseTransactionFactory, +} from './transaction'; export interface Connection< ConnectorType extends string = string, @@ -20,3 +28,59 @@ export interface ConnectionFactory< handle: (connection: ConnectionType) => Promise, ) => Promise; } + +export type CreateConnectionOptions< + ConnectorType extends string = string, + DbClient = unknown, + ConnectionType extends Connection = Connection< + ConnectorType, + DbClient + >, + Executor extends DbSQLExecutor = DbSQLExecutor, +> = { + type: ConnectorType; + connect: Promise; + close: (client: DbClient) => Promise; + initTransaction: ( + connection: () => ConnectionType, + ) => (client: Promise) => DatabaseTransaction; + executor: () => Executor; +}; + +export const createConnection = < + ConnectorType extends string = string, + DbClient = unknown, + ConnectionType extends Connection = Connection< + ConnectorType, + DbClient + >, + Executor extends DbSQLExecutor = DbSQLExecutor, +>( + options: CreateConnectionOptions< + ConnectorType, + DbClient, + ConnectionType, + Executor + >, +): ConnectionType => { + const { type, connect, close, initTransaction, executor } = options; + + let client: DbClient | null = null; + + const getClient = async () => client ?? (client = await connect); + + const connection: Connection = { + type: type, + open: getClient, + close: () => (client ? close(client) : Promise.resolve()), + ...transactionFactoryWithDbClient( + getClient, + initTransaction(() => typedConnection), + ), + execute: sqlExecutor(executor(), { connect: getClient }), + }; + + const typedConnection = connection as ConnectionType; + + return typedConnection; +}; diff --git a/src/packages/dumbo/src/core/connections/transaction.ts b/src/packages/dumbo/src/core/connections/transaction.ts index d68fcd0..758af1c 100644 --- a/src/packages/dumbo/src/core/connections/transaction.ts +++ b/src/packages/dumbo/src/core/connections/transaction.ts @@ -1,9 +1,12 @@ import type { WithSQLExecutor } from '../execute'; import { type Connection } from './connection'; -export interface DatabaseTransaction - extends WithSQLExecutor { +export interface DatabaseTransaction< + ConnectorType extends string = string, + DbClient = unknown, +> extends WithSQLExecutor { type: ConnectorType; + connection: Connection; begin: () => Promise; commit: () => Promise; rollback: (error?: unknown) => Promise; diff --git a/src/packages/dumbo/src/index.ts b/src/packages/dumbo/src/index.ts index 7fad63d..3d5c21d 100644 --- a/src/packages/dumbo/src/index.ts +++ b/src/packages/dumbo/src/index.ts @@ -14,10 +14,10 @@ export type PoolOptions = { connector?: ConnectorType; }; -export type DumboOptions = PoolOptions; +export type DumboOptions = PoolOptions & PostgresPoolOptions; export type Dumbo = PostgresPool; -export const connectionPool = ( +export const connectionPool = ( options: PoolOptionsType, ) => // TODO: this should have the pattern matching and verification diff --git a/src/packages/dumbo/src/postgres/pg/connections/connection.ts b/src/packages/dumbo/src/postgres/pg/connections/connection.ts index ea4d210..26d49e2 100644 --- a/src/packages/dumbo/src/postgres/pg/connections/connection.ts +++ b/src/packages/dumbo/src/postgres/pg/connections/connection.ts @@ -1,9 +1,5 @@ import pg from 'pg'; -import { - sqlExecutor, - transactionFactoryWithDbClient, - type Connection, -} from '../../../core'; +import { createConnection, type Connection } from '../../../core'; import { nodePostgresSQLExecutor } from '../execute'; import { nodePostgresTransaction } from './transaction'; @@ -45,17 +41,13 @@ export const nodePostgresClientConnection = ( ): NodePostgresClientConnection => { const { connect, close } = options; - let client: pg.Client | null = null; - - const getClient = async () => client ?? (client = await connect); - - return { + return createConnection({ type: NodePostgresConnectorType, - open: getClient, - close: () => (client ? close(client) : Promise.resolve()), - ...transactionFactoryWithDbClient(getClient, nodePostgresTransaction), - execute: sqlExecutor(nodePostgresSQLExecutor(), { connect: getClient }), - }; + connect, + close, + initTransaction: (connection) => nodePostgresTransaction(connection), + executor: nodePostgresSQLExecutor, + }); }; export const nodePostgresPoolClientConnection = ( @@ -63,17 +55,13 @@ export const nodePostgresPoolClientConnection = ( ): NodePostgresPoolClientConnection => { const { connect, close } = options; - let client: pg.PoolClient | null = null; - - const getClient = async () => client ?? (client = await connect); - - return { + return createConnection({ type: NodePostgresConnectorType, - open: getClient, - close: () => (client ? close(client) : Promise.resolve()), - ...transactionFactoryWithDbClient(getClient, nodePostgresTransaction), - execute: sqlExecutor(nodePostgresSQLExecutor(), { connect: getClient }), - }; + connect, + close, + initTransaction: (connection) => nodePostgresTransaction(connection), + executor: nodePostgresSQLExecutor, + }); }; export function nodePostgresConnection( diff --git a/src/packages/dumbo/src/postgres/pg/connections/transaction.ts b/src/packages/dumbo/src/postgres/pg/connections/transaction.ts index a68d9dc..33a3d5b 100644 --- a/src/packages/dumbo/src/postgres/pg/connections/transaction.ts +++ b/src/packages/dumbo/src/postgres/pg/connections/transaction.ts @@ -1,4 +1,8 @@ -import { sqlExecutor, type DatabaseTransaction } from '../../../core'; +import { + sqlExecutor, + type Connection, + type DatabaseTransaction, +} from '../../../core'; import { nodePostgresSQLExecutor } from '../execute'; import { NodePostgresConnectorType, @@ -9,29 +13,34 @@ import { export type NodePostgresTransaction = DatabaseTransaction; -export const nodePostgresTransaction = < - DbClient extends NodePostgresPoolOrClient = NodePostgresPoolOrClient, ->( - getClient: Promise, - options?: { close: (client: DbClient, error?: unknown) => Promise }, -): DatabaseTransaction => ({ - type: NodePostgresConnectorType, - begin: async () => { - const client = await getClient; - await client.query('BEGIN'); - }, - commit: async () => { - const client = await getClient; +export const nodePostgresTransaction = + ( + connection: () => Connection, + ) => + ( + getClient: Promise, + options?: { close: (client: DbClient, error?: unknown) => Promise }, + ): DatabaseTransaction => ({ + connection: connection(), + type: NodePostgresConnectorType, + begin: async () => { + const client = await getClient; + await client.query('BEGIN'); + }, + commit: async () => { + const client = await getClient; - await client.query('COMMIT'); + await client.query('COMMIT'); - if (options?.close) await options?.close(client); - }, - rollback: async (error?: unknown) => { - const client = await getClient; - await client.query('ROLLBACK'); + if (options?.close) await options?.close(client); + }, + rollback: async (error?: unknown) => { + const client = await getClient; + await client.query('ROLLBACK'); - if (options?.close) await options?.close(client, error); - }, - execute: sqlExecutor(nodePostgresSQLExecutor(), { connect: () => getClient }), -}); + if (options?.close) await options?.close(client, error); + }, + execute: sqlExecutor(nodePostgresSQLExecutor(), { + connect: () => getClient, + }), + }); diff --git a/src/packages/pongo/src/core/pongoClient.connections.e2e.spec.ts b/src/packages/pongo/src/core/pongoClient.connections.int.spec.ts similarity index 77% rename from src/packages/pongo/src/core/pongoClient.connections.e2e.spec.ts rename to src/packages/pongo/src/core/pongoClient.connections.int.spec.ts index e829df4..bbad940 100644 --- a/src/packages/pongo/src/core/pongoClient.connections.e2e.spec.ts +++ b/src/packages/pongo/src/core/pongoClient.connections.int.spec.ts @@ -1,4 +1,4 @@ -import { isNodePostgresNativePool } from '@event-driven-io/dumbo'; +import { dumbo, isNodePostgresNativePool } from '@event-driven-io/dumbo'; import { PostgreSqlContainer, type StartedPostgreSqlContainer, @@ -81,5 +81,21 @@ void describe('Pongo collection', () => { await client.end(); } }); + + void it('connects using existing connection client', async () => { + const pool = dumbo({ connectionString }); + + try { + await pool.withTransaction(async ({ connection }) => { + const pongo = pongoClient(connectionString, { connection }); + + const users = pongo.db().collection('connections'); + await users.insertOne({ name: randomUUID() }); + await users.insertOne({ name: randomUUID() }); + }); + } finally { + await pool.close(); + } + }); }); }); diff --git a/src/packages/pongo/src/core/pongoClient.ts b/src/packages/pongo/src/core/pongoClient.ts index 8272874..89cf03d 100644 --- a/src/packages/pongo/src/core/pongoClient.ts +++ b/src/packages/pongo/src/core/pongoClient.ts @@ -1,4 +1,7 @@ -import { NodePostgresConnectorType } from '@event-driven-io/dumbo'; +import { + NodePostgresConnectorType, + type NodePostgresConnection, +} from '@event-driven-io/dumbo'; import pg from 'pg'; import type { PostgresDbClientOptions } from '../postgres'; import { getPongoDb, type AllowedDbClientOptions } from './pongoDb'; @@ -29,6 +32,10 @@ export type NotPooledPongoOptions = | { client: pg.Client; pooled: false; + } + | { + connection: NodePostgresConnection; + pooled?: false; }; export type PongoClientOptions =