Skip to content

Commit

Permalink
Addec connection factory and extended transaction with connection inf…
Browse files Browse the repository at this point in the history
…ormation
  • Loading branch information
oskardudycz committed Aug 7, 2024
1 parent 87c058e commit b6bd618
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 57 deletions.
68 changes: 66 additions & 2 deletions src/packages/dumbo/src/core/connections/connection.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
import type { WithSQLExecutor } from '../execute';
import type { DatabaseTransactionFactory } from './transaction';
import {
sqlExecutor,
type DbSQLExecutor,
type WithSQLExecutor,
} from '../execute';
import {
transactionFactoryWithDbClient,
type DatabaseTransaction,
type DatabaseTransactionFactory,
} from './transaction';

export interface Connection<
ConnectorType extends string = string,
Expand All @@ -20,3 +28,59 @@ export interface ConnectionFactory<
handle: (connection: ConnectionType) => Promise<Result>,
) => Promise<Result>;
}

export type CreateConnectionOptions<
ConnectorType extends string = string,
DbClient = unknown,
ConnectionType extends Connection<ConnectorType, DbClient> = Connection<
ConnectorType,
DbClient
>,
Executor extends DbSQLExecutor = DbSQLExecutor,
> = {
type: ConnectorType;
connect: Promise<DbClient>;
close: (client: DbClient) => Promise<void>;
initTransaction: (
connection: () => ConnectionType,
) => (client: Promise<DbClient>) => DatabaseTransaction<ConnectorType>;
executor: () => Executor;
};

export const createConnection = <
ConnectorType extends string = string,
DbClient = unknown,
ConnectionType extends Connection<ConnectorType, DbClient> = Connection<
ConnectorType,
DbClient
>,
Executor extends DbSQLExecutor = DbSQLExecutor,
>(
options: CreateConnectionOptions<
ConnectorType,
DbClient,
ConnectionType,
Executor
>,
): ConnectionType => {
const { type, connect, close, initTransaction, executor } = options;

let client: DbClient | null = null;

const getClient = async () => client ?? (client = await connect);

const connection: Connection<ConnectorType, DbClient> = {
type: type,
open: getClient,
close: () => (client ? close(client) : Promise.resolve()),
...transactionFactoryWithDbClient(
getClient,
initTransaction(() => typedConnection),
),
execute: sqlExecutor(executor(), { connect: getClient }),
};

const typedConnection = connection as ConnectionType;

return typedConnection;
};
7 changes: 5 additions & 2 deletions src/packages/dumbo/src/core/connections/transaction.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import type { WithSQLExecutor } from '../execute';
import { type Connection } from './connection';

export interface DatabaseTransaction<ConnectorType extends string = string>
extends WithSQLExecutor {
export interface DatabaseTransaction<
ConnectorType extends string = string,
DbClient = unknown,
> extends WithSQLExecutor {
type: ConnectorType;
connection: Connection<ConnectorType, DbClient>;
begin: () => Promise<void>;
commit: () => Promise<void>;
rollback: (error?: unknown) => Promise<void>;
Expand Down
4 changes: 2 additions & 2 deletions src/packages/dumbo/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ export type PoolOptions = {
connector?: ConnectorType;
};

export type DumboOptions = PoolOptions;
export type DumboOptions = PoolOptions & PostgresPoolOptions;
export type Dumbo = PostgresPool;

export const connectionPool = <PoolOptionsType extends PoolOptions>(
export const connectionPool = <PoolOptionsType extends DumboOptions>(
options: PoolOptionsType,
) =>
// TODO: this should have the pattern matching and verification
Expand Down
38 changes: 13 additions & 25 deletions src/packages/dumbo/src/postgres/pg/connections/connection.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import pg from 'pg';
import {
sqlExecutor,
transactionFactoryWithDbClient,
type Connection,
} from '../../../core';
import { createConnection, type Connection } from '../../../core';
import { nodePostgresSQLExecutor } from '../execute';
import { nodePostgresTransaction } from './transaction';

Expand Down Expand Up @@ -45,35 +41,27 @@ export const nodePostgresClientConnection = (
): NodePostgresClientConnection => {
const { connect, close } = options;

let client: pg.Client | null = null;

const getClient = async () => client ?? (client = await connect);

return {
return createConnection({
type: NodePostgresConnectorType,
open: getClient,
close: () => (client ? close(client) : Promise.resolve()),
...transactionFactoryWithDbClient(getClient, nodePostgresTransaction),
execute: sqlExecutor(nodePostgresSQLExecutor(), { connect: getClient }),
};
connect,
close,
initTransaction: (connection) => nodePostgresTransaction(connection),
executor: nodePostgresSQLExecutor,
});
};

export const nodePostgresPoolClientConnection = (
options: NodePostgresPoolClientOptions,
): NodePostgresPoolClientConnection => {
const { connect, close } = options;

let client: pg.PoolClient | null = null;

const getClient = async () => client ?? (client = await connect);

return {
return createConnection({
type: NodePostgresConnectorType,
open: getClient,
close: () => (client ? close(client) : Promise.resolve()),
...transactionFactoryWithDbClient(getClient, nodePostgresTransaction),
execute: sqlExecutor(nodePostgresSQLExecutor(), { connect: getClient }),
};
connect,
close,
initTransaction: (connection) => nodePostgresTransaction(connection),
executor: nodePostgresSQLExecutor,
});
};

export function nodePostgresConnection(
Expand Down
57 changes: 33 additions & 24 deletions src/packages/dumbo/src/postgres/pg/connections/transaction.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { sqlExecutor, type DatabaseTransaction } from '../../../core';
import {
sqlExecutor,
type Connection,
type DatabaseTransaction,
} from '../../../core';
import { nodePostgresSQLExecutor } from '../execute';
import {
NodePostgresConnectorType,
Expand All @@ -9,29 +13,34 @@ import {
export type NodePostgresTransaction =
DatabaseTransaction<NodePostgresConnector>;

export const nodePostgresTransaction = <
DbClient extends NodePostgresPoolOrClient = NodePostgresPoolOrClient,
>(
getClient: Promise<DbClient>,
options?: { close: (client: DbClient, error?: unknown) => Promise<void> },
): DatabaseTransaction<NodePostgresConnector> => ({
type: NodePostgresConnectorType,
begin: async () => {
const client = await getClient;
await client.query('BEGIN');
},
commit: async () => {
const client = await getClient;
export const nodePostgresTransaction =
<DbClient extends NodePostgresPoolOrClient = NodePostgresPoolOrClient>(
connection: () => Connection<NodePostgresConnector, DbClient>,
) =>
(
getClient: Promise<DbClient>,
options?: { close: (client: DbClient, error?: unknown) => Promise<void> },
): DatabaseTransaction<NodePostgresConnector> => ({
connection: connection(),
type: NodePostgresConnectorType,
begin: async () => {
const client = await getClient;
await client.query('BEGIN');
},
commit: async () => {
const client = await getClient;

await client.query('COMMIT');
await client.query('COMMIT');

if (options?.close) await options?.close(client);
},
rollback: async (error?: unknown) => {
const client = await getClient;
await client.query('ROLLBACK');
if (options?.close) await options?.close(client);
},
rollback: async (error?: unknown) => {
const client = await getClient;
await client.query('ROLLBACK');

if (options?.close) await options?.close(client, error);
},
execute: sqlExecutor(nodePostgresSQLExecutor(), { connect: () => getClient }),
});
if (options?.close) await options?.close(client, error);
},
execute: sqlExecutor(nodePostgresSQLExecutor(), {
connect: () => getClient,
}),
});
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { isNodePostgresNativePool } from '@event-driven-io/dumbo';
import { dumbo, isNodePostgresNativePool } from '@event-driven-io/dumbo';
import {
PostgreSqlContainer,
type StartedPostgreSqlContainer,
Expand Down Expand Up @@ -81,5 +81,21 @@ void describe('Pongo collection', () => {
await client.end();
}
});

void it('connects using existing connection client', async () => {
const pool = dumbo({ connectionString });

try {
await pool.withTransaction(async ({ connection }) => {
const pongo = pongoClient(connectionString, { connection });

const users = pongo.db().collection<User>('connections');
await users.insertOne({ name: randomUUID() });
await users.insertOne({ name: randomUUID() });
});
} finally {
await pool.close();
}
});
});
});
9 changes: 8 additions & 1 deletion src/packages/pongo/src/core/pongoClient.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import { NodePostgresConnectorType } from '@event-driven-io/dumbo';
import {
NodePostgresConnectorType,
type NodePostgresConnection,
} from '@event-driven-io/dumbo';
import pg from 'pg';
import type { PostgresDbClientOptions } from '../postgres';
import { getPongoDb, type AllowedDbClientOptions } from './pongoDb';
Expand Down Expand Up @@ -29,6 +32,10 @@ export type NotPooledPongoOptions =
| {
client: pg.Client;
pooled: false;
}
| {
connection: NodePostgresConnection;
pooled?: false;
};

export type PongoClientOptions =
Expand Down

0 comments on commit b6bd618

Please sign in to comment.