Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3006.x] Fix parallel state execution with Salt-SSH #66517

Merged
merged 2 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/66514.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed parallel state execution with Salt-SSH
21 changes: 19 additions & 2 deletions salt/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import salt.utils.files
import salt.utils.hashutils
import salt.utils.immutabletypes as immutabletypes
import salt.utils.jid
import salt.utils.msgpack
import salt.utils.platform
import salt.utils.process
Expand Down Expand Up @@ -757,7 +758,21 @@ def __init__(
loader="states",
initial_pillar=None,
file_client=None,
__invocation_id=None,
):
"""
When instantiating an object of this class, do not pass
``__invocation_id``. It is an internal field for tracking
parallel executions where no jid is available (Salt-SSH) and
only exposed as an init argument to work on spawning platforms.
"""
if jid is not None:
__invocation_id = jid
dwoz marked this conversation as resolved.
Show resolved Hide resolved
if __invocation_id is None:
# For salt-ssh parallel states, we need a unique identifier
# for a single execution. self.jid should not be set there
# since it's used for other purposes as well.
__invocation_id = salt.utils.jid.gen_jid(opts)
self._init_kwargs = {
"opts": opts,
"pillar_override": pillar_override,
Expand All @@ -768,6 +783,7 @@ def __init__(
"mocked": mocked,
"loader": loader,
"initial_pillar": initial_pillar,
"__invocation_id": __invocation_id,
}
self.states_loader = loader
if "grains" not in opts:
Expand Down Expand Up @@ -814,6 +830,7 @@ def __init__(
self.pre = {}
self.__run_num = 0
self.jid = jid
self.invocation_id = __invocation_id
self.instance_id = str(id(self))
self.inject_globals = {}
self.mocked = mocked
Expand Down Expand Up @@ -2236,7 +2253,7 @@ def _call_parallel_target(cls, instance, init_kwargs, name, cdata, low):
]
)

troot = os.path.join(instance.opts["cachedir"], instance.jid)
troot = os.path.join(instance.opts["cachedir"], instance.invocation_id)
tfile = os.path.join(troot, salt.utils.hashutils.sha1_digest(tag))
if not os.path.isdir(troot):
try:
Expand Down Expand Up @@ -2820,7 +2837,7 @@ def reconcile_procs(self, running):
if not proc.is_alive():
ret_cache = os.path.join(
self.opts["cachedir"],
self.jid,
self.invocation_id,
salt.utils.hashutils.sha1_digest(tag),
)
if not os.path.isfile(ret_cache):
Expand Down
61 changes: 61 additions & 0 deletions tests/pytests/integration/ssh/state/test_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""
Verify salt-ssh states support ``parallel``.
"""

import pytest

pytestmark = [
pytest.mark.skip_on_windows(reason="salt-ssh not available on Windows"),
pytest.mark.slow_test,
]


@pytest.fixture(scope="module", autouse=True)
def state_tree_parallel(base_env_state_tree_root_dir):
top_file = """
base:
'localhost':
- parallel
'127.0.0.1':
- parallel
"""
state_file = """
{%- for i in range(5) %}
This runs in parallel {{ i }}:
cmd.run:
- name: sleep 0.{{ i }}
- parallel: true
{%- endfor %}
"""
top_tempfile = pytest.helpers.temp_file(
"top.sls", top_file, base_env_state_tree_root_dir
)
state_tempfile = pytest.helpers.temp_file(
"parallel.sls", state_file, base_env_state_tree_root_dir
)
with top_tempfile, state_tempfile:
yield


@pytest.mark.parametrize(
"args",
(
pytest.param(("state.sls", "parallel"), id="sls"),
pytest.param(("state.highstate",), id="highstate"),
pytest.param(("state.top", "top.sls"), id="top"),
),
)
def test_it(salt_ssh_cli, args):
"""
Ensure states with ``parallel: true`` do not cause a crash.
This does not check that they were actually run in parallel
since that would result either in a long-running or flaky test.
"""
ret = salt_ssh_cli.run(*args)
assert ret.returncode == 0
assert isinstance(ret.data, dict)
for i in range(5):
key = f"cmd_|-This runs in parallel {i}_|-sleep 0.{i}_|-run"
assert key in ret.data
assert "pid" in ret.data[key]["changes"]
assert ret.data[key]["changes"]["retcode"] == 0
Loading