-
Notifications
You must be signed in to change notification settings - Fork 207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add NATS source and sink connectors #578
Add NATS source and sink connectors #578
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a great start! I was able to get the source and sink working with a few tweaks.
// TODO: Implement a full-fledge `NatsTester` struct for testing the client connection, | ||
// the stream or subject existence and access permissions, the deserialization of messages | ||
// for the specified format, the authentication to the schema registry (if any), etc. | ||
let (tx, _rx) = tokio::sync::oneshot::channel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For testing, you should use the tx that's passed in to the test method rather than creating a new one (see the redis connector for a simple example of this). So basically in line 141 change that argument to tx
, remove this line, and 148-151, and it shuold work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out, looks like I introduced this when refactoring from 0.9
to 0.10
. This is fixed although a real tester still should be implemented
) -> anyhow::Result<OperatorNode> { | ||
Ok(match table.connector_type { | ||
ConnectorType::Source { .. } => OperatorNode::from_source(Box::new(NatsSourceFunc { | ||
stream: table.client_configs.get("nats.stream").cloned().unwrap(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is expecting that client_configs will contain a "nats.stream"
key, but when creating through the web ui that won't necessary be there. Instead, you can use the stream argument for the source like ConnectorType::Source { stream }
.
We also want to avoid unwraps (and other panicking constructs) in this code, and instead return an error when the data isn't what we expect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done 👍
.unwrap(), | ||
})), | ||
ConnectorType::Sink { .. } => OperatorNode::from_operator(Box::new(NatsSinkFunc { | ||
subject: table.client_configs.get("nats.subject").cloned().unwrap(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly here, we probably want to use the subject in ConnectorType::Sink { subject }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done 👍
let servers_vec: Vec<ServerAddr> = servers_str.split(',').map(|s| s.parse().unwrap()).collect(); | ||
let client = async_nats::ConnectOptions::new() | ||
.user_and_password( | ||
client_config.get("nats.username").unwrap().to_string(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This currently panics if you don't specify a username and password; you might want to rework it so that the user_and_password method is only called if the username and password are Some values. For example
let mut opts = async_nats::ConnectOptions::new();
match (client_config.get("nats.username"), client_config.get("nats.password")) {
(Some(username), Some(password)) => {
opts = opts.user_and_password(username.clone(), password.clone());
}
_ => {}
};
let client = opts
.connect(servers_vec)
.await?;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's modified as well. Should other authentication be added to have it merged into master?
} | ||
|
||
async fn handle_checkpoint(&mut self, _: CheckpointBarrier, _ctx: &mut ArrowContext) { | ||
// TODO: Is it necessary to insert any kind of checklpoint in the state for NATS sink? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Short version: checkpointing in the sink isn't necessary for correctness; it's fine to just flush all in-progress messages.
Longer version: there are two main strategies to achieve at-least once in a sink. The first (and usually simpler one) is what you've done here: flush the messages before finishing the checkpoint; this ensures that we won't drop any data because the downstream system will have acknowledged our writes before we mark the checkpoint as complete. If we fail during this process, we recover from the previous checkpoint and will replay the data that potentially got dropped.
The other strategy is to checkpoint our in-progress data. The downside of this is that we potentially have to write a lot more data to state. The upside is that we don't depend on the downstream system to be available in order to checkpoint. So if nats is down for 30 seconds, we're able to continue processing without blocking checkpointing.
So there's no one right answer, and often sinks will offer both options depending on which tradeoff the user prefers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok very clear thanks for these explanations, I've left a TODO
comment for future iterations then as it is not required for a version 1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor nit: our connector images are square and grayscale — I can put one together for you if you don't want to bother with the conversion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, I hope the format is correct now
e3b147a
to
7b130a4
Compare
ab0dc57
to
e4ac74f
Compare
bf4d794
to
176b4e8
Compare
"filterSubjects": { | ||
"title": "Filter subjects", | ||
"type": "string", | ||
"description": "A list of subjects that the consumer should filter on. If the list is empty, then no filtering is done.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd specify the format for this, whether it's comma-separated or what. You can also use an array which will render a nice UI.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now rendered as an array. The enumerated items descriptions are set to undefined_n
though, not sure how to fix that?
}, | ||
ack_wait: Duration::from_secs(ack_wait.clone() as u64), | ||
description: description.clone(), | ||
filter_subjects: filter_subjects |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It appears that this doesn't work properly if it's not set (i.e., ""
).
I get
Something went wrong instantiating the NATS consumer.: Error { kind: JetStream(Error { code: 400, err_code: ErrorCode(10139), description: Some("consumer filter in FilterSubjects cannot be empty") }), source: None } panic.file="crates/arroyo-connectors/src/nats/source/mod.rs" panic.line=281 panic.column=14
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's fixed, with single quotes at least you should be able to pass an empty string and define it as comma-separated list of strings
cd48aaf
to
6cf9aaf
Compare
6cf9aaf
to
3c8d20c
Compare
3c8d20c
to
6ea15dd
Compare
Opening the PR to get some feedbacks. This is a refactored version of https://github.com/gbto/arroyo/tree/feat/add-nats-source-connector which I've been using to run 6 pipelines in development for now one month on
arrow = { version = "0.9.0" }
. The source only support consuming from NATS Streams and publishing to NATS Subjects. Both supports leverage connection profiles and tables configurations, with environment variable substitution for secrets.The tests are currently not implemented yet, neither are features that were note absolutely required to get it to work in our context of our architecture (e.g. autocomplete, schema validation and registry support, different semantics or deduplication..,).
There still are a few flawed feature, in particular the way optional extra client configuration are parsed and the checkpointing in the sink connector. Comments on these 2 points would be really appreciated and implemented asap.