-
Notifications
You must be signed in to change notification settings - Fork 51
fixed splitting records by database from topic name #588
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes grouping by database when database topic splitting is enabled and ensures the correct database is recorded in the RecordConverter.
- Fixes grouping logic in ProxySinkTask to group by database instead of topic when database topic splitting is enabled
- Adds helper methods for creating/dropping databases and counting rows with specific database names
- Includes an integration test to verify the database topic splitting functionality
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.
Show a summary per file
File | Description |
---|---|
ClickHouseTestHelpers.java | Adds database creation/dropping utilities and proper resource cleanup for queries |
RecordConvertorTest.java | Adds unit test for database topic splitting functionality |
ClickHouseSinkTaskWithSchemaTest.java | Adds integration test for database topic splitting across multiple tenants |
ClickHouseHelperClient.java | Updates table extraction methods to use correct database parameter |
ProxySinkTask.java | Fixes grouping logic to use database as key when database topic splitting is enabled |
Comments suppressed due to low confidence (1)
src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java:422
- Missing proper indentation. The closing brace should be properly aligned with the rest of the code block.
table.addColumn(column); }
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
import com.clickhouse.kafka.connect.sink.db.mapping.Type; | ||
import com.google.gson.Gson; | ||
import com.google.gson.reflect.TypeToken; | ||
import jdk.dynalink.Operation; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This import is unused and appears to be accidentally added. The jdk.dynalink.Operation
class is not used anywhere in this file.
import jdk.dynalink.Operation; |
Copilot uses AI. Check for mistakes.
import com.clickhouse.client.api.query.QuerySettings; | ||
import com.clickhouse.client.api.query.Records; | ||
import com.clickhouse.data.ClickHouseFormat; | ||
import com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This import is unused. The ClickHouseSinkConfig
class is not referenced anywhere in this file.
import com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig; |
Copilot uses AI. Check for mistakes.
String[] parts = topic.split(Pattern.quote(dbTopicSeparatorChar)); | ||
String actualDatabase = parts[0]; | ||
String actualTopic = parts[1]; | ||
System.out.println("actual_topic: " + actualTopic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using System.out.println
for debugging in tests is not recommended. Consider using a proper logger or removing this debug statement entirely.
System.out.println("actual_topic: " + actualTopic); |
Copilot uses AI. Check for mistakes.
} | ||
chst.stop(); | ||
// after the second insert we have exactly sr.size() records | ||
assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic));assertTrue(ClickHouseTestHelpers.validateRows(chc, topic, sr)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing semicolon or line break between the two assertions. This should be split into two separate statements for proper formatting and readability.
assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic));assertTrue(ClickHouseTestHelpers.validateRows(chc, topic, sr)); | |
assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic)); | |
assertTrue(ClickHouseTestHelpers.validateRows(chc, topic, sr)); |
Copilot uses AI. Check for mistakes.
Summary
Closes: #580
Checklist
Delete items not relevant to your PR: