Skip to content

Commit

Permalink
Expand work_queue table to accommodate work pools (#8264)
Browse files Browse the repository at this point in the history
Co-authored-by: Alexander Streed <alex.s@prefect.io>
  • Loading branch information
cicdw and desertaxle authored Feb 1, 2023
1 parent 87c061e commit 839cc3e
Show file tree
Hide file tree
Showing 40 changed files with 1,382 additions and 1,101 deletions.
42 changes: 14 additions & 28 deletions src/prefect/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from prefect.infrastructure import Infrastructure, InfrastructureResult, Process
from prefect.logging import get_logger
from prefect.orion import schemas
from prefect.orion.schemas.core import BlockDocument, FlowRun, WorkPoolQueue, WorkQueue
from prefect.orion.schemas.core import BlockDocument, FlowRun, WorkQueue
from prefect.orion.schemas.filters import (
FlowRunFilter,
FlowRunFilterId,
Expand Down Expand Up @@ -92,10 +92,10 @@ def __init__(
async def update_matched_agent_work_queues(self):
if self.work_queue_prefix:
if self.work_pool_name:
matched_queues = await self.client.read_work_pool_queues(
matched_queues = await self.client.read_work_queues(
work_pool_name=self.work_pool_name,
work_pool_queue_filter=schemas.filters.WorkPoolQueueFilter(
name=schemas.filters.WorkPoolQueueFilterName(
work_queue_filter=schemas.filters.WorkQueueFilter(
name=schemas.filters.WorkQueueFilterName(
startswith_=self.work_queue_prefix
)
),
Expand All @@ -118,7 +118,7 @@ async def update_matched_agent_work_queues(self):
)
self.work_queues = matched_queues

async def get_work_queues(self) -> AsyncIterator[Union[WorkQueue, WorkPoolQueue]]:
async def get_work_queues(self) -> AsyncIterator[WorkQueue]:
"""
Loads the work queue objects corresponding to the agent's target work
queues. If any of them don't exist, they are created.
Expand All @@ -140,31 +140,24 @@ async def get_work_queues(self) -> AsyncIterator[Union[WorkQueue, WorkPoolQueue]

for name in self.work_queues:
try:
if self.work_pool_name:
work_queue = await self.client.read_work_pool_queue(
work_pool_name=self.work_pool_name, work_pool_queue_name=name
)
else:
work_queue = await self.client.read_work_queue_by_name(name)
work_queue = await self.client.read_work_queue_by_name(
work_pool_name=self.work_pool_name, name=name
)
except ObjectNotFound:

# if the work queue wasn't found, create it
if not self.work_queue_prefix:
# do not attempt to create work queues if the agent is polling for
# queues using a regex
try:
work_queue = await self.client.create_work_queue(
work_pool_name=self.work_pool_name, name=name
)
if self.work_pool_name:
work_queue = await self.client.create_work_pool_queue(
work_pool_name=self.work_pool_name,
work_pool_queue=schemas.actions.WorkPoolQueueCreate(
name=name
),
)
self.logger.info(
f"Created work queue {name!r} in work pool {self.work_pool_name!r}."
)
else:
work_queue = await self.client.create_work_queue(name=name)
self.logger.info(f"Created work queue '{name}'.")

# if creating it raises an exception, it was probably just
Expand Down Expand Up @@ -195,9 +188,9 @@ async def get_and_submit_flow_runs(self) -> List[FlowRun]:
submittable_runs: List[FlowRun] = []

if self.work_pool_name:
responses = await self.client.get_scheduled_flow_runs_for_work_pool_queues(
responses = await self.client.get_scheduled_flow_runs_for_work_pool(
work_pool_name=self.work_pool_name,
work_pool_queue_names=[wq.name async for wq in self.get_work_queues()],
work_queue_names=[wq.name async for wq in self.get_work_queues()],
scheduled_before=before,
)
submittable_runs.extend([response.flow_run for response in responses])
Expand All @@ -214,14 +207,7 @@ async def get_and_submit_flow_runs(self) -> List[FlowRun]:

else:
try:
if isinstance(work_queue, WorkPoolQueue):
responses = await self.client.get_scheduled_flow_runs_for_work_pool_queues(
work_pool_name=self.work_pool_name,
work_pool_queue_names=[work_queue.name],
scheduled_before=before,
)
queue_runs = [response.flow_run for response in responses]
else:
if not self.work_pool_name:
queue_runs = await self.client.get_runs_in_work_queue(
id=work_queue.id, limit=10, scheduled_before=before
)
Expand Down
14 changes: 7 additions & 7 deletions src/prefect/cli/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,22 +602,22 @@ async def apply(
f"View Deployment in UI: {PREFECT_UI_URL.value()}/deployments/deployment/{deployment_id}"
)

if deployment.work_queue_name is not None:
if deployment.work_pool_name is not None:
app.console.print(
"\nTo execute flow runs from this deployment, start an agent "
f"that pulls work from the {deployment.work_queue_name!r} work queue:"
f"that pulls work from the {deployment.work_pool_name!r} work pool:"
)
app.console.print(
f"$ prefect agent start -q {deployment.work_queue_name!r}", style="blue"
f"$ prefect agent start -p {deployment.work_pool_name!r}",
style="blue",
)
elif deployment.work_pool_name is not None:
elif deployment.work_queue_name is not None:
app.console.print(
"\nTo execute flow runs from this deployment, start an agent "
f"that pulls work from the {deployment.work_pool_name!r} work pool:"
f"that pulls work from the {deployment.work_queue_name!r} work queue:"
)
app.console.print(
f"$ prefect agent start --pool {deployment.work_pool_name!r}",
style="blue",
f"$ prefect agent start -q {deployment.work_queue_name!r}", style="blue"
)
else:
app.console.print(
Expand Down
Loading

0 comments on commit 839cc3e

Please sign in to comment.