Skip to content

Commit c9b762c

Browse files
committed
CoalescingMergeTree
1 parent 1e94bcd commit c9b762c

File tree

5 files changed

+56
-18
lines changed

5 files changed

+56
-18
lines changed

flow/connectors/clickhouse/normalize.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,16 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable(
118118
} else {
119119
engine = "MergeTree()"
120120
}
121+
case protos.TableEngine_CH_ENGINE_COALESCING_MERGE_TREE:
122+
if c.config.Replicated {
123+
engine = fmt.Sprintf(
124+
"ReplicatedCoalescingMergeTree('%s%s','{replica}')",
125+
zooPathPrefix,
126+
peerdb_clickhouse.EscapeStr(tableIdentifier),
127+
)
128+
} else {
129+
engine = "CoalescingMergeTree()"
130+
}
121131
case protos.TableEngine_CH_ENGINE_NULL:
122132
engine = "Null"
123133
}

flow/e2e/clickhouse/peer_flow_ch_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2102,6 +2102,44 @@ func (s ClickHouseSuite) Test_NullEngine() {
21022102
e2e.RequireEnvCanceled(s.t, env)
21032103
}
21042104

2105+
func (s ClickHouseSuite) Test_CoalescingEngine() {
2106+
if _, ok := s.source.(*e2e.PostgresSource); !ok {
2107+
s.t.Skip("relies on random_string UDF")
2108+
}
2109+
2110+
srcTableName := "test_coalescing"
2111+
srcFullName := s.attachSchemaSuffix(srcTableName)
2112+
dstTableName := "test_coalescing"
2113+
2114+
require.NoError(s.t, s.source.Exec(s.t.Context(),
2115+
fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (id INT PRIMARY KEY, num INT, val TEXT)`, srcFullName)))
2116+
2117+
connectionGen := e2e.FlowConnectionGenerationConfig{
2118+
FlowJobName: s.attachSuffix("clickhouse_nullengine"),
2119+
TableMappings: []*protos.TableMapping{{
2120+
SourceTableIdentifier: srcFullName,
2121+
DestinationTableIdentifier: dstTableName,
2122+
Engine: protos.TableEngine_CH_ENGINE_COALESCING_MERGE_TREE,
2123+
ShardingKey: "id",
2124+
}},
2125+
Destination: s.Peer().Name,
2126+
}
2127+
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
2128+
tc := e2e.NewTemporalClient(s.t)
2129+
env := e2e.ExecutePeerflow(s.t.Context(), tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
2130+
e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)
2131+
2132+
// test toast
2133+
require.NoError(s.t, s.source.Exec(s.t.Context(),
2134+
fmt.Sprintf(`INSERT INTO %s (id,num,val) VALUES (0,0,random_string(9000))`, srcFullName)))
2135+
e2e.EnvWaitForEqualTablesWithNames(env, s, "insert", srcTableName, dstTableName, "id,num,val")
2136+
require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(`UPDATE %s SET num = 1`, srcFullName)))
2137+
e2e.EnvWaitForEqualTablesWithNames(env, s, "update", srcTableName, dstTableName, "id,num,val")
2138+
2139+
env.Cancel(s.t.Context())
2140+
e2e.RequireEnvCanceled(s.t, env)
2141+
}
2142+
21052143
func (s ClickHouseSuite) Test_Partition_Key_Integer() {
21062144
srcTableName := "test_partition_key_integer"
21072145
srcFullName := s.attachSchemaSuffix(srcTableName)

flow/e2e/postgres/peer_flow_pg_test.go

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() {
304304
tc := e2e.NewTemporalClient(s.t)
305305

306306
srcTableName := s.attachSchemaSuffix("test_cpkey_toast1")
307-
randomString := s.attachSchemaSuffix("random_string")
308307
dstTableName := s.attachSchemaSuffix("test_cpkey_toast1_dst")
309308

310309
_, err := s.Conn().Exec(s.t.Context(), fmt.Sprintf(`
@@ -315,11 +314,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() {
315314
t TEXT,
316315
t2 TEXT,
317316
PRIMARY KEY(id,t)
318-
);CREATE OR REPLACE FUNCTION %s( int ) RETURNS TEXT as $$
319-
SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz',
320-
round(random() * 30)::integer, 1), '') FROM generate_series(1, $1);
321-
$$ language sql;
322-
`, srcTableName, randomString))
317+
);`, srcTableName))
323318
require.NoError(s.t, err)
324319

325320
connectionGen := e2e.FlowConnectionGenerationConfig{
@@ -341,9 +336,8 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_1_PG() {
341336
// insert 10 rows into the source table
342337
for i := range 10 {
343338
testValue := fmt.Sprintf("test_value_%d", i)
344-
_, err = rowsTx.Exec(s.t.Context(), fmt.Sprintf(`
345-
INSERT INTO %s(c2,t,t2) VALUES ($1,$2,%s(9000))
346-
`, srcTableName, randomString), i, testValue)
339+
_, err = rowsTx.Exec(s.t.Context(), fmt.Sprintf(
340+
`INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000))`, srcTableName), i, testValue)
347341
e2e.EnvNoError(s.t, env, err)
348342
}
349343
s.t.Log("Inserted 10 rows into the source table")
@@ -368,7 +362,6 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() {
368362
tc := e2e.NewTemporalClient(s.t)
369363

370364
srcTableName := s.attachSchemaSuffix("test_cpkey_toast2")
371-
randomString := s.attachSchemaSuffix("random_string")
372365
dstTableName := s.attachSchemaSuffix("test_cpkey_toast2_dst")
373366

374367
_, err := s.Conn().Exec(s.t.Context(), fmt.Sprintf(`
@@ -379,11 +372,7 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() {
379372
t TEXT,
380373
t2 TEXT,
381374
PRIMARY KEY(id,t)
382-
);CREATE OR REPLACE FUNCTION %s( int ) RETURNS TEXT as $$
383-
SELECT string_agg(substring('0123456789bcdfghjkmnpqrstvwxyz',
384-
round(random() * 30)::integer, 1), '') FROM generate_series(1, $1);
385-
$$ language sql;
386-
`, srcTableName, randomString))
375+
);`, srcTableName))
387376
require.NoError(s.t, err)
388377

389378
connectionGen := e2e.FlowConnectionGenerationConfig{
@@ -403,9 +392,8 @@ func (s PeerFlowE2ETestSuitePG) Test_Composite_PKey_Toast_2_PG() {
403392
// insert 10 rows into the source table
404393
for i := range 10 {
405394
testValue := fmt.Sprintf("test_value_%d", i)
406-
_, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(`
407-
INSERT INTO %s(c2,t,t2) VALUES ($1,$2,%s(9000))
408-
`, srcTableName, randomString), i, testValue)
395+
_, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(
396+
`INSERT INTO %s(c2,t,t2) VALUES ($1,$2,random_string(9000))`, srcTableName), i, testValue)
409397
e2e.EnvNoError(s.t, env, err)
410398
}
411399
s.t.Log("Inserted 10 rows into the source table")

protos/flow.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ enum TableEngine {
263263
CH_ENGINE_NULL = 2;
264264
CH_ENGINE_REPLICATED_REPLACING_MERGE_TREE = 3;
265265
CH_ENGINE_REPLICATED_MERGE_TREE = 4;
266+
CH_ENGINE_COALESCING_MERGE_TREE = 5;
266267
}
267268

268269
// protos for qrep

ui/app/mirrors/create/cdc/schemabox.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ export default function SchemaBox({
257257
const engineOptions = [
258258
{ value: 'CH_ENGINE_REPLACING_MERGE_TREE', label: 'ReplacingMergeTree' },
259259
{ value: 'CH_ENGINE_MERGE_TREE', label: 'MergeTree' },
260+
{ value: 'CH_ENGINE_COALESCING_MERGE_TREE', label: 'CoalescingMergeTree' },
260261
{ value: 'CH_ENGINE_NULL', label: 'Null' },
261262
];
262263

0 commit comments

Comments
 (0)