Skip to content

feat(connectors): Extend JSON field transformations in connectors run…#1863

Merged
numinnex merged 9 commits into
apache:masterfrom
jadireddi:iggy-1848
Jun 16, 2025
Merged

feat(connectors): Extend JSON field transformations in connectors run…#1863
numinnex merged 9 commits into
apache:masterfrom
jadireddi:iggy-1848

Conversation

@jadireddi
Copy link
Copy Markdown
Contributor

@jadireddi jadireddi commented Jun 10, 2025

Fixes: #1848

This PR introduces a message transformation system for connectors, enabling modification of JSON messages. The implementation provides four essential transforms with one having advanced pattern matching capabilities.

Transforms Added

AddFields Transform

  • Dynamically adds fields to JSON messages
  • Static values (strings, numbers, booleans, objects)
  • Computed values (timestamps, UUIDs, datetime strings)
  • Support for multiple timestamp formats (nanos, micros, millis, seconds)
  • UUID v4 and v7 generation

DeleteFields Transform

  • Removes specified fields from JSON messages
  • Simple field name-based deletion
  • Ignores non-existent fields

UpdateFields Transform

  • Conditionally updates existing fields in JSON messages
  • Conditional logic: Always, KeyExists, KeyNotExists
  • Static and computed value updates
  • Field creation when conditions are met
  • Same computed value support as AddFields

**FilterFields Transform **

  • Filtering based on complex key-value patterns
  • Key pattern matching (exact, prefix, suffix, contains, regex)
  • Value pattern matching (equality, type checks, numeric comparisons, regex)
  • Combined key and value pattern matching
  • Include or exclude matching behavior

Copy link
Copy Markdown
Contributor

@hubcio hubcio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall i think this is correct direction but please create also separate module for json and move everything relevant to this module

Comment thread core/connectors/sdk/src/transforms/test_utils.rs Outdated
Comment thread core/connectors/sdk/src/transforms/add_fields.rs
@spetz
Copy link
Copy Markdown
Contributor

spetz commented Jun 13, 2025

@jadireddi thank you for the contribution, it looks good! As @hubcio mentioned, it'd make sense to create a json subdirectory (module) under existing sdk/transforms, where all these JSON transformations related files could be placed. And then, for the already existing ones, we could implement the internal function to be invoked per specific payload format. Please check the following example below:

sdk/transforms/add_fields.rs

use super::{Transform, TransformType};
use crate::{DecodedMessage, Error, Payload, TopicMetadata};
use serde::{Deserialize, Serialize};
use strum_macros::{Display, IntoStaticStr};

#[derive(Debug, Serialize, Deserialize)]
pub struct AddFieldsConfig {
    fields: Vec<Field>,
}

pub struct AddFields {
    pub fields: Vec<Field>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Field {
    pub key: String,
    pub value: FieldValue,
}

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum FieldValue {
    Static(simd_json::OwnedValue),
    Computed(ComputedValue),
}

#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, Display, IntoStaticStr)]
#[serde(rename_all = "snake_case")]
pub enum ComputedValue {
    #[strum(to_string = "date_time")]
    DateTime,
    #[strum(to_string = "timestamp_nanos")]
    TimestampNanos,
    #[strum(to_string = "timestamp_micros")]
    TimestampMicros,
    #[strum(to_string = "timestamp_millis")]
    TimestampMillis,
    #[strum(to_string = "timestamp_seconds")]
    TimestampSeconds,
    #[strum(to_string = "uuid_v4")]
    UuidV4,
    #[strum(to_string = "uuid_v7")]
    UuidV7,
}

impl AddFields {
    pub fn new(config: AddFieldsConfig) -> Self {
        Self {
            fields: config.fields,
        }
    }
}

impl Transform for AddFields {
    fn r#type(&self) -> TransformType {
        TransformType::AddFields
    }

    fn transform(
        &self,
        metadata: &TopicMetadata,
        message: DecodedMessage,
    ) -> Result<Option<DecodedMessage>, Error> {
        if self.fields.is_empty() {
            return Ok(Some(message));
        }

        match &message.payload {
            Payload::Json(_) => self.transform_json(metadata, message),
            _ => Ok(Some(message)),
        }
    }
}

And then the internal json module under sdk/transforms/json with the related transformations e.g.

sdk/transforms/json/add_fields.rs

use simd_json::OwnedValue;

use crate::{
    DecodedMessage, Error, Payload, TopicMetadata,
    transforms::add_fields::{AddFields, ComputedValue, FieldValue},
};

impl AddFields {
    pub fn transform_json(
        &self,
        metadata: &TopicMetadata,
        mut message: DecodedMessage,
    ) -> Result<Option<DecodedMessage>, Error> {
        let Payload::Json(OwnedValue::Object(ref mut map)) = message.payload else {
            return Ok(Some(message));
        };

        for field in &self.fields {
            match &field.value {
                FieldValue::Static(value) => map.insert(field.key.clone(), value.clone()),
                FieldValue::Computed(value) => match value {
                    ComputedValue::DateTime => {
                        map.insert(field.key.clone(), chrono::Utc::now().to_rfc3339().into())
                    }
                    ComputedValue::TimestampMillis => map.insert(
                        field.key.clone(),
                        chrono::Utc::now().timestamp_millis().into(),
                    ),
                    ComputedValue::TimestampMicros => map.insert(
                        field.key.clone(),
                        chrono::Utc::now().timestamp_micros().into(),
                    ),
                    ComputedValue::TimestampNanos => map.insert(
                        field.key.clone(),
                        chrono::Utc::now().timestamp_nanos_opt().into(),
                    ),
                    ComputedValue::TimestampSeconds => {
                        map.insert(field.key.clone(), chrono::Utc::now().timestamp().into())
                    }
                    ComputedValue::UuidV4 => {
                        map.insert(field.key.clone(), uuid::Uuid::new_v4().to_string().into())
                    }
                    ComputedValue::UuidV7 => {
                        map.insert(field.key.clone(), uuid::Uuid::now_v7().to_string().into())
                    }
                },
            };
        }

        Ok(Some(message))
    }
}

By following this approach we could simply add more transforms for the different formats and keep things separated from each other (by simply exposing a dedicated function to be called in the match at the top level).

@jadireddi
Copy link
Copy Markdown
Contributor Author

Thank you, @hubcio and @spetz, for the review. I’ll update the code according to your feedback.

I first split FilterField and UpdateField into independent transformations. Later I noticed that Add, Update, Delete, and Filter shared a lot of common code, so I moved that common code to common.rs. I will keep each Field.rs file as independent transformation.

@jadireddi jadireddi marked this pull request as ready for review June 15, 2025 11:59
@spetz spetz added rust Pull requests that update Rust code connectors Connectors runtime labels Jun 16, 2025
Comment thread core/connectors/sdk/src/transforms/add_fields.rs
Comment thread core/connectors/sdk/src/transforms/update_fields.rs Outdated
@spetz
Copy link
Copy Markdown
Contributor

spetz commented Jun 16, 2025

Thank you for the improvements! Let's make the mentioned parts reusable (if applicable), and then we could merge it :)

Comment thread core/connectors/sdk/src/transforms/mod.rs Outdated
Comment thread core/connectors/sdk/src/transforms/json/update_fields.rs Outdated
Comment thread core/connectors/sdk/src/transforms/json/update_fields.rs Outdated
Copy link
Copy Markdown
Contributor

@spetz spetz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you!

@numinnex numinnex merged commit 9cad36a into apache:master Jun 16, 2025
25 checks passed
@jadireddi jadireddi deleted the iggy-1848 branch June 16, 2025 16:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

connectors Connectors runtime rust Pull requests that update Rust code

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Extend JSON field transformations in connectors runtime

5 participants