Skip to content

Prevent concurrent access to local breaker in rerank #128162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 22, 2025

Conversation

dnhatn
Copy link
Member

@dnhatn dnhatn commented May 19, 2025

When an async operator receives a response, we can't create new blocks on the responding thread because multiple threads may adjust the local breaker simultaneously, leading to a data race. To address this, we can either use the global breaker or delay block creation in getOutput. While using the global block factory is simpler, I prefer the second option to use the local breaker when possible. Therefore, I opted to keep the results in the queue and create new blocks in getOutput. Our tests didn't catch this issue because: (1) only one block is created in the test, and (2) there is no delay when simulating the inference service.

Closes #127638
Closes #127051

@dnhatn dnhatn requested review from afoucret, nik9000 and ivancea May 19, 2025 22:11
@dnhatn dnhatn marked this pull request as ready for review May 19, 2025 22:11
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@elasticsearchmachine elasticsearchmachine added the Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) label May 19, 2025
@dnhatn dnhatn force-pushed the fix-rerank-operator branch from cac99e8 to 1840e7f Compare May 20, 2025 22:47
@dnhatn dnhatn force-pushed the fix-rerank-operator branch from 1840e7f to d89c1c5 Compare May 20, 2025 22:47
Copy link
Contributor

@ivancea ivancea left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👀

inferenceRunner.doInference(
buildInferenceRequest(inputPage),
ActionListener.wrap(
inferenceResponse -> outputListener.onResponse(buildOutput(inputPage, inferenceResponse)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it isn't safe to create blocks here (And therefore update the CB), why is it safe to read a block from transport like we do in LookupFromIndexOperator? I was wondering which rule we should follow in the AsyncOperators to avoid having this issue again

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • We use the global breaker not local breaker for reading blocks from an input stream.
  • I have added assertions in ce54583, which should consistently detect such misuse.

BlockFactory blockFactory = blockFactory();
final int overReservedBytes = between(0, 1024 * 1024);
final int maxOverReservedBytes = between(overReservedBytes, 1024 * 1024);
var localBreaker = new LocalCircuitBreaker(blockFactory.breaker(), overReservedBytes, maxOverReservedBytes);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we wrap this in something that asserts that we're on the same thread every time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ I pushed ce54583.

@dnhatn dnhatn requested a review from nik9000 May 21, 2025 19:13
@dnhatn
Copy link
Member Author

dnhatn commented May 22, 2025

@ivancea @nik9000 @afoucret Thanks for reviewing.

@dnhatn dnhatn merged commit 84865a1 into elastic:main May 22, 2025
18 checks passed
@dnhatn dnhatn deleted the fix-rerank-operator branch May 22, 2025 15:09
afoucret pushed a commit to afoucret/elasticsearch that referenced this pull request Jun 4, 2025
When an async operator receives a response, we can't create new blocks
on the responding thread because multiple threads may adjust the local
breaker simultaneously, leading to a data race. To address this, we can
either use the global breaker or delay block creation in getOutput.
While using the global block factory is simpler, I prefer the second
option to use the local breaker when possible. Therefore, I opted to
keep the results in the queue and create new blocks in getOutput. Our
tests didn't catch this issue because: (1) only one block is created in
the test, and (2) there is no delay when simulating the inference
service.

Closes elastic#127638
Closes elastic#127051
dnhatn added a commit to dnhatn/elasticsearch that referenced this pull request Jun 5, 2025
When an async operator receives a response, we can't create new blocks
on the responding thread because multiple threads may adjust the local
breaker simultaneously, leading to a data race. To address this, we can
either use the global breaker or delay block creation in getOutput.
While using the global block factory is simpler, I prefer the second
option to use the local breaker when possible. Therefore, I opted to
keep the results in the queue and create new blocks in getOutput. Our
tests didn't catch this issue because: (1) only one block is created in
the test, and (2) there is no delay when simulating the inference
service.

Closes elastic#127638
Closes elastic#127051
@dnhatn dnhatn added the v8.19.0 label Jun 5, 2025
dnhatn added a commit that referenced this pull request Jun 5, 2025
When an async operator receives a response, we can't create new blocks
on the responding thread because multiple threads may adjust the local
breaker simultaneously, leading to a data race. To address this, we can
either use the global breaker or delay block creation in getOutput.
While using the global block factory is simpler, I prefer the second
option to use the local breaker when possible. Therefore, I opted to
keep the results in the queue and create new blocks in getOutput. Our
tests didn't catch this issue because: (1) only one block is created in
the test, and (2) there is no delay when simulating the inference
service.

Closes #127638
Closes #127051
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Analytics/ES|QL AKA ESQL >non-issue Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v8.19.0 v9.1.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[CI] EsqlSpecIT test {rerank.Reranker using another sort order ASYNC} failing [CI] EsqlSpecIT test {rerank.Reranker before a limit ASYNC} failing
4 participants