Skip to content

Commit

Permalink
ChangeFeedFixForDuplicateProcessing (Azure#20085)
Browse files Browse the repository at this point in the history
* Change feed fix for duplicate processing on same processor

Co-authored-by: annie-mac <annie-mac@MININT-ST2TT53.redmond.corp.microsoft.com>
Co-authored-by: annie-mac <annie-mac@TT-V-ROPILA-WS5.redmond.corp.microsoft.com>
  • Loading branch information
3 people authored Mar 30, 2021
1 parent e2bd563 commit fd6b1bc
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,9 @@ public interface PartitionSupervisor {
* @return the inner exception if any, otherwise null.
*/
RuntimeException getResultException();

/**
* Close partition supervisor and cancel all internal jobs.
*/
void shutdown();
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,56 +109,64 @@ private Mono<Void> loadLeases() {
}

private Mono<Void> removeLease(Lease lease) {
if (this.currentlyOwnedPartitions.get(lease.getLeaseToken()) != null) {
return Mono.just(this)
.flatMap(value -> {
WorkerTask workerTask = this.currentlyOwnedPartitions.remove(lease.getLeaseToken());

if (workerTask.isRunning()) {
workerTask.interrupt();
if (workerTask != null && workerTask.isRunning()) {
workerTask.cancelJob();
}

logger.info("Partition {}: released.", lease.getLeaseToken());
}

return this.leaseManager.release(lease)
.onErrorResume(e -> {
if (e instanceof LeaseLostException) {
logger.warn("Partition {}: lease already removed.", lease.getLeaseToken());
} else {
logger.warn("Partition {}: failed to remove lease.", lease.getLeaseToken(), e);
}

return Mono.empty();
})
.doOnSuccess(aVoid -> {
logger.info("Partition {}: successfully removed lease.", lease.getLeaseToken());
});
return this.leaseManager.release(lease);
})
.onErrorResume(e -> {
if (e instanceof LeaseLostException) {
logger.warn("Partition {}: lease already removed.", lease.getLeaseToken());
} else {
logger.warn("Partition {}: failed to remove lease.", lease.getLeaseToken(), e);
}

return Mono.empty();
})
.doOnSuccess(aVoid -> {
logger.info("Partition {}: successfully removed lease.", lease.getLeaseToken());
});
}

private WorkerTask processPartition(PartitionSupervisor partitionSupervisor, Lease lease) {
CancellationToken cancellationToken = this.shutdownCts.getToken();

WorkerTask partitionSupervisorTask = new WorkerTask(lease, () -> {
partitionSupervisor.run(cancellationToken)
.onErrorResume(throwable -> {
if (throwable instanceof PartitionSplitException) {
PartitionSplitException ex = (PartitionSplitException) throwable;
return this.handleSplit(lease, ex.getLastContinuation());
} else if (throwable instanceof TaskCancelledException) {
logger.debug("Partition {}: processing canceled.", lease.getLeaseToken());
} else {
logger.warn("Partition {}: processing failed.", lease.getLeaseToken(), throwable);
}
CancellationToken shutdownToken = this.shutdownCts.getToken();

return Mono.empty();
})
.then(this.removeLease(lease)).subscribe();
});
WorkerTask partitionSupervisorTask =
new WorkerTask(
lease,
partitionSupervisor,
getWorkerJob(partitionSupervisor, lease, shutdownToken));

this.scheduler.schedule(partitionSupervisorTask);

return partitionSupervisorTask;
}

private Mono<Void> getWorkerJob(
PartitionSupervisor partitionSupervisor,
Lease lease,
CancellationToken shutdownToken) {
return partitionSupervisor.run(shutdownToken)
.onErrorResume(throwable -> {
if (throwable instanceof PartitionSplitException) {
PartitionSplitException ex = (PartitionSplitException) throwable;
return this.handleSplit(lease, ex.getLastContinuation());
} else if (throwable instanceof TaskCancelledException) {
logger.debug("Partition {}: processing canceled.", lease.getLeaseToken());
} else {
logger.warn("Partition {}: processing failed.", lease.getLeaseToken(), throwable);
}

return Mono.empty();
})
.then(this.removeLease(lease));
}

private Mono<Void> handleSplit(Lease lease, String lastContinuationToken) {
lease.setContinuationToken(lastContinuationToken);
return this.synchronizer.splitPartition(lease)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,17 @@
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;

/**
* Implementation for {@link PartitionSupervisor}.
*/
class PartitionSupervisorImpl implements PartitionSupervisor, Closeable {
class PartitionSupervisorImpl implements PartitionSupervisor {
private final Lease lease;
private final ChangeFeedObserver observer;
private final PartitionProcessor processor;
private final LeaseRenewer renewer;
private CancellationTokenSource renewerCancellation;
private CancellationTokenSource processorCancellation;
private CancellationTokenSource childShutdownCts;

private volatile RuntimeException resultException;

Expand All @@ -49,6 +46,8 @@ public PartitionSupervisorImpl(Lease lease, ChangeFeedObserver observer, Partiti
if (scheduler == null) {
this.scheduler = Schedulers.boundedElastic();
}

this.childShutdownCts = new CancellationTokenSource();
}

@Override
Expand All @@ -59,14 +58,10 @@ public Mono<Void> run(CancellationToken shutdownToken) {

this.observer.open(context);

this.processorCancellation = new CancellationTokenSource();

this.scheduler.schedule(() -> this.processor.run(this.processorCancellation.getToken())
this.scheduler.schedule(() -> this.processor.run(this.childShutdownCts.getToken())
.subscribe());

this.renewerCancellation = new CancellationTokenSource();

this.scheduler.schedule(() -> this.renewer.run(this.renewerCancellation.getToken())
this.scheduler.schedule(() -> this.renewer.run(this.childShutdownCts.getToken())
.subscribe());

return Mono.just(this)
Expand All @@ -81,8 +76,7 @@ private Mono<Void> afterRun(ChangeFeedObserverContext context, CancellationToken

try {

this.processorCancellation.cancel();
this.renewerCancellation.cancel();
this.childShutdownCts.cancel();

if (this.processor.getResultException() != null) {
throw this.processor.getResultException();
Expand Down Expand Up @@ -127,11 +121,9 @@ public RuntimeException getResultException() {
}

@Override
public void close() throws IOException {
if (this.processorCancellation != null) {
this.processorCancellation.close();
public void shutdown() {
if (this.childShutdownCts != null) {
this.childShutdownCts.cancel();
}

this.renewerCancellation.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,52 @@
package com.azure.cosmos.implementation.changefeed.implementation;

import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.PartitionSupervisor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* Worker task that executes in a separate thread.
*/
class WorkerTask extends Thread {
private final Logger logger = LoggerFactory.getLogger(WorkerTask.class);
private boolean done = false;
private Runnable job;
private AtomicBoolean done;
private Mono<Void> job;
private Lease lease;
private PartitionSupervisor partitionSupervisor;

WorkerTask(Lease lease, Runnable job) {
WorkerTask(Lease lease, PartitionSupervisor partitionSupervisor, Mono<Void> job) {
this.lease = lease;
this.job = job;
this.partitionSupervisor = partitionSupervisor;
done = new AtomicBoolean(false);
}

@Override
public void run() {
try {
job.run();
logger.info("Partition controller worker task {} has finished running.", lease.getLeaseToken());
} finally {
logger.info("Partition controller worker task {} has exited.", lease.getLeaseToken());
job = null;
this.done = true;
}
job
.doOnSuccess(avoid -> logger.info("Partition controller worker task {} has finished running.", lease.getLeaseToken()))
.doOnTerminate(() -> {
logger.info("Partition controller worker task {} has exited.", lease.getLeaseToken());
job = null;
this.done.set(true);
})
.subscribe();
}

public void cancelJob() {
this.partitionSupervisor.shutdown();
this.interrupt();
}

public Lease lease() {
return this.lease;
}

public boolean isRunning() {
return !this.done;
return !this.done.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,7 @@ public void readFeedDocumentsAfterSplit() throws InterruptedException {
int leaseCount = changeFeedProcessor.getCurrentState() .map(List::size).block();
assertThat(leaseCount > 1).as("Found %d leases", leaseCount).isTrue();

assertThat(receivedDocuments.size()).isEqualTo(createdDocuments.size());
for (InternalObjectNode item : createdDocuments) {
assertThat(receivedDocuments.containsKey(item.getId())).as("Document with getId: " + item.getId()).isTrue();
}
Expand Down

0 comments on commit fd6b1bc

Please sign in to comment.