Skip to content

Commit 90250f2

Browse files
authored
feat: add analytics bucket sharding (#796)
1 parent b9acc7c commit 90250f2

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+2592
-593
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
DO $$
2+
DECLARE
3+
iceberg_shards text[] = COALESCE(current_setting('storage.iceberg_shards', true), '[]::text[]')::text[];
4+
iceberg_default_shard text = COALESCE(current_setting('storage.iceberg_default_shard', true), '')::text;
5+
i_shard_key text;
6+
BEGIN
7+
8+
ALTER TABLE iceberg_namespaces ADD COLUMN IF NOT EXISTS metadata JSONB NOT NULL DEFAULT '{}';
9+
10+
ALTER TABLE iceberg_tables ADD COLUMN IF NOT EXISTS remote_table_id TEXT NULL;
11+
ALTER TABLE iceberg_tables ADD COLUMN IF NOT EXISTS shard_key TEXT NULL;
12+
ALTER TABLE iceberg_tables ADD COLUMN IF NOT EXISTS shard_id bigint NULL;
13+
14+
-- Only allow deleting namespaces if empty
15+
ALTER TABLE iceberg_tables DROP CONSTRAINT IF EXISTS iceberg_tables_namespace_id_fkey;
16+
ALTER TABLE iceberg_tables DROP CONSTRAINT IF EXISTS iceberg_tables_namespace_id_fkey;
17+
18+
ALTER TABLE iceberg_tables
19+
ADD CONSTRAINT iceberg_tables_namespace_id_fkey
20+
FOREIGN KEY (namespace_id)
21+
REFERENCES iceberg_namespaces(id) ON DELETE RESTRICT;
22+
23+
IF array_length(iceberg_shards, 1) = 0 THEN
24+
RETURN;
25+
END IF;
26+
27+
FOREACH i_shard_key IN ARRAY iceberg_shards
28+
LOOP
29+
INSERT INTO shard (kind, shard_key, capacity) VALUES ('iceberg-table', i_shard_key, 10000)
30+
ON CONFLICT (kind, shard_key) DO NOTHING;
31+
END LOOP;
32+
33+
UPDATE iceberg_tables
34+
SET shard_id = (
35+
SELECT id FROM shard WHERE kind = 'iceberg-table' AND shard_key = iceberg_default_shard LIMIT 1
36+
), shard_key = iceberg_default_shard
37+
WHERE shard_id IS NULL;
38+
END
39+
$$;
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
-- postgres-migrations disable-transaction
2+
DO $$
3+
BEGIN
4+
DROP INDEX IF EXISTS idx_iceberg_namespaces_bucket_id;
5+
DROP INDEX IF EXISTS idx_iceberg_tables_tenant_namespace_id;
6+
DROP INDEX IF EXISTS idx_iceberg_tables_tenant_location;
7+
DROP INDEX IF EXISTS idx_iceberg_tables_location;
8+
9+
-- remove primary key on iceberg_catalogs id
10+
ALTER TABLE iceberg_catalogs DROP CONSTRAINT IF EXISTS iceberg_catalogs_pkey;
11+
12+
IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'iceberg_catalogs' AND column_name = 'name') THEN
13+
ALTER TABLE iceberg_catalogs RENAME COLUMN id TO name;
14+
END IF;
15+
16+
ALTER TABLE iceberg_catalogs ADD COLUMN IF NOT EXISTS id uuid NOT NULL DEFAULT gen_random_uuid() PRIMARY KEY;
17+
ALTER TABLE iceberg_catalogs ADD COLUMN IF NOT EXISTS deleted_at timestamptz NULL;
18+
19+
CREATE INDEX IF NOT EXISTS iceberg_catalogs_unique_name_idx
20+
ON iceberg_catalogs (tenant_id, name) WHERE deleted_at IS NULL;
21+
22+
IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'iceberg_namespaces' AND column_name = 'bucket_name') THEN
23+
ALTER TABLE iceberg_namespaces RENAME COLUMN bucket_id to bucket_name;
24+
END IF;
25+
26+
IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'iceberg_tables' AND column_name = 'bucket_name') THEN
27+
ALTER TABLE iceberg_tables RENAME COLUMN bucket_id to bucket_name;
28+
END IF;
29+
30+
ALTER TABLE iceberg_namespaces ADD COLUMN IF NOT EXISTS catalog_id uuid NULL REFERENCES iceberg_catalogs(id) ON DELETE CASCADE ON UPDATE CASCADE ;
31+
ALTER TABLE iceberg_tables ADD COLUMN IF NOT EXISTS catalog_id uuid NULL REFERENCES iceberg_catalogs(id) ON DELETE CASCADE ON UPDATE CASCADE;
32+
33+
CREATE UNIQUE INDEX IF NOT EXISTS idx_iceberg_namespaces_bucket_id ON iceberg_namespaces (tenant_id, catalog_id, name);
34+
CREATE UNIQUE INDEX IF NOT EXISTS idx_iceberg_tables_tenant_namespace_id ON iceberg_tables (tenant_id, namespace_id, catalog_id, name);
35+
CREATE UNIQUE INDEX IF NOT EXISTS idx_iceberg_tables_tenant_location ON iceberg_tables (tenant_id, location);
36+
CREATE UNIQUE INDEX IF NOT EXISTS idx_iceberg_tables_location ON iceberg_tables (location);
37+
38+
-- create a unique index on name and deleted_at to allow only one active catalog with a given name
39+
CREATE UNIQUE INDEX IF NOT EXISTS iceberg_catalogs_name_deleted_at_idx
40+
ON iceberg_catalogs (tenant_id, name)
41+
WHERE deleted_at IS NULL;
42+
43+
-- Backfill catalog_id for existing namespaces and tables
44+
UPDATE iceberg_tables it
45+
SET catalog_id = c.id
46+
FROM iceberg_catalogs c
47+
WHERE c.name = it.bucket_name;
48+
49+
UPDATE iceberg_namespaces iname
50+
SET catalog_id = c.id
51+
FROM iceberg_catalogs c
52+
WHERE c.name = iname.bucket_name;
53+
54+
ALTER TABLE iceberg_namespaces ALTER COLUMN catalog_id SET NOT NULL;
55+
ALTER TABLE iceberg_tables ALTER COLUMN catalog_id SET NOT NULL;
56+
END
57+
$$;
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
DO $$
2+
DECLARE
3+
partition_queue_ids text[];
4+
i_partition_id text;
5+
BEGIN
6+
7+
-- check if a schema with name pgboss_v10 exists
8+
IF NOT EXISTS (SELECT 1 FROM pg_namespace WHERE nspname = 'pgboss_v10') THEN
9+
RETURN;
10+
END IF;
11+
12+
-- Create or replace function to archive exactly_once jobs
13+
CREATE OR REPLACE FUNCTION pgboss_v10.archive_exactly_once_job()
14+
RETURNS TRIGGER AS
15+
$trigger$
16+
BEGIN
17+
IF NEW.policy = 'exactly_once' AND NEW.state IN ('completed', 'failed', 'cancelled') THEN
18+
INSERT INTO pgboss_v10.archive (
19+
id, name, priority, data, state, retry_limit, retry_count, retry_delay, retry_backoff,
20+
start_after, started_on, singleton_key, singleton_on, expire_in, created_on, completed_on,
21+
keep_until, output, dead_letter, policy
22+
)
23+
VALUES (
24+
NEW.id, NEW.name, NEW.priority, NEW.data, NEW.state, NEW.retry_limit, NEW.retry_count,
25+
NEW.retry_delay, NEW.retry_backoff, NEW.start_after, NEW.started_on, NEW.singleton_key,
26+
NEW.singleton_on, NEW.expire_in, NEW.created_on, NEW.completed_on, NEW.keep_until + INTERVAL '30 days',
27+
NEW.output, NEW.dead_letter, NEW.policy
28+
)
29+
ON CONFLICT DO NOTHING;
30+
31+
DELETE FROM pgboss_v10.job WHERE id = NEW.id;
32+
END IF;
33+
RETURN NEW;
34+
END;
35+
$trigger$
36+
LANGUAGE plpgsql;
37+
38+
CREATE OR REPLACE FUNCTION pgboss_v10.create_queue(queue_name text, options json)
39+
RETURNS VOID AS
40+
$f$
41+
DECLARE
42+
table_name varchar := 'j' || encode(sha224(queue_name::bytea), 'hex');
43+
queue_created_on timestamptz;
44+
BEGIN
45+
WITH q as (
46+
INSERT INTO pgboss_v10.queue (
47+
name,
48+
policy,
49+
retry_limit,
50+
retry_delay,
51+
retry_backoff,
52+
expire_seconds,
53+
retention_minutes,
54+
dead_letter,
55+
partition_name
56+
)
57+
VALUES (
58+
queue_name,
59+
options->>'policy',
60+
(options->>'retryLimit')::int,
61+
(options->>'retryDelay')::int,
62+
(options->>'retryBackoff')::bool,
63+
(options->>'expireInSeconds')::int,
64+
(options->>'retentionMinutes')::int,
65+
options->>'deadLetter',
66+
table_name
67+
)
68+
ON CONFLICT DO NOTHING
69+
RETURNING created_on
70+
)
71+
SELECT created_on into queue_created_on from q;
72+
73+
IF queue_created_on IS NULL THEN
74+
RETURN;
75+
END IF;
76+
77+
EXECUTE format('CREATE TABLE pgboss_v10.%I (LIKE pgboss_v10.job INCLUDING DEFAULTS)', table_name);
78+
79+
EXECUTE format('ALTER TABLE pgboss_v10.%1$I ADD PRIMARY KEY (name, id)', table_name);
80+
EXECUTE format('ALTER TABLE pgboss_v10.%1$I ADD CONSTRAINT q_fkey FOREIGN KEY (name) REFERENCES pgboss_v10.queue (name) ON DELETE RESTRICT DEFERRABLE INITIALLY DEFERRED', table_name);
81+
EXECUTE format('ALTER TABLE pgboss_v10.%1$I ADD CONSTRAINT dlq_fkey FOREIGN KEY (dead_letter) REFERENCES pgboss_v10.queue (name) ON DELETE RESTRICT DEFERRABLE INITIALLY DEFERRED', table_name);
82+
EXECUTE format('CREATE UNIQUE INDEX %1$s_i1 ON pgboss_v10.%1$I (name, COALESCE(singleton_key, '''')) WHERE state = ''created'' AND policy = ''short''', table_name);
83+
EXECUTE format('CREATE UNIQUE INDEX %1$s_i2 ON pgboss_v10.%1$I (name, COALESCE(singleton_key, '''')) WHERE state = ''active'' AND policy = ''singleton''', table_name);
84+
EXECUTE format('CREATE UNIQUE INDEX %1$s_i3 ON pgboss_v10.%1$I (name, state, COALESCE(singleton_key, '''')) WHERE state <= ''active'' AND policy = ''stately''', table_name);
85+
EXECUTE format('CREATE UNIQUE INDEX %1$s_i4 ON pgboss_v10.%1$I (name, singleton_on, COALESCE(singleton_key, '''')) WHERE state <> ''cancelled'' AND singleton_on IS NOT NULL', table_name);
86+
EXECUTE format('CREATE INDEX %1$s_i5 ON pgboss_v10.%1$I (name, start_after) INCLUDE (priority, created_on, id) WHERE state < ''active''', table_name);
87+
EXECUTE format('CREATE UNIQUE INDEX %1$s_i6 ON pgboss_v10.%1$I (name, COALESCE(singleton_key, '''')) WHERE state <= ''active'' AND policy = ''exactly_once''', table_name);
88+
89+
EXECUTE format('ALTER TABLE pgboss_v10.%I ADD CONSTRAINT cjc CHECK (name=%L)', table_name, queue_name);
90+
EXECUTE format('ALTER TABLE pgboss_v10.job ATTACH PARTITION pgboss_v10.%I FOR VALUES IN (%L)', table_name, queue_name);
91+
92+
-- create a function trigger to archive the job when it's exactly_once policy and the state is either completed, failed or cancelled
93+
94+
EXECUTE format('CREATE TRIGGER archive_exactly_once_trigger_insert AFTER INSERT ON pgboss_v10.%I FOR EACH ROW EXECUTE FUNCTION pgboss_v10.archive_exactly_once_job()', table_name);
95+
EXECUTE format('CREATE TRIGGER archive_exactly_once_trigger_update AFTER UPDATE ON pgboss_v10.%I FOR EACH ROW EXECUTE FUNCTION pgboss_v10.archive_exactly_once_job()', table_name);
96+
END;
97+
$f$
98+
LANGUAGE plpgsql;
99+
100+
101+
102+
-- Recreate function with correct index type
103+
SELECT array_agg(partition_name) from pgboss_v10.queue
104+
WHERE policy = 'exactly_once'
105+
INTO partition_queue_ids;
106+
107+
IF array_length(partition_queue_ids, 1) = 0 THEN
108+
RETURN;
109+
END IF;
110+
111+
FOR i_partition_id IN SELECT unnest(partition_queue_ids)
112+
LOOP
113+
EXECUTE format('DROP INDEX IF EXISTS pgboss_v10.%1$s_i6', i_partition_id);
114+
EXECUTE format('CREATE UNIQUE INDEX IF NOT EXISTS %1$s_i6 ON pgboss_v10.%1$I (name, COALESCE(singleton_key, '''')) WHERE state <= ''active'' AND policy = ''exactly_once''', i_partition_id);
115+
IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'archive_exactly_once_trigger_insert' AND tgrelid = ('pgboss_v10.' || i_partition_id)::regclass) THEN
116+
EXECUTE format('CREATE TRIGGER archive_exactly_once_trigger_insert AFTER INSERT ON pgboss_v10.%I FOR EACH ROW EXECUTE FUNCTION pgboss_v10.archive_exactly_once_job()', i_partition_id);
117+
EXECUTE format('CREATE TRIGGER archive_exactly_once_trigger_update AFTER UPDATE ON pgboss_v10.%I FOR EACH ROW EXECUTE FUNCTION pgboss_v10.archive_exactly_once_job()', i_partition_id);
118+
END IF;
119+
END LOOP;
120+
END;
121+
$$;
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
CREATE TABLE IF NOT EXISTS event_upgrades (
2+
id SERIAL PRIMARY KEY,
3+
event_id text NOT NULL,
4+
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
5+
UNIQUE(event_id)
6+
);
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
ALTER TABLE shard_reservation
2+
DROP CONSTRAINT IF EXISTS shard_reservation_kind_resource_id_key CASCADE;
3+
4+
DROP INDEX IF EXISTS shard_reservation_active_slot_idx;
5+
6+
-- Create partial unique index for confirmed reservations
7+
-- Only one confirmed reservation per resource
8+
CREATE UNIQUE INDEX IF NOT EXISTS shard_reservation_kind_resource_confirmed_idx
9+
ON shard_reservation (tenant_id, kind, resource_id);
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
DO $$
2+
DECLARE
3+
is_multitenant bool = COALESCE(current_setting('storage.multitenant', true), 'false')::boolean;
4+
BEGIN
5+
6+
IF is_multitenant THEN
7+
RETURN;
8+
END IF;
9+
10+
ALTER TABLE iceberg_namespaces ADD COLUMN IF NOT EXISTS metadata JSONB NOT NULL DEFAULT '{}';
11+
ALTER TABLE iceberg_tables ADD COLUMN IF NOT EXISTS remote_table_id TEXT NULL;
12+
13+
ALTER TABLE iceberg_tables ADD COLUMN IF NOT EXISTS shard_key TEXT NULL;
14+
ALTER TABLE iceberg_tables ADD COLUMN IF NOT EXISTS shard_id TEXT NULL;
15+
END
16+
$$;
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
DO $$
2+
DECLARE
3+
is_multitenant bool = COALESCE(current_setting('storage.multitenant', true), 'false')::boolean;
4+
drop_constraint_sql text;
5+
BEGIN
6+
7+
IF is_multitenant = false THEN
8+
ALTER TABLE storage.iceberg_namespaces DROP CONSTRAINT IF EXISTS iceberg_namespaces_bucket_id_fkey;
9+
ALTER TABLE storage.iceberg_tables DROP CONSTRAINT IF EXISTS iceberg_tables_bucket_id_fkey;
10+
END IF;
11+
12+
-- remove primary key on iceberg_catalogs id
13+
SELECT concat('ALTER TABLE storage.buckets_analytics DROP CONSTRAINT ', constraint_name)
14+
INTO drop_constraint_sql
15+
FROM information_schema.table_constraints
16+
WHERE table_schema = 'storage'
17+
AND table_name = 'buckets_analytics'
18+
AND constraint_type = 'PRIMARY KEY';
19+
20+
EXECUTE drop_constraint_sql;
21+
22+
IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'buckets_analytics' AND column_name = 'name') THEN
23+
ALTER TABLE storage.buckets_analytics RENAME COLUMN id TO name;
24+
END IF;
25+
26+
ALTER TABLE storage.buckets_analytics ADD COLUMN IF NOT EXISTS id uuid NOT NULL DEFAULT gen_random_uuid() PRIMARY KEY;
27+
ALTER TABLE storage.buckets_analytics ADD COLUMN IF NOT EXISTS deleted_at timestamptz NULL;
28+
29+
CREATE UNIQUE INDEX IF NOT EXISTS buckets_analytics_unique_name_idx
30+
ON storage.buckets_analytics (name) WHERE deleted_at IS NULL;
31+
32+
IF is_multitenant THEN
33+
RETURN;
34+
END IF;
35+
36+
DROP INDEX IF EXISTS idx_iceberg_namespaces_bucket_id;
37+
DROP INDEX IF EXISTS idx_iceberg_tables_namespace_id;
38+
39+
-- remove constraint on iceberg_namespaces bucket_id
40+
ALTER TABLE storage.iceberg_namespaces DROP CONSTRAINT IF EXISTS iceberg_namespaces_bucket_id_fkey;
41+
-- remove constraint on iceberg_tables bucket_id
42+
ALTER TABLE storage.iceberg_tables DROP CONSTRAINT IF EXISTS iceberg_tables_bucket_id_fkey;
43+
44+
IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'iceberg_namespaces' AND column_name = 'bucket_name') THEN
45+
ALTER TABLE storage.iceberg_namespaces RENAME COLUMN bucket_id to bucket_name;
46+
END IF;
47+
48+
IF NOT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'iceberg_tables' AND column_name = 'bucket_name') THEN
49+
ALTER TABLE storage.iceberg_tables RENAME COLUMN bucket_id to bucket_name;
50+
END IF;
51+
52+
ALTER TABLE storage.iceberg_namespaces ADD COLUMN IF NOT EXISTS catalog_id uuid NULL REFERENCES storage.buckets_analytics(id) ON DELETE CASCADE;
53+
ALTER TABLE storage.iceberg_tables ADD COLUMN IF NOT EXISTS catalog_id uuid NULL REFERENCES storage.buckets_analytics(id) ON DELETE CASCADE;
54+
55+
CREATE UNIQUE INDEX IF NOT EXISTS idx_iceberg_namespaces_bucket_id ON storage.iceberg_namespaces (catalog_id, name);
56+
CREATE UNIQUE INDEX IF NOT EXISTS idx_iceberg_tables_namespace_id ON storage.iceberg_tables (catalog_id, namespace_id, name);
57+
CREATE UNIQUE INDEX IF NOT EXISTS idx_iceberg_tables_location ON storage.iceberg_tables (location);
58+
59+
-- Backfill catalog_id for existing namespaces and tables
60+
UPDATE storage.iceberg_tables it
61+
SET catalog_id = c.id
62+
FROM storage.buckets_analytics c
63+
WHERE c.name = it.bucket_name;
64+
65+
UPDATE storage.iceberg_namespaces iname
66+
SET catalog_id = c.id
67+
FROM storage.buckets_analytics c
68+
WHERE c.name = iname.bucket_name;
69+
70+
ALTER TABLE storage.iceberg_namespaces ALTER COLUMN catalog_id SET NOT NULL;
71+
ALTER TABLE storage.iceberg_tables ALTER COLUMN catalog_id SET NOT NULL;
72+
END
73+
$$;

src/config.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ type StorageConfigType = {
178178

179179
icebergEnabled: boolean
180180
icebergWarehouse: string
181+
icebergShards: string[]
181182
icebergCatalogUrl: string
182183
icebergCatalogAuthType: IcebergCatalogAuthType
183184
icebergCatalogToken?: string
@@ -515,6 +516,7 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType {
515516

516517
icebergEnabled: getOptionalConfigFromEnv('ICEBERG_ENABLED') === 'true',
517518
icebergWarehouse: getOptionalConfigFromEnv('ICEBERG_WAREHOUSE') || '',
519+
icebergShards: getOptionalConfigFromEnv('ICEBERG_SHARDS')?.trim().split(',') || [],
518520
icebergCatalogUrl:
519521
getOptionalConfigFromEnv('ICEBERG_CATALOG_URL') ||
520522
`https://s3tables.ap-southeast-1.amazonaws.com/iceberg/v1`,

src/http/plugins/iceberg.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { getTenantConfig, multitenantKnex } from '@internal/database'
55
import { getCatalogAuthStrategy, TenantAwareRestCatalog } from '@storage/protocols/iceberg/catalog'
66
import { getConfig } from '../../config'
77
import { ICEBERG_BUCKET_RESERVED_SUFFIX } from '@storage/limits'
8+
import { KnexShardStoreFactory, ShardCatalog, SingleShard } from '@internal/sharding'
89

910
declare module 'fastify' {
1011
interface FastifyRequest {
@@ -48,8 +49,13 @@ export const icebergRestCatalog = fastifyPlugin(async function (fastify: Fastify
4849
tenantId: req.tenantId,
4950
limits: limits,
5051
restCatalogUrl: icebergCatalogUrl,
51-
warehouse: icebergWarehouse,
5252
auth: catalogAuthType,
53+
sharding: isMultitenant
54+
? new ShardCatalog(new KnexShardStoreFactory(multitenantKnex))
55+
: new SingleShard({
56+
shardKey: icebergWarehouse,
57+
capacity: 10000,
58+
}),
5359
metastore: new KnexMetastore(isMultitenant ? multitenantKnex : req.db.pool.acquire(), {
5460
multiTenant: isMultitenant,
5561
schema: isMultitenant ? 'public' : 'storage',

0 commit comments

Comments
 (0)