Skip to content

Tinybird SMT for Kafka Connect A lightweight custom Single Message Transformation (SMT) for Kafka Connect that maps schemaless JSON messages produced by Tinybird into Kafka Connect Struct records with an explicit schema. This allows JDBC Sink connectors to correctly upsert data into databases such as PostgreSQL.

Notifications You must be signed in to change notification settings

CrowdDotDev/tinybird-append-schema-smt

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

9 Commits
 
 
 
 
 
 
 
 

Repository files navigation

Tinybird SMT for Kafka Connect

A custom Kafka Connect Single Message Transformation (SMT) that converts plain JSON messages (as produced by Tinybird sinks) into Struct + Schema records required by JDBC Sink connectors and other schema-based consumers.

This enables seamless data ingestion from Tinybird → Kafka → Postgres/MySQL/etc. without requiring Schema Registry, Kafka Streams, or format conversions upstream.


🧩 Architecture

flowchart LR
  subgraph Tinybird
    TB["Pipe Sink - JSON Output"]
  end

  subgraph Kafka["Kafka Cluster"]
    T["cdp_dashboard_metrics_total_sink
- plain JSON"]
  end

  subgraph Connect["Kafka Connect"]
    JC["JsonConverter
(schemas.enable=false)"]
    SMT["TinybirdJsonToStruct
(Custom SMT)"]
    JDBC["JdbcSinkConnector"]
  end

  subgraph DB["PostgreSQL"]
    PG["dashboardMetricsTotalSnapshot
(upsert on id='snapshot')"]
  end

  TB --> T
  T --> JC --> SMT --> JDBC --> PG
Loading

✨ Why this exists

Tinybird publishes schemaless JSON into Kafka. But JDBC Sink and other schema-aware connectors require a typed schema.

➡️ Without a schema, JDBC Sink fails to insert into DB ➡️ This SMT injects schema dynamically inside Connect, no changes required in Tinybird

🔧 Features

Converts schemaless JSON Map into Connect Struct

Dynamically builds a schema from connector configuration

Supported types:

  • int64,
  • int32,
  • string,
  • boolean,
  • float64,
  • float32,
  • timestamp

No Schema Registry required

📦 Installation

Build the JAR:

mvn clean package

this will generate a jar file at the path:

target/tinybird-smt-1.0.1.jar

once you have a jar file you should place it into the docker folder:

crowd.dev/scripts/scaffold/kafka-connect/tmp/custom-plugins

update your docker file to use the new version, add for example the following line:

COPY tmp/custom-plugins/tinybird-smt-1.0.1.jar /usr/share/java/tinybird-smt-1.0.1.jar

you can now build tag and push your docker image:

docker buildx build --platform linux/amd64 -t sjc.ocir.io/axbydjxa5zuh/kafka-connect:1765363311 --push .

once you have the new image you can update the kafka-connet deployment accordinly at the path:

k8s/infra/kafka-connect.yaml

⚙️ Configuration Example (JDBC Sink)

# JSON input from Tinybird
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

# Apply Tinybird SMT
transforms=AddSchema
transforms.AddSchema.type=com.crowddev.connect.transform.TinybirdJsonToStruct

# Schema name in Kafka Connect
transforms.AddSchema.schema.name=dashboardMetricsTotalSnapshot

# Field mapping
transforms.AddSchema.fields=\
  activitiesTotal:int64,\
  activitiesLast30Days:int64,\
  organizationsTotal:int64,\
  organizationsLast30Days:int64,\
  membersTotal:int64,\
  membersLast30Days:int64,\
  updatedAt:timestamp

# Static PK for upsert
transforms.AddSchema.pk.field=id
transforms.AddSchema.pk.value=snapshot

# JDBC Sink UPSERT
insert.mode=upsert
pk.mode=record_value
pk.fields=id

🏗️ Development

Requirements:

  • Java 17
  • Maven

Build:

mvn clean package

output:

target/tinybird-smt-1.0.0.jar***

About

Tinybird SMT for Kafka Connect A lightweight custom Single Message Transformation (SMT) for Kafka Connect that maps schemaless JSON messages produced by Tinybird into Kafka Connect Struct records with an explicit schema. This allows JDBC Sink connectors to correctly upsert data into databases such as PostgreSQL.

Resources

Stars

Watchers

Forks

Packages

No packages published

Languages