Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 238 additions & 0 deletions .github/scripts/runpod_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
import os
import json
import time
import requests
import sys
import argparse
import subprocess


def parse_arguments():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(description='Run tests on RunPod GPU')
parser.add_argument('--gpu-type', type=str, help='GPU type to use')
parser.add_argument('--gpu-count',
type=int,
help='Number of GPUs to use',
default=1)
parser.add_argument('--test-command', type=str, help='Test command to run')
parser.add_argument('--disk-size',
type=int,
default=20,
help='Container disk size in GB (default: 20)')
parser.add_argument('--volume-size',
type=int,
default=20,
help='Persistent volume size in GB (default: 20)')
parser.add_argument(
'--image',
type=str,
default='runpod/pytorch:2.4.0-py3.11-cuda12.4.1-devel-ubuntu22.04',
help='Docker image to use')
return parser.parse_args()


args = parse_arguments()
API_KEY = os.environ['RUNPOD_API_KEY']
RUN_ID = os.environ['GITHUB_RUN_ID']
JOB_ID = os.environ['JOB_ID']
PODS_API = "https://rest.runpod.io/v1/pods"
HEADERS = {
"Content-Type": "application/json",
"Authorization": f"Bearer {API_KEY}"
}


def create_pod():
"""Create a RunPod instance"""
print(f"Creating RunPod instance with GPU: {args.gpu_type}...")
payload = {
"name": f"fastvideo-{JOB_ID}-{RUN_ID}",
"containerDiskInGb": args.disk_size,
"volumeInGb": args.volume_size,
"gpuTypeIds": [args.gpu_type],
"gpuCount": args.gpu_count,
"imageName": args.image
}

response = requests.post(PODS_API, headers=HEADERS, json=payload)
response_data = response.json()
print(f"Response: {json.dumps(response_data, indent=2)}")

return response_data["id"]


def wait_for_pod(pod_id):
"""Wait for pod to be in RUNNING state and fully ready with SSH access"""
print("Waiting for RunPod to be ready...")

# First wait for RUNNING status
max_attempts = 10
attempts = 0
while attempts < max_attempts:
response = requests.get(f"{PODS_API}/{pod_id}", headers=HEADERS)
pod_data = response.json()
status = pod_data["desiredStatus"]

if status == "RUNNING":
print("RunPod is running! Now waiting for ports to be assigned...")
break

print(
f"Current status: {status}, waiting... (attempt {attempts+1}/{max_attempts})"
)
time.sleep(2)
attempts += 1

if attempts >= max_attempts:
raise TimeoutError(
"Timed out waiting for RunPod to reach RUNNING state")

# Wait for ports to be assigned
max_attempts = 6
attempts = 0
while attempts < max_attempts:
response = requests.get(f"{PODS_API}/{pod_id}", headers=HEADERS)
pod_data = response.json()
port_mappings = pod_data.get("portMappings")

if (port_mappings is not None and "22" in port_mappings
and pod_data.get("publicIp", "") != ""):
print("RunPod is ready with SSH access!")
print(f"SSH IP: {pod_data['publicIp']}")
print(f"SSH Port: {port_mappings['22']}")
break

print(
f"Waiting for SSH port and public IP to be available... (attempt {attempts+1}/{max_attempts})"
)
time.sleep(10)
attempts += 1

if attempts >= max_attempts:
raise TimeoutError("Timed out waiting for RunPod SSH access")


def execute_command(pod_id):
"""Execute command on the pod via SSH using system SSH client"""
print(f"Running command: {args.test_command}")

response = requests.get(f"{PODS_API}/{pod_id}", headers=HEADERS)
pod_data = response.json()
ssh_ip = pod_data["publicIp"]
ssh_port = pod_data["portMappings"]["22"]

# Copy the repository to the pod using scp
repo_dir = os.path.abspath(os.getcwd())
repo_name = os.path.basename(repo_dir)

print(f"Copying repository from {repo_dir} to RunPod...")

tar_command = [
"tar", "-czf", "/tmp/repo.tar.gz", "-C",
os.path.dirname(repo_dir), repo_name
]
subprocess.run(tar_command, check=True)

# Copy the tarball to the pod
scp_command = [
"scp", "-o", "StrictHostKeyChecking=no", "-o",
"UserKnownHostsFile=/dev/null", "-o", "ServerAliveInterval=60", "-o",
"ServerAliveCountMax=10", "-P",
str(ssh_port), "/tmp/repo.tar.gz", f"root@{ssh_ip}:/tmp/"
]
subprocess.run(scp_command, check=True)

setup_steps = [
"cd /workspace",
"wget -q https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh",
"bash Miniconda3-latest-Linux-x86_64.sh -b -p $HOME/miniconda3",
"source $HOME/miniconda3/bin/activate",
"conda create --name venv python=3.10.0 -y", "conda activate venv",
"mkdir -p /workspace/repo",
"tar -xzf /tmp/repo.tar.gz --no-same-owner -C /workspace/",
f"cd /workspace/{repo_name}", args.test_command
]
remote_command = " && ".join(setup_steps)

ssh_command = [
"ssh", "-o", "StrictHostKeyChecking=no", "-o",
"UserKnownHostsFile=/dev/null", "-o", "ServerAliveInterval=60", "-o",
"ServerAliveCountMax=10", "-p",
str(ssh_port), f"root@{ssh_ip}", remote_command
]

print(f"Connecting to {ssh_ip}:{ssh_port}...")

try:
process = subprocess.Popen(ssh_command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1)

stdout_lines = []

print("Command output:")

for line in iter(process.stdout.readline, ''):
print(line.strip())
stdout_lines.append(line)

process.wait()

return_code = process.returncode
success = return_code == 0

stdout_str = "".join(stdout_lines)

if success:
print("Command executed successfully")
else:
print(f"Command failed with exit code {return_code}")

result = {
"success": success,
"return_code": return_code,
"stdout": stdout_str,
"stderr": ""
}
return result

except Exception as e:
print(f"Error executing SSH command: {str(e)}")
result = {"success": False, "error": str(e), "stdout": "", "stderr": ""}
return result


def terminate_pod(pod_id):
"""Terminate the pod"""
print("Terminating RunPod...")
requests.delete(f"{PODS_API}/{pod_id}", headers=HEADERS)
print(f"Terminated pod {pod_id}")


def main():
pod_id = None
try:
pod_id = create_pod()
wait_for_pod(pod_id)
result = execute_command(pod_id)

if result.get("error") is not None:
print(f"Error executing command: {result['error']}")
sys.exit(1)

if not result.get("success", False):
print(
"Tests failed - check the output above for details on which tests failed"
)
sys.exit(1)

finally:
if pod_id:
terminate_pod(pod_id)


if __name__ == "__main__":
main()
89 changes: 89 additions & 0 deletions .github/scripts/runpod_cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import json
import os
import sys
import requests
import uuid

API_KEY = os.environ['RUNPOD_API_KEY']
RUN_ID = os.environ.get('GITHUB_RUN_ID', str(uuid.uuid4()))
PODS_API = "https://rest.runpod.io/v1/pods"
HEADERS = {
"Content-Type": "application/json",
"Authorization": f"Bearer {API_KEY}"
}


def get_job_ids():
"""Parse job IDs from environment variable"""
job_ids_str = os.environ.get('JOB_IDS')
try:
job_ids = json.loads(job_ids_str)
if not isinstance(job_ids, list):
print(f"Error: JOB_IDS is not a list.")
sys.exit(1)
return job_ids
except json.JSONDecodeError as e:
print(f"Error parsing JOB_IDS: {e}")
sys.exit(1)


def cleanup_pods():
"""Find and terminate RunPod instances"""
print(f"Run ID: {RUN_ID}")

single_job_id = os.environ.get('JOB_ID')

if single_job_id:
job_ids = [single_job_id]
print(f"Job ID: {single_job_id}")
else:
job_ids = get_job_ids()
print(f"Job IDs: {job_ids}")

# Get all pods associated with RunPod API_KEY
try:
response = requests.get(PODS_API, headers=HEADERS)
response.raise_for_status()
pods = response.json()
except requests.exceptions.RequestException as e:
print(f"Error getting pods: {e}")
sys.exit(1)

# Find and terminate pods created by this workflow run
terminated_pods = []
for pod in pods:
pod_name = pod.get("name", "")
pod_id = pod.get("id")

# Check if this pod was created by one of our jobs
if any(f"{job_id}-{RUN_ID}" in pod_name for job_id in job_ids):
print(f"Found pod: {pod_id} ({pod_name})")
try:
print(f"Terminating pod {pod_id}...")
term_response = requests.delete(f"{PODS_API}/{pod_id}",
headers=HEADERS)
term_response.raise_for_status()
terminated_pods.append(pod_id)
print(f"Successfully terminated pod {pod_id}")
except requests.exceptions.RequestException as e:
print(f"Error terminating pod {pod_id}: {e}")
sys.exit(1)

if terminated_pods:
if single_job_id:
print(f"Terminated pod: {terminated_pods[0]}")
else:
print(f"Terminated {len(terminated_pods)} pods: {terminated_pods}")
else:
if single_job_id:
print(f"No pod found matching pattern: {single_job_id}-{RUN_ID}")
else:
print("No pods found to terminate.")


def main():
cleanup_pods()


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion .github/workflows/codespell.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements-lint.txt
pip install -e ".[lint]"
- name: Spelling check with codespell
run: |
# Refer to the above environment variable here
Expand Down
Loading
Loading