-
Notifications
You must be signed in to change notification settings - Fork 134
Idempotent CDC flow creation #3493
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
return nil, exceptions.NewInvalidArgumentApiError(errors.New("resync is not supported in the managed API")) | ||
} | ||
|
||
workflowID := cfg.FlowJobName + "-peerflow" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we making wf IDs deterministic without the uuid suffix?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FlowJobName contains pipe id which is a primary db key on the caller side, so 1 pipe = 1 workflow is what we want. Any UUID generation on PeerDB side would break this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One issue with having a new API is that we need to wait for all users to be up to date before switching the API. Instead if we add a new parameter, it would allow us to switch out of the box
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FlowJobName contains pipe id which is a primary db key on the caller side, so 1 pipe = 1 workflow is what we want. Any UUID generation on PeerDB side would break this
For managed call yes, it makes sense, I meant to say for non-managed creation we still use uuids
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might rename the existing endpoint then so the managed caller doesn't have to be deployed after
); err != nil { | ||
if idempotent { | ||
_, err = h.pool.Exec(ctx, | ||
`INSERT INTO flows (workflow_id, name, source_peer, destination_peer, config_proto, status, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can't instead keep this as one code path & check if error is conflict?
also, can't use ON CONFLICT DO NOTHING
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thing is, name is not a key at all for flows, so no conflict. Idk how would it impact OSS users if we change that but there has been some duplication in the managed instances already.
In ClickPipes the flow name is always unique. Add a new endpoint that makes sure retrying with the same flow name always returns the same result, keying on the CDCFlowWorkflow id.
Concurrency is tested with toxiproxy, I am open to removing it if that's too much