Skip to content

Commit 31c40e2

Browse files
authored
further simplify mysql partitioning (#3223)
current partitioning query was still slow, & I'm not sure why we were doing so much for a simple idea, so splitting up range into go code, only asking mysql for count/min/max this has been shown to perform much better on real world workloads
1 parent 0300055 commit 31c40e2

File tree

2 files changed

+64
-129
lines changed

2 files changed

+64
-129
lines changed

flow/connectors/mysql/qrep.go

Lines changed: 29 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,11 @@ func (c *MySqlConnector) GetQRepPartitions(
7979
}
8080

8181
// Query to get the total number of rows in the table
82-
countQuery := fmt.Sprintf("SELECT COUNT(*) FROM %s %s", parsedWatermarkTable.MySQL(), whereClause)
82+
countQuery := fmt.Sprintf("SELECT COUNT(*),MIN(%[2]s),MAX(%[2]s) FROM %[1]s %[3]s",
83+
parsedWatermarkTable.MySQL(), config.WatermarkColumn, whereClause)
8384
var minVal any
8485
var totalRows int64
86+
var rs *mysql.Result
8587
if last != nil && last.Range != nil {
8688
switch lastRange := last.Range.Range.(type) {
8789
case *protos.PartitionRange_IntRange:
@@ -91,27 +93,20 @@ func (c *MySqlConnector) GetQRepPartitions(
9193
case *protos.PartitionRange_TimestampRange:
9294
minVal = lastRange.TimestampRange.End.AsTime().String()
9395
}
94-
c.logger.Info(fmt.Sprintf("count query: %s - minVal: %v", countQuery, minVal))
95-
96-
rs, err := c.Execute(ctx, countQuery, minVal)
97-
if err != nil {
98-
return nil, err
99-
}
96+
c.logger.Info("querying count", slog.String("query", countQuery), slog.Any("minVal", minVal))
10097

101-
totalRows, err = rs.GetInt(0, 0)
102-
if err != nil {
103-
return nil, fmt.Errorf("failed to query for total rows: %w", err)
104-
}
98+
rs, err = c.Execute(ctx, countQuery, minVal)
10599
} else {
106-
rs, err := c.Execute(ctx, countQuery)
107-
if err != nil {
108-
return nil, err
109-
}
100+
rs, err = c.Execute(ctx, countQuery)
101+
}
102+
if err != nil {
103+
return nil, err
104+
}
105+
defer rs.Close()
110106

111-
totalRows, err = rs.GetInt(0, 0)
112-
if err != nil {
113-
return nil, fmt.Errorf("failed to query for total rows: %w", err)
114-
}
107+
totalRows, err = rs.GetInt(0, 0)
108+
if err != nil {
109+
return nil, fmt.Errorf("failed to query for total rows: %w", err)
115110
}
116111

117112
if totalRows == 0 {
@@ -125,120 +120,26 @@ func (c *MySqlConnector) GetQRepPartitions(
125120
numPartitions++
126121
}
127122

128-
watermarkQKind, watermarkMyType, err := c.GetDataTypeOfWatermarkColumn(ctx, parsedWatermarkTable.MySQL(), config.WatermarkColumn)
123+
watermarkMyType := rs.Fields[1].Type
124+
watermarkQKind, err := qkindFromMysql(rs.Fields[1])
129125
if err != nil {
130-
return nil, fmt.Errorf("failed to get data type of watermark column %s: %w", config.WatermarkColumn, err)
126+
return nil, fmt.Errorf("failed to convert mysql type to qvaluekind: %w", err)
131127
}
132128

133-
c.logger.Info(fmt.Sprintf("total rows: %d, num partitions: %d, num rows per partition: %d",
134-
totalRows, numPartitions, numRowsPerPartition))
135-
var rs *mysql.Result
136-
137-
switch watermarkQKind {
138-
case types.QValueKindInt8, types.QValueKindInt16, types.QValueKindInt32, types.QValueKindInt64,
139-
types.QValueKindUInt8, types.QValueKindUInt16, types.QValueKindUInt32, types.QValueKindUInt64:
140-
if minVal != nil {
141-
partitionsQuery := fmt.Sprintf(
142-
`WITH stats AS (
143-
SELECT MIN(%[2]s) AS min_watermark,
144-
1.0 * (MAX(%[2]s) - MIN(%[2]s)) / (%[1]d) AS range_size
145-
FROM %[3]s WHERE %[2]s > ?
146-
)
147-
SELECT FLOOR((w.%[2]s - s.min_watermark) / s.range_size) AS bucket,
148-
MIN(w.%[2]s) AS start, MAX(w.%[2]s) AS end
149-
FROM %[3]s AS w
150-
CROSS JOIN stats AS s
151-
WHERE w.%[2]s > ?
152-
GROUP BY bucket
153-
ORDER BY start;`,
154-
numPartitions,
155-
quotedWatermarkColumn,
156-
parsedWatermarkTable.MySQL(),
157-
)
158-
c.logger.Info("partitions query", slog.String("query", partitionsQuery), slog.Any("minVal", minVal))
159-
rs, err = c.Execute(ctx, partitionsQuery, minVal, minVal)
160-
} else {
161-
partitionsQuery := fmt.Sprintf(
162-
`WITH stats AS (
163-
SELECT MIN(%[2]s) AS min_watermark,
164-
1.0 * (MAX(%[2]s) - MIN(%[2]s)) / (%[1]d) AS range_size
165-
FROM %[3]s
166-
)
167-
SELECT FLOOR((w.%[2]s - s.min_watermark) / s.range_size) AS bucket,
168-
MIN(w.%[2]s) AS start, MAX(w.%[2]s) AS end
169-
FROM %[3]s AS w
170-
CROSS JOIN stats AS s
171-
GROUP BY bucket
172-
ORDER BY start;`,
173-
numPartitions,
174-
quotedWatermarkColumn,
175-
parsedWatermarkTable.MySQL(),
176-
)
177-
c.logger.Info("partitions query", slog.String("query", partitionsQuery))
178-
rs, err = c.Execute(ctx, partitionsQuery)
179-
}
180-
if err != nil {
181-
return nil, fmt.Errorf("failed to query for partitions: %w", err)
182-
}
183-
case types.QValueKindTimestamp, types.QValueKindTimestampTZ:
184-
if minVal != nil {
185-
partitionsQuery := fmt.Sprintf(
186-
`WITH stats AS (
187-
SELECT MIN(%[2]s) AS min_watermark,
188-
1.0 * (TIMESTAMPDIFF(MICROSECOND, MAX(%[2]s), MIN(%[2]s)) / (%[1]d)) AS range_size
189-
FROM %[3]s WHERE %[2]s > ?
190-
)
191-
SELECT FLOOR(TIMESTAMPDIFF(MICROSECOND, w.%[2]s, s.min_watermark) / s.range_size) AS bucket,
192-
MIN(w.%[2]s) AS start, MAX(w.%[2]s) AS end
193-
FROM %[3]s AS w
194-
CROSS JOIN stats AS s
195-
WHERE w.%[2]s > ?
196-
GROUP BY bucket
197-
ORDER BY start;`,
198-
numPartitions,
199-
quotedWatermarkColumn,
200-
parsedWatermarkTable.MySQL(),
201-
)
202-
c.logger.Info("partitions query", slog.String("query", partitionsQuery), slog.Any("minVal", minVal))
203-
rs, err = c.Execute(ctx, partitionsQuery, minVal)
204-
} else {
205-
partitionsQuery := fmt.Sprintf(
206-
`WITH stats AS (
207-
SELECT MIN(%[2]s) AS min_watermark,
208-
1.0 * (TIMESTAMPDIFF(MICROSECOND, MAX(%[2]s), MIN(%[2]s)) / (%[1]d)) AS range_size
209-
FROM %[3]s
210-
)
211-
SELECT FLOOR(TIMESTAMPDIFF(MICROSECOND, w.%[2]s, s.min_watermark) / s.range_size) AS bucket,
212-
MIN(w.%[2]s) AS start, MAX(w.%[2]s) AS end
213-
FROM %[3]s AS w
214-
CROSS JOIN stats AS s
215-
GROUP BY bucket
216-
ORDER BY start;`,
217-
numPartitions,
218-
quotedWatermarkColumn,
219-
parsedWatermarkTable.MySQL(),
220-
)
221-
c.logger.Info("partitions query", slog.String("query", partitionsQuery))
222-
rs, err = c.Execute(ctx, partitionsQuery)
223-
}
224-
if err != nil {
225-
return nil, fmt.Errorf("failed to query for partitions: %w", err)
226-
}
227-
}
129+
c.logger.Info("queried info for partitioning",
130+
slog.Int64("totalRows", totalRows), slog.Int64("numPartitions", numPartitions), slog.Int64("rowsPerPartition", numRowsPerPartition))
228131

229132
partitionHelper := utils.NewPartitionHelper(c.logger)
230-
for _, row := range rs.Values {
231-
val1, err := QValueFromMysqlFieldValue(watermarkQKind, watermarkMyType, row[1])
232-
if err != nil {
233-
return nil, err
234-
}
235-
val2, err := QValueFromMysqlFieldValue(watermarkQKind, watermarkMyType, row[2])
236-
if err != nil {
237-
return nil, err
238-
}
239-
if err := partitionHelper.AddPartition(val1.Value(), val2.Value()); err != nil {
240-
return nil, fmt.Errorf("failed to add partition: %w", err)
241-
}
133+
val1, err := QValueFromMysqlFieldValue(watermarkQKind, watermarkMyType, rs.Values[0][1])
134+
if err != nil {
135+
return nil, fmt.Errorf("failed to convert partition minimum to qvalue: %w", err)
136+
}
137+
val2, err := QValueFromMysqlFieldValue(watermarkQKind, watermarkMyType, rs.Values[0][2])
138+
if err != nil {
139+
return nil, fmt.Errorf("fialed to convert partition maximum to qvalue: %w", err)
140+
}
141+
if err := partitionHelper.AddPartitionsWithRange(val1.Value(), val2.Value(), numPartitions); err != nil {
142+
return nil, fmt.Errorf("failed to add partitions: %w", err)
242143
}
243144

244145
return partitionHelper.GetPartitions(), nil

flow/connectors/utils/partition.go

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"google.golang.org/protobuf/types/known/timestamppb"
1515

1616
"github.com/PeerDB-io/peerdb/flow/generated/protos"
17+
"github.com/PeerDB-io/peerdb/flow/shared"
1718
)
1819

1920
type PartitionRangeType string
@@ -296,6 +297,40 @@ func (p *PartitionHelper) AddPartition(start any, end any) error {
296297
return nil
297298
}
298299

300+
func (p *PartitionHelper) AddPartitionsWithRange(start any, end any, numPartitions int64) error {
301+
partition, err := p.getPartitionForStartAndEnd(start, end)
302+
if err != nil {
303+
return err
304+
}
305+
306+
switch r := partition.Range.Range.(type) {
307+
case *protos.PartitionRange_IntRange:
308+
size := shared.DivCeil(r.IntRange.End-r.IntRange.Start, numPartitions)
309+
for i := range numPartitions {
310+
if err := p.AddPartition(r.IntRange.Start+size*i, min(r.IntRange.Start+size*(i+1), r.IntRange.End)); err != nil {
311+
return err
312+
}
313+
}
314+
case *protos.PartitionRange_UintRange:
315+
size := shared.DivCeil(r.UintRange.End-r.UintRange.Start, uint64(numPartitions))
316+
for i := range uint64(numPartitions) {
317+
if err := p.AddPartition(r.UintRange.Start+size*i, min(r.UintRange.Start+size*(i+1), r.UintRange.End)); err != nil {
318+
return err
319+
}
320+
}
321+
case *protos.PartitionRange_TimestampRange:
322+
tstart := r.TimestampRange.Start.AsTime().UnixMicro()
323+
tend := r.TimestampRange.End.AsTime().UnixMicro()
324+
size := shared.DivCeil(tend-tstart, numPartitions)
325+
for i := range numPartitions {
326+
if err := p.AddPartition(time.UnixMicro(tstart+size*i), time.UnixMicro(min(tstart+size*(i+1), tend))); err != nil {
327+
return err
328+
}
329+
}
330+
}
331+
return nil
332+
}
333+
299334
func (p *PartitionHelper) getPartitionForStartAndEnd(start any, end any) (*protos.QRepPartition, error) {
300335
if start == nil || end == nil {
301336
return nil, nil
@@ -326,7 +361,6 @@ func (p *PartitionHelper) getPartitionForStartAndEnd(start any, end any) (*proto
326361
default:
327362
return nil, fmt.Errorf("unsupported type: %T", v)
328363
}
329-
return nil, nil
330364
}
331365

332366
func (p *PartitionHelper) updatePartitionHelper(partition *protos.QRepPartition) error {

0 commit comments

Comments
 (0)