-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Prevent concurrent access to local breaker in rerank #128162
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Pinging @elastic/es-analytical-engine (Team:Analytics) |
cac99e8
to
1840e7f
Compare
1840e7f
to
d89c1c5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 👀
inferenceRunner.doInference( | ||
buildInferenceRequest(inputPage), | ||
ActionListener.wrap( | ||
inferenceResponse -> outputListener.onResponse(buildOutput(inputPage, inferenceResponse)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it isn't safe to create blocks here (And therefore update the CB), why is it safe to read a block from transport like we do in LookupFromIndexOperator
? I was wondering which rule we should follow in the AsyncOperator
s to avoid having this issue again
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- We use the global breaker not local breaker for reading blocks from an input stream.
- I have added assertions in ce54583, which should consistently detect such misuse.
BlockFactory blockFactory = blockFactory(); | ||
final int overReservedBytes = between(0, 1024 * 1024); | ||
final int maxOverReservedBytes = between(overReservedBytes, 1024 * 1024); | ||
var localBreaker = new LocalCircuitBreaker(blockFactory.breaker(), overReservedBytes, maxOverReservedBytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we wrap this in something that assert
s that we're on the same thread every time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++ I pushed ce54583.
When an async operator receives a response, we can't create new blocks on the responding thread because multiple threads may adjust the local breaker simultaneously, leading to a data race. To address this, we can either use the global breaker or delay block creation in getOutput. While using the global block factory is simpler, I prefer the second option to use the local breaker when possible. Therefore, I opted to keep the results in the queue and create new blocks in getOutput. Our tests didn't catch this issue because: (1) only one block is created in the test, and (2) there is no delay when simulating the inference service. Closes elastic#127638 Closes elastic#127051
When an async operator receives a response, we can't create new blocks on the responding thread because multiple threads may adjust the local breaker simultaneously, leading to a data race. To address this, we can either use the global breaker or delay block creation in getOutput. While using the global block factory is simpler, I prefer the second option to use the local breaker when possible. Therefore, I opted to keep the results in the queue and create new blocks in getOutput. Our tests didn't catch this issue because: (1) only one block is created in the test, and (2) there is no delay when simulating the inference service. Closes elastic#127638 Closes elastic#127051
When an async operator receives a response, we can't create new blocks on the responding thread because multiple threads may adjust the local breaker simultaneously, leading to a data race. To address this, we can either use the global breaker or delay block creation in getOutput. While using the global block factory is simpler, I prefer the second option to use the local breaker when possible. Therefore, I opted to keep the results in the queue and create new blocks in getOutput. Our tests didn't catch this issue because: (1) only one block is created in the test, and (2) there is no delay when simulating the inference service. Closes #127638 Closes #127051
When an async operator receives a response, we can't create new blocks on the responding thread because multiple threads may adjust the local breaker simultaneously, leading to a data race. To address this, we can either use the global breaker or delay block creation in getOutput. While using the global block factory is simpler, I prefer the second option to use the local breaker when possible. Therefore, I opted to keep the results in the queue and create new blocks in getOutput. Our tests didn't catch this issue because: (1) only one block is created in the test, and (2) there is no delay when simulating the inference service.
Closes #127638
Closes #127051