Skip to content

Commit 3e957f2

Browse files
authored
fallback to full table partitions for Timescale compressed hypertables (#3318)
1 parent 24ae9ff commit 3e957f2

File tree

8 files changed

+130
-52
lines changed

8 files changed

+130
-52
lines changed

flow/activities/snapshot_activity.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ type SlotSnapshotState struct {
2323
}
2424

2525
type TxSnapshotState struct {
26-
SnapshotName string
27-
SupportsTIDScans bool
26+
SnapshotName string
2827
}
2928

3029
type SnapshotActivity struct {
@@ -93,9 +92,8 @@ func (a *SnapshotActivity) SetupReplication(
9392
a.Alerter.LogFlowInfo(ctx, config.FlowJobName, "Replication slot and publication setup complete")
9493

9594
return &protos.SetupReplicationOutput{
96-
SlotName: slotInfo.SlotName,
97-
SnapshotName: slotInfo.SnapshotName,
98-
SupportsTidScans: slotInfo.SupportsTIDScans,
95+
SlotName: slotInfo.SlotName,
96+
SnapshotName: slotInfo.SnapshotName,
9997
}, nil
10098
}
10199

@@ -118,8 +116,7 @@ func (a *SnapshotActivity) MaintainTx(ctx context.Context, sessionID string, pee
118116
a.SnapshotStatesMutex.Lock()
119117
if exportSnapshotOutput != nil {
120118
a.TxSnapshotStates[sessionID] = TxSnapshotState{
121-
SnapshotName: exportSnapshotOutput.SnapshotName,
122-
SupportsTIDScans: exportSnapshotOutput.SupportsTidScans,
119+
SnapshotName: exportSnapshotOutput.SnapshotName,
123120
}
124121
} else {
125122
a.TxSnapshotStates[sessionID] = TxSnapshotState{}
@@ -169,3 +166,22 @@ func (a *SnapshotActivity) LoadTableSchema(
169166
) (*protos.TableSchema, error) {
170167
return internal.LoadTableSchemaFromCatalog(ctx, a.CatalogPool, flowName, tableName)
171168
}
169+
170+
func (a *SnapshotActivity) GetDefaultPartitionKeyForTables(
171+
ctx context.Context,
172+
input *protos.FlowConnectionConfigs,
173+
) (*protos.GetDefaultPartitionKeyForTablesOutput, error) {
174+
connector, err := connectors.GetByNameAs[connectors.QRepPullConnectorCore](ctx, nil, a.CatalogPool, input.SourceName)
175+
if err != nil {
176+
return nil, a.Alerter.LogFlowError(ctx, input.FlowJobName, fmt.Errorf("failed to get connector: %w", err))
177+
}
178+
179+
output, err := connector.GetDefaultPartitionKeyForTables(ctx, &protos.GetDefaultPartitionKeyForTablesInput{
180+
TableMappings: input.TableMappings,
181+
})
182+
if err != nil {
183+
return nil, a.Alerter.LogFlowError(ctx, input.FlowJobName, fmt.Errorf("failed to check if tables can parallel load: %w", err))
184+
}
185+
186+
return output, nil
187+
}

flow/connectors/core.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,9 @@ type QRepPullConnectorCore interface {
225225

226226
// GetQRepPartitions returns the partitions for a given table that haven't been synced yet.
227227
GetQRepPartitions(ctx context.Context, config *protos.QRepConfig, last *protos.QRepPartition) ([]*protos.QRepPartition, error)
228+
229+
GetDefaultPartitionKeyForTables(ctx context.Context,
230+
input *protos.GetDefaultPartitionKeyForTablesInput) (*protos.GetDefaultPartitionKeyForTablesOutput, error)
228231
}
229232

230233
type QRepPullConnector interface {

flow/connectors/mongo/qrep.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,15 @@ func (c *MongoConnector) GetQRepPartitions(
135135
return partitions, nil
136136
}
137137

138+
func (c *MongoConnector) GetDefaultPartitionKeyForTables(
139+
ctx context.Context,
140+
input *protos.GetDefaultPartitionKeyForTablesInput,
141+
) (*protos.GetDefaultPartitionKeyForTablesOutput, error) {
142+
return &protos.GetDefaultPartitionKeyForTablesOutput{
143+
TableDefaultPartitionKeyMapping: nil,
144+
}, nil
145+
}
146+
138147
func (c *MongoConnector) PullQRepRecords(
139148
ctx context.Context,
140149
otelManager *otel_metrics.OtelManager,

flow/connectors/mysql/qrep.go

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,29 +22,6 @@ import (
2222
"github.com/PeerDB-io/peerdb/flow/shared/types"
2323
)
2424

25-
func (c *MySqlConnector) GetDataTypeOfWatermarkColumn(
26-
ctx context.Context,
27-
watermarkTableName string,
28-
watermarkColumn string,
29-
) (types.QValueKind, byte, error) {
30-
if watermarkColumn == "" {
31-
return "", 0, errors.New("watermark column is not specified in the config")
32-
}
33-
34-
query := fmt.Sprintf("SELECT `%s` FROM %s LIMIT 0", watermarkColumn, watermarkTableName)
35-
rs, err := c.Execute(ctx, query)
36-
if err != nil {
37-
return "", 0, fmt.Errorf("failed to execute query for watermark column type: %w", err)
38-
}
39-
40-
if len(rs.Fields) == 0 {
41-
return "", 0, fmt.Errorf("no fields returned from select query: %s", query)
42-
}
43-
44-
qk, err := qkindFromMysql(rs.Fields[0])
45-
return qk, rs.Fields[0].Type, err
46-
}
47-
4825
func (c *MySqlConnector) GetQRepPartitions(
4926
ctx context.Context,
5027
config *protos.QRepConfig,
@@ -153,6 +130,15 @@ func (c *MySqlConnector) GetQRepPartitions(
153130
return partitionHelper.GetPartitions(), nil
154131
}
155132

133+
func (c *MySqlConnector) GetDefaultPartitionKeyForTables(
134+
ctx context.Context,
135+
input *protos.GetDefaultPartitionKeyForTablesInput,
136+
) (*protos.GetDefaultPartitionKeyForTablesOutput, error) {
137+
return &protos.GetDefaultPartitionKeyForTablesOutput{
138+
TableDefaultPartitionKeyMapping: nil,
139+
}, nil
140+
}
141+
156142
func (c *MySqlConnector) PullQRepRecords(
157143
ctx context.Context,
158144
otelManager *otel_metrics.OtelManager,

flow/connectors/postgres/postgres.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1200,18 +1200,12 @@ func (c *PostgresConnector) EnsurePullability(
12001200
}
12011201

12021202
func (c *PostgresConnector) ExportTxSnapshot(ctx context.Context, env map[string]string) (*protos.ExportTxSnapshotOutput, any, error) {
1203-
pgversion, err := c.MajorVersion(ctx)
1204-
if err != nil {
1205-
return nil, nil, fmt.Errorf("[export-snapshot] error getting PG version: %w", err)
1206-
}
1207-
12081203
skipSnapshotExport, err := internal.PeerDBSkipSnapshotExport(ctx, env)
12091204
if err != nil {
12101205
c.logger.Error("failed to check PEERDB_SKIP_SNAPSHOT_EXPORT, proceeding with export snapshot", slog.Any("error", err))
12111206
} else if skipSnapshotExport {
12121207
return &protos.ExportTxSnapshotOutput{
1213-
SnapshotName: "",
1214-
SupportsTidScans: pgversion >= shared.POSTGRES_13,
1208+
SnapshotName: "",
12151209
}, nil, err
12161210
}
12171211

@@ -1242,8 +1236,7 @@ func (c *PostgresConnector) ExportTxSnapshot(ctx context.Context, env map[string
12421236
needRollback = false
12431237

12441238
return &protos.ExportTxSnapshotOutput{
1245-
SnapshotName: snapshotName,
1246-
SupportsTidScans: pgversion >= shared.POSTGRES_13,
1239+
SnapshotName: snapshotName,
12471240
}, tx, err
12481241
}
12491242

flow/connectors/postgres/qrep.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,67 @@ func (c *PostgresConnector) GetQRepPartitions(
6969
return c.getNumRowsPartitions(ctx, getPartitionsTx, config, last)
7070
}
7171

72+
func (c *PostgresConnector) GetDefaultPartitionKeyForTables(
73+
ctx context.Context,
74+
input *protos.GetDefaultPartitionKeyForTablesInput,
75+
) (*protos.GetDefaultPartitionKeyForTablesOutput, error) {
76+
c.logger.Info("Evaluating if tables can perform parallel load")
77+
78+
output := &protos.GetDefaultPartitionKeyForTablesOutput{
79+
TableDefaultPartitionKeyMapping: make(map[string]string, len(input.TableMappings)),
80+
}
81+
82+
pgVersion, err := shared.GetMajorVersion(ctx, c.conn)
83+
if err != nil {
84+
return nil, fmt.Errorf("failed to determine server version: %w", err)
85+
}
86+
supportsTidScans := pgVersion >= shared.POSTGRES_14
87+
88+
if supportsTidScans {
89+
for _, tm := range input.TableMappings {
90+
output.TableDefaultPartitionKeyMapping[tm.SourceTableIdentifier] = "ctid"
91+
}
92+
}
93+
94+
if !supportsTidScans {
95+
// older versions fall back to full table partitions anyway; nothing more to do
96+
c.logger.Warn("Postgres version does not support TID scans, falling back to full table partitions")
97+
return output, nil
98+
}
99+
100+
var hasTimescale bool
101+
if err := c.conn.QueryRow(ctx,
102+
"SELECT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'timescaledb')").Scan(&hasTimescale); err != nil {
103+
return nil, fmt.Errorf("failed to check for timescaledb extension: %w", err)
104+
}
105+
if !hasTimescale {
106+
return output, nil
107+
}
108+
109+
// compressed hypertables cannot do ctid scans, so disable for them
110+
// NOTE: it appears that the hypercore "TAM" may give us access to ctid scans, but that's to be removed in Timescale 2.22
111+
rows, err := c.conn.Query(ctx, `SELECT DISTINCT hypertable_schema, hypertable_name
112+
FROM timescaledb_information.chunks
113+
WHERE is_compressed='t';`)
114+
if err != nil {
115+
return nil, fmt.Errorf("query compressed hypertables: %w", err)
116+
}
117+
var schema, name string
118+
if _, err := pgx.ForEachRow(rows, []any{&schema, &name}, func() error {
119+
if _, ok := output.TableDefaultPartitionKeyMapping[fmt.Sprintf("%s.%s", schema, name)]; ok {
120+
table := fmt.Sprintf("%s.%s", schema, name)
121+
delete(output.TableDefaultPartitionKeyMapping, table)
122+
c.logger.Warn("table is a compressed hypertable, falling back to full table partition",
123+
slog.String("table", table))
124+
}
125+
return nil
126+
}); err != nil {
127+
return nil, fmt.Errorf("failed to get compressed hypertables: %w", err)
128+
}
129+
130+
return output, nil
131+
}
132+
72133
func (c *PostgresConnector) setTransactionSnapshot(ctx context.Context, tx pgx.Tx, snapshot string) error {
73134
if snapshot != "" {
74135
if _, err := tx.Exec(ctx, "SET TRANSACTION SNAPSHOT "+utils.QuoteLiteral(snapshot)); err != nil {

flow/workflows/snapshot_flow.go

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,6 @@ func (s *SnapshotFlowExecution) cloneTables(
244244
snapshotType snapshotType,
245245
slotName string,
246246
snapshotName string,
247-
supportsTIDScans bool,
248247
maxParallelClones int,
249248
) error {
250249
if snapshotType == SNAPSHOT_TYPE_SLOT {
@@ -253,14 +252,21 @@ func (s *SnapshotFlowExecution) cloneTables(
253252
s.logger.Info("cloning tables in tx snapshot mode", slog.String("snapshot", snapshotName))
254253
}
255254

256-
boundSelector := shared.NewBoundSelector(ctx, "CloneTablesSelector", maxParallelClones)
255+
getParallelLoadKeyForTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
256+
StartToCloseTimeout: 10 * time.Minute,
257+
RetryPolicy: &temporal.RetryPolicy{
258+
InitialInterval: 1 * time.Minute,
259+
},
260+
})
257261

258-
defaultPartitionCol := "ctid"
259-
if !supportsTIDScans {
260-
s.logger.Info("Postgres version too old for TID scans, might use full table partitions!")
261-
defaultPartitionCol = ""
262+
var res *protos.GetDefaultPartitionKeyForTablesOutput
263+
if err := workflow.ExecuteActivity(getParallelLoadKeyForTablesCtx,
264+
snapshot.GetDefaultPartitionKeyForTables, s.config).Get(ctx, &res); err != nil {
265+
return fmt.Errorf("failed to get default partition keys for tables: %w", err)
262266
}
263267

268+
boundSelector := shared.NewBoundSelector(ctx, "CloneTablesSelector", maxParallelClones)
269+
264270
for _, v := range s.config.TableMappings {
265271
source := v.SourceTableIdentifier
266272
destination := v.DestinationTableIdentifier
@@ -269,7 +275,7 @@ func (s *SnapshotFlowExecution) cloneTables(
269275
slog.String("snapshotName", snapshotName),
270276
)
271277
if v.PartitionKey == "" {
272-
v.PartitionKey = defaultPartitionCol
278+
v.PartitionKey = res.TableDefaultPartitionKeyMapping[source]
273279
}
274280
if err := s.cloneTable(ctx, boundSelector, snapshotName, v); err != nil {
275281
s.logger.Error("failed to start clone child workflow", slog.Any("error", err))
@@ -304,19 +310,16 @@ func (s *SnapshotFlowExecution) cloneTablesWithSlot(
304310
}()
305311
var slotName string
306312
var snapshotName string
307-
var supportsTidScans bool
308313
if slotInfo != nil {
309314
slotName = slotInfo.SlotName
310315
snapshotName = slotInfo.SnapshotName
311-
supportsTidScans = slotInfo.SupportsTidScans
312316
}
313317

314318
s.logger.Info("cloning tables in parallel", slog.Int("parallelism", numTablesInParallel))
315319
if err := s.cloneTables(ctx,
316320
SNAPSHOT_TYPE_SLOT,
317321
slotName,
318322
snapshotName,
319-
supportsTidScans,
320323
numTablesInParallel,
321324
); err != nil {
322325
s.logger.Error("failed to clone tables", slog.Any("error", err))
@@ -411,7 +414,6 @@ func SnapshotFlowWorkflow(
411414
SNAPSHOT_TYPE_TX,
412415
"",
413416
txnSnapshotState.SnapshotName,
414-
txnSnapshotState.SupportsTIDScans,
415417
numTablesInParallel,
416418
); err != nil {
417419
return fmt.Errorf("failed to clone tables: %w", err)

protos/flow.proto

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,9 +156,9 @@ message SetupReplicationInput {
156156
}
157157

158158
message SetupReplicationOutput {
159+
reserved 3;
159160
string slot_name = 1;
160161
string snapshot_name = 2;
161-
bool supports_tid_scans = 3;
162162
}
163163

164164
message CreateRawTableInput {
@@ -460,8 +460,8 @@ message IsQRepPartitionSyncedInput {
460460
}
461461

462462
message ExportTxSnapshotOutput {
463+
reserved 2;
463464
string snapshot_name = 1;
464-
bool supports_tid_scans = 2;
465465
}
466466

467467
enum DynconfValueType {
@@ -560,3 +560,11 @@ message FlowContextMetadata{
560560
message AdditionalContextMetadata{
561561
FlowOperation operation = 1;
562562
}
563+
564+
message GetDefaultPartitionKeyForTablesInput {
565+
repeated TableMapping table_mappings = 1;
566+
}
567+
568+
message GetDefaultPartitionKeyForTablesOutput {
569+
map<string, string> table_default_partition_key_mapping = 1;
570+
}

0 commit comments

Comments
 (0)