A custom Kafka Connect Single Message Transformation (SMT) that converts plain JSON messages (as produced by Tinybird sinks) into Struct + Schema records required by JDBC Sink connectors and other schema-based consumers.
This enables seamless data ingestion from Tinybird → Kafka → Postgres/MySQL/etc. without requiring Schema Registry, Kafka Streams, or format conversions upstream.
flowchart LR
subgraph Tinybird
TB["Pipe Sink - JSON Output"]
end
subgraph Kafka["Kafka Cluster"]
T["cdp_dashboard_metrics_total_sink
- plain JSON"]
end
subgraph Connect["Kafka Connect"]
JC["JsonConverter
(schemas.enable=false)"]
SMT["TinybirdJsonToStruct
(Custom SMT)"]
JDBC["JdbcSinkConnector"]
end
subgraph DB["PostgreSQL"]
PG["dashboardMetricsTotalSnapshot
(upsert on id='snapshot')"]
end
TB --> T
T --> JC --> SMT --> JDBC --> PG
Tinybird publishes schemaless JSON into Kafka. But JDBC Sink and other schema-aware connectors require a typed schema.
➡️ Without a schema, JDBC Sink fails to insert into DB ➡️ This SMT injects schema dynamically inside Connect, no changes required in Tinybird
Converts schemaless JSON Map into Connect Struct
Dynamically builds a schema from connector configuration
- int64,
- int32,
- string,
- boolean,
- float64,
- float32,
- timestamp
No Schema Registry required
Build the JAR:
mvn clean package
this will generate a jar file at the path:
target/tinybird-smt-1.0.1.jar
once you have a jar file you should place it into the docker folder:
crowd.dev/scripts/scaffold/kafka-connect/tmp/custom-plugins
update your docker file to use the new version, add for example the following line:
COPY tmp/custom-plugins/tinybird-smt-1.0.1.jar /usr/share/java/tinybird-smt-1.0.1.jar
you can now build tag and push your docker image:
docker buildx build --platform linux/amd64 -t sjc.ocir.io/axbydjxa5zuh/kafka-connect:1765363311 --push .
once you have the new image you can update the kafka-connet deployment accordinly at the path:
k8s/infra/kafka-connect.yaml
# JSON input from Tinybird
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
# Apply Tinybird SMT
transforms=AddSchema
transforms.AddSchema.type=com.crowddev.connect.transform.TinybirdJsonToStruct
# Schema name in Kafka Connect
transforms.AddSchema.schema.name=dashboardMetricsTotalSnapshot
# Field mapping
transforms.AddSchema.fields=\
activitiesTotal:int64,\
activitiesLast30Days:int64,\
organizationsTotal:int64,\
organizationsLast30Days:int64,\
membersTotal:int64,\
membersLast30Days:int64,\
updatedAt:timestamp
# Static PK for upsert
transforms.AddSchema.pk.field=id
transforms.AddSchema.pk.value=snapshot
# JDBC Sink UPSERT
insert.mode=upsert
pk.mode=record_value
pk.fields=id
- Java 17
- Maven
mvn clean package
output:
target/tinybird-smt-1.0.0.jar***