Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
-- Create a custom schema to test schema-aware replication
create schema if not exists "private";

create table "private"."humans" (
"passportId" text primary key,
"firstName" text not null,
"lastName" text not null,
"age" integer,

"_deleted" boolean DEFAULT false NOT NULL,
"_modified" timestamp with time zone DEFAULT now() NOT NULL
);

-- auto-update the _modified timestamp
CREATE TRIGGER update_modified_datetime BEFORE UPDATE ON private.humans FOR EACH ROW
EXECUTE FUNCTION extensions.moddatetime('_modified');

-- add a table to the publication so we can subscribe to changes
alter publication supabase_realtime add table "private"."humans";

grant usage on schema "private" to "anon";
grant usage on schema "private" to "authenticated";
grant usage on schema "private" to "service_role";

grant delete on table "private"."humans" to "anon";
grant insert on table "private"."humans" to "anon";
grant references on table "private"."humans" to "anon";
grant select on table "private"."humans" to "anon";
grant trigger on table "private"."humans" to "anon";
grant truncate on table "private"."humans" to "anon";
grant update on table "private"."humans" to "anon";

grant delete on table "private"."humans" to "authenticated";
grant insert on table "private"."humans" to "authenticated";
grant references on table "private"."humans" to "authenticated";
grant select on table "private"."humans" to "authenticated";
grant trigger on table "private"."humans" to "authenticated";
grant truncate on table "private"."humans" to "authenticated";
grant update on table "private"."humans" to "authenticated";

grant delete on table "private"."humans" to "service_role";
grant insert on table "private"."humans" to "service_role";
grant references on table "private"."humans" to "service_role";
grant select on table "private"."humans" to "service_role";
grant trigger on table "private"."humans" to "service_role";
grant truncate on table "private"."humans" to "service_role";
grant update on table "private"."humans" to "service_role";
1 change: 1 addition & 0 deletions docs-src/docs/releases/17.0.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ To improve vibe-coding when working with RxDB directly we:
- **FIX** add event guard to count and findByIds in _execOverDatabase [#7864](https://github.com/pubkey/rxdb/pull/7864)
- **FIX**: memory leak in migratePromise() [#7787](https://github.com/pubkey/rxdb/pull/7787)
- **FIX** `exclusiveMinimum` and `exclusiveMaximum` TypeScript types corrected from `boolean` to `number` to match JSON Schema Draft 6+ [#7962](https://github.com/pubkey/rxdb/pull/7962)
- **ADD** `schemaName` option to the [Supabase Replication Plugin](../replication-supabase.md) to replicate tables from non-public Postgres schemas [#7963](https://github.com/pubkey/rxdb/pull/7963)
- **ADD** `waitBeforePersist` option to `ReplicationPushOptions` to delay upstream persistence cycles, enabling write batching across collections and CPU-idle deferral [#7872](https://github.com/pubkey/rxdb/issues/7872)
- **ADD** enforce maximum length for indexes and primary keys (`maxLength: 2048`)
- **CHANGE** `final` schema fields no longer need to be marked as `required`
Expand Down
44 changes: 44 additions & 0 deletions docs-src/docs/replication-supabase.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,50 @@ Supabase returns `null` for nullable columns, but in RxDB you often model those
:::


## Using a Custom Postgres Schema

By default, the plugin targets the `public` schema. If your tables live in a different Postgres schema, pass `schemaName` to `replicateSupabase`:

```ts
const replication = replicateSupabase({
tableName: 'humans',
schemaName: 'private', // default is "public"
client: supabase,
collection: db.humans,
replicationIdentifier: 'humans-private-schema',
pull: { batchSize: 50 },
push: { batchSize: 50 },
});
```

You also need to:
- Create the table inside that schema and grant the required roles access to both the schema and the table.
- Add the table to the `supabase_realtime` publication if you use live replication.

Example SQL to set up a `private` schema with a `humans` table:

```sql
create schema if not exists "private";

create table "private"."humans" (
"passportId" text primary key,
"firstName" text not null,
"lastName" text not null,
"age" integer,
"_deleted" boolean DEFAULT false NOT NULL,
"_modified" timestamp with time zone DEFAULT now() NOT NULL
);

CREATE TRIGGER update_modified_datetime BEFORE UPDATE ON private.humans FOR EACH ROW
EXECUTE FUNCTION extensions.moddatetime('_modified');

alter publication supabase_realtime add table "private"."humans";

grant usage on schema "private" to "anon";
grant select, insert, update, delete on table "private"."humans" to "anon";
```


## Using Joins

You can use the `pull.queryBuilder` to use joins and also pull data from related tables.
Expand Down
21 changes: 16 additions & 5 deletions src/plugins/replication-supabase/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,20 @@ export function replicateSupabase<RxDocType>(
// set defaults
options.waitForLeadership = typeof options.waitForLeadership === 'undefined' ? true : options.waitForLeadership;
options.live = typeof options.live === 'undefined' ? true : options.live;
const schemaName = typeof options.schemaName === 'undefined' ? 'public' : options.schemaName;
const modifiedField = options.modifiedField ? options.modifiedField : DEFAULT_MODIFIED_FIELD;
const deletedField = options.deletedField ? options.deletedField : DEFAULT_DELETED_FIELD;

/**
* For the default 'public' schema we use the client directly
* to avoid sending an explicit Accept-Profile header, which is
* not needed and can cause "operation was canceled" errors in
* some PostgREST setups.
*/
const schemaClient = schemaName === 'public'
? options.client
: options.client.schema(schemaName);

const pullStream$: Subject<RxReplicationPullStreamItem<RxDocType, SupabaseCheckpoint>> = new Subject();
let replicationPrimitivesPull: ReplicationPullOptions<RxDocType, SupabaseCheckpoint> | undefined;

Expand All @@ -86,7 +97,7 @@ export function replicateSupabase<RxDocType>(
return doc;
}
async function fetchById(id: string): Promise<WithDeleted<RxDocType>> {
const { data, error } = await options.client
const { data, error } = await schemaClient
.from(options.tableName)
.select()
.eq(primaryPath, id)
Expand All @@ -103,7 +114,7 @@ export function replicateSupabase<RxDocType>(
lastPulledCheckpoint: SupabaseCheckpoint | undefined,
batchSize: number
) {
let query = options.client
let query = schemaClient
.from(options.tableName)
.select('*');

Expand Down Expand Up @@ -169,7 +180,7 @@ export function replicateSupabase<RxDocType>(
) {
async function insertOrReturnConflict(doc: WithDeleted<RxDocType>): Promise<WithDeleted<RxDocType> | undefined> {
const id = (doc as any)[primaryPath];
const { error } = await options.client.from(options.tableName).insert(doc)
const { error } = await schemaClient.from(options.tableName).insert(doc)
if (!error) {
return;
} else if (error.code == POSTGRES_INSERT_CONFLICT_CODE) {
Expand Down Expand Up @@ -197,7 +208,7 @@ export function replicateSupabase<RxDocType>(
// modified field will be set server-side
delete toRow[modifiedField];

let query = options.client
let query = schemaClient
.from(options.tableName)
.update(toRow);

Expand Down Expand Up @@ -262,7 +273,7 @@ export function replicateSupabase<RxDocType>(
.channel('realtime:' + options.tableName)
.on(
'postgres_changes',
{ event: '*', schema: 'public', table: options.tableName },
{ event: '*', schema: schemaName, table: options.tableName },
(payload) => {
/**
* We assume soft-deletes in supabase
Expand Down
5 changes: 5 additions & 0 deletions src/plugins/replication-supabase/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ export type SyncOptionsSupabase<RxDocType> = Omit<
client: SupabaseClient;
tableName: string;

/**
* The Postgres schema to use. Default: "public"
*/
schemaName?: string;

/**
* Modified field, default "_modified"
*/
Expand Down
94 changes: 94 additions & 0 deletions test/replication-supabase.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,100 @@ describe('replication-supabase.test.ts', function () {
});
});

describe('schemaName', () => {
const privateSchemaTableName = 'humans';
const privateSchemaName = 'private';

async function getPrivateSchemaServerState(): Promise<WithDeleted<TestDocType>[]> {
const { data, error } = await supabase
.schema(privateSchemaName)
.from(privateSchemaTableName)
.select('*');
if (error) {
throw error;
}
return data;
}
async function cleanUpPrivateSchemaServer() {
const { error } = await supabase
.schema(privateSchemaName)
.from(privateSchemaTableName)
.delete()
.neq(primaryPath, 0);
if (error) {
throw error;
}
}
it('should push and pull documents using a custom schemaName', async () => {
await cleanUpPrivateSchemaServer();

const collection = await humansCollection.createPrimary(5, undefined, false);

const replicationState = replicateSupabase<TestDocType>({
tableName: privateSchemaTableName,
schemaName: privateSchemaName,
client: supabase,
replicationIdentifier: randomToken(10),
collection,
live: false,
pull: {
batchSize,
modifier: d => {
if (!d.age) {
delete d.age;
}
return d;
}
},
push: {
batchSize
}
});
ensureReplicationHasNoErrors(replicationState);
await replicationState.awaitInitialReplication();
await replicationState.awaitInSync();
await replicationState.cancel();

const serverState = await getPrivateSchemaServerState();
assert.strictEqual(serverState.length, 5, 'must have pushed all docs to the custom schema');

// also verify that the public schema was not touched
const publicServerState = await getServerState();
assert.strictEqual(publicServerState.length, 0, 'public schema must be empty');

// pull from the custom schema into a fresh collection
const collection2 = await humansCollection.createPrimary(0, undefined, false);
const replicationState2 = replicateSupabase<TestDocType>({
tableName: privateSchemaTableName,
schemaName: privateSchemaName,
client: supabase,
replicationIdentifier: randomToken(10),
collection: collection2,
live: false,
pull: {
batchSize,
modifier: d => {
if (!d.age) {
delete d.age;
}
return d;
}
}
});
ensureReplicationHasNoErrors(replicationState2);
await replicationState2.awaitInitialReplication();
await replicationState2.awaitInSync();
await replicationState2.cancel();

const docs2 = await collection2.find().exec();
assert.strictEqual(docs2.length, 5, 'must have pulled all docs from the custom schema');

await collection.database.close();
await collection2.database.close();
await cleanUpPrivateSchemaServer();
});
});

describe('issues', () => {
it('#7513 push.modifier is never applied', async () => {
await cleanUpServer();
Expand Down
Loading