Skip to content

Commit

Permalink
Update abstra-lib
Browse files Browse the repository at this point in the history
  • Loading branch information
abstra-bot committed Nov 7, 2024
1 parent 3ce6cbd commit 1dd0b07
Show file tree
Hide file tree
Showing 273 changed files with 1,333 additions and 1,325 deletions.
6 changes: 3 additions & 3 deletions abstra_internals/cloud/server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abstra_internals.cloud.application import CustomApplication
from abstra_internals.cloud.ghooks import GunicornOptionsBuilder
from abstra_internals.cloud.server_application import CustomApplication
from abstra_internals.cloud.server_hooks import GunicornOptionsBuilder
from abstra_internals.controllers.main import MainController
from abstra_internals.environment import DEFAULT_PORT
from abstra_internals.logger import AbstraLogger
Expand All @@ -11,7 +11,7 @@

def run():
AbstraLogger.init("cloud")
SettingsController.set_root_path(".") # TODO: use CWD
SettingsController.set_root_path(".")
SettingsController.set_server_port(DEFAULT_PORT)

controller = MainController(repositories=get_prodution_repositories())
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import datetime
import inspect
import signal

Expand All @@ -16,7 +15,6 @@
set_WORKER_UUID,
)
from abstra_internals.logger import AbstraLogger
from abstra_internals.repositories.execution_logs import LogEntry
from abstra_internals.utils import get_internal_id


Expand Down Expand Up @@ -51,38 +49,13 @@ def child_exit(self, server: Arbiter, worker: Worker):
worker_id = get_internal_id(worker, ensure=False)
app_id = get_internal_id(server, ensure=False)

killed_executions = (
self.main_controller.execution_repository.find_by_worker(
worker_id=worker_id,
status="running",
app_id=app_id,
)
)

text = f"[ABORTED] Worker exited with status ({status})"
err_msg = f"[ABORTED] Worker exited with status ({status})"
if status == signal.Signals.SIGKILL:
text += ": Server reached its memory limit or was replaced with a new version"

# update executions
for execution in killed_executions:
# Add log entry
err_log = LogEntry(
execution_id=execution.id,
created_at=datetime.datetime.now(),
payload={"text": text},
sequence=999999,
event="stderr",
)
self.main_controller.execution_logs_repository.save(err_log)
err_msg += ": Server reached its memory limit or was replaced with a new version"

# Update execution status
self.main_controller.execution_repository.set_failure_by_id(
execution_id=execution.id
)

self.main_controller.stage_run_repository.change_status(
execution.stage_run_id, "failed", execution.id
)
self.main_controller.child_exit(
app_id=app_id, worker_id=worker_id, err_msg=err_msg
)

except Exception as e:
AbstraLogger.capture_exception(e)
Expand All @@ -91,33 +64,10 @@ def child_exit(self, server: Arbiter, worker: Worker):
def on_exit(self, server: Arbiter):
try:
app_id = get_internal_id(server, ensure=False)

exited_execs = self.main_controller.execution_repository.find_by_app(
status="running",
app_id=app_id,
self.main_controller.self_exit(
app_id=app_id, err_msg="[ABORTED] Server exited"
)

# update executions
for execution in exited_execs:
# Add log entry
err_log = LogEntry(
execution_id=execution.id,
created_at=datetime.datetime.now(),
payload={"text": "[ABORTED] Server exited"},
sequence=999999,
event="stderr",
)
self.main_controller.execution_logs_repository.save(err_log)

# Update execution status
self.main_controller.execution_repository.set_failure_by_id(
execution_id=execution.id
)

self.main_controller.stage_run_repository.change_status(
execution.stage_run_id, "failed", execution.id
)

except Exception as e:
AbstraLogger.capture_exception(e)

Expand Down
30 changes: 30 additions & 0 deletions abstra_internals/cloud/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from multiprocessing.forkserver import set_forkserver_preload

from abstra_internals.controllers import execution_process
from abstra_internals.controllers.execution_consumer import ExecutionConsumer
from abstra_internals.controllers.main import MainController
from abstra_internals.environment import RABBITMQ_CONNECTION_URI
from abstra_internals.logger import AbstraLogger
from abstra_internals.repositories.consumer import RabbitConsumer
from abstra_internals.repositories.factory import get_prodution_repositories
from abstra_internals.settings import SettingsController
from abstra_internals.signals import SignalHandlers


def run():
SignalHandlers.init()
AbstraLogger.init("cloud")
SettingsController.set_root_path(".")

if not RABBITMQ_CONNECTION_URI:
raise Exception("RABBITMQ_CONNECTION_URI not found")

set_forkserver_preload([execution_process.__name__])
controller = MainController(repositories=get_prodution_repositories())
with RabbitConsumer(RABBITMQ_CONNECTION_URI) as consumer:
SignalHandlers.register_sigterm_callback(consumer.stop)
ExecutionConsumer(consumer, controller)


if __name__ == "__main__":
run()
43 changes: 22 additions & 21 deletions abstra_internals/controllers/execution.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import json
from threading import Thread
from typing import Optional

from abstra_internals.controllers.execution_client import (
Expand All @@ -8,12 +7,9 @@
)
from abstra_internals.controllers.execution_target import ExecutionTarget
from abstra_internals.controllers.workflow_interface import IWorkflowEngine
from abstra_internals.entities.execution import Execution, RequestContext
from abstra_internals.entities.execution import Execution, PreExecution, RequestContext
from abstra_internals.repositories.factory import Repositories
from abstra_internals.repositories.project.project import (
ActionStage,
ProjectRepository,
)
from abstra_internals.repositories.project.project import ActionStage, ProjectRepository
from abstra_internals.settings import Settings
from abstra_internals.utils.dot_abstra import TEST_DATA_FILE

Expand Down Expand Up @@ -48,10 +44,23 @@ def __init__(
self.repositories = repositories
self.workflow_engine = workflow_engine

def submit(
self,
stage: ActionStage,
request: Optional[RequestContext] = None,
target_stage_run_id: Optional[str] = None,
):
return self.repositories.producer.submit(
PreExecution(
request=request,
stage_id=stage.id,
target_stage_run_id=target_stage_run_id,
)
)

def run(
self,
*,
wait=True,
stage: ActionStage,
client: Optional[ExecutionClient] = None,
request: Optional[RequestContext] = None,
Expand Down Expand Up @@ -85,22 +94,14 @@ def run(
client.handle_lock_failed(target_stage_run_id)
raise LockFailedException()

pthread = Thread(
target=ExecutionTarget,
kwargs=dict(
stage=stage,
client=client,
execution=execution,
repositories=self.repositories,
workflow_engine=self.workflow_engine,
),
name=f"{stage.title} - {execution.short_id}",
ExecutionTarget(
stage=stage,
client=client,
execution=execution,
repositories=self.repositories,
workflow_engine=self.workflow_engine,
)

pthread.start()
if wait:
pthread.join()

def test(
self,
stage: ActionStage,
Expand Down
99 changes: 99 additions & 0 deletions abstra_internals/controllers/execution_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
from concurrent.futures import ThreadPoolExecutor
from uuid import uuid4

from abstra_internals.controllers.execution_process import ExecutionProcess
from abstra_internals.controllers.main import MainController
from abstra_internals.environment import QUEUE_CONCURRENCY
from abstra_internals.logger import AbstraLogger
from abstra_internals.repositories.consumer import Consumer, QueueMessage
from abstra_internals.settings import Settings


class StageNotFound(Exception):
pass


class NonCleanExit(Exception):
pass


def PreExecController(
*,
controller: MainController,
consumer: Consumer,
msg: QueueMessage,
arbiter_uuid: str,
):
worker_uuid = str(uuid4())
head_id = worker_uuid.split("-")[0]

try:
mp_context = controller.repositories.mp_context.get_context()
stage = controller.get_action(msg.preexecution.stage_id)
if stage is None:
raise StageNotFound(msg.preexecution.stage_id)

p = mp_context.Process(
target=ExecutionProcess,
kwargs=dict(
stage=stage,
controller=controller,
worker_uuid=worker_uuid,
arbiter_uuid=arbiter_uuid,
root_path=Settings.root_path,
request=msg.preexecution.request,
environment=AbstraLogger.environment,
target_stage_run_id=msg.preexecution.target_stage_run_id,
),
name=f"Worker-{head_id}",
)

p.start()
p.join()

if p.exitcode != 0:
err_msg = f"Worker exited with status ({p.exitcode})"
if p.exitcode == -9:
err_msg += ": Server reached its memory limit"

raise NonCleanExit(err_msg)

consumer.done_callback(msg)
except Exception as e:
AbstraLogger.error(f"[{head_id}] PreExecController ERROR: aborting consumer")
AbstraLogger.capture_exception(e)
controller.child_exit(
app_id=arbiter_uuid, worker_id=worker_uuid, err_msg=f"[ABORTED] {e.args}"
)
consumer.stop()


class Arbiter:
def __init__(self, controller: MainController):
self.controller = controller
self.uuid = str(uuid4())

@property
def head_id(self) -> str:
return self.uuid.split("-")[0]

def __enter__(self):
AbstraLogger.debug(f"[{self.head_id}] ARBITER INIT")
return self

def __exit__(self, exc_type, exc_val, exc_tb):
AbstraLogger.debug(f"[{self.head_id}] ARBITER EXIT")
self.controller.self_exit(app_id=self.uuid, err_msg="[ABORTED] Worker exited")


def ExecutionConsumer(consumer: Consumer, controller: MainController):
with Arbiter(controller) as arbiter:
with ThreadPoolExecutor(max_workers=QUEUE_CONCURRENCY) as executor:
for msg in consumer.iter():
executor.submit(
PreExecController,
arbiter_uuid=arbiter.uuid,
controller=controller,
consumer=consumer,
msg=msg,
)
49 changes: 49 additions & 0 deletions abstra_internals/controllers/execution_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from typing import Optional

from abstra_internals.controllers.execution import ExecutionController
from abstra_internals.controllers.main import MainController
from abstra_internals.entities.execution import RequestContext
from abstra_internals.environment import set_SERVER_UUID, set_WORKER_UUID
from abstra_internals.logger import AbstraLogger, Environment
from abstra_internals.repositories.project.project import ActionStage
from abstra_internals.settings import Settings
from abstra_internals.stdio_patcher import StdioPatcher


# runs in subprocess - all arguments must be picklable/multiprocessable
def ExecutionProcess(
*,
root_path: str,
worker_uuid: str,
arbiter_uuid: str,
stage: ActionStage,
controller: MainController,
environment: Optional[Environment],
request: Optional[RequestContext] = None,
target_stage_run_id: Optional[str] = None,
):
Settings.set_root_path(root_path)
AbstraLogger.init(environment)

set_WORKER_UUID(worker_uuid)
set_SERVER_UUID(arbiter_uuid)
StdioPatcher.apply(controller)

head_id = worker_uuid.split("-")[0]

AbstraLogger.debug(f"[{head_id}] WORKER INIT")

try:
ExecutionController(
repositories=controller.repositories,
workflow_engine=controller.workflow_engine,
).run(
stage=stage,
request=request,
target_stage_run_id=target_stage_run_id,
)
except Exception as e:
AbstraLogger.error(f"[{head_id}] WORKER ERROR: {e}")
AbstraLogger.capture_exception(e)
finally:
AbstraLogger.debug(f"[{head_id}] WORKER EXIT")
Loading

0 comments on commit 1dd0b07

Please sign in to comment.