Skip to content

Commit

Permalink
server.py: improve error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
rhaschke committed Oct 22, 2024
1 parent 98ba863 commit 7c02ae3
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 40 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/interactive.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,4 @@ jobs:
- name: Import build artifacts to reprepro server
uses: ./reprepro
with:
url: "${{ vars.DEPLOY_URL }}?&run_id=${{ github.run_id }}&arch=${{ inputs.ARCH || vars.ARCH || 'x64' }}"
url: "${{ vars.DEPLOY_URL }}?distro=${{ inputs.DEB_DISTRO || vars.DEB_DISTRO }}&run_id=${{ github.run_id }}&arch=${{ inputs.ARCH || vars.ARCH || 'x64' }}"
6 changes: 6 additions & 0 deletions reprepro/deploy.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,9 @@ evtSource.onmessage = (event) => {
else
console.log(event.data)
};

evtSource.onerror = (error) => {
core.warning(error.message);
evtSource.close();
process.exit(1);
}
2 changes: 1 addition & 1 deletion reprepro/import.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fi
DISTRO="${DISTRO}-build"

if [ -n "$RUN_ID" ] ; then
echo "Fetching debs artifact from https://github.com/$REPO/actions/runs/$RUN_ID"
echo "Fetching artifact"
gh --repo "$REPO" run download --name debs --dir "$INCOMING_DIR" "$RUN_ID"
elif [ "$(ls -A "$INCOMING_DIR")" ]; then
echo "Importing existing files from incoming directory"
Expand Down
122 changes: 84 additions & 38 deletions reprepro/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
from sse_starlette.sse import EventSourceResponse
from fastapi import FastAPI, Request, Response
import asyncio
from enum import Enum
import os
import queue
import subprocess
import time
from threading import Thread
from pathlib import Path
import logging

Expand All @@ -21,59 +24,102 @@
app = FastAPI()


@app.get("/import")
def reprepro_import(request: Request, run_id: str = "", arch: str = "x64"):
class Status(Enum):
STARTED = 1
DOWNLOADING = 2
IMPORTING = 3


def process(q: queue.Queue, distro: str, repo: str, arch: str, run_id: str):
"""Process import request, writing http response to queue"""
global running
if running:
return Response(
content=f"Blocked by active import from: {running}",
media_type="text/plain",
status_code=503,
)
url = f"https://github.com/{repo}/actions/runs/{run_id}"
stamp = time.strftime("%Y-%m-%d %H:%M:%S")

env = os.environ.copy()
env["ARCH"] = arch
env["RUN_ID"] = run_id
env["REPO"] = repo = f"ubi_agni/ros-builder-action"
while running:
q.put(f"Blocked by active import from: {running}")
time.sleep(2)

running = f"https://github.com/{repo}/actions/runs/{run_id}"
if q.cancelled: # skip processing if client already disconnected
print(f"Cancelling import from {url}")
return

running = f"{url} started at {stamp}"

home = os.environ["HOME"]
log = open(f"{home}/import.log", "a", encoding="utf-8")
log.write(f"\n\n{stamp}\nImporting {arch} from {url}\n")
q.put(f"Importing {arch} from {url}")

# Run import script
env = os.environ.copy()
env.update(DISTRO=distro, ARCH=arch, RUN_ID=run_id, REPO=repo)
import_script = Path(__file__).parent / "import.sh"
process = subprocess.Popen(
p = subprocess.Popen(
[import_script.as_posix()],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
env=env,
)

async def processor():
with open("import.log", "a", encoding="utf-8") as f:
stamp = time.strftime("%Y-%m-%d %H:%M:%S")
url = f"https://github.com/{repo}/actions/runs/{run_id}"
f.write(f"\n\n{stamp} Importing {arch} from {url}\n")
while True:
if output := p.stdout.readline():
log.write(output)
q.put(output[:-1] or " ") # strip newline and ensure non-empty string
elif p.poll() is not None: # process finished
if p.returncode != 0:
q.put(f"Failed with return code {p.returncode}")
break

while True:
if await request.is_disconnected():
pass # continue process and writing log file

output = process.stdout.readline()
if output:
f.write(output)
yield output[:-1]
q.put("") # empty string signals end of stream
running = None

elif process.poll() is not None:
break # process finished

await asyncio.sleep(1)

if process.returncode != 0:
output = f"Failed with return code {process.returncode}\n"
f.write(output)
yield output[:-1]
@app.get("/import")
def reprepro_import(request: Request, distro: str, run_id: str, arch: str = "x64"):
kwargs = dict(
repo="ubi-agni/ros-builder-action", distro=distro, run_id=run_id, arch=arch
)
q = queue.Queue()
q.cancelled = False
t = Thread(target=process, args=(q,), kwargs=kwargs, daemon=True)
t.start()

yield "" # signal end of stream
global running
running = None
async def processor():
status = Status.STARTED
size_cmd = "ls -1st /tmp/gh*.zip 2> /dev/null | head -n 1 | cut -d ' ' -f1"
size = -1
try:
while True:
try:
response = q.get_nowait()
if status == Status.STARTED and response.startswith("Fetching "):
status = Status.DOWNLOADING
elif status == Status.DOWNLOADING or status == Status.IMPORTING:
status = Status.IMPORTING
size = 0

yield response
if not response:
break
except queue.Empty:
if status == Status.DOWNLOADING:
newsize = int(subprocess.getoutput(size_cmd) or 0)
if newsize > size:
size = newsize
yield f"Downloading artifact: {size}"
else:
yield "Extracting artifact"
elif status == Status.IMPORTING:
size += 1
if size >= 10:
yield "Import stalled. Killing unzstd."
subprocess.run(["pkill", "unzstd"], check=False)

await asyncio.sleep(1)

except asyncio.CancelledError: # client disconnected
q.cancelled = True

return EventSourceResponse(processor())

0 comments on commit 7c02ae3

Please sign in to comment.