Skip to content

Commit

Permalink
BRK-17 Sending callback to BaCa2
Browse files Browse the repository at this point in the history
  • Loading branch information
ZyndramZM committed Sep 18, 2023
1 parent d72c5d0 commit 29dc07e
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 21 deletions.
52 changes: 33 additions & 19 deletions broker/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,15 @@ class SubmitState(Enum):
SAVING = 6
#: Judging process ended successfully.
DONE = 200
#: Submit results successfully sent to BaCa2
RETURNED = 201


class TaskSubmit(Thread):
class BaCa2CommunicationError(Exception):
pass

class JudgingError(Exception):
pass

TIMEOUT_STEP = 3

def __init__(self,
Expand Down Expand Up @@ -146,16 +150,12 @@ def set_submit_update(self, status: SubmitState):
self._submit_update_lock.acquire()
self.sets_statuses[status] += 1
self._submit_update_lock.release()
if self.sets_statuses[status] == len(self.sets):
if self.sets_statuses[status] == len(self.sets) and status not in (SubmitState.DONE, SubmitState.SAVING):
self._change_state(status)

def close_set_submit(self, set_name: str, results: SetResult):
with self._gather_results_lock:
self.results[set_name] = results
if len(self.results) == len(self.sets):
self._change_state(SubmitState.DONE)
# TODO: verify
self._send_to_baca(BACA_URL, BACA_PASSWORD)

def _call_for_update(self, success: bool, msg: str = None):
pass
Expand All @@ -181,18 +181,23 @@ def _check_build(self):
# TODO: Consult package checking
return True

def _check_results(self):
if self.state != SubmitState.SAVING:
raise self.JudgingError(f"Submit state has to be 'SAVING' not '{self.state.name}'.")
for s in self.sets:
if s.state != SubmitState.DONE:
raise self.JudgingError(f"SetSubmit state has to be 'DONE' not '{s.state.name}'.")
return True

def _send_to_baca(self, baca_url: str, password: str) -> None:
if self.state != SubmitState.DONE:
raise ValueError(f"Submit state has to be 'DONE' not '{repr(self.state)}'.")
message = BrokerToBaca(
pass_hash=make_hash(password, self.submit_id),
submit_id=self.submit_id,
results=deepcopy(self.results) # TODO: verify if deepcopy is appropriate here
)
r = requests.post(url=f'{baca_url}/result/{self.submit_id}', json=message.serialize())
if r.status_code != 200:
raise ConnectionError(f"Results for TaskSubmit with id {self.submit_id} could not be send.")
self._change_state(SubmitState.RETURNED)
raise self.BaCa2CommunicationError(f"Results for TaskSubmit with id {self.submit_id} could not be send.")

def process(self):
self._change_state(SubmitState.AWAITING_PREPROC)
Expand All @@ -207,16 +212,24 @@ def process(self):
self._change_state(SubmitState.CANCELED)
self._call_for_update(success=False, msg='Package check error')
return

self._change_state(SubmitState.SENDING)
for s in self.sets:
s.start()

for s in self.sets:
s.join()

self._change_state(SubmitState.SAVING)
self._check_results()
self._send_to_baca(BACA_URL, BACA_PASSWORD)

self._change_state(SubmitState.DONE)

def run(self):
try:
self.process()
except Exception as e:
self._change_state(SubmitState.ERROR, str(e))
self._change_state(SubmitState.ERROR, f'{e.__class__.__name__}: {e}\n\n{e.__traceback__}')


class SetSubmit(Thread):
Expand Down Expand Up @@ -249,6 +262,7 @@ def __init__(self,
self.submit_id, self.set_name, self.state)
self.task_dir = self.task_submit.task_submit_dir / f'{self.set_name}.task'
self.result_dir = self.task_submit.task_submit_dir / f'{self.set_name}.result'
self.results = None

if sys.platform.startswith('win'):
self.python_call = 'py'
Expand All @@ -264,7 +278,6 @@ def _call_for_update(self):
self.task_submit.set_submit_update(self.state)

def _change_state(self, state: SubmitState, error_msg: str = None):
# TODO: Add state monitoring for parent (task) submit
self.state = state
if state == SubmitState.DONE and self.master.delete_records:
self._conn.exec("DELETE FROM set_submit_records WHERE submit_id=? AND set_name=?",
Expand Down Expand Up @@ -347,7 +360,7 @@ def _parse_results(self) -> SetResult:
# TODO: add assertion about status
results_yaml = self.result_dir / 'results' / 'results.yaml'
with open(results_yaml) as f:
content: dict = yaml.load(f, get_loader())
content: dict = yaml.load(f, Loader=get_loader())
tests = {}
for key, val in content.items():
satori = val['satori']
Expand All @@ -359,7 +372,7 @@ def _parse_results(self) -> SetResult:
runtime_memory=int(satori['execute_memory'][:-1])
)
tests[key] = tmp
return SetResult(name=self.name, tests=tests)
return SetResult(name=self.set_name, tests=tests)

def process(self):
self._change_state(SubmitState.SENDING)
Expand All @@ -370,15 +383,16 @@ def process(self):
self._await_results(timeout=60, active_wait=True)

self._change_state(SubmitState.SAVING)
results = self._parse_results()
self.results = self._parse_results()

self.task_submit.close_set_submit(self.set_name, self.results)

self._change_state(SubmitState.DONE)
self.task_submit.close_set_submit(self.set_name, results)


def run(self):
try:
self.process()
except Exception as e:
self._change_state(SubmitState.ERROR, str(e))
self._change_state(SubmitState.ERROR, f'{e.__class__.__name__}: {e}\n\n{e.__traceback__}')
# TODO: error handling for BaCa2 srv
10 changes: 8 additions & 2 deletions broker/yaml_tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,18 @@ def file_representer(dumper: yaml.SafeDumper, file: File):
return dumper.represent_scalar('!file', file.path)


def file_constructor(loader: yaml.SafeLoader, node: yaml.ScalarNode) -> File:
return File(loader.construct_scalar(node))


def get_dumper():
dumper = yaml.SafeDumper
# dumper.add_representer(Include, Include.to_yaml)
dumper.add_representer(File, file_representer)
return dumper


def get_loader():
pass
#TODO: add loader for !file tag
loader = yaml.SafeLoader
loader.add_constructor('!file', file_constructor)
return loader

0 comments on commit 29dc07e

Please sign in to comment.