Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTPOperator should have the possibility to retry at the page level using the http hook run_with_advanced_retry #39247

Closed
1 of 2 tasks
lopezvit opened this issue Apr 25, 2024 · 3 comments · Fixed by #40086
Closed
1 of 2 tasks

Comments

@lopezvit
Copy link

Description

I should be possible that the HTTPOperator retries some of the request without failing the whole task, since when it is using pagination, some pages might fail.

Use case/motivation

I'm requesting all the pages from an API endpoint (TeamTailor JobPosts) and that is a total of 438 pages.
Sometimes while going through these pages the server returns error 500.
If that page in particular would be retried with exponential backoff the whole task would probably succeed, but retrying the whole task might reproduce the problem.

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@lopezvit lopezvit added kind:feature Feature Requests needs-triage label for new issues that we didn't triage yet labels Apr 25, 2024
@potiuk potiuk added good first issue and removed needs-triage label for new issues that we didn't triage yet labels Apr 25, 2024
@lopezvit
Copy link
Author

I have created this extended version and seems to be working correctly, sorry I don't have time to create a PR:

from __future__ import annotations

from airflow.configuration import conf
from airflow.providers.http.operators.http import HttpOperator
from airflow.utils.context import Context
from requests import Response
from requests.auth import AuthBase
from typing import Any, Callable


class RetryHttpOperator(HttpOperator):

    def __init__(self, *, endpoint: str | None = None, method: str = "POST", data: dict[str, Any] | str | None = None,
                 headers: dict[str, str] | None = None, pagination_function: Callable[..., Any] | None = None,
                 response_check: Callable[..., bool] | None = None, response_filter: Callable[..., Any] | None = None,
                 extra_options: dict[str, Any] | None = None, http_conn_id: str = "http_default",
                 log_response: bool = False, auth_type: type[AuthBase] | None = None, tcp_keep_alive: bool = True,
                 tcp_keep_alive_idle: int = 120, tcp_keep_alive_count: int = 20, tcp_keep_alive_interval: int = 30,
                 deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
                 retry_args: dict[Any, Any],
                 **kwargs: Any) -> None:
        super().__init__(endpoint=endpoint, method=method, data=data, headers=headers,
                         pagination_function=pagination_function, response_check=response_check,
                         response_filter=response_filter, extra_options=extra_options, http_conn_id=http_conn_id,
                         log_response=log_response, auth_type=auth_type, tcp_keep_alive=tcp_keep_alive,
                         tcp_keep_alive_idle=tcp_keep_alive_idle, tcp_keep_alive_count=tcp_keep_alive_count,
                         tcp_keep_alive_interval=tcp_keep_alive_interval, deferrable=deferrable, **kwargs)
        self._retry_args = retry_args

    def execute_sync(self, context: Context) -> Any:
        self.log.info("Calling HTTP method")
        if self._retry_args:
            response = self.hook.run_with_advanced_retry(self._retry_args, self.endpoint, self.data, self.headers,
                                                         self.extra_options)
        else:
            response = self.hook.run(self.endpoint, self.data, self.headers, self.extra_options)
        response = self.paginate_sync(response=response)
        return self.process_response(context=context, response=response)

    def paginate_sync(self, response: Response) -> Response | list[Response]:
        if not self.pagination_function:
            return response

        all_responses = [response]
        while True:
            next_page_params = self.pagination_function(response)
            if not next_page_params:
                break
            if self._retry_args:
                response = self.hook.run_with_advanced_retry(self._retry_args,
                                                             **self._merge_next_page_parameters(next_page_params))
            else:
                response = self.hook.run(**self._merge_next_page_parameters(next_page_params))
            all_responses.append(response)
        return all_responses

@boraberke
Copy link
Contributor

Hi, I can work on this issue. Could you please assign me?

@lopezvit
Copy link
Author

Sorry, I'm not a collaborator, so I cannot assign anybody... maybe @eladkal could?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants