Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
DR = models.DagRun
DM = models.DagModel

CALLBACK_SEND_BATCH_SIZE = 100


class DagFileProcessorProcess(AbstractDagFileProcessorProcess, LoggingMixin, MultiprocessingStartMethodMixin):
"""Runs DAG processing in a separate process using DagFileProcessor
Expand Down Expand Up @@ -1219,7 +1221,12 @@ def _process_executor_events(self, session: Session = None) -> int:
# Check state of finished tasks
filter_for_tis = TI.filter_for_tis(tis_with_right_state)
tis: List[TI] = session.query(TI).filter(filter_for_tis).options(selectinload('dag_model')).all()
for ti in tis:

# Send tasks in batches of 100, and call the agent's heartbeat()
# to process the multiprocessing pipe. This keep the pipe from
# being full, which would block pipe.send() and cause deadlocking
# when we have a lot of tasks to send.
for i, ti in enumerate(tis, 1):
try_number = ti_primary_key_to_try_number_map[ti.key.primary]
buffer_key = ti.key.with_try_number(try_number)
state, info = event_buffer.pop(buffer_key)
Expand All @@ -1245,6 +1252,9 @@ def _process_executor_events(self, session: Session = None) -> int:

self.processor_agent.send_callback_to_execute(request)

if i % CALLBACK_SEND_BATCH_SIZE == 0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, I don't think this will help actually.

This block/line 1253 doesn't get called that often, so I suspect what is happening is that the processor_manager side of the pipe is full, so trying to send even a single byte might block until the other end reads, but it can't read because it is also trying to write.

And trying to heartbeat every time before writing would be slow!.

🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One option could be for send_callback_to_execute to store in a queue internally in the ProcessorAgent, and only send inside heartbeat after first reading?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(SLA callback requests also have this same problem.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be easier to change the other side - https://github.com/apache/airflow/blob/master/airflow/utils/dag_processing.py#L698 that is the only place that side sends anything.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option:

>>> p1, p2 = multiprocessing.Pipe()
>>> fcntl.fcntl(p1.fileno(), fcntl.F_SETFL, os.O_NONBLOCK | fcntl.fcntl(p1.fileno(), fcntl.F_GETFL)) 
>>> p1.send('a' * 1024000) # Simulate something that would block
BlockingIOError: [Errno 11] Resource temporarily unavailable

We could set the socket in DagFileManager (the receiving side) to non-blocking, that way if the send would fail we could catch it, and queue it there, and then go and poll again.?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by “queue it there, and then go and poll again”? Making the child connection non-blocking sounds great (I wondered briefly but couldn’t find found), but the polling loop seems to have quite a few side effects before that line, so we can’t just put the thing back and try again later. Does it mean to add a queue (list) on DagFileProcessorManager to store DagParsingStat instances when they fail to send, and add a block in the beginning of the poll loop to check and resend them first?

self.processor_agent.heartbeat()

return len(event_buffer)

def _execute(self) -> None:
Expand Down Expand Up @@ -1514,7 +1524,11 @@ def _do_scheduling(self, session) -> int:
for dag_id, execution_date in query:
active_runs_by_dag_id[dag_id].add(execution_date)

for dag_run in dag_runs:
# Send dag_runs in batches of 100, and call the agent's heartbeat()
# to process the multiprocessing pipe. This keep the pipe from
# being full, which would block pipe.send() and cause deadlocking
# when we have a lot of dags to send.
for i, dag_run in enumerate(dag_runs, 1):
# Use try_except to not stop the Scheduler when a Serialized DAG is not found
# This takes care of Dynamic DAGs especially
# SerializedDagNotFound should not happen here in the same loop because the DagRun would
Expand All @@ -1526,6 +1540,8 @@ def _do_scheduling(self, session) -> int:
except SerializedDagNotFound:
self.log.exception("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
continue
if i % CALLBACK_SEND_BATCH_SIZE == 0:
self.processor_agent.heartbeat()

guard.commit()

Expand Down