Skip to content

Refactor ForEachProcessor to use iteration instead of recursion #51104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 21, 2020

Conversation

probakowski
Copy link
Contributor

This change makes ForEachProcessor iterative and still non-blocking.
In case of non-async processors we use single for loop and no recursion at all.
In case of async processors we continue work on either current thread or thread
started by downstream processor, whichever finishes second (usually processor thread).
Everything is synchronized by single atomic variable.

Relates #50514

This change makes ForEachProcessor iterative and still non-blocking.
In case of non-async processors we use single for loop and no recursion at all.
In case of async processors we continue work on either current thread or thread
started by downstream processor, whichever is slower (usually processor thread).
Everything is synchronised by single atomic variable.

Relates elastic#50514
@probakowski probakowski added :Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP >refactoring v8.0.0 v7.7.0 labels Jan 16, 2020
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-core-features (:Core/Features/Ingest)

Copy link
Member

@martijnvg martijnvg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a much cleaner and simpler approach 👍
It did took a while before I understood the change, but I think it is neat.
I left a few comments, but that is just for my own understanding.

}
}

void innerExecute(int index, List<?> values, List<Object> newValues, IngestDocument document,
BiConsumer<IngestDocument, Exception> handler) {
for (; index < values.size(); index++) {
AtomicBoolean shouldContinueHere = new AtomicBoolean();
Copy link
Member

@martijnvg martijnvg Jan 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this approach. In the case of inner processor going async; whichever thread sets this atomic boolean last should continue with the next value.

And in the case inner processor doesn't go async then on the second getAndSet(...) the current thread continues with the next value.

I do wonder if this variable can be renamed to describe this beter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder about this scenario:

  1. The current thread sets shouldContinueHere first.
  2. Next value is handled and also manages to set shouldContinueHere first.
  3. Both async call manage to return and update newValues simultaneously.

Is this scenario possible? And if so should have protection against this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If current thread hits shouldContinueHere first, then it sees false and ends the loop (returns).
If it hits it second that means it's after async call already finished and other thread updated newValues already.
As you mentioned in first comment only thread that checks shouldContinueHere last actually proceeds
Does it answer your question or did I misunderstand the scenario?

Copy link
Member

@martijnvg martijnvg Jan 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying. I misinterpreted the if statement on line 98 when looking at it this morning.
It is not possible that multiple values from the list are handled concurrently.

if (e != null || result == null) {
handler.accept(result, e);
} else if (shouldContinueHere.getAndSet(true)) {
innerExecute(nextIndex, values, newValues, document, handler);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case recursion does happen, but we're not risking a SO here, because it always a different thread would get here.

Copy link
Member

@martijnvg martijnvg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@jakelandis jakelandis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM and nice work!

@probakowski probakowski merged commit a4da1c9 into elastic:master Jan 21, 2020
@probakowski probakowski deleted the iterative-foreach branch January 21, 2020 20:43
probakowski added a commit that referenced this pull request Jan 22, 2020
…#51104) (#51322)

* Refactor ForEachProcessor to use iteration instead of recursion (#51104)

* Refactor ForEachProcessor to use iteration instead of recursion

This change makes ForEachProcessor iterative and still non-blocking.
In case of non-async processors we use single for loop and no recursion at all.
In case of async processors we continue work on either current thread or thread
started by downstream processor, whichever is slower (usually processor thread).
Everything is synchronised by single atomic variable.

Relates #50514

* Update IngestCommonPlugin.java
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Data Management/Ingest Node Execution or management of Ingest Pipelines including GeoIP >refactoring v7.7.0 v8.0.0-alpha1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants