Skip to content

Commit

Permalink
backport deep merge logic for env (#15363)
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz authored Sep 12, 2024
1 parent de06752 commit e0267b4
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/prefect/server/models/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ async def update_work_pool(
[UUID, pendulum.DateTime, "ORMWorkPool", "ORMWorkPool"],
Awaitable[None],
]
],
] = None,
) -> bool:
"""
Update a WorkPool by id.
Expand Down
6 changes: 6 additions & 0 deletions src/prefect/workers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ async def from_template_and_values(
)
variables.update(values)

# deep merge `env`
if isinstance(job_config.get("env"), dict) and (
hardcoded_env := variables.get("env")
):
job_config["env"] = {**hardcoded_env, **job_config.get("env", {})}

populated_configuration = apply_values(template=job_config, values=variables)
populated_configuration = await resolve_block_document_references(
template=populated_configuration, client=client
Expand Down
78 changes: 70 additions & 8 deletions tests/workers/test_base_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
)
from prefect.flows import flow
from prefect.server import models
from prefect.server.schemas.actions import WorkPoolUpdate as ServerWorkPoolUpdate
from prefect.server.schemas.core import Deployment, Flow
from prefect.server.schemas.responses import DeploymentResponse
from prefect.server.schemas.states import StateType
Expand Down Expand Up @@ -2075,33 +2076,94 @@ async def test_worker_last_polled_health_check(
pendulum.set_test_now()


async def test_env_merge_logic_is_deep(prefect_client, session, flow):
@pytest.mark.parametrize(
"work_pool_env, deployment_env, flow_run_env, expected_env",
[
(
{},
{"test-var": "foo"},
{"another-var": "boo"},
{"test-var": "foo", "another-var": "boo"},
),
(
{"A": "1", "B": "2"},
{"C": "3", "D": "4"},
{},
{"A": "1", "B": "2", "C": "3", "D": "4"},
),
(
{"A": "1", "B": "2"},
{"C": "42"},
{"C": "3", "D": "4"},
{"A": "1", "B": "2", "C": "3", "D": "4"},
),
(
{"A": "1", "B": "2"},
{"B": ""}, # will be treated as unset and not apply
{},
{"A": "1", "B": "2"},
),
],
ids=[
"flow_run_into_deployment",
"deployment_into_work_pool",
"flow_run_into_work_pool",
"try_overwrite_with_empty_str",
],
)
async def test_env_merge_logic_is_deep(
prefect_client,
session,
flow,
work_pool,
work_pool_env,
deployment_env,
flow_run_env,
expected_env,
):
if work_pool_env:
await models.workers.update_work_pool(
session=session,
work_pool_id=work_pool.id,
work_pool=ServerWorkPoolUpdate(
base_job_template={
"job_configuration": {"env": work_pool_env},
"variables": {"properties": {"env": {"type": "object"}}},
},
description="test",
is_paused=False,
concurrency_limit=None,
),
)
await session.commit()

deployment = await models.deployments.create_deployment(
session=session,
deployment=Deployment(
name="env-testing",
tags=["test"],
flow_id=flow.id,
schedule=None,
path="./subdir",
entrypoint="/file.py:flow",
parameter_openapi_schema=None,
job_variables={"env": {"test-var": "foo"}},
parameter_openapi_schema={},
job_variables={"env": deployment_env},
work_queue_id=work_pool.default_queue_id,
),
)
await session.commit()

flow_run = await prefect_client.create_flow_run_from_deployment(
deployment.id,
state=Pending(),
job_variables={"env": {"another-var": "boo"}},
job_variables={"env": flow_run_env},
)

async with WorkerTestImpl(
name="test",
work_pool_name="test-work-pool",
work_pool_name=work_pool.name if work_pool_env else "test-work-pool",
) as worker:
await worker.sync_with_backend()
config = await worker._get_configuration(flow_run)

assert config.env["test-var"] == "foo"
assert config.env["another-var"] == "boo"
for key, value in expected_env.items():
assert config.env[key] == value

0 comments on commit e0267b4

Please sign in to comment.