Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -232,41 +232,46 @@ public void discardEventsOfPipe(final String pipeNameToDrop, int regionId) {
// Try to remove the events as much as possible
inputPendingQueue.discardEventsOfPipe(pipeNameToDrop, regionId);

// synchronized to use the lastEvent & lastExceptionEvent
synchronized (this) {
// Here we discard the last event, and re-submit the pipe task to avoid that the pipe task has
// stopped submission but will not be stopped by critical exceptions, because when it acquires
// lock, the pipe is already dropped, thus it will do nothing.
// Note that since we use a new thread to stop all the pipes, we will not encounter deadlock
// here. Or else we will.
if (lastEvent instanceof EnrichedEvent
&& pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName())
&& regionId == ((EnrichedEvent) lastEvent).getRegionId()) {
// Do not clear last event's reference count because it may be on transferring
lastEvent = null;
// Submit self to avoid that the lastEvent has been retried "max times" times and has
// stopped executing.
// 1. If the last event is still on execution, or submitted by the previous "onSuccess" or
// "onFailure", the "submitSelf" cause nothing.
// 2. If the last event is waiting the instance lock to call "onSuccess", then the callback
// method will skip this turn of submission.
// 3. If the last event is waiting to call "onFailure", then it will be ignored because the
// last event has been set to null.
// 4. If the last event has called "onFailure" and caused the subtask to stop submission,
// it's submitted here and the "report" will wait for the "drop pipe" lock to stop all
// the pipes with critical exceptions. As illustrated above, the "report" will do
// nothing.
submitSelf();
}
highPriorityLockTaskCount.incrementAndGet();
try {
// synchronized to use the lastEvent & lastExceptionEvent
synchronized (this) {
// Here we discard the last event, and re-submit the pipe task to avoid that the pipe task
// has stopped submission but will not be stopped by critical exceptions, because when it
// acquires lock, the pipe is already dropped, thus it will do nothing. Note that since we
// use a new thread to stop all the pipes, we will not encounter deadlock here. Or else we
// will.
if (lastEvent instanceof EnrichedEvent
&& pipeNameToDrop.equals(((EnrichedEvent) lastEvent).getPipeName())
&& regionId == ((EnrichedEvent) lastEvent).getRegionId()) {
// Do not clear the last event's reference counts because it may be on transferring
lastEvent = null;
// Submit self to avoid that the lastEvent has been retried "max times" times and has
// stopped executing.
// 1. If the last event is still on execution, or submitted by the previous "onSuccess" or
// "onFailure", the "submitSelf" causes nothing.
// 2. If the last event is waiting the instance lock to call "onSuccess", then the
// callback method will skip this turn of submission.
// 3. If the last event is waiting to call "onFailure", then it will be ignored because
// the last event has been set to null.
// 4. If the last event has called "onFailure" and caused the subtask to stop submission,
// it's submitted here and the "report" will wait for the "drop pipe" lock to stop all
// the pipes with critical exceptions. As illustrated above, the "report" will do
// nothing.
submitSelf();
}

// We only clear the lastEvent's reference count when it's already on failure. Namely, we
// clear the lastExceptionEvent. It's safe to potentially clear it twice because we have the
// "nonnull" detection.
if (lastExceptionEvent instanceof EnrichedEvent
&& pipeNameToDrop.equals(((EnrichedEvent) lastExceptionEvent).getPipeName())
&& regionId == ((EnrichedEvent) lastExceptionEvent).getRegionId()) {
clearReferenceCountAndReleaseLastExceptionEvent();
// We only clear the lastEvent's reference counts when it's already on failure. Namely, we
// clear the lastExceptionEvent. It's safe to potentially clear it twice because we have the
// "nonnull" detection.
if (lastExceptionEvent instanceof EnrichedEvent
&& pipeNameToDrop.equals(((EnrichedEvent) lastExceptionEvent).getPipeName())
&& regionId == ((EnrichedEvent) lastExceptionEvent).getRegionId()) {
clearReferenceCountAndReleaseLastExceptionEvent();
}
}
} finally {
highPriorityLockTaskCount.decrementAndGet();
}

if (outputPipeConnector instanceof IoTDBConnector) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,16 @@
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;

public abstract class PipeAbstractConnectorSubtask extends PipeReportableSubtask {

private static final Logger LOGGER = LoggerFactory.getLogger(PipeAbstractConnectorSubtask.class);

// To ensure that high-priority tasks can obtain object locks first, a counter is now used to save
// the number of high-priority tasks.
protected final AtomicLong highPriorityLockTaskCount = new AtomicLong(0);

// For output (transfer events to the target system in connector)
protected PipeConnector outputPipeConnector;

Expand Down Expand Up @@ -70,67 +75,76 @@ public void bindExecutors(
}

@Override
public synchronized void onSuccess(final Boolean hasAtLeastOneEventProcessed) {
isSubmitted = false;
public void onSuccess(final Boolean hasAtLeastOneEventProcessed) {
preScheduleLowPriorityTask(100);

synchronized (this) {
isSubmitted = false;

super.onSuccess(hasAtLeastOneEventProcessed);
super.onSuccess(hasAtLeastOneEventProcessed);
}
}

@Override
public synchronized void onFailure(final Throwable throwable) {
isSubmitted = false;
public void onFailure(final Throwable throwable) {
preScheduleLowPriorityTask(100);

if (isClosed.get()) {
LOGGER.info(
"onFailure in pipe transfer, ignored because the connector subtask is dropped.",
throwable);
clearReferenceCountAndReleaseLastEvent(null);
return;
}
synchronized (this) {
isSubmitted = false;

// We assume that the event is cleared as the "lastEvent" in processor subtask and reaches the
// connector subtask. Then, it may fail because of released resource and block the other pipes
// using the same connector. We simply discard it.
if (lastExceptionEvent instanceof EnrichedEvent
&& ((EnrichedEvent) lastExceptionEvent).isReleased()) {
LOGGER.info(
"onFailure in pipe transfer, ignored because the failure event is released.", throwable);
submitSelf();
return;
}
if (isClosed.get()) {
LOGGER.info(
"onFailure in pipe transfer, ignored because the connector subtask is dropped.",
throwable);
clearReferenceCountAndReleaseLastEvent(null);
return;
}

// If lastExceptionEvent != lastEvent, it indicates that the lastEvent's reference has been
// changed because the pipe of it has been dropped. In that case, we just discard the event.
if (lastEvent != lastExceptionEvent) {
LOGGER.info(
"onFailure in pipe transfer, ignored because the failure event's pipe is dropped.",
throwable);
clearReferenceCountAndReleaseLastExceptionEvent();
submitSelf();
return;
}
// We assume that the event is cleared as the "lastEvent" in processor subtask and reaches the
// connector subtask. Then, it may fail because of released resource and block the other pipes
// using the same connector. We simply discard it.
if (lastExceptionEvent instanceof EnrichedEvent
&& ((EnrichedEvent) lastExceptionEvent).isReleased()) {
LOGGER.info(
"onFailure in pipe transfer, ignored because the failure event is released.",
throwable);
submitSelf();
return;
}

if (throwable instanceof PipeConnectionException) {
// Retry to connect to the target system if the connection is broken
// We should reconstruct the client before re-submit the subtask
if (onPipeConnectionException(throwable)) {
// return if the pipe task should be stopped
// If lastExceptionEvent != lastEvent, it indicates that the lastEvent's reference has been
// changed because the pipe of it has been dropped. In that case, we just discard the event.
if (lastEvent != lastExceptionEvent) {
LOGGER.info(
"onFailure in pipe transfer, ignored because the failure event's pipe is dropped.",
throwable);
clearReferenceCountAndReleaseLastExceptionEvent();
submitSelf();
return;
}
}

// Handle exceptions if any available clients exist
// Notice that the PipeRuntimeConnectorCriticalException must be thrown here
// because the upper layer relies on this to stop all the related pipe tasks
// Other exceptions may cause the subtask to stop forever and can not be restarted
if (throwable instanceof PipeRuntimeConnectorCriticalException) {
super.onFailure(throwable);
} else {
// Print stack trace for better debugging
LOGGER.warn(
"A non PipeRuntimeConnectorCriticalException occurred, will throw a PipeRuntimeConnectorCriticalException.",
throwable);
super.onFailure(new PipeRuntimeConnectorCriticalException(throwable.getMessage()));
if (throwable instanceof PipeConnectionException) {
// Retry to connect to the target system if the connection is broken
// We should reconstruct the client before re-submit the subtask
if (onPipeConnectionException(throwable)) {
// return if the pipe task should be stopped
return;
}
}

// Handle exceptions if any available clients exist
// Notice that the PipeRuntimeConnectorCriticalException must be thrown here
// because the upper layer relies on this to stop all the related pipe tasks
// Other exceptions may cause the subtask to stop forever and can not be restarted
if (throwable instanceof PipeRuntimeConnectorCriticalException) {
super.onFailure(throwable);
} else {
// Print stack trace for better debugging
LOGGER.warn(
"A non PipeRuntimeConnectorCriticalException occurred, will throw a PipeRuntimeConnectorCriticalException.",
throwable);
super.onFailure(new PipeRuntimeConnectorCriticalException(throwable.getMessage()));
}
}
}

Expand Down Expand Up @@ -238,4 +252,17 @@ protected synchronized void clearReferenceCountAndReleaseLastExceptionEvent() {
lastExceptionEvent = null;
}
}

private void preScheduleLowPriorityTask(int maxRetries) {
while (highPriorityLockTaskCount.get() != 0L && maxRetries-- > 0) {
try {
// Introduce a short delay to avoid CPU spinning
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOGGER.warn("Interrupted while waiting for the high priority lock task.", e);
break;
}
}
}
}
Loading