-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Batch send to not overload multiprocessing pipe #15037
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This breaks the loops sending callbacks to the multiprocessing pipe into 100-size batches, and call DagFileProcessorAgent.heartbeat() to consume the pipe between the batches. This avoids the pipe from becoming full, which would make Pipe.send() block and deadlocking the process.
|
|
||
| self.processor_agent.send_callback_to_execute(request) | ||
|
|
||
| if i % CALLBACK_SEND_BATCH_SIZE == 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, I don't think this will help actually.
This block/line 1253 doesn't get called that often, so I suspect what is happening is that the processor_manager side of the pipe is full, so trying to send even a single byte might block until the other end reads, but it can't read because it is also trying to write.
And trying to heartbeat every time before writing would be slow!.
🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One option could be for send_callback_to_execute to store in a queue internally in the ProcessorAgent, and only send inside heartbeat after first reading?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(SLA callback requests also have this same problem.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be easier to change the other side - https://github.com/apache/airflow/blob/master/airflow/utils/dag_processing.py#L698 that is the only place that side sends anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option:
>>> p1, p2 = multiprocessing.Pipe()
>>> fcntl.fcntl(p1.fileno(), fcntl.F_SETFL, os.O_NONBLOCK | fcntl.fcntl(p1.fileno(), fcntl.F_GETFL))
>>> p1.send('a' * 1024000) # Simulate something that would block
BlockingIOError: [Errno 11] Resource temporarily unavailableWe could set the socket in DagFileManager (the receiving side) to non-blocking, that way if the send would fail we could catch it, and queue it there, and then go and poll again.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by “queue it there, and then go and poll again”? Making the child connection non-blocking sounds great (I wondered briefly but couldn’t find found), but the polling loop seems to have quite a few side effects before that line, so we can’t just put the thing back and try again later. Does it mean to add a queue (list) on DagFileProcessorManager to store DagParsingStat instances when they fail to send, and add a block in the beginning of the poll loop to check and resend them first?
This breaks the loops sending callbacks to the multiprocessing pipe into size 100 batches, and call
DagFileProcessorAgent.heartbeat()after each pipe (which would callPipe.recv()) to consume the pipe. This avoids the pipe from becoming full, which would makePipe.send()block and deadlocking the process.Pipe.send()is called in two code paths, (interestingly) represented exactly by the two py-spy traces available in #7935.The way I do this is pretty naive, but represents the direction I think the issue should be resolved. I don’t really understand what the database calls do in
_do_schedulingand_process_executor_events, and therefore have no idea if it’s OK to callself.processor_agent.heartbeat()interleaving those database calls (previouslyheartbeat()is only called after all those database calls are done).Resolves #7935? There’s actually another separate issue described in it regarding the Redis worker being deadlocked. But this is no longer an issue according to @ashb (#7935 (comment)), and indeed all reports on that one is against 1.10.x, so I’m putting it off (and honestly I’m not sure how that one should be handled; that may need to involve some Redis internals).
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.