Skip to content

Commit

Permalink
[SDK] Get Kubernetes Events for Job (kubeflow#1975)
Browse files Browse the repository at this point in the history
* [SDK] Get Kubernetes Events for Job

* Move class at the top

* Add object kind to event message

* Add example for events
  • Loading branch information
andreyvelich authored Jan 11, 2024
1 parent 006dda4 commit 778f555
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 47 deletions.
1 change: 1 addition & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
[flake8]
max-line-length = 100
extend-ignore = W503
8 changes: 8 additions & 0 deletions manifests/overlays/kubeflow/kubeflow-training-roles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- events
verbs:
- get
- list
- watch

---
apiVersion: rbac.authorization.k8s.io/v1
Expand Down
179 changes: 141 additions & 38 deletions sdk/python/kubeflow/training/api/training_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import logging
import time
import json
from typing import Optional, Callable, List, Dict, Any, Set, Union
from typing import Optional, Callable, Tuple, List, Dict, Any, Set, Union
import queue
from kubernetes import client, config, watch

Expand Down Expand Up @@ -855,16 +855,16 @@ def wait_for_job_conditions(
{expected_conditions}"
)

def get_job_pod_names(
def get_job_pods(
self,
name: str,
namespace: Optional[str] = None,
is_master: bool = False,
replica_type: Optional[str] = None,
replica_index: Optional[int] = None,
timeout: int = constants.DEFAULT_TIMEOUT,
) -> List[str]:
"""Get pod names for the Training Job.
) -> List[models.V1Pod]:
"""Get pods for the Training Job.
Args:
name: Name for the Job.
Expand All @@ -889,7 +889,7 @@ def get_job_pod_names(
timeout: Kubernetes API server timeout in seconds to execute the request.
Returns:
list[str]: List of the Job pod names.
list[V1Pod]: List of the Job pods.
Raises:
ValueError: Job replica type is invalid.
Expand Down Expand Up @@ -933,23 +933,75 @@ def get_job_pod_names(
if replica_index is not None:
label_selector += f",{constants.REPLICA_INDEX_LABEL}={replica_index}"

# List Training Job pods.
pods = []
# Return list of Training Job pods.
try:
thread = self.core_api.list_namespaced_pod(
namespace,
label_selector=label_selector,
async_req=True,
)
response = thread.get(timeout)
return thread.get(timeout).items
except multiprocessing.TimeoutError:
raise TimeoutError(f"Timeout to list pods for Job: {namespace}/{name}")
except Exception:
raise RuntimeError(f"Failed to list pods for Job: {namespace}/{name}")

for pod in response.items:
pods.append(pod.metadata.name)
return pods
def get_job_pod_names(
self,
name: str,
namespace: Optional[str] = None,
is_master: bool = False,
replica_type: Optional[str] = None,
replica_index: Optional[int] = None,
timeout: int = constants.DEFAULT_TIMEOUT,
) -> List[str]:
"""Get pod names for the Training Job.
Args:
name: Name for the Job.
namespace: Namespace for the Job. By default namespace is taken from
`TrainingClient` object.
is_master: Whether to get pods only with the label
`training.kubeflow.org/job-role: master`.
replica_type: Type of the Job replica.
For TFJob one of `Chief`, `PS`, or `worker`.
For PyTorchJob one of `master` or `worker`.
For MXJob one of `scheduler`, `server`, or `worker`.
For XGBoostJob one of `master` or `worker`.
For MPIJob one of `launcher` or `worker`.
For PaddleJob one of `master` or `worker`.
replica_index: Index for the Job replica.
timeout: Kubernetes API server timeout in seconds to execute the request.
Returns:
list[str]: List of the Job pod names.
Raises:
ValueError: Job replica type is invalid.
TimeoutError: Timeout to get Job pods.
RuntimeError: Failed to get Job pods.
"""

namespace = namespace or self.namespace

pods = self.get_job_pods(
name=name,
namespace=namespace,
is_master=is_master,
replica_type=replica_type,
replica_index=replica_index,
timeout=timeout,
)
pod_names = []
for pod in pods:
pod_names.append(pod.metadata.name)
return pod_names

def get_job_logs(
self,
Expand All @@ -961,7 +1013,8 @@ def get_job_logs(
replica_index: Optional[int] = None,
follow: bool = False,
timeout: int = constants.DEFAULT_TIMEOUT,
) -> Dict[str, str]:
verbose: bool = False,
) -> Tuple[Dict[str, str], Dict[str, List[str]]]:
"""Get the logs for every Training Job pod. By default it returns logs from
the `master` pod. Logs are returned in this format: { "pod-name": "Log data" }.
Expand Down Expand Up @@ -990,21 +1043,35 @@ def get_job_logs(
follow: Whether to follow the log stream of the pod and print logs to StdOut.
timeout: Optional, Kubernetes API server timeout in seconds
to execute the request.
verbose: Whether to get Kubernetes events for Job and corresponding pods.
Returns:
Dict[str, str]: A dictionary in which the keys are pod names and the
values are the corresponding logs.
Dict[str, str]: A dictionary in which the keys are object kind and name, and the
values are list of the corresponding Kubernetes events with their timestamps. This
value is returned only if `verbose = True`. For example:
```json
{
"PyTorchJob train-mnist": [
"2024-01-05 22:58:20 Created pod: train-mnist-worker-0"
],
"Pod train-mnist-worker-0": [
"2024-01-05 22:58:20 Created container init-pytorch"
]
}
```
Raises:
ValueError: Job replica type is invalid.
TimeoutError: Timeout to get Job pods.
RuntimeError: Failed to get Job pods.
TimeoutError: Timeout to get Job or Job's pods
RuntimeError: Failed to get Job or Job's pods.
"""

namespace = namespace or self.namespace
job_kind = job_kind or self.job_kind

pods = self.get_job_pod_names(
pods = self.get_job_pods(
name=name,
namespace=namespace,
is_master=is_master,
Expand All @@ -1014,17 +1081,22 @@ def get_job_logs(
)

logs_dict = {}
events_dict = {}
if pods and follow:
log_streams = []
for pod in pods:
log_streams.append(
watch.Watch().stream(
self.core_api.read_namespaced_pod_log,
name=pod,
namespace=namespace,
container=constants.JOB_PARAMETERS[job_kind]["container"],
if (
pod.status is not None
and pod.status.phase != constants.POD_PHASE_PENDING
):
log_streams.append(
watch.Watch().stream(
self.core_api.read_namespaced_pod_log,
name=pod.metadata.name,
namespace=namespace,
container=constants.JOB_PARAMETERS[job_kind]["container"],
)
)
)
finished = [False for _ in log_streams]

# Create thread and queue per stream, for non-blocking iteration
Expand All @@ -1034,7 +1106,7 @@ def get_job_logs(
while True:
for index, log_queue in enumerate(log_queue_pool):
if all(finished):
return logs_dict
break
if finished[index]:
continue
# grouping the every 50 log lines of the same pod
Expand All @@ -1046,27 +1118,58 @@ def get_job_logs(
break

# Print logs to the StdOut
print(f"[Pod {pods[index]}]: {logline}")
print(f"[Pod {pods[index].metadata.name}]: {logline}")
# Add logs to the results dict.
if pods[index] not in logs_dict:
logs_dict[pods[index]] = logline
if pods[index].metadata.name not in logs_dict:
logs_dict[pods[index].metadata.name] = logline
else:
logs_dict[pods[index]] += logline
logs_dict[pods[index].metadata.name] += logline
except queue.Empty:
break
if all(finished):
break
elif pods:
for pod in pods:
try:
pod_logs = self.core_api.read_namespaced_pod_log(
pod,
namespace,
container=constants.JOB_PARAMETERS[job_kind]["container"],
)
logs_dict[pod] = pod_logs
except Exception:
raise RuntimeError(f"Failed to read logs for pod {namespace}/{pod}")

return logs_dict
if (
pod.status is not None
and pod.status.phase != constants.POD_PHASE_PENDING
):
try:
pod_logs = self.core_api.read_namespaced_pod_log(
name=pod.metadata.name,
namespace=namespace,
container=constants.JOB_PARAMETERS[job_kind]["container"],
)
logs_dict[pod.metadata.name] = pod_logs
except Exception:
raise RuntimeError(
f"Failed to read logs for pod {namespace}/{pod.metadata.name}"
)
# If verbose is set, return Kubernetes events for Job and pods.
if verbose:
job = self.get_job(name=name, namespace=namespace)
events = self.core_api.list_namespaced_event(namespace=namespace)

# Get events for the Job and Job's pods.
for event in events.items:
utils.add_event_to_dict(
events_dict=events_dict,
event=event,
object_kind=job_kind,
object_name=name,
object_creation_timestamp=job.metadata.creation_timestamp,
)
if pods:
for pod in pods:
utils.add_event_to_dict(
events_dict=events_dict,
event=event,
object_kind=constants.POD_KIND,
object_name=pod.metadata.name,
object_creation_timestamp=pod.metadata.creation_timestamp,
)

return logs_dict, events_dict

def update_job(
self,
Expand Down
7 changes: 7 additions & 0 deletions sdk/python/kubeflow/training/constants/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@
VERSION = "v1"
API_VERSION = f"{GROUP}/{VERSION}"

# Kind for pod.
POD_KIND = "Pod"

# Pending status for pod phase.
POD_PHASE_PENDING = "Pending"


# Training Job conditions.
JOB_CONDITION_CREATED = "Created"
JOB_CONDITION_RUNNING = "Running"
Expand Down
41 changes: 34 additions & 7 deletions sdk/python/kubeflow/training/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import datetime
import os
import logging
import textwrap
Expand Down Expand Up @@ -53,6 +54,15 @@ def __init__(self, obj):
self.data = json.dumps(obj)


class SetEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
if isinstance(obj, type):
return obj.__name__
return json.JSONEncoder.default(self, obj)


def is_running_in_k8s():
return os.path.isdir("/var/run/secrets/kubernetes.io/")

Expand Down Expand Up @@ -368,10 +378,27 @@ def get_pvc_spec(
return pvc_spec


class SetEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, set):
return list(obj)
if isinstance(obj, type):
return obj.__name__
return json.JSONEncoder.default(self, obj)
def add_event_to_dict(
events_dict: Dict[str, List[str]],
event: models.CoreV1Event,
object_kind: str,
object_name: str,
object_creation_timestamp: datetime,
):
"""Add Kubernetes event to the dict with this format:
```
{"<Object Kind> <Object Name>": "<Event Timestamp> <Event Message>"}
```
"""
if (
event.involved_object.kind == object_kind
and event.involved_object.name == object_name
and event.metadata.creation_timestamp >= object_creation_timestamp
):
event_time = event.metadata.creation_timestamp.strftime("%Y-%m-%d %H:%M:%S")
event_msg = f"{event_time} {event.message}"
if object_name not in events_dict:
events_dict[f"{object_kind} {object_name}"] = [event_msg]
else:
events_dict[f"{object_kind} {object_name}"] += [event_msg]
return events_dict
2 changes: 0 additions & 2 deletions sdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@
"Intended Audience :: Developers",
"Intended Audience :: Education",
"Intended Audience :: Science/Research",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
Expand Down

0 comments on commit 778f555

Please sign in to comment.