Skip to content

Commit f4fcf13

Browse files
authored
Merge pull request #5358 from khushboobhatia01/bkhushboo/race_condition
Make update_execution() atomic
2 parents 8fdb7dc + ec52b89 commit f4fcf13

File tree

4 files changed

+85
-58
lines changed

4 files changed

+85
-58
lines changed

CHANGELOG.rst

+4
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ Fixed
7777
* Correct error reported when encrypted key value is reported, and another key value parameter that requires conversion is present. #5328
7878
Contributed by @amanda11, Ammeon Solutions
7979

80+
* Make ``update_executions()`` atomic by protecting the update with a coordination lock. Actions, like workflows, may have multiple
81+
concurrent updates to their execution state. This makes those updates safer, which should make the execution status more reliable. #5358
82+
83+
Contributed by @khushboobhatia01
8084

8185

8286
3.5.0 - June 23, 2021

st2actions/tests/unit/test_workflow_engine.py

+22-8
Original file line numberDiff line numberDiff line change
@@ -152,12 +152,8 @@ def test_process(self):
152152
lv_ac_db = lv_db_access.LiveAction.get_by_id(str(lv_ac_db.id))
153153
self.assertEqual(lv_ac_db.status, action_constants.LIVEACTION_STATUS_SUCCEEDED)
154154

155-
@mock.patch.object(
156-
coordination_service.NoOpDriver,
157-
"get_lock",
158-
mock.MagicMock(side_effect=coordination.ToozConnectionError("foobar")),
159-
)
160-
def test_process_error_handling(self):
155+
@mock.patch.object(coordination_service.NoOpDriver, "get_lock")
156+
def test_process_error_handling(self, mock_get_lock):
161157
expected_errors = [
162158
{
163159
"message": "Execution failed. See result for details.",
@@ -172,6 +168,7 @@ def test_process_error_handling(self):
172168
},
173169
]
174170

171+
mock_get_lock.side_effect = coordination_service.NoOpLock(name="noop")
175172
wf_meta = self.get_wf_fixture_meta_data(TEST_PACK_PATH, "sequential.yaml")
176173
lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta["name"])
177174
lv_ac_db, ac_ex_db = action_service.request(lv_ac_db)
@@ -190,6 +187,15 @@ def test_process_error_handling(self):
190187
t1_ac_ex_db = ex_db_access.ActionExecution.query(
191188
task_execution=str(t1_ex_db.id)
192189
)[0]
190+
mock_get_lock.side_effect = [
191+
coordination.ToozConnectionError("foobar"),
192+
coordination.ToozConnectionError("foobar"),
193+
coordination.ToozConnectionError("foobar"),
194+
coordination.ToozConnectionError("foobar"),
195+
coordination.ToozConnectionError("foobar"),
196+
coordination_service.NoOpLock(name="noop"),
197+
coordination_service.NoOpLock(name="noop"),
198+
]
193199
workflows.get_engine().process(t1_ac_ex_db)
194200

195201
# Assert the task is marked as failed.
@@ -206,14 +212,14 @@ def test_process_error_handling(self):
206212
@mock.patch.object(
207213
coordination_service.NoOpDriver,
208214
"get_lock",
209-
mock.MagicMock(side_effect=coordination.ToozConnectionError("foobar")),
210215
)
211216
@mock.patch.object(
212217
workflows.WorkflowExecutionHandler,
213218
"fail_workflow_execution",
214219
mock.MagicMock(side_effect=Exception("Unexpected error.")),
215220
)
216-
def test_process_error_handling_has_error(self):
221+
def test_process_error_handling_has_error(self, mock_get_lock):
222+
mock_get_lock.side_effect = coordination_service.NoOpLock(name="noop")
217223
wf_meta = self.get_wf_fixture_meta_data(TEST_PACK_PATH, "sequential.yaml")
218224
lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta["name"])
219225
lv_ac_db, ac_ex_db = action_service.request(lv_ac_db)
@@ -233,13 +239,21 @@ def test_process_error_handling_has_error(self):
233239
task_execution=str(t1_ex_db.id)
234240
)[0]
235241

242+
mock_get_lock.side_effect = [
243+
coordination.ToozConnectionError("foobar"),
244+
coordination.ToozConnectionError("foobar"),
245+
coordination.ToozConnectionError("foobar"),
246+
coordination.ToozConnectionError("foobar"),
247+
coordination.ToozConnectionError("foobar"),
248+
]
236249
self.assertRaisesRegexp(
237250
Exception, "Unexpected error.", workflows.get_engine().process, t1_ac_ex_db
238251
)
239252

240253
self.assertTrue(
241254
workflows.WorkflowExecutionHandler.fail_workflow_execution.called
242255
)
256+
mock_get_lock.side_effect = coordination_service.NoOpLock(name="noop")
243257

244258
# Since error handling failed, the workflow will have status of running.
245259
wf_ex_db = wf_db_access.WorkflowExecution.get_by_id(wf_ex_db.id)

st2common/st2common/services/executions.py

+36-32
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
from st2common.models.db.execution import ActionExecutionDB
4949
from st2common.runners import utils as runners_utils
5050
from st2common.metrics.base import Timer
51+
from st2common.services import coordination
5152
from six.moves import range
5253

5354

@@ -195,39 +196,42 @@ def update_execution(liveaction_db, publish=True, set_result_size=False):
195196
"""
196197
execution = ActionExecution.get(liveaction__id=str(liveaction_db.id))
197198

198-
# Skip execution object update when action is already in completed state.
199-
if execution.status in action_constants.LIVEACTION_COMPLETED_STATES:
200-
LOG.debug(
201-
"[%s] Action is already in completed state: %s. Skipping execution update to state: %s."
202-
% (execution.id, execution.status, liveaction_db.status)
203-
)
204-
return execution
205-
206-
decomposed = _decompose_liveaction(liveaction_db)
207-
208-
kw = {}
209-
for k, v in six.iteritems(decomposed):
210-
kw["set__" + k] = v
211-
212-
if liveaction_db.status != execution.status:
213-
# Note: If the status changes we store this transition in the "log" attribute of action
214-
# execution
215-
kw["push__log"] = _create_execution_log_entry(liveaction_db.status)
216-
217-
if set_result_size:
218-
# Sadly with the current ORM abstraction there is no better way to achieve updating
219-
# result_size and we need to serialize the value again - luckily that operation is fast.
220-
# To put things into perspective - on 4 MB result dictionary it only takes 7 ms which is
221-
# negligible compared to other DB operations duration (and for smaller results it takes
222-
# in sub ms range).
223-
with Timer(key="action.executions.calculate_result_size"):
224-
result_size = len(
225-
ActionExecutionDB.result._serialize_field_value(liveaction_db.result)
199+
with coordination.get_coordinator().get_lock(liveaction_db.id):
200+
# Skip execution object update when action is already in completed state.
201+
if execution.status in action_constants.LIVEACTION_COMPLETED_STATES:
202+
LOG.debug(
203+
"[%s] Action is already in completed state: %s. Skipping execution update to state: %s."
204+
% (execution.id, execution.status, liveaction_db.status)
226205
)
227-
kw["set__result_size"] = result_size
228-
229-
execution = ActionExecution.update(execution, publish=publish, **kw)
230-
return execution
206+
return execution
207+
208+
decomposed = _decompose_liveaction(liveaction_db)
209+
210+
kw = {}
211+
for k, v in six.iteritems(decomposed):
212+
kw["set__" + k] = v
213+
214+
if liveaction_db.status != execution.status:
215+
# Note: If the status changes we store this transition in the "log" attribute of action
216+
# execution
217+
kw["push__log"] = _create_execution_log_entry(liveaction_db.status)
218+
219+
if set_result_size:
220+
# Sadly with the current ORM abstraction there is no better way to achieve updating
221+
# result_size and we need to serialize the value again - luckily that operation is fast.
222+
# To put things into perspective - on 4 MB result dictionary it only takes 7 ms which is
223+
# negligible compared to other DB operations duration (and for smaller results it takes
224+
# in sub ms range).
225+
with Timer(key="action.executions.calculate_result_size"):
226+
result_size = len(
227+
ActionExecutionDB.result._serialize_field_value(
228+
liveaction_db.result
229+
)
230+
)
231+
kw["set__result_size"] = result_size
232+
233+
execution = ActionExecution.update(execution, publish=publish, **kw)
234+
return execution
231235

232236

233237
def abandon_execution_if_incomplete(liveaction_id, publish=True):

st2common/tests/unit/services/test_workflow_service_retries.py

+23-18
Original file line numberDiff line numberDiff line change
@@ -134,18 +134,9 @@ def setUpClass(cls):
134134
for pack in PACKS:
135135
actions_registrar.register_from_pack(pack)
136136

137-
@mock.patch.object(
138-
coord_svc.NoOpDriver,
139-
"get_lock",
140-
mock.MagicMock(
141-
side_effect=[
142-
coordination.ToozConnectionError("foobar"),
143-
coordination.ToozConnectionError("fubar"),
144-
coord_svc.NoOpLock(name="noop"),
145-
]
146-
),
147-
)
148-
def test_recover_from_coordinator_connection_error(self):
137+
@mock.patch.object(coord_svc.NoOpDriver, "get_lock")
138+
def test_recover_from_coordinator_connection_error(self, mock_get_lock):
139+
mock_get_lock.side_effect = coord_svc.NoOpLock(name="noop")
149140
wf_meta = self.get_wf_fixture_meta_data(TEST_PACK_PATH, "sequential.yaml")
150141
lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta["name"])
151142
lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
@@ -161,18 +152,25 @@ def test_recover_from_coordinator_connection_error(self):
161152
)[0]
162153
tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction["id"])
163154
self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
155+
mock_get_lock.side_effect = [
156+
coordination.ToozConnectionError("foobar"),
157+
coordination.ToozConnectionError("foobar"),
158+
coord_svc.NoOpLock(name="noop"),
159+
coord_svc.NoOpLock(name="noop"),
160+
coord_svc.NoOpLock(name="noop"),
161+
coord_svc.NoOpLock(name="noop"),
162+
coord_svc.NoOpLock(name="noop"),
163+
]
164164
wf_svc.handle_action_execution_completion(tk1_ac_ex_db)
165165

166+
mock_get_lock.side_effect = coord_svc.NoOpLock(name="noop")
166167
# Workflow service should recover from retries and task1 should succeed.
167168
tk1_ex_db = wf_db_access.TaskExecution.get_by_id(tk1_ex_db.id)
168169
self.assertEqual(tk1_ex_db.status, wf_statuses.SUCCEEDED)
169170

170-
@mock.patch.object(
171-
coord_svc.NoOpDriver,
172-
"get_lock",
173-
mock.MagicMock(side_effect=coordination.ToozConnectionError("foobar")),
174-
)
175-
def test_retries_exhausted_from_coordinator_connection_error(self):
171+
@mock.patch.object(coord_svc.NoOpDriver, "get_lock")
172+
def test_retries_exhausted_from_coordinator_connection_error(self, mock_get_lock):
173+
mock_get_lock.side_effect = coord_svc.NoOpLock(name="noop")
176174
wf_meta = self.get_wf_fixture_meta_data(TEST_PACK_PATH, "sequential.yaml")
177175
lv_ac_db = lv_db_models.LiveActionDB(action=wf_meta["name"])
178176
lv_ac_db, ac_ex_db = ac_svc.request(lv_ac_db)
@@ -189,6 +187,13 @@ def test_retries_exhausted_from_coordinator_connection_error(self):
189187
tk1_lv_ac_db = lv_db_access.LiveAction.get_by_id(tk1_ac_ex_db.liveaction["id"])
190188
self.assertEqual(tk1_lv_ac_db.status, ac_const.LIVEACTION_STATUS_SUCCEEDED)
191189

190+
mock_get_lock.side_effect = [
191+
coordination.ToozConnectionError("foobar"),
192+
coordination.ToozConnectionError("foobar"),
193+
coordination.ToozConnectionError("foobar"),
194+
coordination.ToozConnectionError("foobar"),
195+
coordination.ToozConnectionError("foobar"),
196+
]
192197
# The connection error should raise if retries are exhaused.
193198
self.assertRaises(
194199
coordination.ToozConnectionError,

0 commit comments

Comments
 (0)