From fd6b1bc9ca285b15bd691e9bc4edb4387cbdf4d0 Mon Sep 17 00:00:00 2001 From: Annie Liang <64233642+xinlian12@users.noreply.github.com> Date: Tue, 30 Mar 2021 08:30:13 -0700 Subject: [PATCH] ChangeFeedFixForDuplicateProcessing (#20085) * Change feed fix for duplicate processing on same processor Co-authored-by: annie-mac Co-authored-by: annie-mac --- .../changefeed/PartitionSupervisor.java | 5 ++ .../PartitionControllerImpl.java | 80 ++++++++++--------- .../PartitionSupervisorImpl.java | 28 +++---- .../changefeed/implementation/WorkerTask.java | 36 ++++++--- .../cosmos/rx/ChangeFeedProcessorTest.java | 1 + 5 files changed, 84 insertions(+), 66 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/PartitionSupervisor.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/PartitionSupervisor.java index ad47d1649e40..7e2e16ae173f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/PartitionSupervisor.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/PartitionSupervisor.java @@ -20,4 +20,9 @@ public interface PartitionSupervisor { * @return the inner exception if any, otherwise null. */ RuntimeException getResultException(); + + /** + * Close partition supervisor and cancel all internal jobs. + */ + void shutdown(); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionControllerImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionControllerImpl.java index fa70ac98309d..7ce02d983e2b 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionControllerImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionControllerImpl.java @@ -109,56 +109,64 @@ private Mono loadLeases() { } private Mono removeLease(Lease lease) { - if (this.currentlyOwnedPartitions.get(lease.getLeaseToken()) != null) { + return Mono.just(this) + .flatMap(value -> { WorkerTask workerTask = this.currentlyOwnedPartitions.remove(lease.getLeaseToken()); - - if (workerTask.isRunning()) { - workerTask.interrupt(); + if (workerTask != null && workerTask.isRunning()) { + workerTask.cancelJob(); } - logger.info("Partition {}: released.", lease.getLeaseToken()); - } - - return this.leaseManager.release(lease) - .onErrorResume(e -> { - if (e instanceof LeaseLostException) { - logger.warn("Partition {}: lease already removed.", lease.getLeaseToken()); - } else { - logger.warn("Partition {}: failed to remove lease.", lease.getLeaseToken(), e); - } - return Mono.empty(); - }) - .doOnSuccess(aVoid -> { - logger.info("Partition {}: successfully removed lease.", lease.getLeaseToken()); - }); + return this.leaseManager.release(lease); + }) + .onErrorResume(e -> { + if (e instanceof LeaseLostException) { + logger.warn("Partition {}: lease already removed.", lease.getLeaseToken()); + } else { + logger.warn("Partition {}: failed to remove lease.", lease.getLeaseToken(), e); + } + + return Mono.empty(); + }) + .doOnSuccess(aVoid -> { + logger.info("Partition {}: successfully removed lease.", lease.getLeaseToken()); + }); } private WorkerTask processPartition(PartitionSupervisor partitionSupervisor, Lease lease) { - CancellationToken cancellationToken = this.shutdownCts.getToken(); - - WorkerTask partitionSupervisorTask = new WorkerTask(lease, () -> { - partitionSupervisor.run(cancellationToken) - .onErrorResume(throwable -> { - if (throwable instanceof PartitionSplitException) { - PartitionSplitException ex = (PartitionSplitException) throwable; - return this.handleSplit(lease, ex.getLastContinuation()); - } else if (throwable instanceof TaskCancelledException) { - logger.debug("Partition {}: processing canceled.", lease.getLeaseToken()); - } else { - logger.warn("Partition {}: processing failed.", lease.getLeaseToken(), throwable); - } + CancellationToken shutdownToken = this.shutdownCts.getToken(); - return Mono.empty(); - }) - .then(this.removeLease(lease)).subscribe(); - }); + WorkerTask partitionSupervisorTask = + new WorkerTask( + lease, + partitionSupervisor, + getWorkerJob(partitionSupervisor, lease, shutdownToken)); this.scheduler.schedule(partitionSupervisorTask); return partitionSupervisorTask; } + private Mono getWorkerJob( + PartitionSupervisor partitionSupervisor, + Lease lease, + CancellationToken shutdownToken) { + return partitionSupervisor.run(shutdownToken) + .onErrorResume(throwable -> { + if (throwable instanceof PartitionSplitException) { + PartitionSplitException ex = (PartitionSplitException) throwable; + return this.handleSplit(lease, ex.getLastContinuation()); + } else if (throwable instanceof TaskCancelledException) { + logger.debug("Partition {}: processing canceled.", lease.getLeaseToken()); + } else { + logger.warn("Partition {}: processing failed.", lease.getLeaseToken(), throwable); + } + + return Mono.empty(); + }) + .then(this.removeLease(lease)); + } + private Mono handleSplit(Lease lease, String lastContinuationToken) { lease.setContinuationToken(lastContinuationToken); return this.synchronizer.splitPartition(lease) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionSupervisorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionSupervisorImpl.java index 3871baafeadd..31827d6dc0b4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionSupervisorImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/PartitionSupervisorImpl.java @@ -20,20 +20,17 @@ import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; -import java.io.Closeable; -import java.io.IOException; import java.time.Duration; /** * Implementation for {@link PartitionSupervisor}. */ -class PartitionSupervisorImpl implements PartitionSupervisor, Closeable { +class PartitionSupervisorImpl implements PartitionSupervisor { private final Lease lease; private final ChangeFeedObserver observer; private final PartitionProcessor processor; private final LeaseRenewer renewer; - private CancellationTokenSource renewerCancellation; - private CancellationTokenSource processorCancellation; + private CancellationTokenSource childShutdownCts; private volatile RuntimeException resultException; @@ -49,6 +46,8 @@ public PartitionSupervisorImpl(Lease lease, ChangeFeedObserver observer, Partiti if (scheduler == null) { this.scheduler = Schedulers.boundedElastic(); } + + this.childShutdownCts = new CancellationTokenSource(); } @Override @@ -59,14 +58,10 @@ public Mono run(CancellationToken shutdownToken) { this.observer.open(context); - this.processorCancellation = new CancellationTokenSource(); - - this.scheduler.schedule(() -> this.processor.run(this.processorCancellation.getToken()) + this.scheduler.schedule(() -> this.processor.run(this.childShutdownCts.getToken()) .subscribe()); - this.renewerCancellation = new CancellationTokenSource(); - - this.scheduler.schedule(() -> this.renewer.run(this.renewerCancellation.getToken()) + this.scheduler.schedule(() -> this.renewer.run(this.childShutdownCts.getToken()) .subscribe()); return Mono.just(this) @@ -81,8 +76,7 @@ private Mono afterRun(ChangeFeedObserverContext context, CancellationToken try { - this.processorCancellation.cancel(); - this.renewerCancellation.cancel(); + this.childShutdownCts.cancel(); if (this.processor.getResultException() != null) { throw this.processor.getResultException(); @@ -127,11 +121,9 @@ public RuntimeException getResultException() { } @Override - public void close() throws IOException { - if (this.processorCancellation != null) { - this.processorCancellation.close(); + public void shutdown() { + if (this.childShutdownCts != null) { + this.childShutdownCts.cancel(); } - - this.renewerCancellation.close(); } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/WorkerTask.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/WorkerTask.java index 611e2b2ccc81..f937168265f5 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/WorkerTask.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/implementation/WorkerTask.java @@ -3,33 +3,45 @@ package com.azure.cosmos.implementation.changefeed.implementation; import com.azure.cosmos.implementation.changefeed.Lease; +import com.azure.cosmos.implementation.changefeed.PartitionSupervisor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; + +import java.util.concurrent.atomic.AtomicBoolean; /** * Worker task that executes in a separate thread. */ class WorkerTask extends Thread { private final Logger logger = LoggerFactory.getLogger(WorkerTask.class); - private boolean done = false; - private Runnable job; + private AtomicBoolean done; + private Mono job; private Lease lease; + private PartitionSupervisor partitionSupervisor; - WorkerTask(Lease lease, Runnable job) { + WorkerTask(Lease lease, PartitionSupervisor partitionSupervisor, Mono job) { this.lease = lease; this.job = job; + this.partitionSupervisor = partitionSupervisor; + done = new AtomicBoolean(false); } @Override public void run() { - try { - job.run(); - logger.info("Partition controller worker task {} has finished running.", lease.getLeaseToken()); - } finally { - logger.info("Partition controller worker task {} has exited.", lease.getLeaseToken()); - job = null; - this.done = true; - } + job + .doOnSuccess(avoid -> logger.info("Partition controller worker task {} has finished running.", lease.getLeaseToken())) + .doOnTerminate(() -> { + logger.info("Partition controller worker task {} has exited.", lease.getLeaseToken()); + job = null; + this.done.set(true); + }) + .subscribe(); + } + + public void cancelJob() { + this.partitionSupervisor.shutdown(); + this.interrupt(); } public Lease lease() { @@ -37,6 +49,6 @@ public Lease lease() { } public boolean isRunning() { - return !this.done; + return !this.done.get(); } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java index 618d788c7b01..8aaa76b539e2 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ChangeFeedProcessorTest.java @@ -605,6 +605,7 @@ public void readFeedDocumentsAfterSplit() throws InterruptedException { int leaseCount = changeFeedProcessor.getCurrentState() .map(List::size).block(); assertThat(leaseCount > 1).as("Found %d leases", leaseCount).isTrue(); + assertThat(receivedDocuments.size()).isEqualTo(createdDocuments.size()); for (InternalObjectNode item : createdDocuments) { assertThat(receivedDocuments.containsKey(item.getId())).as("Document with getId: " + item.getId()).isTrue(); }