Skip to content

allow editing initial load settings #3160

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions flow/workflows/cdc_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ type CDCFlowWorkflowState struct {
// Current signalled state of the peer flow.
ActiveSignal model.CDCFlowSignal
CurrentFlowStatus protos.FlowStatus

// Initial load settings
SnapshotNumRowsPerPartition uint32
SnapshotMaxParallelWorkers uint32
SnapshotNumTablesInParallel uint32
}

// returns a new empty PeerFlowState
Expand All @@ -53,6 +58,9 @@ func NewCDCFlowWorkflowState(ctx workflow.Context, logger log.Logger, cfg *proto
TableMappings: tableMappings,
NumberOfSyncs: 0,
},
SnapshotNumRowsPerPartition: cfg.SnapshotNumRowsPerPartition,
SnapshotMaxParallelWorkers: cfg.SnapshotMaxParallelWorkers,
SnapshotNumTablesInParallel: cfg.SnapshotNumTablesInParallel,
}
syncStatusToCatalog(ctx, workflow.GetLogger(ctx), state.CurrentFlowStatus)
return &state
Expand Down Expand Up @@ -98,6 +106,9 @@ func updateFlowConfigWithLatestSettings(
cloneCfg.MaxBatchSize = state.SyncFlowOptions.BatchSize
cloneCfg.IdleTimeoutSeconds = state.SyncFlowOptions.IdleTimeoutSeconds
cloneCfg.TableMappings = state.SyncFlowOptions.TableMappings
cloneCfg.SnapshotNumRowsPerPartition = state.SnapshotNumRowsPerPartition
cloneCfg.SnapshotMaxParallelWorkers = state.SnapshotMaxParallelWorkers
cloneCfg.SnapshotNumTablesInParallel = state.SnapshotNumTablesInParallel
return cloneCfg
}

Expand Down Expand Up @@ -156,6 +167,15 @@ func processCDCFlowConfigUpdate(
}
maps.Copy(cfg.Env, flowConfigUpdate.UpdatedEnv)
}
if flowConfigUpdate.SnapshotNumRowsPerPartition > 0 {
state.SnapshotNumRowsPerPartition = flowConfigUpdate.SnapshotNumRowsPerPartition
}
if flowConfigUpdate.SnapshotMaxParallelWorkers > 0 {
state.SnapshotMaxParallelWorkers = flowConfigUpdate.SnapshotMaxParallelWorkers
}
if flowConfigUpdate.SnapshotNumTablesInParallel > 0 {
state.SnapshotNumTablesInParallel = flowConfigUpdate.SnapshotNumTablesInParallel
}

tablesAreAdded := len(flowConfigUpdate.AdditionalTables) > 0
tablesAreRemoved := len(flowConfigUpdate.RemovedTables) > 0
Expand Down Expand Up @@ -224,6 +244,9 @@ func processTableAdditions(
additionalTablesCfg.InitialSnapshotOnly = true
additionalTablesCfg.TableMappings = flowConfigUpdate.AdditionalTables
additionalTablesCfg.Resync = false
additionalTablesCfg.SnapshotNumRowsPerPartition = state.SnapshotNumRowsPerPartition
additionalTablesCfg.SnapshotMaxParallelWorkers = state.SnapshotMaxParallelWorkers
additionalTablesCfg.SnapshotNumTablesInParallel = state.SnapshotNumTablesInParallel

addTablesSelector := workflow.NewNamedSelector(ctx, "AddTables")
addTablesSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {})
Expand Down Expand Up @@ -390,6 +413,9 @@ func addCdcPropertiesSignalListener(
slog.Any("RemovedTables", cdcConfigUpdate.RemovedTables),
slog.Int("NumberOfSyncs", int(state.SyncFlowOptions.NumberOfSyncs)),
slog.Any("UpdatedEnv", cdcConfigUpdate.UpdatedEnv),
slog.Uint64("SnapshotNumRowsPerPartition", uint64(cdcConfigUpdate.SnapshotNumRowsPerPartition)),
slog.Uint64("SnapshotMaxParallelWorkers", uint64(cdcConfigUpdate.SnapshotMaxParallelWorkers)),
slog.Uint64("SnapshotNumTablesInParallel", uint64(cdcConfigUpdate.SnapshotNumTablesInParallel)),
)
})
}
Expand Down Expand Up @@ -611,6 +637,10 @@ func CDCFlowWorkflow(
}

snapshotFlowCtx := workflow.WithChildOptions(ctx, childSnapshotFlowOpts)
// now snapshot parameters are also part of the state, but until we finish snapshot they wouldn't be modifiable.
// so we can use the same cfg for snapshot flow, and then rely on being state being saved to catalog
// during any operation that triggers another snapshot (INCLUDING add tables).
// this could fail for very weird Temporal resets
snapshotFlowFuture := workflow.ExecuteChildWorkflow(snapshotFlowCtx, SnapshotFlowWorkflow, cfg)
var snapshotDone bool
var snapshotError error
Expand Down
3 changes: 3 additions & 0 deletions protos/flow.proto
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,9 @@ message CDCFlowConfigUpdate {
repeated TableMapping removed_tables = 5;
// updates keys in the env map, existing keys left unchanged
map<string, string> updated_env = 6;
uint32 snapshot_num_rows_per_partition = 7;
uint32 snapshot_max_parallel_workers = 8;
uint32 snapshot_num_tables_in_parallel = 9;
}

message QRepFlowConfigUpdate {
Expand Down
109 changes: 106 additions & 3 deletions ui/app/mirrors/[mirrorId]/edit/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ type EditMirrorProps = {
export default function EditMirror({ params: { mirrorId } }: EditMirrorProps) {
const defaultBatchSize = blankCDCSetting.maxBatchSize;
const defaultIdleTimeout = blankCDCSetting.idleTimeoutSeconds;
const defaultSnapshotNumRowsPerPartition =
blankCDCSetting.snapshotNumRowsPerPartition;
const defaultSnapshotMaxParallelWorkers =
blankCDCSetting.snapshotMaxParallelWorkers;
const defaultSnapshotNumTablesInParallel =
blankCDCSetting.snapshotNumTablesInParallel;

const [rows, setRows] = useState<TableMapRow[]>([]);
const [loading, setLoading] = useState(false);
Expand All @@ -48,6 +54,9 @@ export default function EditMirror({ params: { mirrorId } }: EditMirrorProps) {
removedTables: [],
numberOfSyncs: 0,
updatedEnv: {},
snapshotNumRowsPerPartition: defaultSnapshotNumRowsPerPartition,
snapshotMaxParallelWorkers: defaultSnapshotMaxParallelWorkers,
snapshotNumTablesInParallel: defaultSnapshotNumTablesInParallel,
});
const { push } = useRouter();

Expand All @@ -66,8 +75,24 @@ export default function EditMirror({ params: { mirrorId } }: EditMirrorProps) {
removedTables: [],
numberOfSyncs: 0,
updatedEnv: {},
snapshotNumRowsPerPartition:
(res as MirrorStatusResponse).cdcStatus?.config
?.snapshotNumRowsPerPartition || defaultSnapshotNumRowsPerPartition,
snapshotMaxParallelWorkers:
(res as MirrorStatusResponse).cdcStatus?.config
?.snapshotMaxParallelWorkers || defaultSnapshotMaxParallelWorkers,
snapshotNumTablesInParallel:
(res as MirrorStatusResponse).cdcStatus?.config
?.snapshotNumTablesInParallel || defaultSnapshotNumTablesInParallel,
});
}, [mirrorId, defaultBatchSize, defaultIdleTimeout]);
}, [
mirrorId,
defaultBatchSize,
defaultIdleTimeout,
defaultSnapshotNumRowsPerPartition,
defaultSnapshotMaxParallelWorkers,
defaultSnapshotNumTablesInParallel,
]);

useEffect(() => {
fetchStateAndUpdateDeps();
Expand Down Expand Up @@ -156,7 +181,7 @@ export default function EditMirror({ params: { mirrorId } }: EditMirrorProps) {
batchSize: e.target.valueAsNumber,
})
}
defaultValue={config.batchSize}
value={config.batchSize}
/>
</div>
}
Expand All @@ -182,7 +207,85 @@ export default function EditMirror({ params: { mirrorId } }: EditMirrorProps) {
idleTimeout: e.target.valueAsNumber,
})
}
defaultValue={config.idleTimeout}
value={config.idleTimeout}
/>
</div>
}
/>

<RowWithTextField
key={3}
label={<Label>{'Snapshot Rows Per Partition'} </Label>}
action={
<div
style={{
display: 'flex',
flexDirection: 'row',
alignItems: 'center',
}}
>
<TextField
variant='simple'
type={'number'}
onChange={(e: React.ChangeEvent<HTMLInputElement>) =>
setConfig({
...config,
snapshotNumRowsPerPartition: e.target.valueAsNumber,
})
}
value={config.snapshotNumRowsPerPartition}
/>
</div>
}
/>

<RowWithTextField
key={4}
label={<Label>{'Snapshot Max Parallel Workers'} </Label>}
action={
<div
style={{
display: 'flex',
flexDirection: 'row',
alignItems: 'center',
}}
>
<TextField
variant='simple'
type={'number'}
onChange={(e: React.ChangeEvent<HTMLInputElement>) =>
setConfig({
...config,
snapshotMaxParallelWorkers: e.target.valueAsNumber,
})
}
value={config.snapshotMaxParallelWorkers}
/>
</div>
}
/>

<RowWithTextField
key={5}
label={<Label>{'Snapshot Tables In Parallel'} </Label>}
action={
<div
style={{
display: 'flex',
flexDirection: 'row',
alignItems: 'center',
}}
>
<TextField
variant='simple'
type={'number'}
onChange={(e: React.ChangeEvent<HTMLInputElement>) =>
setConfig({
...config,
snapshotNumTablesInParallel: e.target.valueAsNumber,
})
}
value={config.snapshotNumTablesInParallel}
/>
</div>
}
Expand Down
Loading