Skip to content

IFC-1436 Make work pools configurable #6346

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 13 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion backend/infrahub/cli/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from infrahub import config
from infrahub.services.adapters.workflow.worker import WorkflowWorkerExecution
from infrahub.tasks.dummy import DUMMY_FLOW, DummyInput
from infrahub.workflows.constants import WorkerType, WorkflowType
from infrahub.workflows.initialization import setup_task_manager
from infrahub.workflows.models import WorkerPoolDefinition

Expand Down Expand Up @@ -43,7 +44,12 @@ async def execute(
async with get_client(sync_client=False) as client:
worker = WorkflowWorkerExecution()
await DUMMY_FLOW.save(
client=client, work_pool=WorkerPoolDefinition(name="infrahub-worker", worker_type="infrahubasync")
client=client,
work_pool=WorkerPoolDefinition(
name="infrahub-worker",
workflow_type=WorkflowType.INTERNAL | WorkflowType.CORE | WorkflowType.USER,
worker_type=WorkerType.INFRAHUB_ASYNC,
),
)

result = await worker.execute_workflow(
Expand Down
26 changes: 25 additions & 1 deletion backend/infrahub/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@

from infrahub.constants.database import DatabaseType
from infrahub.exceptions import InitializationError, ProcessingError
from infrahub.workflows.constants import WorkerType, WorkflowType
from infrahub.workflows.models import WorkerPoolDefinition

if TYPE_CHECKING:
from infrahub.services.adapters.cache import InfrahubCache
Expand Down Expand Up @@ -377,7 +379,7 @@ class WorkflowSettings(BaseSettings):
port: int | None = Field(default=None, ge=1, le=65535, description="Specified if running on a non default port.")
tls_enabled: bool = Field(default=False, description="Indicates if TLS is enabled for the connection")
driver: WorkflowDriver = WorkflowDriver.WORKER
default_worker_type: str = "infrahubasync"
default_worker_type: str = WorkerType.INFRAHUB_ASYNC
extra_loggers: list[str] = Field(
default_factory=list, description="A list of additional logger that will be captured during task execution."
)
Expand All @@ -387,6 +389,20 @@ class WorkflowSettings(BaseSettings):
worker_polling_interval: int = Field(
default=2, ge=1, le=30, description="Specify how often the worker should poll the server for tasks (sec)"
)
work_pools: list[WorkerPoolDefinition] = Field(
default=[
WorkerPoolDefinition(
name="infrahub-worker",
workflow_type=WorkflowType.INTERNAL | WorkflowType.CORE | WorkflowType.USER,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would the configuration looks like when using environment variables if we want to configure multiple work_pools ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, not sure if having that as part of the configuration will work well .. ideally the worker pools should be pre-defined by the community and the enterprise version

Copy link
Contributor Author

@gmazoyer gmazoyer May 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC Pydantic is able to figure out these complex settings given a valid string representation of them. So in this case, it would be:

INFRAHUB_WORKFLOW_WORK_POOLS='[
  {
    "name": "infrahub-worker",
    "workflow_type": 7,
    "worker_type": "INFRAHUB_ASYNC",
    "description": "Default Pool for internal tasks"
  }
]'

The workflow type is more abstract as it's a int flag, but maybe this could be worked around by a validator for WorkerPoolDefinition.workflow_type.

The idea of making this a setting was it should be easy to change so we could have different values for community and enterprise without involving actual code change.

worker_type=WorkerType.INFRAHUB_ASYNC,
description="Default Pool for internal tasks",
)
],
description="Definitions of work pools used to process tasks",
)
workflow_routing: dict[WorkflowType, WorkerPoolDefinition] = Field(
default_factory=dict, description="Define how workflows are assigned between work pools"
)

@property
def api_endpoint(self) -> str:
Expand All @@ -397,6 +413,14 @@ def api_endpoint(self) -> str:
url += "/api"
return url

@model_validator(mode="after")
def setup_task_routing(self) -> Self:
for work_pool in self.work_pools:
for workflow_type in list(work_pool.workflow_type):
self.workflow_routing[workflow_type] = work_pool

return self


class ApiSettings(BaseSettings):
model_config = SettingsConfigDict(env_prefix="INFRAHUB_API_")
Expand Down
3 changes: 1 addition & 2 deletions backend/infrahub/workers/infrahub_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ async def run(
entrypoint: str = configuration._related_objects["deployment"].entrypoint

file_path, flow_name = entrypoint.split(":")
file_path.replace("/", ".")
module_path = file_path.replace("backend/", "").replace(".py", "").replace("/", ".")
module_path = file_path.removeprefix("backend/").removesuffix(".py").replace("/", ".")
flow_func = load_flow_function(module_path=module_path, flow_name=flow_name)
inject_service_parameter(func=flow_func, parameters=flow_run.parameters, service=self.service)
flow_run_logger.debug("Validating parameters")
Expand Down
7 changes: 1 addition & 6 deletions backend/infrahub/workflows/catalogue.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import random

from .constants import WorkflowTag, WorkflowType
from .models import WorkerPoolDefinition, WorkflowDefinition

INFRAHUB_WORKER_POOL = WorkerPoolDefinition(name="infrahub-worker", description="Default Pool for internal tasks")

from .models import WorkflowDefinition

ACTION_ADD_NODE_TO_GROUP = WorkflowDefinition(
name="action-add-node-to-group",
Expand Down Expand Up @@ -529,8 +526,6 @@
)


worker_pools = [INFRAHUB_WORKER_POOL]

workflows = [
ACTION_ADD_NODE_TO_GROUP,
ACTION_RUN_GENERATOR,
Expand Down
17 changes: 12 additions & 5 deletions backend/infrahub/workflows/constants.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from enum import Flag, auto

from infrahub.utils import InfrahubStringEnum


class WorkflowType(InfrahubStringEnum):
INTERNAL = "internal"
CORE = "core"
USER = "user"
class WorkflowType(Flag):
INTERNAL = auto()
CORE = auto()
USER = auto()


TAG_NAMESPACE = "infrahub.app"
Expand All @@ -19,5 +21,10 @@ class WorkflowTag(InfrahubStringEnum):
def render(self, identifier: str | None = None) -> str:
if identifier is None:
return f"{TAG_NAMESPACE}/{self.value}"
rendered_value = str(self.value).format(identifier=identifier)
rendered_value = str(self.value).format(identifier=identifier.lower())
return f"{TAG_NAMESPACE}/{rendered_value}"


class WorkerType(InfrahubStringEnum):
INFRAHUB_ASYNC = "infrahubasync"
PROCESS = "process"
9 changes: 3 additions & 6 deletions backend/infrahub/workflows/initialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
from infrahub.trigger.models import TriggerType
from infrahub.trigger.setup import setup_triggers

from .catalogue import worker_pools, workflows
from .catalogue import workflows
from .models import TASK_RESULT_STORAGE_NAME


@task(name="task-manager-setup-worker-pools", task_run_name="Setup Worker pools", cache_policy=NONE) # type: ignore[arg-type]
async def setup_worker_pools(client: PrefectClient) -> None:
log = get_run_logger()
for worker in worker_pools:
for worker in config.SETTINGS.workflow.work_pools:
wp = WorkPoolCreate(
name=worker.name,
type=worker.worker_type or config.SETTINGS.workflow.default_worker_type,
Expand All @@ -35,10 +35,7 @@ async def setup_worker_pools(client: PrefectClient) -> None:
async def setup_deployments(client: PrefectClient) -> None:
log = get_run_logger()
for workflow in workflows:
# For now the workpool is hardcoded but
# later we need to make it dynamic to have a different worker based on the type of the workflow
work_pool = worker_pools[0]
await workflow.save(client=client, work_pool=work_pool)
await workflow.save(client=client, work_pool=config.SETTINGS.workflow.workflow_routing[workflow.type])
log.info(f"Flow {workflow.name}, created successfully ... ")


Expand Down
3 changes: 2 additions & 1 deletion backend/infrahub/workflows/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

class WorkerPoolDefinition(BaseModel):
name: str
workflow_type: WorkflowType
worker_type: str | None = None
description: str = ""

Expand Down Expand Up @@ -70,7 +71,7 @@ def get_tags(self) -> list[str]:
tags: list[str] = []
if self.type != WorkflowType.INTERNAL:
tags.append(TAG_NAMESPACE)
tags.append(WorkflowTag.WORKFLOWTYPE.render(identifier=self.type.value))
tags.append(WorkflowTag.WORKFLOWTYPE.render(identifier=self.type.name))
tags += [tag.render() for tag in self.tags]
return tags

Expand Down
5 changes: 3 additions & 2 deletions backend/tests/functional/webhook/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from prefect.client.orchestration import PrefectClient, get_client
from prefect.events.actions import RunDeployment

from infrahub import config
from infrahub.core.constants import InfrahubKind
from infrahub.core.node import Node
from infrahub.webhook.models import EventContext, WebhookTriggerDefinition
Expand All @@ -18,7 +19,7 @@
delete_webhook_automation,
webhook_process,
)
from infrahub.workflows.catalogue import WEBHOOK_PROCESS, worker_pools
from infrahub.workflows.catalogue import WEBHOOK_PROCESS
from infrahub.workflows.initialization import setup_worker_pools
from tests.adapters.http import MemoryHTTP
from tests.constants import TestKind
Expand Down Expand Up @@ -104,7 +105,7 @@ async def prefect_client(self, prefect_test_fixture) -> AsyncGenerator[PrefectCl
@pytest.fixture(scope="class")
async def webhook_deployment(self, db: InfrahubDatabase, prefect_client: PrefectClient) -> None:
await setup_worker_pools(client=prefect_client)
await WEBHOOK_PROCESS.save(client=prefect_client, work_pool=worker_pools[0])
await WEBHOOK_PROCESS.save(client=prefect_client, work_pool=config.SETTINGS.workflow.work_pools[0])

@pytest.fixture(scope="class")
async def webhook1(self, db: InfrahubDatabase, initial_dataset: None, client: InfrahubClient) -> Node:
Expand Down
31 changes: 18 additions & 13 deletions backend/tests/helpers/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,24 @@
from prefect.workers.base import BaseWorkerResult

from infrahub.tasks.dummy import DUMMY_FLOW, DUMMY_FLOW_BROKEN
from infrahub.workers.infrahub_async import (
InfrahubWorkerAsync,
)
from infrahub.workflows.catalogue import INFRAHUB_WORKER_POOL
from infrahub.workers.infrahub_async import InfrahubWorkerAsync
from infrahub.workflows.constants import WorkflowType
from infrahub.workflows.initialization import setup_blocks
from infrahub.workflows.models import WorkerPoolDefinition
from tests.helpers.constants import (
PORT_PREFECT,
)
from tests.helpers.constants import PORT_PREFECT
from tests.helpers.test_app import TestInfrahubApp
from tests.helpers.utils import start_prefect_server_container


@pytest.fixture(scope="session")
def infrahubasync_worker() -> WorkerPoolDefinition:
return WorkerPoolDefinition(
name="infrahub-worker",
workflow_type=WorkflowType.INTERNAL | WorkflowType.CORE | WorkflowType.USER,
description="Default Pool for internal tasks",
)


class TestWorkerInfrahubAsync(TestInfrahubApp):
@classmethod
async def wait_for_flow(
Expand Down Expand Up @@ -89,11 +94,9 @@ async def prefect_client(self, prefect_server: str) -> PrefectClient:
return PrefectClient(api=prefect_server)

@pytest.fixture(scope="class")
async def work_pool(self, prefect_client: PrefectClient) -> WorkPool:
async def work_pool(self, infrahubasync_worker: WorkerPoolDefinition, prefect_client: PrefectClient) -> WorkPool:
wp = WorkPoolCreate(
name=INFRAHUB_WORKER_POOL.name,
type=InfrahubWorkerAsync.type,
description=INFRAHUB_WORKER_POOL.name,
name=infrahubasync_worker.name, type=InfrahubWorkerAsync.type, description=infrahubasync_worker.name
)
return await prefect_client.create_work_pool(work_pool=wp, overwrite=True)

Expand All @@ -102,9 +105,11 @@ async def block_storage(self, redis: dict[int, int] | None, prefect_client: Pref
await setup_blocks()

@pytest.fixture(scope="class")
async def dummy_flows_deployment(self, work_pool: WorkerPoolDefinition, prefect_client: PrefectClient) -> None:
async def dummy_flows_deployment(
self, work_pool: WorkerPoolDefinition, infrahubasync_worker: WorkerPoolDefinition, prefect_client: PrefectClient
) -> None:
for flow in [DUMMY_FLOW, DUMMY_FLOW_BROKEN]:
await flow.save(client=prefect_client, work_pool=INFRAHUB_WORKER_POOL)
await flow.save(client=prefect_client, work_pool=infrahubasync_worker)

@pytest.fixture(scope="class")
async def prefect_worker(
Expand Down
18 changes: 18 additions & 0 deletions backend/tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from infrahub.core.utils import delete_all_nodes
from infrahub.database import InfrahubDatabase
from infrahub.utils import get_models_dir
from infrahub.workflows.constants import WorkflowType
from infrahub.workflows.models import WorkerPoolDefinition
from tests.helpers.file_repo import FileRepo


Expand Down Expand Up @@ -135,3 +137,19 @@ def prefect_test_fixture():
def prefect_test(prefect_test_fixture):
with disable_run_logger():
yield


@pytest.fixture(scope="session")
def infrahubasync_worker() -> WorkerPoolDefinition:
return WorkerPoolDefinition(
name="infrahub-worker",
workflow_type=WorkflowType.INTERNAL | WorkflowType.CORE | WorkflowType.USER,
description="Default Pool for internal tasks",
)


@pytest.fixture(scope="session")
def user_worker() -> WorkerPoolDefinition:
return WorkerPoolDefinition(
name="user-task-worker", workflow_type=WorkflowType.USER, description="Default Pool for user tasks"
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from prefect.client.orchestration import PrefectClient

from infrahub.workflows.catalogue import INFRAHUB_WORKER_POOL
from infrahub.workflows.initialization import setup_task_manager
from infrahub.workflows.models import WorkerPoolDefinition
from tests.helpers.test_worker import TestWorkerInfrahubAsync

# @pytest.fixture
Expand All @@ -10,14 +10,14 @@


class TestTaskManagerSetup(TestWorkerInfrahubAsync):
async def test_setup_task_manager(self, prefect_client: PrefectClient):
async def test_setup_task_manager(self, infrahubasync_worker: WorkerPoolDefinition, prefect_client: PrefectClient):
await setup_task_manager()

response = await prefect_client.read_work_pool(INFRAHUB_WORKER_POOL.name)
response = await prefect_client.read_work_pool(infrahubasync_worker.name)
assert response.type == "infrahubasync"

# Setup the task manager a second time to validate that it's idempotent
await setup_task_manager()

response = await prefect_client.read_work_pool(INFRAHUB_WORKER_POOL.name)
response = await prefect_client.read_work_pool(infrahubasync_worker.name)
assert response.type == "infrahubasync"
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@
from infrahub.database import InfrahubDatabase
from infrahub.services.adapters.workflow.worker import WorkflowWorkerExecution
from infrahub.tasks.dummy import DUMMY_FLOW, DUMMY_FLOW_BROKEN, DummyInput, DummyOutput
from infrahub.workers.infrahub_async import (
InfrahubWorkerAsync,
)
from infrahub.workers.infrahub_async import InfrahubWorkerAsync
from tests.helpers.test_worker import TestWorkerInfrahubAsync


Expand Down
5 changes: 3 additions & 2 deletions backend/tests/unit/workflows/test_catalogue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@

import pytest

from infrahub import config
from infrahub.workflows import catalogue
from infrahub.workflows.catalogue import worker_pools, workflows
from infrahub.workflows.catalogue import workflows

if TYPE_CHECKING:
from infrahub.workflows.models import WorkflowDefinition
Expand Down Expand Up @@ -37,7 +38,7 @@ def test_workflow_definition_flow_names() -> None:
def test_workflows_sorted() -> None:
workflow_names = sorted(name for name in dir(catalogue) if name.isupper())
ordered_workflows = [getattr(catalogue, name) for name in workflow_names]
for worker_pool in worker_pools:
for worker_pool in config.SETTINGS.workflow.work_pools:
if worker_pool in ordered_workflows:
ordered_workflows.remove(worker_pool)
assert ordered_workflows == workflows, "The list of workflows isn't sorted alphabetically"
Loading