Skip to content

MLE-14416 Added info logging for start/stop in DMSDK batchers. #1665

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@

import com.marklogic.client.DatabaseClient;
import com.marklogic.client.datamovement.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;

public abstract class BatcherImpl implements Batcher {

private final Logger logger = LoggerFactory.getLogger(getClass());

private String jobName = "unnamed";
private String jobId = null;
private int batchSize = 100;
Expand All @@ -31,6 +36,7 @@ public abstract class BatcherImpl implements Batcher {
private JobTicket jobTicket;
private Calendar jobStartTime;
private Calendar jobEndTime;

private final AtomicBoolean stopped = new AtomicBoolean(false);
private final AtomicBoolean started = new AtomicBoolean(false);

Expand Down Expand Up @@ -136,19 +142,32 @@ void setJobEndTime() {
this.jobEndTime = Calendar.getInstance();
}

AtomicBoolean getStarted() {
return this.started;
}
@Override
public boolean isStarted() {
return started.get();
}

@Override
public boolean isStopped() {
return stopped.get();
}
AtomicBoolean getStopped() {
return this.stopped;

final void setStartedToTrue() {
logger.info("Setting 'started' to true.");
this.started.set(true);
}

final void setStoppedToTrue() {
logger.info("Setting 'stopped' to true.");
this.stopped.set(true);
}

final boolean isStoppedTrue() {
// This method is necessary as calling "isStopped()" results in different behavior in QueryBatcherImpl, where
// that method has been overridden to inspect the thread pool status instead. It's not clear why that was done,
// so this preserves the existing behavior where the value of `stopped` is check in multiple places (it would seem
// that in all of those places, calling "isStopped()" would be preferable).
return this.stopped.get() == true;
}

protected DataMovementManagerImpl getMoveMgr() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,14 @@ public JobReport getJobReport(JobTicket ticket) {
@Override
public void stopJob(JobTicket ticket) {
if ( ticket == null ) throw new IllegalArgumentException("ticket must not be null");
logger.info("Stopping {} job with ID: {}", ticket.getJobType(), ticket.getJobId());
service.stopJob(ticket, activeJobs);
}

@Override
public void stopJob(Batcher batcher) {
if ( batcher == null ) throw new IllegalArgumentException("batcher must not be null");
logger.info("Stopping batcher; job name: {}; job ID: {}", batcher.getJobName(), batcher.getJobId());
service.stopJob(batcher, activeJobs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ public synchronized void start(JobTicket ticket) {
urisReadyListener.initializeListener(this);
}
super.setJobStartTime();
super.getStarted().set(true);
setStartedToTrue();
if(this.maxBatches < Long.MAX_VALUE) {
setMaxUris(getMaxBatches());
}
Expand Down Expand Up @@ -720,7 +720,7 @@ private class QueryTask implements Runnable {

public void run() {
// don't proceed if this job is stopped (because dataMovementManager.stopJob was called)
if (batcher.getStopped().get() == true) {
if (batcher.isStoppedTrue()) {
logger.warn("Cancelling task to query forest '{}' forestBatchNum {} with start {} after the job is stopped",
forest.getForestName(), forestBatchNum, start);
return;
Expand Down Expand Up @@ -906,7 +906,7 @@ private void processDocs(QueryBatchImpl batch) {
}

private void launchNextTask() {
if (batcher.getStopped().get() == true ) {
if (batcher.isStoppedTrue()) {
// we're stopping, so don't do anything more
return;
}
Expand Down Expand Up @@ -1059,7 +1059,7 @@ private void startIterating() {

@Override
public void stop() {
super.getStopped().set(true);
setStoppedToTrue();
if ( threadPool != null ) threadPool.shutdownNow();
super.setJobEndTime();
if ( query != null ) {
Expand Down Expand Up @@ -1102,7 +1102,7 @@ private void closeAllListeners() {
}

protected void finalize() {
if (this.getStopped().get() == false ) {
if (!isStoppedTrue()) {
logger.warn("QueryBatcher instance \"{}\" was never cleanly stopped. You should call dataMovementManager.stopJob.",
getJobName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,14 +350,14 @@ private void requireStarted(String msg) {

@Override
public void stop() {
if (super.getStopped().get()) return;
super.getStopped().set(true);
if (isStoppedTrue()) return;
setStoppedToTrue();
if (threadPool != null) threadPool.shutdownNow();
super.setJobEndTime();
}
private void orderlyStop() {
if (super.getStopped().get()) return;
super.getStopped().set(true);
if (isStoppedTrue()) return;
setStoppedToTrue();
if (threadPool != null) threadPool.shutdown();
super.setJobEndTime();
}
Expand Down Expand Up @@ -409,7 +409,7 @@ public synchronized void start(JobTicket ticket) {

super.setJobTicket(ticket);
super.setJobStartTime();
super.getStarted().set(true);
setStartedToTrue();

for (int i=0; i<super.getThreadCount(); i++) {
ContentHandle<T> threadHandle = rowsHandle.newHandle();
Expand Down Expand Up @@ -528,7 +528,7 @@ private boolean readRows(RowBatchCallable<T> callable) {
private boolean shouldRequestBatch(RowBatchFailureEventImpl requestEvent, int batchRetries) {
if (batchRetries == 0) return true; // first request
if (requestEvent == null) return false; // request succeeded
if (super.getStopped().get()) return false; // stopped
if (isStoppedTrue()) return false; // stopped
// whether to retry request
return (requestEvent.getDisposition() == RowBatchFailureListener.BatchFailureDisposition.RETRY &&
batchRetries < requestEvent.getMaxRetries());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public void initialize() {
logger.debug("batchSize={}", getBatchSize());
}
super.setJobStartTime();
super.getStarted().set(true);
setStartedToTrue();
}
}

Expand Down Expand Up @@ -459,7 +459,7 @@ public void start(JobTicket ticket) {
@Override
public void stop() {
super.setJobEndTime();
super.getStopped().set(true);
setStoppedToTrue();
if ( threadPool != null ) threadPool.shutdownNow();
closeAllListeners();
}
Expand Down