Skip to content

Commit

Permalink
NIFI-10756 Generate error message when processor and/or controller
Browse files Browse the repository at this point in the history
service is unable to transition to start and/or enabled state

Signed-off-by: Mike Moser <mosermw@apache.org>
  • Loading branch information
NissimShiman authored and mosermw committed May 12, 2023
1 parent b23baac commit fd2138b
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1594,7 +1594,7 @@ private void run(ScheduledExecutorService taskScheduler, long administrativeYiel
}

if (starting) { // will ensure that the Processor represented by this node can only be started once
initiateStart(taskScheduler, administrativeYieldMillis, timeoutMillis, processContextFactory, schedulingAgentCallback);
initiateStart(taskScheduler, administrativeYieldMillis, timeoutMillis, new AtomicLong(0), processContextFactory, schedulingAgentCallback);
} else {
final String procName = processorRef.get().getProcessor().toString();
procLog.warn("Cannot start {} because it is not currently stopped. Current state is {}", procName, currentState);
Expand Down Expand Up @@ -1711,7 +1711,7 @@ public void verifyCanTerminate() {


private void initiateStart(final ScheduledExecutorService taskScheduler, final long administrativeYieldMillis, final long timeoutMilis,
final Supplier<ProcessContext> processContextFactory, final SchedulingAgentCallback schedulingAgentCallback) {
AtomicLong startupAttemptCount, final Supplier<ProcessContext> processContextFactory, final SchedulingAgentCallback schedulingAgentCallback) {

final Processor processor = getProcessor();
final ComponentLog procLog = new SimpleProcessLogger(StandardProcessorNode.this.getIdentifier(), processor);
Expand All @@ -1733,8 +1733,15 @@ private void initiateStart(final ScheduledExecutorService taskScheduler, final l
if (validationStatus != ValidationStatus.VALID) {
LOG.debug("Cannot start {} because Processor is currently not valid; will try again after 5 seconds", StandardProcessorNode.this);

startupAttemptCount.incrementAndGet();
if (startupAttemptCount.get() == 240 || startupAttemptCount.get() % 7200 == 0) {
final ValidationState validationState = getValidationState();
procLog.error("Encountering difficulty starting. (Validation State is {}: {}). Will continue trying to start.",
validationState, validationState.getValidationErrors());
}

// re-initiate the entire process
final Runnable initiateStartTask = () -> initiateStart(taskScheduler, administrativeYieldMillis, timeoutMilis, processContextFactory, schedulingAgentCallback);
final Runnable initiateStartTask = () -> initiateStart(taskScheduler, administrativeYieldMillis, timeoutMilis, startupAttemptCount, processContextFactory, schedulingAgentCallback);
taskScheduler.schedule(initiateStartTask, 500, TimeUnit.MILLISECONDS);

schedulingAgentCallback.onTaskComplete();
Expand Down Expand Up @@ -1811,7 +1818,7 @@ private void initiateStart(final ScheduledExecutorService taskScheduler, final l
// make sure we only continue retry loop if STOP action wasn't initiated
if (scheduledState.get() != ScheduledState.STOPPING && scheduledState.get() != ScheduledState.RUN_ONCE) {
// re-initiate the entire process
final Runnable initiateStartTask = () -> initiateStart(taskScheduler, administrativeYieldMillis, timeoutMilis, processContextFactory, schedulingAgentCallback);
final Runnable initiateStartTask = () -> initiateStart(taskScheduler, administrativeYieldMillis, timeoutMilis, startupAttemptCount, processContextFactory, schedulingAgentCallback);
taskScheduler.schedule(initiateStartTask, administrativeYieldMillis, TimeUnit.MILLISECONDS);
} else {
completeStopAction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
Expand Down Expand Up @@ -574,6 +575,7 @@ public CompletableFuture<Void> enable(final ScheduledExecutorService scheduler,

final ControllerServiceProvider controllerServiceProvider = this.serviceProvider;
final StandardControllerServiceNode service = this;
AtomicLong enablingAttemptCount = new AtomicLong(0);
scheduler.execute(new Runnable() {
@Override
public void run() {
Expand All @@ -592,6 +594,13 @@ public void run() {
LOG.debug("Cannot enable {} because it is not currently valid. (Validation State is {}: {}). Will try again in 1 second",
StandardControllerServiceNode.this, validationState, validationState.getValidationErrors());

enablingAttemptCount.incrementAndGet();
if (enablingAttemptCount.get() == 120 || enablingAttemptCount.get() % 3600 == 0) {
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
componentLog.error("Encountering difficulty enabling. (Validation State is {}: {}). Will continue trying to enable.",
validationState, validationState.getValidationErrors());
}

try {
scheduler.schedule(this, 1, TimeUnit.SECONDS);
} catch (RejectedExecutionException rejectedExecutionException) {
Expand Down

0 comments on commit fd2138b

Please sign in to comment.