Skip to content

Commit

Permalink
[FLINK-16057][task] Optimize ContinuousFileReaderOperator
Browse files Browse the repository at this point in the history
Current approach of re-enqueuing mails creates an overhead
visible in benchmarks. This change eliminates unnecessary
re-enqueueing of mails by checking mailboxExecutor.isIdle.
  • Loading branch information
rkhachatryan authored and pnowojski committed Jun 2, 2020
1 parent 78b4e3d commit 1a69cb9
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.RunnableWithException;
Expand Down Expand Up @@ -205,7 +206,7 @@ public void onNoMoreData(ContinuousFileReaderOperator<?, ?> op) {

private transient InputFormat<OUT, ? super T> format;
private TypeSerializer<OUT> serializer;
private transient MailboxExecutor executor;
private transient MailboxExecutorImpl executor;
private transient OUT reusedRecord;
private transient SourceFunction.SourceContext<OUT> sourceContext;
private transient ListState<T> checkpointedState;
Expand Down Expand Up @@ -233,7 +234,7 @@ public void onNoMoreData(ContinuousFileReaderOperator<?, ?> op) {

this.format = checkNotNull(format);
this.processingTimeService = checkNotNull(processingTimeService);
this.executor = checkNotNull(mailboxExecutor);
this.executor = (MailboxExecutorImpl) checkNotNull(mailboxExecutor);
}

@Override
Expand Down Expand Up @@ -311,17 +312,19 @@ private void enqueueProcessRecord() {
}

private void processRecord() throws IOException {
if (!state.prepareToProcessRecord(this)) {
return;
}
do {
if (!state.prepareToProcessRecord(this)) {
return;
}

readAndCollectRecord();
readAndCollectRecord();

if (format.reachedEnd()) {
onSplitProcessed();
} else {
enqueueProcessRecord();
}
if (format.reachedEnd()) {
onSplitProcessed();
return;
}
} while (executor.isIdle()); // todo: consider moving this loop into MailboxProcessor (return boolean "re-execute" from enqueued action)
enqueueProcessRecord();
}

private void onSplitProcessed() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,22 @@ public final class MailboxExecutorImpl implements MailboxExecutor {

private final StreamTaskActionExecutor actionExecutor;

private final MailboxProcessor mailboxProcessor;

public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority, StreamTaskActionExecutor actionExecutor) {
this(mailbox, priority, actionExecutor, null);
}

public MailboxExecutorImpl(@Nonnull TaskMailbox mailbox, int priority, StreamTaskActionExecutor actionExecutor, MailboxProcessor mailboxProcessor) {
this.mailbox = mailbox;
this.priority = priority;
this.actionExecutor = Preconditions.checkNotNull(actionExecutor);
this.mailboxProcessor = mailboxProcessor;
}

public boolean isIdle() {
return !mailboxProcessor.isMailboxLoopRunning() ||
(mailboxProcessor.isDefaultActionUnavailable() && !mailbox.hasMail() && mailbox.getState().isAcceptingMails());
}

@Override
Expand Down Expand Up @@ -85,4 +97,5 @@ public boolean tryYield() {
return false;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public MailboxExecutor getMainMailboxExecutor() {
* @param priority the priority of the {@link MailboxExecutor}.
*/
public MailboxExecutor getMailboxExecutor(int priority) {
return new MailboxExecutorImpl(mailbox, priority, actionExecutor);
return new MailboxExecutorImpl(mailbox, priority, actionExecutor, this);
}

public void initMetric(TaskMetricGroup metricGroup) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,16 @@ public interface TaskMailbox {
* This enum represents the states of the mailbox lifecycle.
*/
enum State {
OPEN, QUIESCED, CLOSED
OPEN(true), QUIESCED(false), CLOSED(false);
private final boolean acceptingMails;

State(boolean acceptingMails) {
this.acceptingMails = acceptingMails;
}

public boolean isAcceptingMails() {
return acceptingMails;
}
}

/**
Expand Down

0 comments on commit 1a69cb9

Please sign in to comment.