Skip to content
9 changes: 6 additions & 3 deletions airflow/api_connexion/endpoints/log_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,14 @@ def get_log(
# return_type would be either the above two or None
logs: Any
if return_type == "application/json" or return_type is None: # default
logs, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata)
logs = logs[0] if task_try_number is not None else logs
hosts, log_streams, metadata = task_log_reader.read_log_chunks(ti, task_try_number, metadata)
host = f"{hosts[0] or ''}\n"
logs = log_streams[0] if task_try_number is not None else log_streams
# we must have token here, so we can safely ignore it
token = URLSafeSerializer(key).dumps(metadata) # type: ignore[assignment]
return logs_schema.dump(LogResponseObject(continuation_token=token, content=logs))
return logs_schema.dump(
LogResponseObject(continuation_token=token, content=host + "\n".join(log for log in logs))
)
# text/plain. Stream
logs = task_log_reader.read_log_stream(ti, task_try_number, metadata)

Expand Down
3 changes: 2 additions & 1 deletion airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
from airflow.executors.executor_utils import ExecutorName
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.utils.log.file_task_handler import _CompatibleLogSourceType

# Command to execute - list of strings
# the first element is always "airflow".
Expand Down Expand Up @@ -528,7 +529,7 @@ def execute_async(
"""
raise NotImplementedError()

def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]:
def get_task_log(self, ti: TaskInstance, try_number: int) -> _CompatibleLogSourceType:
"""
Return the task logs.

Expand Down
15 changes: 8 additions & 7 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -872,16 +872,17 @@ def _process_executor_events(self, executor: BaseExecutor, session: Session) ->

task_log_reader = TaskLogReader()
if task_log_reader.supports_read:
metadata: dict[str, Any] = {}
logs, metadata = task_log_reader.read_log_chunks(ti, ti.try_number, metadata)
if ti.hostname in dict(logs[0]):
message = str(dict(logs[0])[ti.hostname]).replace("\\n", "\n")
hosts, log_streams, metadata = task_log_reader.read_log_chunks(
ti, ti.try_number, None
)
if ti.hostname in hosts:
message = str(next(iter(log_streams[0])))
while metadata["end_of_log"] is False:
logs, metadata = task_log_reader.read_log_chunks(
hosts, log_streams, metadata = task_log_reader.read_log_chunks(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a big change, I am afraid to include it in 2.11!

For example, in this case where the return type of task_log_reader.read_log_chunks has changed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaxil it seems he is intending this to go to 3.0 only
#45914 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah we should not spend time adding meaningful changes to 2.x

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, in this case where the return type of task_log_reader.read_log_chunks has changed.

I think changing the return type of read_log_chunks or other internal methods isn't a problem, since the user-facing interface is the REST API. As long as the REST API response remains consistent, users won’t be affected or need to be aware of the refactor.

ti, ti.try_number - 1, metadata
)
if ti.hostname in dict(logs[0]):
message = message + str(dict(logs[0])[ti.hostname]).replace("\\n", "\n")
if ti.hostname in hosts:
message = message + str(next(iter(log_streams[0])))
if span.is_recording():
span.add_event(
name="task_log",
Expand Down
Loading