Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(cli): add --follow flag to logs command (reanahub#731)
Browse files Browse the repository at this point in the history
jlemesh committed Oct 17, 2024
1 parent 80d88dc commit 9ba012f
Showing 5 changed files with 391 additions and 81 deletions.
1 change: 1 addition & 0 deletions AUTHORS.md
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ The list of contributors in alphabetical order:
- [Giuseppe Steduto](https://orcid.org/0009-0002-1258-8553)
- [Harri Hirvonsalo](https://orcid.org/0000-0002-5503-510X)
- [Jan Okraska](https://orcid.org/0000-0002-1416-3244)
- [Jelizaveta Lemeševa](https://orcid.org/0009-0003-6606-9270)
- [Leticia Wanderley](https://orcid.org/0000-0003-4649-6630)
- [Marco Donadoni](https://orcid.org/0000-0003-2922-5505)
- [Marco Vidal](https://orcid.org/0000-0002-9363-4971)
114 changes: 114 additions & 0 deletions reana_client/cli/utils.py
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@
import os
import shlex
import sys
import time
from typing import Callable, NoReturn, Optional, List, Tuple, Union

import click
@@ -24,6 +25,8 @@
RUN_STATUSES,
JOB_STATUS_TO_MSG_COLOR,
JSON,
CLI_LOGS_FOLLOW_MIN_INTERVAL,
CLI_LOGS_FOLLOW_DEFAULT_INTERVAL,
)
from reana_client.printer import display_message
from reana_client.utils import workflow_uuid_or_name
@@ -409,3 +412,114 @@ def output_user_friendly_logs(workflow_logs, steps):
f"Step {job_name_or_id} emitted no logs.",
msg_type="info",
)


def retrieve_workflow_logs(
workflow,
access_token,
json_format,
filters,
steps,
chosen_filters,
available_filters,
page=None,
size=None,
): # noqa: D301
"""Retrieve workflow logs."""
from reana_client.api.client import get_workflow_logs

response = get_workflow_logs(
workflow,
access_token,
steps=None if not steps else list(set(steps)),
page=page,
size=size,
)
workflow_logs = json.loads(response["logs"])
if filters:
for key, value in chosen_filters.items():
unwanted_steps = [
k
for k, v in workflow_logs["job_logs"].items()
if v[available_filters[key]] != value
]
for job_id in unwanted_steps:
del workflow_logs["job_logs"][job_id]

if json_format:
display_message(json.dumps(workflow_logs, indent=2))
sys.exit(0)
else:
from reana_client.cli.utils import output_user_friendly_logs

output_user_friendly_logs(workflow_logs, None if not steps else list(set(steps)))


def follow_workflow_logs(
workflow,
access_token,
interval,
steps,
): # noqa: D301
"""Continuously poll for workflow or job logs."""
from reana_client.api.client import get_workflow_logs, get_workflow_status

if len(steps) > 1:
display_message(
"Only one step can be followed at a time, ignoring additional steps.",
"warning",
)
if interval < CLI_LOGS_FOLLOW_MIN_INTERVAL:
interval = CLI_LOGS_FOLLOW_DEFAULT_INTERVAL
display_message(
f"Interval should be an integer greater than or equal to {CLI_LOGS_FOLLOW_MIN_INTERVAL}, resetting to default ({CLI_LOGS_FOLLOW_DEFAULT_INTERVAL} s).",
"warning",
)
step = steps[0] if steps else None

previous_logs = ""

while True:
response = get_workflow_logs(
workflow,
access_token,
steps=None if not step else [step],
)
if response.get("live_logs_enabled", False) is False:
display_message(
"Live logs are not enabled, please rerun the command without the --follow flag.",
"error",
)
return

json_response = json.loads(response.get("logs"))

if step:
jobs = json_response["job_logs"]

if not jobs:
raise Exception(f"Step data not found: {step}")

job = next(iter(jobs.values())) # get values of the first job
logs = job["logs"]
status = job["status"]
else:
logs = json_response["workflow_logs"]
status = get_workflow_status(workflow, access_token).get("status")

previous_lines = previous_logs.splitlines()
new_lines = logs.splitlines()

diff = "\n".join([x for x in new_lines if x not in previous_lines])
if diff != "" and diff != "\n":
display_message(diff)

if status in ["finished", "failed", "stopped", "deleted"]:
subject = "Workflow" if not step else "Job"
display_message(
f"{subject} has completed, you might want to rerun the command without the --follow flag.",
"info",
)
return
previous_logs = logs
time.sleep(interval)
176 changes: 95 additions & 81 deletions reana_client/cli/workflow.py
Original file line number Diff line number Diff line change
@@ -31,8 +31,15 @@
key_value_to_dict,
parse_filter_parameters,
requires_environments,
retrieve_workflow_logs,
follow_workflow_logs,
)
from reana_client.config import (
ERROR_MESSAGES,
RUN_STATUSES,
TIMECHECK,
CLI_LOGS_FOLLOW_DEFAULT_INTERVAL,
)
from reana_client.config import ERROR_MESSAGES, RUN_STATUSES, TIMECHECK
from reana_client.printer import display_message
from reana_client.utils import (
get_reana_yaml_file_path,
@@ -886,6 +893,20 @@ def add_verbose_data_from_response(response, verbose_headers, headers, data):
multiple=True,
help="Filter job logs to include only those steps that match certain filtering criteria. Use --filter name=value pairs. Available filters are compute_backend, docker_img, status and step.",
)
@click.option(
"--follow",
"follow",
is_flag=True,
default=False,
help="Follow the logs of a running workflow or job (similar to tail -f).",
)
@click.option(
"-i",
"--interval",
"interval",
default=CLI_LOGS_FOLLOW_DEFAULT_INTERVAL,
help=f"Sleep time in seconds between log polling if log following is enabled. [default={CLI_LOGS_FOLLOW_DEFAULT_INTERVAL}]",
)
@add_pagination_options
@check_connection
@click.pass_context
@@ -894,22 +915,32 @@ def workflow_logs(
workflow,
access_token,
json_format,
steps=None,
follow,
interval,
filters=None,
page=None,
size=None,
): # noqa: D301
"""Get workflow logs.
The ``logs`` command allows to retrieve logs of running workflow. Note that
only finished steps of the workflow are returned, the logs of the currently
processed step is not returned until it is finished.
The ``logs`` command allows to retrieve logs of a running workflow.
Either retrive logs and print the result or follow the logs of a running workflow/job.
Examples:\n
\t $ reana-client logs -w myanalysis.42
\t $ reana-client logs -w myanalysis.42 -s 1st_step
\t $ reana-client logs -w myanalysis.42\n
\t $ reana-client logs -w myanalysis.42 --json\n
\t $ reana-client logs -w myanalysis.42 --filter status=running\n
\t $ reana-client logs -w myanalysis.42 --filter step=myfit --follow\n
"""
from reana_client.api.client import get_workflow_logs
logging.debug("command: {}".format(ctx.command_path.replace(" ", ".")))
for p in ctx.params:
logging.debug("{param}: {value}".format(param=p, value=ctx.params[p]))

if json_format and follow:
display_message(
"Ignoring --json as it cannot be used together with --follow.",
msg_type="warning",
)

available_filters = {
"step": "job_name",
@@ -920,90 +951,73 @@ def workflow_logs(
steps = []
chosen_filters = dict()

logging.debug("command: {}".format(ctx.command_path.replace(" ", ".")))
for p in ctx.params:
logging.debug("{param}: {value}".format(param=p, value=ctx.params[p]))
if workflow:
if filters:
try:
for f in filters:
key, value = f.split("=")
if key not in available_filters:
if filters:
try:
for f in filters:
key, value = f.split("=")
if key not in available_filters:
display_message(
"Filter '{}' is not valid.\n"
"Available filters are '{}'.".format(
key,
"' '".join(sorted(available_filters.keys())),
),
msg_type="error",
)
sys.exit(1)
elif key == "step":
steps.append(value)
else:
# Case insensitive for compute backends
if (
key == "compute_backend"
and value.lower() in REANA_COMPUTE_BACKENDS
):
value = REANA_COMPUTE_BACKENDS[value.lower()]
elif key == "status" and value not in RUN_STATUSES:
display_message(
"Filter '{}' is not valid.\n"
"Available filters are '{}'.".format(
key,
"' '".join(sorted(available_filters.keys())),
),
"Input status value {} is not valid. ".format(value),
msg_type="error",
)
),
sys.exit(1)
elif key == "step":
steps.append(value)
else:
# Case insensitive for compute backends
if (
key == "compute_backend"
and value.lower() in REANA_COMPUTE_BACKENDS
):
value = REANA_COMPUTE_BACKENDS[value.lower()]
elif key == "status" and value not in RUN_STATUSES:
display_message(
"Input status value {} is not valid. ".format(value),
msg_type="error",
),
sys.exit(1)
chosen_filters[key] = value
except Exception as e:
logging.debug(traceback.format_exc())
logging.debug(str(e))
display_message(
"Please provide complete --filter name=value pairs, "
"for example --filter status=running.\n"
"Available filters are '{}'.".format(
"' '".join(sorted(available_filters.keys()))
),
msg_type="error",
)
sys.exit(1)
try:
response = get_workflow_logs(
workflow,
access_token,
steps=None if not steps else list(set(steps)),
page=page,
size=size,
)
workflow_logs = json.loads(response["logs"])
if filters:
for key, value in chosen_filters.items():
unwanted_steps = [
k
for k, v in workflow_logs["job_logs"].items()
if v[available_filters[key]] != value
]
for job_id in unwanted_steps:
del workflow_logs["job_logs"][job_id]

if json_format:
display_message(json.dumps(workflow_logs, indent=2))
sys.exit(0)
else:
from reana_client.cli.utils import output_user_friendly_logs

output_user_friendly_logs(
workflow_logs, None if not steps else list(set(steps))
)
chosen_filters[key] = value
except Exception as e:
logging.debug(traceback.format_exc())
logging.debug(str(e))
display_message(
"Cannot retrieve the logs of a workflow {}: \n"
"{}".format(workflow, str(e)),
"Please provide complete --filter name=value pairs, "
"for example --filter status=running.\n"
"Available filters are '{}'.".format(
"' '".join(sorted(available_filters.keys()))
),
msg_type="error",
)
sys.exit(1)

try:
if follow:
follow_workflow_logs(workflow, access_token, interval, steps)
else:
retrieve_workflow_logs(
workflow,
access_token,
json_format,
filters,
steps,
chosen_filters,
available_filters,
page,
size,
)
except Exception as e:
logging.debug(traceback.format_exc())
logging.debug(str(e))
display_message(
"Cannot retrieve logs for workflow {}: \n{}".format(workflow, str(e)),
msg_type="error",
)
sys.exit(1)


@workflow_execution_group.command("validate")
@click.option(
6 changes: 6 additions & 0 deletions reana_client/config.py
Original file line number Diff line number Diff line change
@@ -77,3 +77,9 @@

STD_OUTPUT_CHAR = "-"
"""Character used to refer to the standard output."""

CLI_LOGS_FOLLOW_MIN_INTERVAL = 1
"""Minimum interval between log requests in seconds."""

CLI_LOGS_FOLLOW_DEFAULT_INTERVAL = 10
"""Default interval between log requests in seconds."""
175 changes: 175 additions & 0 deletions tests/test_cli_workflows.py
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@

"""REANA client workflow tests."""

import copy
import json
import sys
from typing import List
@@ -21,6 +22,7 @@
from reana_client.cli import cli
from reana_client.config import RUN_STATUSES
from reana_client.utils import get_workflow_status_change_msg
from reana_commons.api_client import BaseAPIClient
from reana_commons.config import INTERACTIVE_SESSION_TYPES


@@ -940,6 +942,179 @@ def test_get_workflow_status_ok():
assert json_response[0]["name"] in response["name"]


def test_get_workflow_logs():
"""Test workflow logs."""
status_code = 200
response = {
"logs": '{"workflow_logs": "workflow logs test"}',
"user": "00000000-0000-0000-0000-000000000000",
"workflow_id": "26a55924-83c9-493b-841b-8fd7629e25c9",
"workflow_name": "helloworld-serial-kubernetes0.3",
}
env = {"REANA_SERVER_URL": "localhost"}
mock_http_response, mock_response = Mock(), Mock()
mock_http_response.status_code = status_code
mock_response = response
reana_token = "000000"
runner = CliRunner(env=env)
with runner.isolation():
with patch(
"reana_client.api.client.current_rs_api_client",
make_mock_api_client("reana-server")(mock_response, mock_http_response),
):
result = runner.invoke(
cli,
["logs", "-t", reana_token, "--json", "-w", response["workflow_name"]],
)
json_response = json.loads(result.output)
assert result.exit_code == 0
assert isinstance(json_response, dict)
assert json_response["workflow_logs"] in "workflow logs test"


def test_follow_job_logs():
"""Test follow job logs."""
logs = {
"workflow_logs": "workflow logs test",
"job_logs": {
"job_id": {
"workflow_uuid": "26a55924-83c9-493b-841b-8fd7629e25c9",
"job_name": "hello1",
"compute_backend": "Kubernetes",
"backend_job_id": "reana-run-job-42532a36-4a41-4acf-a3b0-d61655030f43",
"docker_img": "docker.io/library/python:3.8-slim",
"cmd": "python",
"status": "running",
"logs": "job test logs\n",
"started_at": "2024-09-26T09:02:36",
"finished_at": None,
}
},
}
logs_next = copy.deepcopy(logs)
logs_next["job_logs"]["job_id"]["status"] = "stopped"
logs_next["job_logs"]["job_id"]["logs"] = "job test logs\nmore job logs\n"

response = {
"logs": json.dumps(logs),
"user": "00000000-0000-0000-0000-000000000000",
"workflow_id": "26a55924-83c9-493b-841b-8fd7629e25c9",
"workflow_name": "helloworld-serial-kubernetes0.3",
"live_logs_enabled": True,
}
response_next = copy.deepcopy(response)
response_next["logs"] = json.dumps(logs_next)

env = {"REANA_SERVER_URL": "localhost"}
mock_http_response = Mock()
mock_http_response.status_code = 200
reana_token = "000000"
runner = CliRunner(env=env)

mock_http_client, mock_result = Mock(), Mock()
mock_result.result.side_effect = [
(response, mock_http_response),
(response_next, mock_http_response),
]
mock_http_client.request.return_value = mock_result

with runner.isolation():
with patch(
"reana_client.api.client.current_rs_api_client",
BaseAPIClient("reana-server", http_client=mock_http_client)._client,
):
result = runner.invoke(
cli,
[
"logs",
"-t",
reana_token,
"--follow",
"-i",
1,
"-w",
"helloworld-serial-kubernetes0.3",
"--filter",
"step=hello1",
],
)
assert result.exit_code == 0
assert (
result.output
== """job test logs
more job logs
==> Job has completed, you might want to rerun the command without the --follow flag.
"""
)


def test_follow_live_logs_disabled():
"""Test follow job logs when live logs are disabled."""
logs = {
"workflow_logs": "",
"job_logs": {
"job_id": {
"workflow_uuid": "26a55924-83c9-493b-841b-8fd7629e25c9",
"job_name": "hello1",
"compute_backend": "Kubernetes",
"backend_job_id": "reana-run-job-42532a36-4a41-4acf-a3b0-d61655030f43",
"docker_img": "docker.io/library/python:3.8-slim",
"cmd": "python",
"status": "running",
"logs": "",
"started_at": "2024-09-26T09:02:36",
"finished_at": None,
}
},
}

response = {
"logs": json.dumps(logs),
"user": "00000000-0000-0000-0000-000000000000",
"workflow_id": "26a55924-83c9-493b-841b-8fd7629e25c9",
"workflow_name": "helloworld-serial-kubernetes0.3",
}

env = {"REANA_SERVER_URL": "localhost"}
mock_http_response = Mock()
mock_http_response.status_code = 200
reana_token = "000000"
runner = CliRunner(env=env)

mock_http_client, mock_result = Mock(), Mock()
mock_result.result.side_effect = [
(response, mock_http_response),
]
mock_http_client.request.return_value = mock_result

with runner.isolation():
with patch(
"reana_client.api.client.current_rs_api_client",
BaseAPIClient("reana-server", http_client=mock_http_client)._client,
):
result = runner.invoke(
cli,
[
"logs",
"-t",
reana_token,
"--follow",
"-i",
1,
"-w",
"helloworld-serial-kubernetes0.3",
"--filter",
"step=hello1",
],
)
assert result.exit_code == 0
assert (
result.output
== """==> ERROR: Live logs are not enabled, please rerun the command without the --follow flag.
"""
)


@patch("reana_client.cli.workflow.workflow_create")
@patch("reana_client.cli.workflow.upload_files")
@patch("reana_client.cli.workflow.workflow_start")

0 comments on commit 9ba012f

Please sign in to comment.