Skip to content

Multi-DB sink with enableDbTopicSplit fails to insert into existing tables #580

@monometa

Description

@monometa

Describe the bug

When using enableDbTopicSplit=true with dbTopicSplitChar="__", the connector fails to insert into existing tables if more than one tenant/topic is configured.

Each tenant has its own schema (database) in ClickHouse Cloud, with an identical table telemetry.
Example: tenant_A.telemetry and tenant_B.telemetry.

With a single topic/tenant, everything works. When adding a second tenant, the connector either crashes (suppressTableExistenceException=false) or silently drops records (suppressTableExistenceException=true), even though the tables exist and are queryable.

if the global database config is set to tenant_A, then the connector fails on tenant_B (reports table does not exist).
If the global database config is set to tenant_B, then it fails on tenant_A.
So only one tenant works at a time, never both together.

Steps to reproduce

  1. Create two databases in ClickHouse: tenant_A and tenant_B.
  2. In both databases create a table telemetry, e.g.:
    CREATE TABLE tenant_A.telemetry
    (
        event_id String,
        event_date String
    )
    ENGINE = MergeTree
    ORDER BY event_date;
    
    CREATE TABLE tenant_B.telemetry
    (
        event_id String,
        event_date String
    )
    ENGINE = MergeTree
    ORDER BY event_date;
  3. Configure the connector with enableDbTopicSplit=true, dbTopicSplitChar="__", and topics:
    tenant_A__telemetry, tenant_B__telemetry
    
  4. Produce messages to both topics.

Expected behaviour

The connector should recognize tenant_A__telemetry as db=tenant_A, table=telemetry and
tenant_B__telemetry as db=tenant_B, table=telemetry.
Records should be inserted into the respective tables without errors or silent drops.

Error log

With suppressTableExistenceException=false:

Caused by: java.lang.RuntimeException: Table `tenant_A`.`telemetry` does not exist
...
Caused by: java.lang.RuntimeException: Table `tenant_B`.`telemetry` does not exist

With suppressTableExistenceException=true:

WARN Table [`tenant_A`.`telemetry`] does not exist, but error was suppressed.
INFO Put records: 2

But no records appear in the table.

Configuration

Environment

  • Kafka-Connect version: 1.3.1 (ClickHouse Kafka Connect)
  • Kafka Connect configuration:
    {
      "connector.class": "com.clickhouse.kafka.connect.ClickHouseSinkConnector",
      "hostname": "<clickhouse-cloud-host>",
      "port": "8443",
      "ssl": "true",
      "username": "default",
      "password": "*****",
      "database": "tenant_A",
      "enableDbTopicSplit": "true",
      "dbTopicSplitChar": "__",
      "topics": "tenant_A__telemetry,tenant_B__telemetry",
      "tasks.max": "1",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable": "false",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
  • Kafka version: AWS MSK (MSK IAM authentication)
  • Kafka environment: AWS MSK + MSK Connect (3.7.x)
  • OS: AWS MSK managed connector worker environment

ClickHouse server

  • ClickHouse Server version: 25.6.2.5971 (ClickHouse Cloud)
  • ClickHouse Server non-default settings, if any: none
  • CREATE TABLE statements for tables involved: (see above in Steps to reproduce)
  • Sample data for these tables:
    {"event_id": "12345", "event_date": "2023-10-01"}
    {"event_id": "67890", "event_date": "2023-10-02"}
    

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions