Skip to content

Commit e777b89

Browse files
committed
pg: detect when partitioning fails due to timescale compressed tables, & fallback to not using ctid
1 parent 3624b6b commit e777b89

File tree

1 file changed

+16
-2
lines changed

1 file changed

+16
-2
lines changed

flow/connectors/postgres/qrep.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package connpostgres
33
import (
44
"bytes"
55
"context"
6+
"errors"
67
"fmt"
78
"log/slog"
89
"strconv"
@@ -13,6 +14,7 @@ import (
1314
"github.com/google/uuid"
1415
"github.com/jackc/pgerrcode"
1516
"github.com/jackc/pgx/v5"
17+
"github.com/jackc/pgx/v5/pgconn"
1618
"github.com/jackc/pgx/v5/pgtype"
1719
"go.temporal.io/sdk/log"
1820
"google.golang.org/protobuf/encoding/protojson"
@@ -45,7 +47,7 @@ func (c *PostgresConnector) GetQRepPartitions(
4547
// if no watermark column is specified, return a single partition
4648
return []*protos.QRepPartition{
4749
{
48-
PartitionId: uuid.New().String(),
50+
PartitionId: uuid.NewString(),
4951
FullTablePartition: true,
5052
Range: nil,
5153
},
@@ -101,7 +103,7 @@ func (c *PostgresConnector) getNumRowsPartitions(
101103
// Query to get the total number of rows in the table
102104
countQuery := fmt.Sprintf(`SELECT COUNT(*) FROM %s %s`, parsedWatermarkTable.String(), whereClause)
103105
var row pgx.Row
104-
var minVal any = nil
106+
var minVal any
105107
if last != nil && last.Range != nil {
106108
switch lastRange := last.Range.Range.(type) {
107109
case *protos.PartitionRange_IntRange:
@@ -186,6 +188,18 @@ func (c *PostgresConnector) getNumRowsPartitions(
186188
}
187189

188190
if err := rows.Err(); err != nil {
191+
var pgErr *pgconn.PgError
192+
if config.WatermarkColumn == "ctid" && errors.As(err, &pgErr) && pgErr.Code == "XX000" && pgErr.Message == "transparent decompression only supports tableoid system column" {
193+
// timescale, fallback to full table partition
194+
return []*protos.QRepPartition{
195+
{
196+
PartitionId: uuid.NewString(),
197+
FullTablePartition: true,
198+
Range: nil,
199+
},
200+
}, nil
201+
}
202+
189203
return nil, fmt.Errorf("failed to read rows: %w", err)
190204
}
191205

0 commit comments

Comments
 (0)