Skip to content

[TOOLS] Monitor training abnormalities and send reminders #574

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions tools/monitor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Training Log Monitor

This monitoring tool is used to monitor training logs on remote servers, check for anomalies in the logs, send reminders via email or Feishu robot. This tool aims to ensure that problems during the training process can be identified in a timely manner, including timely monitoring of training jamming or slowing down.

# NOTE

For email reminders:
This program requires a password to be entered during runtime, so please ensure that it is used in a secure environment to avoid the risk of password leakage.

For Feishu robot reminders:
This program requires a URL of Feishu Robot to be entered during runtime, so please ensure that it is used in a secure environment to avoid the risk of URL leakage. Configuration method reference link https://open.feishu.cn/document/client-docs/bot-v3/add-custom-bot, set the keyword to "monitor".

Training anomaly monitoring relies on historical training data analysis using various statistical methods. Please manually observe the logs for a period of time to ensure that at least the first 10 iterations are normal.

## Features

- Monitors a remote log file for training status.
- Sends corresponding abnormal information prompt emails based on log analysis results, including sample content for clarity.
- Configurable check interval.

## Run

The monitoring program runs on the user's local machine.

## Prerequisites

Before running the script, ensure you have a password-free SSH login to the remote host.


## Installation

```bash
git clone https://github.com/FlagOpen/FlagScale.git
cd FlagScale/tools/monitor
pip install -r requirements.txt
```

## Configuration

1. For Email:
Modify the provided configuration file [config-email.yaml](config-email.yaml) example to set actual values:

2. For Feishu Root

Modify the provided configuration file [config-feishu.yaml](config-feishu.yaml) example to set actual values:

## Usage

1. For Email:

```bash
python monitor.py --notice email
```

You will then be prompted to enter your source email's password.

2. For Feishu robot:

```bash
python monitor.py --notice feishu
```

You will then be prompted to enter Feishu robot URL.

## Next steps

We will add monitoring perspectives, including:
- Prompt when training ends.
- Perform communication group-based monitoring.
- Monitor hardware utilization anomalies.
- More user needs...
23 changes: 23 additions & 0 deletions tools/monitor/config-email.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Target email address for receiving alerts
target_email: example_alert@domain.com # This is the email address that will receive alerts

# SMTP server setup for sending emails, can query email configuration to obtain
smtp_server: smtp.example.com # This is the SMTP server used for sending emails

# Email address used to send alerts
source_email: example_sender@domain.com # This is the email address used to send alerts

# Remote host IP address for accessing log files
remote_host: 192.0.2.1 # This IP address identifies the remote host where logs are stored

# Username for SSH login to the remote host
remote_user: example_user # This is the username used to log in to the remote host via SSH

# Port number used for SSH access
remote_port: 22 # This is the standard SSH port used to connect to the remote host

# Path to the log file on the remote host
remote_log_path: /path/example_log_file.log

# Interval in seconds for checking the logs
check_interval: 1200
14 changes: 14 additions & 0 deletions tools/monitor/config-feishu.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Remote host IP address for accessing log files
remote_host: 192.0.2.1 # This IP address or host name identifies the remote host where logs are stored

# Username for SSH login to the remote host
remote_user: example_user # This is the username used to log in to the remote host via SSH

# Port number used for SSH access
remote_port: 22 # This is the standard SSH port used to connect to the remote host

# Path to the log file on the remote host
remote_log_path: /path/example_log_file.log

# Interval in seconds for checking the logs
check_interval: 1200
253 changes: 253 additions & 0 deletions tools/monitor/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
import argparse
import datetime
import json
import re
import smtplib
import sys
import time
from datetime import datetime
from email.mime.text import MIMEText
from getpass import getpass

import paramiko
import requests
import yaml


def read_config(config_file):
"""Read and parse configuration from YAML file"""
try:
with open(config_file, "r") as file:
config = yaml.safe_load(file)
print("Loaded configuration:")
print(yaml.dump(config))
return config
except FileNotFoundError:
print(f"{datetime.now()} - Configuration file not found.")
sys.exit(1)
except yaml.YAMLError as e:
print(f"{datetime.now()} - Error parsing configuration file: {e}")
sys.exit(1)
except Exception as e:
print(f"{datetime.now()} - Error reading configuration file: {e}")
sys.exit(1)


def read_logs(remote_host, remote_user, remote_port, remote_log_path):
"""Connect to remote host and read log file using SSH/SFTP"""
print(f"{datetime.now()} - Connecting to remote host...")
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname=remote_host, username=remote_user, port=remote_port)

sftp = ssh.open_sftp()
with sftp.open(remote_log_path, "r") as remote_file:
print("Loading log file content...")
contents = remote_file.readlines()
print("Loading finish")

sftp.close()
ssh.close()

return contents
except Exception as e:
print(f"{datetime.now()} - Error reading remote log file: {e}")
sys.exit(1)


def parse_logs(log_lines):
"""Parse log file contents using regex pattern matching"""
print(f"{datetime.now()} - Parsing log content...")

pattern = (
r"\[+(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\]+\s*"
r"iteration\s+(?P<iteration>\d+\s*/\s*\d+)\s+\|\s*"
r"consumed samples:\s+(?P<consumed_samples>\d+)\s+\|\s*"
r"elapsed time per iteration \(ms\):\s+(?P<elapsed_time_per_iteration_ms>\d+\.\d+)\s+\|\s*"
r"throughput per GPU \(TFLOP/s/GPU\):\s+(?P<throughput_per_GPU_TFLOPs_per_GPU>\d+\.\d+)\s+\|\s*"
r"learning rate:\s+(?P<learning_rate>\d+\.?\d*E?-?\d*)\s+\|\s*"
r"global batch size:\s+(?P<global_batch_size>\d+)\s+\|\s*"
r"lm loss:\s+(?P<lm_loss>\d+\.\d+E?[-\+]?\d*)\s+\|\s*"
r"load_balancing_loss:\s+(?P<load_balancing_loss>\d+\.\d+E?[-\+]?\d*)\s+\|\s*"
r"loss scale:\s+(?P<loss_scale>\d+\.\d+)\s+\|\s*"
r"grad norm:\s+(?P<grad_norm>\d+\.\d+)\s+\|\s*"
r"num zeros:\s+(?P<num_zeros>[\d.]+)\s+\|\s*"
r"params norm:\s+(?P<params_norm>\d+\.\d+)\s+\|\s*"
r"number of skipped iterations:\s+(?P<number_of_skipped_iterations>\d+)\s+\|\s*"
r"number of nan iterations:\s+(?P<number_of_nan_iterations>\d+)\s*"
)

lines_to_keep = []
for line in log_lines:
match = re.search(pattern, line)
if match:
parsed_data = match.groupdict()
if parsed_data["iteration"].startswith("1/"):
lines_to_keep = []
lines_to_keep.append(parsed_data)
return lines_to_keep


def check_logs(logs):
"""Analyze log entries to detect training issues"""
print(f"{datetime.now()} - Checking log content...")
recent_logs = logs[-10:]
last_log = logs[-1]
last_timestamp = datetime.strptime(
last_log["timestamp"], "%Y-%m-%d %H:%M:%S"
)
last_elapsed = float(last_log["elapsed_time_per_iteration_ms"])
now = datetime.now()
time_delta_ms = (now - last_timestamp).total_seconds() * 1000

loss_abnormal = False
min_loss = 10000000000
max_loss = -10000000000
for _log in logs:
cur_loss = float(_log["lm_loss"])
min_loss = min(min_loss, cur_loss)
max_loss = min(max_loss, cur_loss)
if cur_loss <= 0 or max_loss > min_loss + 5:
loss_abnormal = True

if time_delta_ms > 3 * last_elapsed:
status_code = 1
message = "Training appears to be stuck."
elif historical_elapsed := [
float(entry["elapsed_time_per_iteration_ms"]) for entry in logs[:-1]
]:
average_elapsed = sum(historical_elapsed) / len(historical_elapsed)
if last_elapsed > 2 * average_elapsed:
status_code = 2
message = "Training appears to be slowing down."
else:
status_code = 0
message = "Training is normal."
elif loss_abnormal:
status_code = 3
message = "Model loss appears as 0, negative, or abnormal peak, suspecting training issues"
else:
status_code = 0
message = "Training is normal."

return status_code, message, recent_logs


def send_email(
smtp_server, source_email, source_email_password, target_email, subject, body
):
"""Send email notification using SMTP"""
print(f"{datetime.now()} - Sending email: {subject}")
try:
msg = MIMEText(body)
msg["Subject"] = subject
msg["From"] = source_email
msg["To"] = target_email

server = smtplib.SMTP(smtp_server, 25)
server.login(source_email, source_email_password)
server.sendmail(source_email, [target_email], msg.as_string())
server.quit()
print(f"{datetime.now()} - Email sent successfully.")
except Exception as e:
print(f"{datetime.now()} - Error sending email: {e}")


def send_feishu(robot_url, message, body):
"""Send a message to the Feishu robot."""
print(f"{datetime.now()} - Sending to Feishu robot: {message}")
card_json = json.dumps(
{
"msg_type": "post",
"content": {
"post": {
"zh_cn": {
"title": "monitor: "+ message,
"content": [
[{
"tag": "text",
"text": body
}]
]
}
}
}
}
)
headers = {"Content-Type": "application/json"}
try:
response = requests.post(url=robot_url, data=card_json, headers=headers)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"Failed to send message: {e}")


def main():
"""Main function to handle command line arguments and monitoring loop"""
# Parse command line arguments
parser = argparse.ArgumentParser(description="Training Monitor")
parser.add_argument(
"--notice",
type=str,
choices=["email", "feishu"],
required=True,
help="Notification method (email/feishu)",
)
args = parser.parse_args()

if args.notice == "email":
# Load configuration from YAML file
config = read_config("config-email.yaml")

# Email notification configuration
smtp_server = config["smtp_server"]
source_email = config["source_email"]
target_email = config["target_email"]
check_interval = int(config["check_interval"])

# Securely get email password
source_email_password = getpass(f"Enter the password for {source_email}: ")

elif args.notice == "feishu":
# Load configuration from YAML file
config = read_config("config-feishu.yaml")

check_interval = int(config["check_interval"])

robot_url = getpass(f"Enter the URL for Feishu robot: ")

while True:
logs = read_logs(
config["remote_host"],
config["remote_user"],
int(config["remote_port"]),
config["remote_log_path"],
)
useful_logs = parse_logs(logs)
status_code, message, recent_logs = check_logs(useful_logs)
recent_logs_str = json.dumps(recent_logs, indent=4)

if status_code != 0:
if args.notice == "email":
send_email(
smtp_server,
source_email,
source_email_password,
target_email,
message,
recent_logs_str,
)
if args.notice == "feishu":
send_feishu(
robot_url,
message,
recent_logs_str,
)

print(f"Waiting {check_interval} seconds for next check...")
time.sleep(check_interval)


if __name__ == "__main__":
main()
4 changes: 4 additions & 0 deletions tools/monitor/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
paramiko==3.5.1
pytz==2025.2
PyYAML==6.0.2
Requests==2.32.3
Loading