Skip to content

Commit

Permalink
Exponential Backoff Not Functioning in BaseSensorOperator Reschedule …
Browse files Browse the repository at this point in the history
…Mode (#39823)

* Fix: Increment try_number/poke_count in BaseSensorOperator for correct exponential backoff in reschedule mode.

* exponential backoff handling for reschedule mode

---------

Co-authored-by: sanket2000 <email@example.com>
  • Loading branch information
sanket2000 and sanket2000 authored Jun 16, 2024
1 parent 60c2d36 commit 841b28c
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 0 deletions.
31 changes: 31 additions & 0 deletions airflow/sensors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,37 @@ def _get_next_poke_interval(
if not self.exponential_backoff:
return self.poke_interval

if self.reschedule:
# Calculate elapsed time since the sensor started
elapsed_time = run_duration()

# Initialize variables for the simulation
cumulative_time: float = 0.0
estimated_poke_count: int = 0

while cumulative_time <= elapsed_time:
estimated_poke_count += 1
# Calculate min_backoff for the current try number
min_backoff = max(int(self.poke_interval * (2 ** (estimated_poke_count - 2))), 1)

# Calculate the jitter
run_hash = int(
hashlib.sha1(
f"{self.dag_id}#{self.task_id}#{started_at}#{estimated_poke_count}".encode()
).hexdigest(),
16,
)
modded_hash = min_backoff + run_hash % min_backoff

# Calculate the jitter, which is used to prevent multiple sensors simultaneously poking
interval_with_jitter = min(modded_hash, timedelta.max.total_seconds() - 1)

# Add the interval to the cumulative time
cumulative_time += interval_with_jitter

# Now we have an estimated_poke_count based on the elapsed time
poke_count = estimated_poke_count or poke_count

# The value of min_backoff should always be greater than or equal to 1.
min_backoff = max(int(self.poke_interval * (2 ** (poke_count - 2))), 1)

Expand Down
1 change: 1 addition & 0 deletions newsfragments/39823.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed ``BaseSensorOperator`` with exponential backoff and reschedule mode by estimating try number based on ``run_duration``; previously, sensors had a fixed reschedule interval.
65 changes: 65 additions & 0 deletions tests/sensors/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,71 @@ def _get_tis():
assert sensor_ti.state == State.SUCCESS
assert dummy_ti.state == State.NONE

def test_ok_with_reschedule_and_exponential_backoff(
self, make_sensor, time_machine, task_reschedules_for_ti, session
):
sensor, dr = make_sensor(
return_value=None,
poke_interval=10,
timeout=36000,
mode="reschedule",
exponential_backoff=True,
)

def _get_tis():
tis = dr.get_task_instances(session=session)
assert len(tis) == 2
yield next(x for x in tis if x.task_id == SENSOR_OP)
yield next(x for x in tis if x.task_id == DUMMY_OP)

false_count = 10
sensor.poke = Mock(side_effect=[False] * false_count + [True])

task_start_date = timezone.utcnow()

time_machine.move_to(task_start_date, tick=False)
curr_date = task_start_date

def run_duration():
return (timezone.utcnow() - task_start_date).total_seconds()

new_interval = 0

sensor_ti, dummy_ti = _get_tis()
assert dummy_ti.state == State.NONE
assert sensor_ti.state == State.NONE

# ordinarily the scheduler does this
sensor_ti.state = State.SCHEDULED
sensor_ti.try_number += 1 # first TI run
session.commit()

# loop poke returns false
for _poke_count in range(1, false_count + 1):
curr_date = curr_date + timedelta(seconds=new_interval)
time_machine.coordinates.shift(new_interval)
self._run(sensor)
sensor_ti, dummy_ti = _get_tis()
assert sensor_ti.state == State.UP_FOR_RESCHEDULE
# verify another row in task_reschedule table
task_reschedules = task_reschedules_for_ti(sensor_ti)
assert len(task_reschedules) == _poke_count
old_interval = new_interval
new_interval = sensor._get_next_poke_interval(task_start_date, run_duration, _poke_count)
assert old_interval < new_interval # actual test
assert task_reschedules[-1].start_date == curr_date
assert task_reschedules[-1].reschedule_date == curr_date + timedelta(seconds=new_interval)
assert dummy_ti.state == State.NONE

# last poke returns True and task succeeds
curr_date = curr_date + timedelta(seconds=new_interval)
time_machine.coordinates.shift(new_interval)
self._run(sensor)

sensor_ti, dummy_ti = _get_tis()
assert sensor_ti.state == State.SUCCESS
assert dummy_ti.state == State.NONE

@pytest.mark.parametrize("mode", ["poke", "reschedule"])
def test_should_include_ready_to_reschedule_dep(self, mode):
sensor = DummySensor(task_id="a", return_value=True, mode=mode)
Expand Down

0 comments on commit 841b28c

Please sign in to comment.