Skip to content

Commit

Permalink
Merge pull request #6 from BaCa2-project/http_server
Browse files Browse the repository at this point in the history
Broker HTTP server re-made
  • Loading branch information
ZyndramZM authored Oct 20, 2023
2 parents 0335d48 + 04b46d3 commit 4d30e12
Show file tree
Hide file tree
Showing 14 changed files with 509 additions and 420 deletions.
8 changes: 8 additions & 0 deletions broker/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from time import sleep

from broker.master import BrokerMaster
from settings import APP_SETTINGS, DB_STRING, SUBMITS_DIR

if __name__ == '__main__':
broker_instance = BrokerMaster(DB_STRING, SUBMITS_DIR, APP_SETTINGS['delete_records'])
print('BaCa2 broker is running')
4 changes: 1 addition & 3 deletions broker/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,10 @@ def _generate_test_yaml(self):
'storage': '5G',
'workspace': '5G',
}
# TODO:
# 'url': callback_url,
}
}
if self.enable_shortcut:
test_yaml['satori'] = {
test_yaml['kolejka']['satori'] = {
'result': {
'execute_time_real': '/io/executor/run/real_time',
'execute_time_cpu': '/io/executor/run/cpu_time',
Expand Down
152 changes: 15 additions & 137 deletions broker/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,41 @@

import requests as req
from pathlib import Path
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
from threading import Thread, Lock

from db.connector import Connection
from .submit import TaskSubmit
from broker.server import BacaApiServer
from .server import KolejkaCommunicationManager, BrokerIOServer

from settings import KOLEJKA_SRC_DIR, APP_SETTINGS
from .timeout import TimeoutManager


class BrokerMaster:
def __init__(self,
db_string: str,
submits_dir: Path,
delete_records: bool = APP_SETTINGS['delete_records'],
threads: int = 2,
# server_address: tuple[str, int] = ('127.0.0.1', 15212)
threads: int = 2
):
self.connection = Connection(db_string)
self.delete_records = delete_records
self.submits_dir = submits_dir
self.threads = threads
self.submits = {}
self.submit_http_server = KolejkaCommunicationServer(APP_SETTINGS['server_ip'], APP_SETTINGS['server_port'])
self.submit_http_server.start_server()
self.api_server = BacaApiServer(self,
server_ip=APP_SETTINGS['server_ip'],
server_port=APP_SETTINGS['server_port'])
self.kolejka_manager = KolejkaCommunicationManager()
self.broker_server = BrokerIOServer(self.kolejka_manager, self)
self.timeout_manager = TimeoutManager()
self.verbose = APP_SETTINGS['verbose']
if self.verbose:
print('Broker master initialized')
print('Initial settings:')
for key, value in APP_SETTINGS.items():
print(f'\t{key}: {value}')

def __del__(self):
self.submit_http_server.stop_server()
self.broker_server.close_server()
self.timeout_manager.stop()


@staticmethod
def refresh_kolejka_src(add_executable_attr: bool = True):
Expand Down Expand Up @@ -81,129 +85,3 @@ def submit_results(self, submit_id: str, results):
def close_submit(self, submit_id: str):
if self.submits.get(submit_id) is not None:
del self.submits[submit_id]


class KolejkaCommunicationServer:
"""
Manages a http server that listens for updates from KOLEJKA system about submit records' statuses.
Provides methods for awaiting calls from KOLEJKA system.
"""

def __init__(self, host: str, port: int):
self.host = host
self.port = port
self.server = ThreadingHTTPServer2(self, (host, port), _KolejkaCommunicationHandler)
self.server_thread = Thread(target=self.server.serve_forever)
self.submit_dict: dict[str, Lock] = {}
self.integrity_lock = Lock() # for protection against data races

def __len__(self):
with self.integrity_lock:
length = len(self.submit_dict)
return length

def start_server(self) -> None:
"""Starts the HTTP server in a separate thread"""
assert not self.is_active
self.server_thread.start()

def stop_server(self) -> None:
"""Stops the HTTP server"""
assert self.is_active
self.server.shutdown()
self.server.server_close()

@property
def is_active(self) -> bool:
"""Returns True if the HTTP server is currently operational"""
return self.server_thread.is_alive()

def add_submit(self, submit_id: str) -> str:
# TODO: adding submit should return url for submit
"""
Adds a submit record to the local storage. Marks it as 'awaiting checking' for KOLEJKA system.
:raise ValueError: if the submit record is already in the local storage
"""
with self.integrity_lock:
if submit_id in self.submit_dict:
raise ValueError('Submit with id %s already registered.' % submit_id)
self.submit_dict[submit_id] = Lock()
self.submit_dict[submit_id].acquire()
return f'http://{self.host}:{self.port}/{submit_id}'

def release_submit(self, submit_id: str) -> None:
"""
Marks a submit record as 'checked'.
:raise KeyError: if the submit record is not present in the local storage
:raise ValueError: if the submit record has already been released
"""
with self.integrity_lock:
if self.submit_dict[submit_id].locked():
self.submit_dict[submit_id].release()
else:
raise ValueError('Submit with id %s has already been released.' % submit_id)

def delete_submit(self, submit_id: str) -> None:
"""
Removes a submit record from the local storage.
raise KeyError: if the submit record is not present in the local storage
"""
with self.integrity_lock:
del self.submit_dict[submit_id]

def await_submit(self, submit_id: str, timeout: float = -1) -> bool:
"""
Returns True if a record's status changes to 'checked' within 'timeout' seconds after
calling this method. If 'timeout' is a negative number waits indefinitely.
raise KeyError: if the submit record is not present in the local storage
"""
with self.integrity_lock:
lock = self.submit_dict[submit_id]
lock_acquired = lock.acquire(timeout=timeout)
if lock_acquired:
lock.release()
try:
self.delete_submit(submit_id)
except KeyError: # in case this method is called multiple times simultaneously for the same submit record
pass
return lock_acquired


class ThreadingHTTPServer2(ThreadingHTTPServer):
"""
Exactly the same thing as ThreadingHTTPServer but with an additional attribute 'manager'.
'manager' field stores KolejkaCommunicationServer instance so that the HTTP handler can invoke
KolejkaCommunicationServer methods.
"""

def __init__(self, manager: KolejkaCommunicationServer, *args, **kwargs):
self.manager = manager
super().__init__(*args, **kwargs)


class _KolejkaCommunicationHandler(BaseHTTPRequestHandler):
"""
HTTP handler class for communication with KOLEJKA system
"""

def __init__(self, request: bytes, client_address: tuple[str, int], server: ThreadingHTTPServer2):
super().__init__(request, client_address, server)
self.server: ThreadingHTTPServer2 = server

def do_POST(self):
"""Handles http requests."""
manager: KolejkaCommunicationServer = self.server.manager
submit_id = self.path
if not submit_id.isalnum():
return
try:
manager.release_submit(submit_id)
except (KeyError, ValueError):
pass

def log_message(self, format: str, *args) -> None:
pass
Loading

0 comments on commit 4d30e12

Please sign in to comment.