Skip to content

Commit

Permalink
Merge pull request #1619 from PrefectHQ/robust-scheduler
Browse files Browse the repository at this point in the history
Make scheduler robust to exceptions
  • Loading branch information
zangell44 authored Apr 12, 2022
2 parents eadf41d + 69722d8 commit 0243aca
Showing 1 changed file with 29 additions and 20 deletions.
49 changes: 29 additions & 20 deletions src/prefect/orion/services/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ async def run_once(self, db: OrionDBInterface):

session = await db.session()
async with session:
async with session.begin():
last_id = None
while True:
last_id = None
while True:
async with session.begin():
query = self._get_select_deployments_to_schedule_query()

# use cursor based pagination
Expand All @@ -75,31 +75,40 @@ async def run_once(self, db: OrionDBInterface):
deployment_ids = result.scalars().unique().all()

# collect runs across all deployments
all_runs = []
runs_to_insert = []
for deployment_id in deployment_ids:
runs = await self._generate_scheduled_flow_runs(
session=session,
deployment_id=deployment_id,
start_time=now,
end_time=now + self.max_scheduled_time,
max_runs=self.max_runs,
)
all_runs.extend(runs)
# guard against erroneously configured schedules
try:
runs_to_insert.extend(
await self._generate_scheduled_flow_runs(
session=session,
deployment_id=deployment_id,
start_time=now,
end_time=now + self.max_scheduled_time,
max_runs=self.max_runs,
)
)
except Exception as exc:
self.logger.error(
f"Error scheduling deployment {deployment_id!r}.",
exc_info=True,
)

# bulk insert the runs based on batch size setting
for batch in batched_iterable(all_runs, self.insert_batch_size):
for batch in batched_iterable(
runs_to_insert, self.insert_batch_size
):
inserted_runs = await self._insert_scheduled_flow_runs(
session=session, runs=batch
)
await session.flush()
total_inserted_runs += len(inserted_runs)

# if no deployments were found, exit the loop
if len(deployment_ids) < self.deployment_batch_size:
break
else:
# record the last deployment ID
last_id = deployment_ids[-1]
# if no deployments were found, exit the loop
if len(deployment_ids) < self.deployment_batch_size:
break
else:
# record the last deployment ID
last_id = deployment_ids[-1]

self.logger.info(f"Scheduled {total_inserted_runs} runs.")

Expand Down

0 comments on commit 0243aca

Please sign in to comment.