Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,48 @@ public void testRemoveDanglingTxnWithOpenTxnOnSourceAndDanglingTxnOnDR() throws
}
}

@Test
public void testClearDanglingTxnRunsOnlyAfterFinalIncrementalRound() throws Throwable {
List<String> withClauseList = Arrays.asList(
"'" + HiveConf.ConfVars.HIVE_REPL_CLEAR_DANGLING_TXNS_ON_TARGET + "'='true'",
"'" + HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS.varname + "'='1'");
String insertStmt = "insert into sales_transactional partition(country) values "
+ "(102, 'Phone', 800.00, '2026-02-11 11:30:00', 'Canada'),"
+ "(103, 'Tablet', 450.00, '2026-02-11 12:15:00', 'USA'),"
+ "(104, 'Monitor', 300.00, '2026-02-11 14:00:00', 'UK')";

primary.run("use " + primaryDbName)
.run("create table sales_transactional (sale_id int, product_name string, amount decimal(10,2), "
+ "sale_date timestamp) partitioned by (country string) stored as orc "
+ "tblproperties (\"transactional\"=\"true\")")
.run(insertStmt)
.run(insertStmt);

primary.dump(primaryDbName, withClauseList);
replica.load(replicatedDbName, primaryDbName, withClauseList)
.run("use " + replicatedDbName)
.run("select count(*) from sales_transactional")
.verifyResult("6");

primary.run("use " + primaryDbName);
for (int i = 0; i < 12; i++) {
primary.run(insertStmt);
}
primary.run("truncate table sales_transactional");
for (int i = 0; i < 5; i++) {
primary.run(insertStmt);
}

WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, withClauseList);

replica.load(replicatedDbName, primaryDbName, withClauseList)
.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
.verifyResult(incrementalDump.lastReplicationId)
.run("select count(*) from sales_transactional")
.verifyResult("15");
}


private List<Long> getOpenTxnCountFromDump(FileSystem fs, Path openTxnDumpPath) throws IOException {
List<Long> openTxnIds = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -963,15 +963,20 @@ private int executeIncrementalLoad(long loadStartTime) throws Exception {
((IncrementalLoadLogger)work.incrementalLoadTasksBuilder().getReplLogger()).initiateEventTimestamp(currentTimestamp);
LOG.info("REPL_INCREMENTAL_LOAD stage duration : {} ms", currentTimestamp - loadStartTime);

if (conf.getBoolVar(HiveConf.ConfVars.HIVE_REPL_CLEAR_DANGLING_TXNS_ON_TARGET)) {

// Clear dangling transactions only once all incremental work for this dump is exhausted.
// Running this in intermediate rounds can remove source->target txn mappings that later
// rounds still depend on for write-id replay.
boolean hasPendingIncrementalWork = builder.hasMoreWork() || work.hasBootstrapLoadTasks();
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_REPL_CLEAR_DANGLING_TXNS_ON_TARGET) && !hasPendingIncrementalWork) {
ClearDanglingTxnWork clearDanglingTxnWork = new ClearDanglingTxnWork(work.getDumpDirectory(), targetDb.getName());
Task<ClearDanglingTxnWork> clearDanglingTxnTaskTask = TaskFactory.get(clearDanglingTxnWork, conf);
if (childTasks.isEmpty()) {
childTasks.add(clearDanglingTxnTaskTask);
} else {
DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(Collections.singletonList(clearDanglingTxnTaskTask)));
}
} else if (conf.getBoolVar(HiveConf.ConfVars.HIVE_REPL_CLEAR_DANGLING_TXNS_ON_TARGET)) {
LOG.info("Skipping dangling transaction cleanup in this iteration as incremental load has pending work.");
}

return 0;
Expand Down