Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions sdk/python/kfp/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,146 @@ def get_run(self, run_id: str) -> kfp_server_api.V2beta1Run:
"""
return self._run_api.run_service_get_run(run_id=run_id)

def get_logs(
self,
run_id: str,
component_name: Optional[str] = None,
) -> Union[str, Dict[str, str]]:
"""Downloads logs from a pipeline run's components via artifacts.

This method retrieves logs that are automatically uploaded as artifacts
by the KFP launcher, making them available even after pod deletion.

Args:
run_id: ID of the pipeline run.
component_name: Optional name of specific component to get logs from.
If None, returns logs from all components.

Returns:
If component_name is specified: string containing the component logs.
If component_name is None: dictionary mapping component names to their logs.

Raises:
ValueError: If the run_id doesn't exist or component_name is not found.
RuntimeError: If there's an error retrieving artifacts.

Example:
::

client = kfp.Client()
# Get logs from specific component
logs = client.get_logs(
run_id='5d08dd3d-58d9-4d02-9ff2-f2cee8dbfda8',
component_name='foo2'
)
print(logs)

# Get logs from all components
all_logs = client.get_logs(
run_id='5d08dd3d-58d9-4d02-9ff2-f2cee8dbfda8'
)
for component, log_text in all_logs.items():
print(f"Component {component}: {log_text[:100]}...")
"""
# Get run details
run = self.get_run(run_id=run_id)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather this leverage the log artifacts automatically uploaded by the KFP launcher. This has the benefit of still working even if the pod gets deleted and it doesn't require the user to have access to the Kubernetes cluster itself.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

if run is None:
raise ValueError(f"Run with ID '{run_id}' not found.")

# Check if run has task details
if not run.run_details or not run.run_details.task_details:
raise ValueError(f"No task details found for run '{run_id}'. "
'The run may not have started yet.')

logs_dict = {}

# Iterate through all task details to get logs
for task_detail in run.run_details.task_details:
# Get component/task name
task_name = (
task_detail.display_name or task_detail.name or
task_detail.task_id)

# Skip if we're looking for a specific component and this isn't it
if component_name and task_name != component_name:
continue

# Get node_id from task detail
node_id = task_detail.id or task_detail.task_id
if not node_id:
continue

# Try to read the main.log artifact
try:
log_content = self._read_artifact(
run_id=run_id, node_id=node_id, artifact_name='main.log')
logs_dict[task_name] = log_content
except Exception as e:
logs_dict[task_name] = (f'Error retrieving logs: {str(e)}')

# Handle return value
if not logs_dict:
if component_name:
raise ValueError(
f"Component '{component_name}' not found in run '{run_id}'."
)
else:
raise ValueError(f"No logs found for run '{run_id}'. "
'The run may not have any completed tasks.')

if component_name:
if component_name in logs_dict:
return logs_dict[component_name]
else:
available_components = ', '.join(logs_dict.keys())
raise ValueError(
f"Component '{component_name}' not found in run '{run_id}'. "
f'Available components: {available_components}')

return logs_dict

def _read_artifact(
self,
run_id: str,
node_id: str,
artifact_name: str,
) -> str:
"""Reads artifact content from KFP backend.

Args:
run_id: The run ID.
node_id: The node/task ID.
artifact_name: Name of the artifact (e.g., 'main.log').

Returns:
Artifact content as string.

Raises:
RuntimeError: If artifact cannot be retrieved.
"""
try:
from kfp_server_api.models import ReadArtifactRequest
except ImportError:
raise ImportError(
'kfp_server_api package is required to read artifacts.')

# Construct the artifact request
artifact_request = ReadArtifactRequest(
run_id=run_id, node_id=node_id, artifact_name=artifact_name)

try:
# Use the API client to read the artifact
response = self._run_api.run_service_read_artifact(artifact_request)

# Decode bytes to string
if isinstance(response.data, bytes):
return response.data.decode('utf-8')
return str(response.data)
except Exception as e:
raise RuntimeError(
f"Failed to read artifact '{artifact_name}' from node "
f"'{node_id}': {str(e)}")

def wait_for_run_completion(
self,
run_id: str,
Expand Down
152 changes: 150 additions & 2 deletions sdk/python/kfp/client/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,5 +548,153 @@ def pipeline_test_upload_without_name(boolean: bool = True):
self.assertEqual(result, expected_result)


if __name__ == '__main__':
unittest.main()
class TestGetLogs(parameterized.TestCase):

def setUp(self):
self.client = client.Client(namespace='ns1')

def test_get_logs_method_exists(self):
"""Verify get_logs method exists on Client."""
self.assertTrue(hasattr(self.client, 'get_logs'))
self.assertTrue(callable(self.client.get_logs))

def test_get_logs_raises_on_missing_run(self):
"""Test get_logs raises ValueError for non-existent run."""
with patch.object(self.client, 'get_run', return_value=None):
with self.assertRaisesRegex(
ValueError, "Run with ID 'nonexistent-run' not found."):
self.client.get_logs(run_id='nonexistent-run')

def test_get_logs_raises_on_missing_task_details(self):
"""Test get_logs raises ValueError when task details missing."""
mock_run = Mock()
mock_run.run_id = 'test-run-123'
mock_run.run_details = None

with patch.object(self.client, 'get_run', return_value=mock_run):
with self.assertRaisesRegex(ValueError,
"No task details found for run"):
self.client.get_logs(run_id='test-run-123')

def test_get_logs_success_single_component(self):
"""Test get_logs successfully retrieves logs for single component."""
# Mock task detail
mock_task = Mock()
mock_task.display_name = 'test-component'
mock_task.id = 'node-123'

mock_run_details = Mock()
mock_run_details.task_details = [mock_task]

mock_run = Mock()
mock_run.run_id = 'test-run-123'
mock_run.run_details = mock_run_details

# Mock artifact response
mock_response = Mock()
mock_response.data = b'Component logs here'

with patch.object(self.client, 'get_run', return_value=mock_run):
with patch.object(
self.client._run_api,
'run_service_read_artifact',
return_value=mock_response):

result = self.client.get_logs(
run_id='test-run-123', component_name='test-component')

self.assertEqual(result, 'Component logs here')

def test_get_logs_success_all_components(self):
"""Test get_logs successfully retrieves logs for all components."""
# Mock multiple tasks
mock_task1 = Mock()
mock_task1.display_name = 'component-1'
mock_task1.id = 'node-1'

mock_task2 = Mock()
mock_task2.display_name = 'component-2'
mock_task2.id = 'node-2'

mock_run_details = Mock()
mock_run_details.task_details = [mock_task1, mock_task2]

mock_run = Mock()
mock_run.run_id = 'test-run-123'
mock_run.run_details = mock_run_details

# Mock artifact responses
mock_response1 = Mock()
mock_response1.data = b'logs from component-1'

mock_response2 = Mock()
mock_response2.data = b'logs from component-2'

with patch.object(self.client, 'get_run', return_value=mock_run):
with patch.object(
self.client._run_api,
'run_service_read_artifact',
side_effect=[mock_response1, mock_response2]):

result = self.client.get_logs(run_id='test-run-123')

self.assertIsInstance(result, dict)
self.assertEqual(len(result), 2)
self.assertEqual(result['component-1'], 'logs from component-1')
self.assertEqual(result['component-2'], 'logs from component-2')

def test_get_logs_component_not_found(self):
"""Test get_logs raises ValueError when component not found."""
mock_task = Mock()
mock_task.display_name = 'other-component'
mock_task.id = 'node-1'

mock_run_details = Mock()
mock_run_details.task_details = [mock_task]

mock_run = Mock()
mock_run.run_id = 'test-run-123'
mock_run.run_details = mock_run_details

mock_response = Mock()
mock_response.data = b'logs'

with patch.object(self.client, 'get_run', return_value=mock_run):
with patch.object(
self.client._run_api,
'run_service_read_artifact',
return_value=mock_response):

with self.assertRaisesRegex(
ValueError, "Component 'test-component' not found"):
self.client.get_logs(
run_id='test-run-123', component_name='test-component')

def test_read_artifact_success(self):
"""Test _read_artifact successfully reads artifact."""
mock_response = Mock()
mock_response.data = b'artifact content'

with patch.object(
self.client._run_api,
'run_service_read_artifact',
return_value=mock_response):

result = self.client._read_artifact(
run_id='run-123', node_id='node-123', artifact_name='main.log')

self.assertEqual(result, 'artifact content')

def test_read_artifact_failure(self):
"""Test _read_artifact raises RuntimeError on failure."""
with patch.object(
self.client._run_api,
'run_service_read_artifact',
side_effect=Exception('API Error')):

with self.assertRaisesRegex(RuntimeError,
"Failed to read artifact"):
self.client._read_artifact(
run_id='run-123',
node_id='node-123',
artifact_name='main.log')