-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Batch send to not overload multiprocessing pipe #15037
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, I don't think this will help actually.
This block/line 1253 doesn't get called that often, so I suspect what is happening is that the processor_manager side of the pipe is full, so trying to send even a single byte might block until the other end reads, but it can't read because it is also trying to write.
And trying to heartbeat every time before writing would be slow!.
🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One option could be for
send_callback_to_executeto store in a queue internally in the ProcessorAgent, and only send inside heartbeat after first reading?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(SLA callback requests also have this same problem.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be easier to change the other side - https://github.com/apache/airflow/blob/master/airflow/utils/dag_processing.py#L698 that is the only place that side sends anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option:
We could set the socket in DagFileManager (the receiving side) to non-blocking, that way if the send would fail we could catch it, and queue it there, and then go and poll again.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by “queue it there, and then go and poll again”? Making the child connection non-blocking sounds great (I wondered briefly but couldn’t find found), but the polling loop seems to have quite a few side effects before that line, so we can’t just put the thing back and try again later. Does it mean to add a queue (list) on
DagFileProcessorManagerto storeDagParsingStatinstances when they fail to send, and add a block in the beginning of the poll loop to check and resend them first?