A high-performance, modular ingestion pipeline for healthcare order and claim files. Built in Go, this system enables secure file uploads, schema validation, and reliable streaming to Kafka for downstream analytics and processing.
- Upload via Azure Blob Storage or SFTP
- Per-customer Avro schema validation
- Kafka publishing with rich metadata headers
- Configurable via PostgreSQL
- Structured logging and tracing (Datadog, OpenTelemetry)
- Audit trail for every file processed
- Local development with Docker Compose
flowchart TD
Customer[Customer]
Blob[Azure Blob Storage]
EventGrid[Azure Event Grid]
Validator[Go Validator Function]
ConfigDB[Customer Config DB]
SchemaRegistry[Confluent Schema Registry]
Kafka[Kafka]
Downstream[Downstream Services]
Audit[Audit Logs / Metrics]
Customer --> Blob
Blob --> EventGrid
EventGrid --> Validator
Validator --> Blob
Validator --> ConfigDB
Validator --> SchemaRegistry
Validator --> Kafka
Validator --> Audit
Kafka --> Downstream
- Azure Blob Storage: Each customer uploads files to their own container.
- Event Grid: Triggers the Go validator function on new file uploads.
- Go Validator Function: Downloads, validates, and streams records to Kafka.
- Confluent Schema Registry: Stores and serves Avro schemas.
- Kafka: Decouples ingestion from downstream consumers.
- PostgreSQL: Stores customer configs and audit logs.
- Docker & Docker Compose
- Azure CLI (for local blob upload)
- Go 1.24+ (for local builds)
-
Clone the repo
-
Start all services:
make build make start
-
Running tests
make test-small
or
make test-large
or
make test-all
-
The consumer runs automatically but and logs the messages to the console after deserialization. To inspect the consumer navigate here:
cd cmd/consumer/main.go
This builds the validator, starts all containers, uploads a sample file, and prints logs.
cmd/function/
- Main entrypoint for the validator servicecmd/consumer/
- Kafka Avro consumer for testinginternal/services/
- Core validation, conversion, and processing logicinternal/adapters/
- Integrations for storage, Kafka, schema registry, configinternal/domain/
- Domain models and error typestest-files/
- Test scripts, sample data, and schema registrationschemas/
- Avro schema definitions
Set environment variables (see docker-compose.yml
for examples):
-
AzureWebJobsStorage
-
DB_CONNECTION_STRING
-
KAFKA_BOOTSTRAP_SERVERS
-
ENVIRONMENT
(set tolocal
for local dev) -
The system expects CSV files to match the schema and validation profile for each customer.
-
All Avro messages use the Confluent wire format.
-
For local development, Azurite is used to emulate Azure Blob Storage.
- Customer uploads a CSV to their Azure Blob container.
- Event Grid triggers the Go validator.
- Validator:
- Loads customer config and Avro schema
- Validates and encodes each record
- Publishes to Kafka with headers (
x-customer-id
,x-schema-version
, etc.) - Logs audit info to Postgres
- Downstream consumers read from Kafka topics.
- Add new file types or schemas by updating the config DB and schema registry.
- Add new downstream consumers by subscribing to Kafka topics.