Skip to content

Commit ee49219

Browse files
ChristoGrabmaxi297
andauthored
fix: handle backoff_strategies in CompositeErrorHandler (#225)
Co-authored-by: maxi297 <maxime@airbyte.io>
1 parent 14375fe commit ee49219

File tree

3 files changed

+132
-0
lines changed

3 files changed

+132
-0
lines changed

airbyte_cdk/sources/declarative/requesters/error_handlers/composite_error_handler.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import requests
99

1010
from airbyte_cdk.sources.streams.http.error_handlers import ErrorHandler
11+
from airbyte_cdk.sources.streams.http.error_handlers.backoff_strategy import BackoffStrategy
1112
from airbyte_cdk.sources.streams.http.error_handlers.response_models import (
1213
ErrorResolution,
1314
ResponseAction,
@@ -77,3 +78,24 @@ def interpret_response(
7778
return matched_error_resolution
7879

7980
return create_fallback_error_resolution(response_or_exception)
81+
82+
@property
83+
def backoff_strategies(self) -> Optional[List[BackoffStrategy]]:
84+
"""
85+
Combines backoff strategies from all child error handlers into a single flattened list.
86+
87+
When used with HttpRequester, note the following behavior:
88+
- In HttpRequester.__post_init__, the entire list of backoff strategies is assigned to the error handler
89+
- However, the error handler's backoff_time() method only ever uses the first non-None strategy in the list
90+
- This means that if any backoff strategies are present, the first non-None strategy becomes the default
91+
- This applies to both user-defined response filters and errors from DEFAULT_ERROR_MAPPING
92+
- The list structure is not used to map different strategies to different error conditions
93+
- Therefore, subsequent strategies in the list will not be used
94+
95+
Returns None if no handlers have strategies defined, which will result in HttpRequester using its default backoff strategy.
96+
"""
97+
all_strategies = []
98+
for handler in self.error_handlers:
99+
if hasattr(handler, "backoff_strategies") and handler.backoff_strategies:
100+
all_strategies.extend(handler.backoff_strategies)
101+
return all_strategies if all_strategies else None

unit_tests/sources/declarative/requesters/error_handlers/test_composite_error_handler.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@
99

1010
from airbyte_cdk.models import FailureType
1111
from airbyte_cdk.sources.declarative.requesters.error_handlers import HttpResponseFilter
12+
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies import (
13+
ConstantBackoffStrategy,
14+
)
1215
from airbyte_cdk.sources.declarative.requesters.error_handlers.composite_error_handler import (
1316
CompositeErrorHandler,
1417
)
@@ -272,3 +275,77 @@ def test_max_time_is_max_of_underlying_handlers(test_name, max_times, expected_m
272275

273276
max_time = composite_error_handler.max_time
274277
assert max_time == expected_max_time
278+
279+
280+
@pytest.mark.parametrize(
281+
"test_name, handler_strategies, expected_strategies",
282+
[
283+
("test_empty_strategies", [None, None], None),
284+
(
285+
"test_single_handler_with_strategy",
286+
[[ConstantBackoffStrategy(5, {}, {})], None],
287+
[ConstantBackoffStrategy(5, {}, {})],
288+
),
289+
(
290+
"test_multiple_handlers_with_strategies",
291+
[[ConstantBackoffStrategy(5, {}, {})], [ConstantBackoffStrategy(10, {}, {})]],
292+
[ConstantBackoffStrategy(5, {}, {}), ConstantBackoffStrategy(10, {}, {})],
293+
),
294+
(
295+
"test_some_handlers_without_strategies",
296+
[[ConstantBackoffStrategy(5, {}, {})], None, [ConstantBackoffStrategy(10, {}, {})]],
297+
[ConstantBackoffStrategy(5, {}, {}), ConstantBackoffStrategy(10, {}, {})],
298+
),
299+
],
300+
)
301+
def test_composite_error_handler_backoff_strategies(
302+
test_name, handler_strategies, expected_strategies
303+
):
304+
parameters = {}
305+
config = {}
306+
307+
error_handlers = [
308+
DefaultErrorHandler(backoff_strategies=strategies, parameters=parameters, config=config)
309+
for strategies in handler_strategies
310+
]
311+
312+
composite_handler = CompositeErrorHandler(error_handlers=error_handlers, parameters=parameters)
313+
314+
assert composite_handler.backoff_strategies == expected_strategies
315+
316+
317+
def test_composite_error_handler_always_uses_first_strategy():
318+
first_handler = DefaultErrorHandler(
319+
backoff_strategies=[ConstantBackoffStrategy(5, {}, {})],
320+
parameters={},
321+
config={},
322+
response_filters=[
323+
HttpResponseFilter(
324+
action=ResponseAction.RETRY, http_codes={429}, config={}, parameters={}
325+
)
326+
],
327+
)
328+
second_handler = DefaultErrorHandler(
329+
backoff_strategies=[ConstantBackoffStrategy(10, {}, {})],
330+
parameters={},
331+
config={},
332+
response_filters=[
333+
HttpResponseFilter(
334+
action=ResponseAction.RETRY, http_codes={500}, config={}, parameters={}
335+
)
336+
],
337+
)
338+
339+
composite_handler = CompositeErrorHandler(
340+
error_handlers=[first_handler, second_handler], parameters={}
341+
)
342+
343+
# Test that even for a 500 error (which matches second handler's filter),
344+
# we still get both strategies with first handler's coming first
345+
response_mock = create_response(500)
346+
assert first_handler.backoff_strategies[0].backoff_time(response_mock, 1) == 5
347+
348+
# Verify we get both strategies in the composite handler
349+
assert len(composite_handler.backoff_strategies) == 2
350+
assert isinstance(composite_handler.backoff_strategies[0], ConstantBackoffStrategy)
351+
assert composite_handler.backoff_strategies[1], ConstantBackoffStrategy

unit_tests/sources/declarative/requesters/test_http_requester.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
from airbyte_cdk.sources.declarative.auth.declarative_authenticator import DeclarativeAuthenticator
1515
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
16+
from airbyte_cdk.sources.declarative.requesters.error_handlers import HttpResponseFilter
1617
from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies import (
1718
ConstantBackoffStrategy,
1819
ExponentialBackoffStrategy,
@@ -26,6 +27,7 @@
2627
InterpolatedRequestOptionsProvider,
2728
)
2829
from airbyte_cdk.sources.message import MessageRepository
30+
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
2931
from airbyte_cdk.sources.streams.http.exceptions import (
3032
RequestBodyException,
3133
UserDefinedBackoffException,
@@ -901,3 +903,34 @@ def test_request_attempt_count_with_exponential_backoff_strategy(http_requester_
901903
http_requester._http_client._request_attempt_count.get(request_mock)
902904
== http_requester._http_client._max_retries + 1
903905
)
906+
907+
908+
@pytest.mark.usefixtures("mock_sleep")
909+
def test_backoff_strategy_from_manifest_is_respected(http_requester_factory: Any) -> None:
910+
backoff_strategy = ConstantBackoffStrategy(
911+
parameters={}, config={}, backoff_time_in_seconds=0.1
912+
)
913+
error_handler = DefaultErrorHandler(
914+
parameters={}, config={}, max_retries=1, backoff_strategies=[backoff_strategy]
915+
)
916+
917+
request_mock = MagicMock(spec=requests.PreparedRequest)
918+
request_mock.headers = {}
919+
request_mock.url = "https://orksy.com/orks_rule_humies_drule"
920+
request_mock.method = "GET"
921+
request_mock.body = {}
922+
923+
http_requester = http_requester_factory(error_handler=error_handler)
924+
http_requester._http_client._session.send = MagicMock()
925+
926+
response = requests.Response()
927+
response.status_code = 500
928+
http_requester._http_client._session.send.return_value = response
929+
930+
with pytest.raises(UserDefinedBackoffException):
931+
http_requester._http_client._send_with_retry(request=request_mock, request_kwargs={})
932+
933+
assert (
934+
http_requester._http_client._request_attempt_count.get(request_mock)
935+
== http_requester._http_client._max_retries + 1
936+
)

0 commit comments

Comments
 (0)