Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NATS source and sink connectors #578

Merged

Conversation

gbto
Copy link
Contributor

@gbto gbto commented Apr 1, 2024

Opening the PR to get some feedbacks. This is a refactored version of https://github.com/gbto/arroyo/tree/feat/add-nats-source-connector which I've been using to run 6 pipelines in development for now one month on arrow = { version = "0.9.0" }. The source only support consuming from NATS Streams and publishing to NATS Subjects. Both supports leverage connection profiles and tables configurations, with environment variable substitution for secrets.

The tests are currently not implemented yet, neither are features that were note absolutely required to get it to work in our context of our architecture (e.g. autocomplete, schema validation and registry support, different semantics or deduplication..,).

There still are a few flawed feature, in particular the way optional extra client configuration are parsed and the checkpointing in the sink connector. Comments on these 2 points would be really appreciated and implemented asap.

-- source configuration
create table demo_source (
    value text
    ) with (
    type = 'source',
    connector = 'nats',
    servers = 'nats-1:4222,nats-2:4222',
    'nats.stream' = 'demo-source,
    'auth.type' = 'credentials',
    'auth.username' = '{{ NATS_USER }}',
    'auth.password' = '{{ NATS_PASSWORD }}',
    format = 'json',
    'json.unstructured' = 'true'
);
-- sink configuration
create table demo_sink (
   value text
) with (
    type = 'sink',
    connector = 'nats',
    servers = 'nats-1:4222,nats-2:4222',
    'nats.subject' = 'demo.subject,
    'auth.type' = 'credentials',
    'auth.username' = '{{ NATS_USER }}',
    'auth.password' = '{{ NATS_PASSWORD }}',
    format = 'json',
    'json.unstructured' = 'true'
);

Copy link
Member

@mwylde mwylde left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great start! I was able to get the source and sink working with a few tweaks.

// TODO: Implement a full-fledge `NatsTester` struct for testing the client connection,
// the stream or subject existence and access permissions, the deserialization of messages
// for the specified format, the authentication to the schema registry (if any), etc.
let (tx, _rx) = tokio::sync::oneshot::channel();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For testing, you should use the tx that's passed in to the test method rather than creating a new one (see the redis connector for a simple example of this). So basically in line 141 change that argument to tx, remove this line, and 148-151, and it shuold work.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out, looks like I introduced this when refactoring from 0.9 to 0.10. This is fixed although a real tester still should be implemented

) -> anyhow::Result<OperatorNode> {
Ok(match table.connector_type {
ConnectorType::Source { .. } => OperatorNode::from_source(Box::new(NatsSourceFunc {
stream: table.client_configs.get("nats.stream").cloned().unwrap(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is expecting that client_configs will contain a "nats.stream" key, but when creating through the web ui that won't necessary be there. Instead, you can use the stream argument for the source like ConnectorType::Source { stream }.

We also want to avoid unwraps (and other panicking constructs) in this code, and instead return an error when the data isn't what we expect.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done 👍

.unwrap(),
})),
ConnectorType::Sink { .. } => OperatorNode::from_operator(Box::new(NatsSinkFunc {
subject: table.client_configs.get("nats.subject").cloned().unwrap(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly here, we probably want to use the subject in ConnectorType::Sink { subject }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done 👍

let servers_vec: Vec<ServerAddr> = servers_str.split(',').map(|s| s.parse().unwrap()).collect();
let client = async_nats::ConnectOptions::new()
.user_and_password(
client_config.get("nats.username").unwrap().to_string(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This currently panics if you don't specify a username and password; you might want to rework it so that the user_and_password method is only called if the username and password are Some values. For example

    let mut opts = async_nats::ConnectOptions::new();

    match (client_config.get("nats.username"), client_config.get("nats.password")) {
        (Some(username), Some(password)) => {
            opts = opts.user_and_password(username.clone(), password.clone());
        }
        _ => {}
    };
    
    
    let client = opts
        .connect(servers_vec)
        .await?;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's modified as well. Should other authentication be added to have it merged into master?

}

async fn handle_checkpoint(&mut self, _: CheckpointBarrier, _ctx: &mut ArrowContext) {
// TODO: Is it necessary to insert any kind of checklpoint in the state for NATS sink?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Short version: checkpointing in the sink isn't necessary for correctness; it's fine to just flush all in-progress messages.

Longer version: there are two main strategies to achieve at-least once in a sink. The first (and usually simpler one) is what you've done here: flush the messages before finishing the checkpoint; this ensures that we won't drop any data because the downstream system will have acknowledged our writes before we mark the checkpoint as complete. If we fail during this process, we recover from the previous checkpoint and will replay the data that potentially got dropped.

The other strategy is to checkpoint our in-progress data. The downside of this is that we potentially have to write a lot more data to state. The upside is that we don't depend on the downstream system to be available in order to checkpoint. So if nats is down for 30 seconds, we're able to continue processing without blocking checkpointing.

So there's no one right answer, and often sinks will offer both options depending on which tradeoff the user prefers.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok very clear thanks for these explanations, I've left a TODO comment for future iterations then as it is not required for a version 1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit: our connector images are square and grayscale — I can put one together for you if you don't want to bother with the conversion

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, I hope the format is correct now

@gbto gbto force-pushed the feat/add-nats-source-sink-connectors branch 6 times, most recently from e3b147a to 7b130a4 Compare April 4, 2024 17:14
@gbto gbto force-pushed the feat/add-nats-source-sink-connectors branch from ab0dc57 to e4ac74f Compare April 15, 2024 18:49
@gbto gbto force-pushed the feat/add-nats-source-sink-connectors branch 4 times, most recently from bf4d794 to 176b4e8 Compare April 24, 2024 23:05
"filterSubjects": {
"title": "Filter subjects",
"type": "string",
"description": "A list of subjects that the consumer should filter on. If the list is empty, then no filtering is done.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd specify the format for this, whether it's comma-separated or what. You can also use an array which will render a nice UI.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now rendered as an array. The enumerated items descriptions are set to undefined_n though, not sure how to fix that?

},
ack_wait: Duration::from_secs(ack_wait.clone() as u64),
description: description.clone(),
filter_subjects: filter_subjects
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears that this doesn't work properly if it's not set (i.e., "").

I get

Something went wrong instantiating the NATS consumer.: Error { kind: JetStream(Error { code: 400, err_code: ErrorCode(10139), description: Some("consumer filter in FilterSubjects cannot be empty") }), source: None } panic.file="crates/arroyo-connectors/src/nats/source/mod.rs" panic.line=281 panic.column=14

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's fixed, with single quotes at least you should be able to pass an empty string and define it as comma-separated list of strings

@gbto gbto force-pushed the feat/add-nats-source-sink-connectors branch 2 times, most recently from cd48aaf to 6cf9aaf Compare April 25, 2024 07:10
@gbto gbto force-pushed the feat/add-nats-source-sink-connectors branch from 6cf9aaf to 3c8d20c Compare April 25, 2024 07:12
@gbto gbto force-pushed the feat/add-nats-source-sink-connectors branch from 3c8d20c to 6ea15dd Compare April 25, 2024 07:33
@mwylde mwylde merged commit 2b5e394 into ArroyoSystems:master Apr 25, 2024
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants