Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 238 additions & 0 deletions .github/scripts/runpod_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
import os
import json
import time
import requests
import sys
import argparse
import subprocess


def parse_arguments():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(description='Run tests on RunPod GPU')
parser.add_argument('--gpu-type', type=str, help='GPU type to use')
parser.add_argument('--gpu-count',
type=int,
help='Number of GPUs to use',
default=1)
parser.add_argument('--test-command', type=str, help='Test command to run')
parser.add_argument('--disk-size',
type=int,
default=20,
help='Container disk size in GB (default: 20)')
parser.add_argument('--volume-size',
type=int,
default=20,
help='Persistent volume size in GB (default: 20)')
parser.add_argument(
'--image',
type=str,
default='runpod/pytorch:2.4.0-py3.11-cuda12.4.1-devel-ubuntu22.04',
help='Docker image to use')
return parser.parse_args()


args = parse_arguments()
API_KEY = os.environ['RUNPOD_API_KEY']
GITHUB_SHA = os.environ['GITHUB_SHA']
GITHUB_REF = os.environ.get('GITHUB_REF', 'unknown')
GITHUB_REPOSITORY = os.environ['GITHUB_REPOSITORY']
RUN_ID = os.environ['GITHUB_RUN_ID']

PODS_API = "https://rest.runpod.io/v1/pods"
HEADERS = {
"Content-Type": "application/json",
"Authorization": f"Bearer {API_KEY}"
}


def create_pod():
"""Create a RunPod instance"""
print(f"Creating RunPod instance with GPU: {args.gpu_type}...")
payload = {
"name": f"fastvideo-github-test-{RUN_ID}",
"containerDiskInGb": args.disk_size,
"volumeInGb": args.volume_size,
"env": {
"GITHUB_SHA": GITHUB_SHA,
"GITHUB_REF": GITHUB_REF
},
"gpuTypeIds": [args.gpu_type],
"gpuCount": args.gpu_count,
"imageName": args.image
}

response = requests.post(PODS_API, headers=HEADERS, json=payload)
response_data = response.json()
print(f"Response: {json.dumps(response_data, indent=2)}")

return response_data["id"]


def wait_for_pod(pod_id):
"""Wait for pod to be in RUNNING state and fully ready with SSH access"""
print("Waiting for RunPod to be ready...")

# First wait for RUNNING status
max_attempts = 10
attempts = 0
while attempts < max_attempts:
response = requests.get(f"{PODS_API}/{pod_id}", headers=HEADERS)
pod_data = response.json()
status = pod_data["desiredStatus"]

if status == "RUNNING":
print("RunPod is running! Now waiting for ports to be assigned...")
break

print(f"Current status: {status}, waiting... (attempt {attempts+1}/{max_attempts})")
time.sleep(2)
attempts += 1

if attempts >= max_attempts:
raise TimeoutError("Timed out waiting for RunPod to reach RUNNING state")

# Wait for ports to be assigned
max_attempts = 6
attempts = 0
while attempts < max_attempts:
response = requests.get(f"{PODS_API}/{pod_id}", headers=HEADERS)
pod_data = response.json()
port_mappings = pod_data.get("portMappings")

if (port_mappings is not None and "22" in port_mappings
and pod_data.get("publicIp", "") != ""):
print("RunPod is ready with SSH access!")
print(f"SSH IP: {pod_data['publicIp']}")
print(f"SSH Port: {port_mappings['22']}")
break

print(f"Waiting for SSH port and public IP to be available... (attempt {attempts+1}/{max_attempts})")
time.sleep(10)
attempts += 1

if attempts >= max_attempts:
raise TimeoutError("Timed out waiting for RunPod SSH access")


def execute_command(pod_id):
"""Execute command on the pod via SSH using system SSH client"""
print(f"Running command: {args.test_command}")

response = requests.get(f"{PODS_API}/{pod_id}", headers=HEADERS)
pod_data = response.json()
ssh_ip = pod_data["publicIp"]
ssh_port = pod_data["portMappings"]["22"]

# Copy the repository to the pod using scp
repo_dir = os.path.abspath(os.getcwd())
repo_name = os.path.basename(repo_dir)

print(f"Copying repository from {repo_dir} to RunPod...")

tar_command = [
"tar", "-czf", "/tmp/repo.tar.gz", "-C",
os.path.dirname(repo_dir), repo_name
]
subprocess.run(tar_command, check=True)

# Copy the tarball to the pod
scp_command = [
"scp", "-o", "StrictHostKeyChecking=no", "-o",
"UserKnownHostsFile=/dev/null", "-P",
str(ssh_port), "/tmp/repo.tar.gz", f"root@{ssh_ip}:/tmp/"
]
subprocess.run(scp_command, check=True)

setup_steps = [
"cd /workspace",
"wget -q https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh",
"bash Miniconda3-latest-Linux-x86_64.sh -b -p $HOME/miniconda3",
"source $HOME/miniconda3/bin/activate",
"conda create --name venv python=3.10.0 -y", "conda activate venv",
"mkdir -p /workspace/repo",
"tar -xzf /tmp/repo.tar.gz --no-same-owner -C /workspace/",
f"cd /workspace/{repo_name}", args.test_command
]
remote_command = " && ".join(setup_steps)

ssh_command = [
"ssh", "-o", "StrictHostKeyChecking=no", "-o",
"UserKnownHostsFile=/dev/null", "-p",
str(ssh_port), f"root@{ssh_ip}", remote_command
]

print(f"Connecting to {ssh_ip}:{ssh_port}...")

try:
process = subprocess.Popen(ssh_command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1)

stdout_lines = []

print("Command output:")

for line in iter(process.stdout.readline, ''):
print(line.strip())
stdout_lines.append(line)

process.wait()

return_code = process.returncode
success = return_code == 0

stdout_str = "".join(stdout_lines)

if success:
print("Command executed successfully")
else:
print(f"Command failed with exit code {return_code}")

result = {
"success": success,
"return_code": return_code,
"stdout": stdout_str,
"stderr": ""
}
return result

except Exception as e:
print(f"Error executing SSH command: {str(e)}")
result = {"success": False, "error": str(e), "stdout": "", "stderr": ""}
return result


def terminate_pod(pod_id):
"""Terminate the pod"""
print("Terminating RunPod...")
requests.delete(f"{PODS_API}/{pod_id}", headers=HEADERS)
print("RunPod terminated")


def main():
pod_id = None
try:
pod_id = create_pod()
wait_for_pod(pod_id)
result = execute_command(pod_id)

if result.get("error") is not None:
print(f"Error executing command: {result['error']}")
sys.exit(1)

if not result.get("success", False):
print(
"Tests failed - check the output above for details on which tests failed"
)
sys.exit(1)

finally:
if pod_id:
terminate_pod(pod_id)


if __name__ == "__main__":
main()
53 changes: 53 additions & 0 deletions .github/workflows/pr-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
name: PR Test

on:
push:
branches: [main]
pull_request:
branches: [main]
types: [opened, ready_for_review, synchronize, reopened]

concurrency:
group: pr-test-${{ github.ref }}
cancel-in-progress: true

jobs:
unit-test:
if: github.event.pull_request.draft == false
runs-on: ubuntu-latest
environment: runpod-runners
steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.10"

- name: Set up SSH key
run: |
mkdir -p ~/.ssh
echo "${{ secrets.RUNPOD_PRIVATE_KEY }}" > ~/.ssh/id_rsa
chmod 600 ~/.ssh/id_rsa
ssh-keygen -y -f ~/.ssh/id_rsa > ~/.ssh/id_rsa.pub

- name: Install dependencies
run: pip install requests

- name: Run tests on RunPod
env:
RUNPOD_API_KEY: ${{ secrets.RUNPOD_API_KEY }}
GITHUB_SHA: ${{ github.sha }}
GITHUB_REF: ${{ github.ref }}
GITHUB_REPOSITORY: ${{ github.repository }}
GITHUB_RUN_ID: ${{ github.run_id }}
run: >-
python .github/scripts/runpod_api.py
--gpu-type "NVIDIA A40"
--gpu-count 1
--volume-size 100
--test-command "./env_setup.sh &&
pip install vllm &&
pip install pytest &&
pytest ./fastvideo/v1/tests/ --ignore=./fastvideo/v1/tests/old_tests/ -s"
24 changes: 24 additions & 0 deletions fastvideo/v1/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import pytest
import torch.distributed as dist
from fastvideo.v1.distributed import init_distributed_environment, initialize_model_parallel, destroy_model_parallel

@pytest.fixture(scope="function")
def distributed_setup():
"""
Fixture to set up and tear down the distributed environment for tests.

This ensures proper cleanup even if tests fail.
"""
init_distributed_environment(world_size=1,
rank=0,
distributed_init_method="env://",
local_rank=0,
backend="nccl")
initialize_model_parallel(tensor_model_parallel_size=1,
sequence_model_parallel_size=1,
backend="nccl")
yield

if dist.is_initialized():
destroy_model_parallel()
dist.destroy_process_group()
Loading
Loading