feat(connectors): Extend JSON field transformations in connectors run…#1863
Conversation
hubcio
left a comment
There was a problem hiding this comment.
overall i think this is correct direction but please create also separate module for json and move everything relevant to this module
|
@jadireddi thank you for the contribution, it looks good! As @hubcio mentioned, it'd make sense to create a
use super::{Transform, TransformType};
use crate::{DecodedMessage, Error, Payload, TopicMetadata};
use serde::{Deserialize, Serialize};
use strum_macros::{Display, IntoStaticStr};
#[derive(Debug, Serialize, Deserialize)]
pub struct AddFieldsConfig {
fields: Vec<Field>,
}
pub struct AddFields {
pub fields: Vec<Field>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Field {
pub key: String,
pub value: FieldValue,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum FieldValue {
Static(simd_json::OwnedValue),
Computed(ComputedValue),
}
#[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, Display, IntoStaticStr)]
#[serde(rename_all = "snake_case")]
pub enum ComputedValue {
#[strum(to_string = "date_time")]
DateTime,
#[strum(to_string = "timestamp_nanos")]
TimestampNanos,
#[strum(to_string = "timestamp_micros")]
TimestampMicros,
#[strum(to_string = "timestamp_millis")]
TimestampMillis,
#[strum(to_string = "timestamp_seconds")]
TimestampSeconds,
#[strum(to_string = "uuid_v4")]
UuidV4,
#[strum(to_string = "uuid_v7")]
UuidV7,
}
impl AddFields {
pub fn new(config: AddFieldsConfig) -> Self {
Self {
fields: config.fields,
}
}
}
impl Transform for AddFields {
fn r#type(&self) -> TransformType {
TransformType::AddFields
}
fn transform(
&self,
metadata: &TopicMetadata,
message: DecodedMessage,
) -> Result<Option<DecodedMessage>, Error> {
if self.fields.is_empty() {
return Ok(Some(message));
}
match &message.payload {
Payload::Json(_) => self.transform_json(metadata, message),
_ => Ok(Some(message)),
}
}
}And then the internal
use simd_json::OwnedValue;
use crate::{
DecodedMessage, Error, Payload, TopicMetadata,
transforms::add_fields::{AddFields, ComputedValue, FieldValue},
};
impl AddFields {
pub fn transform_json(
&self,
metadata: &TopicMetadata,
mut message: DecodedMessage,
) -> Result<Option<DecodedMessage>, Error> {
let Payload::Json(OwnedValue::Object(ref mut map)) = message.payload else {
return Ok(Some(message));
};
for field in &self.fields {
match &field.value {
FieldValue::Static(value) => map.insert(field.key.clone(), value.clone()),
FieldValue::Computed(value) => match value {
ComputedValue::DateTime => {
map.insert(field.key.clone(), chrono::Utc::now().to_rfc3339().into())
}
ComputedValue::TimestampMillis => map.insert(
field.key.clone(),
chrono::Utc::now().timestamp_millis().into(),
),
ComputedValue::TimestampMicros => map.insert(
field.key.clone(),
chrono::Utc::now().timestamp_micros().into(),
),
ComputedValue::TimestampNanos => map.insert(
field.key.clone(),
chrono::Utc::now().timestamp_nanos_opt().into(),
),
ComputedValue::TimestampSeconds => {
map.insert(field.key.clone(), chrono::Utc::now().timestamp().into())
}
ComputedValue::UuidV4 => {
map.insert(field.key.clone(), uuid::Uuid::new_v4().to_string().into())
}
ComputedValue::UuidV7 => {
map.insert(field.key.clone(), uuid::Uuid::now_v7().to_string().into())
}
},
};
}
Ok(Some(message))
}
}By following this approach we could simply add more transforms for the different formats and keep things separated from each other (by simply exposing a dedicated function to be called in the match at the top level). |
|
Thank you, @hubcio and @spetz, for the review. I’ll update the code according to your feedback. I first split FilterField and UpdateField into independent transformations. Later I noticed that Add, Update, Delete, and Filter shared a lot of common code, so I moved that common code to common.rs. I will keep each Field.rs file as independent transformation. |
|
Thank you for the improvements! Let's make the mentioned parts reusable (if applicable), and then we could merge it :) |
Fixes: #1848
This PR introduces a message transformation system for connectors, enabling modification of JSON messages. The implementation provides four essential transforms with one having advanced pattern matching capabilities.
Transforms Added
AddFields Transform
DeleteFields Transform
UpdateFields Transform
**FilterFields Transform **