From 6a430102b4b5658f8a6ded8368f3c83b111d8f94 Mon Sep 17 00:00:00 2001 From: Till Prochaska <1512805+tillprochaska@users.noreply.github.com> Date: Sat, 8 Jun 2024 18:08:19 +0200 Subject: [PATCH] Fetch results of evening voting sessions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit While on most plenary days, there is only one voting session around midday, on some days there is another sesssion in the evening, usually around 17:00. The vote results of the evening sessions are appended to the same source document that also contains the results of the midday votes. Currently, we only run the `RCVListPipeline` between 12:00 and 15:00 until we’ve been able to fetch vote results successfully. Once we’ve fetched the results, we do not attempt to fetch them again. That means that so far we did not (automatically) fetch results of the evening voting session. In addition to the current behavior, this change tries to fetch vote results between 17:00 and 20:00, until we’ve been able to fetch them successfully a second time. This is only the first part of the solution, as we also need to check that we only stop scraping vote results once we’ve been able to fetch updated results (e.g. by storing a hash of the source data for every successful pipeline run). --- backend/howtheyvote/worker/__init__.py | 51 +++++++++++++++++--------- backend/howtheyvote/worker/worker.py | 19 ++++++++++ backend/tests/worker/test_worker.py | 44 +++++++++++++++++++++- 3 files changed, 95 insertions(+), 19 deletions(-) diff --git a/backend/howtheyvote/worker/__init__.py b/backend/howtheyvote/worker/__init__.py index f884180c0..fe614f5c3 100644 --- a/backend/howtheyvote/worker/__init__.py +++ b/backend/howtheyvote/worker/__init__.py @@ -1,6 +1,6 @@ import datetime -from sqlalchemy import exists, func, select +from sqlalchemy import select from structlog import get_logger from .. import config @@ -8,12 +8,12 @@ from ..models import PipelineRun, PipelineRunResult, PlenarySession from ..pipelines import MembersPipeline, PressPipeline, RCVListPipeline, SessionsPipeline from ..query import session_is_current_at -from .worker import SkipPipelineError, Weekday, Worker +from .worker import SkipPipelineError, Weekday, Worker, pipeline_ran_successfully log = get_logger(__name__) -def op_rcv() -> None: +def op_rcv_midday() -> None: """Checks if there is a current plenary session and, if yes, fetches the latest roll-call vote results.""" today = datetime.date.today() @@ -21,7 +21,25 @@ def op_rcv() -> None: if not _is_session_day(today): raise SkipPipelineError() - if _ran_successfully(RCVListPipeline, today): + if pipeline_ran_successfully(RCVListPipeline, today): + raise SkipPipelineError() + + pipeline = RCVListPipeline(term=config.CURRENT_TERM, date=today) + pipeline.run() + + +def op_rcv_evening() -> None: + """While on most plenary days, there’s only one voting session around midday, on some days + there is another sesssion in the evening, usually around 17:00. The vote results of the + evening sessions are appended to the same source document that also contains the results + of the midday votes. This method fetches the latest roll-call vote results, even if they + have been fetched successfully earlier on the same day.""" + today = datetime.date.today() + + if not _is_session_day(today): + raise SkipPipelineError() + + if pipeline_ran_successfully(RCVListPipeline, today, count=2): raise SkipPipelineError() pipeline = RCVListPipeline(term=config.CURRENT_TERM, date=today) @@ -59,19 +77,6 @@ def _is_session_day(date: datetime.date) -> bool: return session is not None -def _ran_successfully(pipeline: type[object], date: datetime.date) -> bool: - """Check if a given pipeline has been run successfully on a given day.""" - query = ( - exists() - .where(PipelineRun.pipeline == pipeline.__name__) - .where(func.date(PipelineRun.started_at) == func.date(date)) - .where(PipelineRun.result == PipelineRunResult.SUCCESS) - .select() - ) - - return bool(Session.execute(query).scalar()) - - worker = Worker() # Mon at 04:00 @@ -94,7 +99,7 @@ def _ran_successfully(pipeline: type[object], date: datetime.date) -> bool: # Mon-Thu between 12:00 and 15:00, every 10 mins worker.schedule( - op_rcv, + op_rcv_midday, name=RCVListPipeline.__name__, weekdays={Weekday.MON, Weekday.TUE, Weekday.WED, Weekday.THU}, hours=range(12, 15), @@ -102,6 +107,16 @@ def _ran_successfully(pipeline: type[object], date: datetime.date) -> bool: tz=config.TIMEZONE, ) +# Mon-Thu between 17:00 and 20:00, every 10 mins +worker.schedule( + op_rcv_evening, + name=RCVListPipeline.__name__, + weekdays={Weekday.MON, Weekday.TUE, Weekday.WED, Weekday.THU}, + hours=range(17, 20), + minutes=range(0, 60, 10), + tz=config.TIMEZONE, +) + # Mon-Thu, between 13:00 and 20:00, every 30 mins worker.schedule( op_press, diff --git a/backend/howtheyvote/worker/worker.py b/backend/howtheyvote/worker/worker.py index 199dcb28c..df8f2ffe2 100644 --- a/backend/howtheyvote/worker/worker.py +++ b/backend/howtheyvote/worker/worker.py @@ -9,6 +9,7 @@ from prometheus_client import Counter, Gauge, Histogram from prometheus_client import start_http_server as start_metrics_server from schedule import Scheduler +from sqlalchemy import func, select from structlog import get_logger from .. import config @@ -58,6 +59,24 @@ class Weekday(enum.Enum): Handler = Callable[..., Any] +def pipeline_ran_successfully( + pipeline: type[object], + date: datetime.date, + count: int = 1, +) -> bool: + """Check if a given pipeline has been run successfully on a given day.""" + query = ( + select(func.count()) + .select_from(PipelineRun) + .where(PipelineRun.pipeline == pipeline.__name__) + .where(func.date(PipelineRun.started_at) == func.date(date)) + .where(PipelineRun.result == PipelineRunResult.SUCCESS) + ) + result = Session.execute(query).scalar() or 0 + + return result >= count + + class Worker: """Running a worker starts a long-running process that executes data pipelines in regular intervals and stores the result of the pipeline runs in the database.""" diff --git a/backend/tests/worker/test_worker.py b/backend/tests/worker/test_worker.py index b98a759db..dd924e24b 100644 --- a/backend/tests/worker/test_worker.py +++ b/backend/tests/worker/test_worker.py @@ -5,7 +5,7 @@ from howtheyvote.models import PipelineRun, PipelineRunResult from howtheyvote.pipelines import DataUnavailableError, PipelineError -from howtheyvote.worker.worker import Weekday, Worker +from howtheyvote.worker.worker import Weekday, Worker, pipeline_ran_successfully def get_handler(): @@ -174,3 +174,45 @@ def pipeline_error(): assert runs[1].pipeline == "pipeline_error" assert runs[1].result == PipelineRunResult.FAILURE + + +def test_pipeline_ran_successfully(db_session): + class TestPipeline: + pass + + now = datetime.datetime.now() + today = now.date() + + run = PipelineRun( + started_at=now, + finished_at=now, + pipeline=TestPipeline.__name__, + result=PipelineRunResult.FAILURE, + ) + db_session.add(run) + db_session.commit() + + assert pipeline_ran_successfully(TestPipeline, today) is False + + run = PipelineRun( + started_at=now, + finished_at=now, + pipeline=TestPipeline.__name__, + result=PipelineRunResult.SUCCESS, + ) + db_session.add(run) + db_session.commit() + + assert pipeline_ran_successfully(TestPipeline, today) is True + assert pipeline_ran_successfully(TestPipeline, today, count=2) is False + + run = PipelineRun( + started_at=now, + finished_at=now, + pipeline=TestPipeline.__name__, + result=PipelineRunResult.SUCCESS, + ) + db_session.add(run) + db_session.commit() + + assert pipeline_ran_successfully(TestPipeline, today, count=2) is True