-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Fix Unable to fetch logs from worker pod error in UI for k8s executor #28817
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Unable to fetch logs from worker pod error in UI for k8s executor #28817
Conversation
|
Paging @dstandish |
752c046 to
14724ce
Compare
05f0d7d to
aa64a4f
Compare
b54cdcc to
6d1d744
Compare
|
Looks reasonable to me, but needs a rebase |
6d1d744 to
bb17bf5
Compare
o-nikolas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, pending passing tests of course.
airflow/executors/base_executor.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait this bit doesn't make sense to me. if we already have the TI, why is it we also need to supply the try number?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is some explanation in the code for read:
airflow/airflow/utils/log/file_task_handler.py
Lines 376 to 410 in c266aac
| def read(self, task_instance, try_number=None, metadata=None): | |
| """ | |
| Read logs of given task instance from local machine. | |
| :param task_instance: task instance object | |
| :param try_number: task instance try_number to read logs from. If None | |
| it returns all logs separated by try_number | |
| :param metadata: log metadata, can be used for steaming log reading and auto-tailing. | |
| :return: a list of listed tuples which order log string by host | |
| """ | |
| # Task instance increments its try number when it starts to run. | |
| # So the log for a particular task try will only show up when | |
| # try number gets incremented in DB, i.e logs produced the time | |
| # after cli run and before try_number + 1 in DB will not be displayed. | |
| if try_number is None: | |
| next_try = task_instance.next_try_number | |
| try_numbers = list(range(1, next_try)) | |
| elif try_number < 1: | |
| logs = [ | |
| [("default_host", f"Error fetching the logs. Try number {try_number} is invalid.")], | |
| ] | |
| return logs, [{"end_of_log": True}] | |
| else: | |
| try_numbers = [try_number] | |
| logs = [""] * len(try_numbers) | |
| metadata_array = [{}] * len(try_numbers) | |
| # subclasses implement _read and may not have log_type, which was added recently | |
| for i, try_number_element in enumerate(try_numbers): | |
| log, out_metadata = self._read(task_instance, try_number_element, metadata) | |
| # es_task_handler return logs grouped by host. wrap other handler returning log string | |
| # with default/ empty host so that UI can render the response in the same way | |
| logs[i] = log if self._read_grouped_logs() else [(task_instance.hostname, log)] | |
| metadata_array[i] = out_metadata |
try_number is plumbed in from read to _read and then should have been sent to the executors, but it was a miss:
airflow/airflow/utils/log/file_task_handler.py
Lines 274 to 315 in c266aac
| def _read( | |
| self, | |
| ti: TaskInstance, | |
| try_number: int, | |
| metadata: dict[str, Any] | None = None, | |
| ): | |
| """ | |
| Template method that contains custom logic of reading | |
| logs given the try_number. | |
| :param ti: task instance record | |
| :param try_number: current try_number to read log from | |
| :param metadata: log metadata, | |
| can be used for steaming log reading and auto-tailing. | |
| Following attributes are used: | |
| log_pos: (absolute) Char position to which the log | |
| which was retrieved in previous calls, this | |
| part will be skipped and only following test | |
| returned to be added to tail. | |
| :return: log message as a string and metadata. | |
| Following attributes are used in metadata: | |
| end_of_log: Boolean, True if end of log is reached or False | |
| if further calls might get more log text. | |
| This is determined by the status of the TaskInstance | |
| log_pos: (absolute) Char position to which the log is retrieved | |
| """ | |
| # Task instance here might be different from task instance when | |
| # initializing the handler. Thus explicitly getting log location | |
| # is needed to get correct log path. | |
| worker_log_rel_path = self._render_filename(ti, try_number) | |
| messages_list: list[str] = [] | |
| remote_logs: list[str] = [] | |
| running_logs: list[str] = [] | |
| local_logs: list[str] = [] | |
| executor_messages: list[str] = [] | |
| executor_logs: list[str] = [] | |
| served_logs: list[str] = [] | |
| with suppress(NotImplementedError): | |
| remote_messages, remote_logs = self._read_remote_logs(ti, try_number, metadata) | |
| messages_list.extend(remote_messages) | |
| if ti.state == TaskInstanceState.RUNNING: | |
| response = self._executor_get_task_log(ti) |
190d348 to
60f7d12
Compare
ti.try_number was used for fetching log from k8s pod. it was causing incorrect log being returned for k8s pod. fixed by passing try_number from _read to get_task_log method use try_number argument instead of ti.try_number for selecting pod in k8s executor
60f7d12 to
7357b53
Compare
|
Anyone have more feedback for this one or shall I merge it? |
…pod-error-in-UI-for-k8s-executor
[ note: i have set
delete_worker_podstoFalseso that the pod is not deleted after completion]for k8s executor while trying to view the task log from the UI. we are getting the following error.
i think, the issue is, while calling
PodGenerator.build_selector_for_k8s_executor_podwe are passingti.try_numberinstead of thetry_numberthat was passed to the_readmethod.airflow/airflow/utils/log/file_task_handler.py
Line 167 in 3ececb2
airflow/airflow/utils/log/file_task_handler.py
Lines 215 to 222 in 3ececb2
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.