Skip to content

Conversation

chernser
Copy link
Contributor

Summary

  • Fixes grouping by database when db topic split enabled
  • Fixes recording correct database in RecordConverter
  • Adds integration test for the feature

Closes: #580

Checklist

Delete items not relevant to your PR:

  • Unit and integration tests covering the common scenarios were added
  • A human-readable description of the changes was provided to include in CHANGELOG
  • For significant changes, documentation in https://github.com/ClickHouse/clickhouse-docs was updated with further explanations or tutorials

@chernser chernser requested review from mzitnik and Copilot September 12, 2025 18:45
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes grouping by database when database topic splitting is enabled and ensures the correct database is recorded in the RecordConverter.

  • Fixes grouping logic in ProxySinkTask to group by database instead of topic when database topic splitting is enabled
  • Adds helper methods for creating/dropping databases and counting rows with specific database names
  • Includes an integration test to verify the database topic splitting functionality

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
ClickHouseTestHelpers.java Adds database creation/dropping utilities and proper resource cleanup for queries
RecordConvertorTest.java Adds unit test for database topic splitting functionality
ClickHouseSinkTaskWithSchemaTest.java Adds integration test for database topic splitting across multiple tenants
ClickHouseHelperClient.java Updates table extraction methods to use correct database parameter
ProxySinkTask.java Fixes grouping logic to use database as key when database topic splitting is enabled
Comments suppressed due to low confidence (1)

src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java:422

  • Missing proper indentation. The closing brace should be properly aligned with the rest of the code block.
                    table.addColumn(column);                }

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

import com.clickhouse.kafka.connect.sink.db.mapping.Type;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import jdk.dynalink.Operation;
Copy link
Preview

Copilot AI Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import is unused and appears to be accidentally added. The jdk.dynalink.Operation class is not used anywhere in this file.

Suggested change
import jdk.dynalink.Operation;

Copilot uses AI. Check for mistakes.

import com.clickhouse.client.api.query.QuerySettings;
import com.clickhouse.client.api.query.Records;
import com.clickhouse.data.ClickHouseFormat;
import com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig;
Copy link
Preview

Copilot AI Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import is unused. The ClickHouseSinkConfig class is not referenced anywhere in this file.

Suggested change
import com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig;

Copilot uses AI. Check for mistakes.

String[] parts = topic.split(Pattern.quote(dbTopicSeparatorChar));
String actualDatabase = parts[0];
String actualTopic = parts[1];
System.out.println("actual_topic: " + actualTopic);
Copy link
Preview

Copilot AI Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using System.out.println for debugging in tests is not recommended. Consider using a proper logger or removing this debug statement entirely.

Suggested change
System.out.println("actual_topic: " + actualTopic);

Copilot uses AI. Check for mistakes.

}
chst.stop();
// after the second insert we have exactly sr.size() records
assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic));assertTrue(ClickHouseTestHelpers.validateRows(chc, topic, sr));
Copy link
Preview

Copilot AI Sep 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing semicolon or line break between the two assertions. This should be split into two separate statements for proper formatting and readability.

Suggested change
assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic));assertTrue(ClickHouseTestHelpers.validateRows(chc, topic, sr));
assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic));
assertTrue(ClickHouseTestHelpers.validateRows(chc, topic, sr));

Copilot uses AI. Check for mistakes.

@chernser chernser marked this pull request as draft September 13, 2025 05:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Multi-DB sink with enableDbTopicSplit fails to insert into existing tables
1 participant