Skip to content

Commit 52082bc

Browse files
authored
HBASE-28850 Only return from ReplicationSink.replicationEntries while all background tasks are finished (#6263)
Signed-off-by: Andrew Purtell <apurtell@apache.org>
1 parent 2531f93 commit 52082bc

File tree

1 file changed

+19
-2
lines changed

1 file changed

+19
-2
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -492,16 +492,33 @@ private void batch(TableName tableName, Collection<List<Row>> allRows, int batch
492492
}
493493
futures.addAll(batchRows.stream().map(table::batchAll).collect(Collectors.toList()));
494494
}
495+
// Here we will always wait until all futures are finished, even if there are failures when
496+
// getting from a future in the middle. This is because this method may be called in a rpc call,
497+
// so the batch operations may reference some off heap cells(through CellScanner). If we return
498+
// earlier here, the rpc call may be finished and they will release the off heap cells before
499+
// some of the batch operations finish, and then cause corrupt data or even crash the region
500+
// server. See HBASE-28584 and HBASE-28850 for more details.
501+
IOException error = null;
495502
for (Future<?> future : futures) {
496503
try {
497504
FutureUtils.get(future);
498505
} catch (RetriesExhaustedException e) {
506+
IOException ioe;
499507
if (e.getCause() instanceof TableNotFoundException) {
500-
throw new TableNotFoundException("'" + tableName + "'");
508+
ioe = new TableNotFoundException("'" + tableName + "'");
509+
} else {
510+
ioe = e;
511+
}
512+
if (error == null) {
513+
error = ioe;
514+
} else {
515+
error.addSuppressed(ioe);
501516
}
502-
throw e;
503517
}
504518
}
519+
if (error != null) {
520+
throw error;
521+
}
505522
}
506523

507524
private AsyncClusterConnection getConnection() throws IOException {

0 commit comments

Comments
 (0)