-
Notifications
You must be signed in to change notification settings - Fork 138
feat(connectors): add connectors runtime #1826
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
b5f77c1
to
39d9249
Compare
6e46f18
to
50ddf32
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Adds the Rust SDK and runtime for dynamically loaded source and sink connectors, supporting custom plugins via C FFI and per-field data transformations.
- Introduces text, raw, and JSON decoders in the SDK
- Implements runtime logic to load, initialize, and manage source and sink plugins with transform support
- Provides example data producer, Docker setup, and documentation
Reviewed Changes
Copilot reviewed 38 out of 38 changed files in this pull request and generated 1 comment.
Show a summary per file
File | Description |
---|---|
core/connectors/sdk/src/decoders/text.rs | New text stream decoder implementation |
core/connectors/sdk/src/decoders/raw.rs | New raw stream decoder implementation |
core/connectors/sdk/src/decoders/json.rs | New JSON stream decoder using simd_json |
core/connectors/sdk/src/decoders/mod.rs | Expose decoder modules |
core/connectors/sdk/Cargo.toml | SDK crate manifest |
core/connectors/runtime/src/transform.rs | Load and filter transforms from config |
core/connectors/runtime/src/source.rs | Runtime logic for loading and handling source plugins |
core/connectors/runtime/src/sink.rs | Runtime logic for loading and handling sink plugins |
core/connectors/runtime/src/main.rs | Runtime entrypoint and plugin orchestration |
core/connectors/runtime/src/configs.rs | Runtime configuration definitions |
core/connectors/runtime/src/error.rs | Runtime error enum |
core/connectors/runtime/config.toml | Sample runtime configuration |
core/connectors/runtime/Cargo.toml | Runtime crate manifest |
core/connectors/docker-compose.yml | Docker compose for example services |
core/connectors/data_producer/src/main.rs | Example data producer application |
core/connectors/data_producer/Cargo.toml | Data producer crate manifest |
core/connectors/README.md | Connectors documentation and quick start guide |
README.md | Monorepo updates adding connectors section |
DEPENDENCIES.md | Added new connector dependencies |
Cargo.toml | Workspace members and dependency updates |
Comments suppressed due to low confidence (3)
core/connectors/runtime/src/transform.rs:31
- [nitpick] Using
unwrap_or_default()
on deserialization will silently ignore parsing errors. Consider handling errors explicitly and logging them before defaulting to avoid hidden misconfigurations.
let shared_config = serde_json::from_value::<SharedTransformConfig>(transform_config.clone()).unwrap_or_default();
core/connectors/runtime/src/source.rs:271
- [nitpick] Using
eprintln!
bypasses the structured logging system. Replace with atracing::error!
call to keep logs consistent with the rest of the runtime.
eprintln!("Failed to deserialize produced messages for source connector with ID: {plugin_id}");
core/connectors/runtime/config.toml:73
- There's a typo in the table name
ransforms
; it should betransforms
to match the rest of the configuration structure.
[[sinks.quickwit.ransforms.add_fields.fields]]
75f90df
to
30134c0
Compare
e94f709
to
432ff1a
Compare
Add the connector runtime, allowing to build the custom plugins in Rust (
sinks
andsources
), which will be loaded during the runtime start, and invoked when either consuming the batch of messages (sink
), or producing them (source
) via exposed C FFI to ensure the maximum efficiency and performance. Moreover, it's possible to make use of the data transformations (per-field, depending on the supported payload format).This is the very first version of the runtime, yet the overall framework along with the exposed traits, should allow for the smooth implementation of the custom plugins and data transformations.