-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Refactor ForEachProcessor to use iteration instead of recursion #51104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This change makes ForEachProcessor iterative and still non-blocking. In case of non-async processors we use single for loop and no recursion at all. In case of async processors we continue work on either current thread or thread started by downstream processor, whichever is slower (usually processor thread). Everything is synchronised by single atomic variable. Relates elastic#50514
Pinging @elastic/es-core-features (:Core/Features/Ingest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a much cleaner and simpler approach 👍
It did took a while before I understood the change, but I think it is neat.
I left a few comments, but that is just for my own understanding.
} | ||
} | ||
|
||
void innerExecute(int index, List<?> values, List<Object> newValues, IngestDocument document, | ||
BiConsumer<IngestDocument, Exception> handler) { | ||
for (; index < values.size(); index++) { | ||
AtomicBoolean shouldContinueHere = new AtomicBoolean(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this approach. In the case of inner processor going async; whichever thread sets this atomic boolean last should continue with the next value.
And in the case inner processor doesn't go async then on the second getAndSet(...)
the current thread continues with the next value.
I do wonder if this variable can be renamed to describe this beter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder about this scenario:
- The current thread sets
shouldContinueHere
first. - Next value is handled and also manages to set
shouldContinueHere
first. - Both async call manage to return and update
newValues
simultaneously.
Is this scenario possible? And if so should have protection against this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If current thread hits shouldContinueHere
first, then it sees false
and ends the loop (returns).
If it hits it second that means it's after async call already finished and other thread updated newValues
already.
As you mentioned in first comment only thread that checks shouldContinueHere
last actually proceeds
Does it answer your question or did I misunderstand the scenario?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for clarifying. I misinterpreted the if statement on line 98 when looking at it this morning.
It is not possible that multiple values from the list are handled concurrently.
if (e != null || result == null) { | ||
handler.accept(result, e); | ||
} else if (shouldContinueHere.getAndSet(true)) { | ||
innerExecute(nextIndex, values, newValues, document, handler); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case recursion does happen, but we're not risking a SO here, because it always a different thread would get here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM and nice work!
…#51104) (#51322) * Refactor ForEachProcessor to use iteration instead of recursion (#51104) * Refactor ForEachProcessor to use iteration instead of recursion This change makes ForEachProcessor iterative and still non-blocking. In case of non-async processors we use single for loop and no recursion at all. In case of async processors we continue work on either current thread or thread started by downstream processor, whichever is slower (usually processor thread). Everything is synchronised by single atomic variable. Relates #50514 * Update IngestCommonPlugin.java
This change makes ForEachProcessor iterative and still non-blocking.
In case of non-async processors we use single for loop and no recursion at all.
In case of async processors we continue work on either current thread or thread
started by downstream processor, whichever finishes second (usually processor thread).
Everything is synchronized by single atomic variable.
Relates #50514