Skip to content

Conversation

ilidemi
Copy link
Contributor

@ilidemi ilidemi commented Sep 17, 2025

In ClickPipes the flow name is always unique. Add a new endpoint that makes sure retrying with the same flow name always returns the same result, keying on the CDCFlowWorkflow id.

Concurrency is tested with toxiproxy, I am open to removing it if that's too much

return nil, exceptions.NewInvalidArgumentApiError(errors.New("resync is not supported in the managed API"))
}

workflowID := cfg.FlowJobName + "-peerflow"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we making wf IDs deterministic without the uuid suffix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FlowJobName contains pipe id which is a primary db key on the caller side, so 1 pipe = 1 workflow is what we want. Any UUID generation on PeerDB side would break this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue with having a new API is that we need to wait for all users to be up to date before switching the API. Instead if we add a new parameter, it would allow us to switch out of the box

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FlowJobName contains pipe id which is a primary db key on the caller side, so 1 pipe = 1 workflow is what we want. Any UUID generation on PeerDB side would break this

For managed call yes, it makes sense, I meant to say for non-managed creation we still use uuids

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might rename the existing endpoint then so the managed caller doesn't have to be deployed after

); err != nil {
if idempotent {
_, err = h.pool.Exec(ctx,
`INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can't instead keep this as one code path & check if error is conflict?

also, can't use ON CONFLICT DO NOTHING?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thing is, name is not a key at all for flows, so no conflict. Idk how would it impact OSS users if we change that but there has been some duplication in the managed instances already.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants