Skip to content

Commit

Permalink
Release/1.23.0 (#143)
Browse files Browse the repository at this point in the history
This introduces several changes to the way `servicelayer` exposes a RabbitMQ-based implementation for Aleph tasks. 

### Objects

**Task**

A data class that contains information about one Aleph task. 
Also contains methods to track how many times the task was retried.

```
task_id: str
job_id: str
delivery_tag: str
operation: str
context: dict
payload: dict
priority: int
collection_id: Optional[str] = None
```

**Dataset**

Object which keeps track of the status of currently running tasks by populating Redis. One `Dataset` instance corresponds to one `collection_id`. This is also the object which the `/status/` API ends up querying to populate its response. 

Redis keys used by the `Dataset` object:

- `tq:qdatasets`: set of all `collection_id`s of active datasets (a dataset is considered active when it has either running or pending tasks)
- `tq:qdj:<dataset>:taskretry:<task_id>`: the number of times `task_id` was retried

All of the following keys refer to `task_id`s or statistics about tasks per a certain dataset (`collection_id`):

- `tq:qdj:<dataset>:finished`: number of tasks that have been marked as "Done" and for which an acknowledgement is also sent by the Worker over RabbitMQ.
- `tq:qdj:<dataset>:running`: set of all `task_id`s of tasks currently running. A "Running" task is a task which has been checked out, and is being processed by a worker. 
- `tq:qdj:<dataset>:pending`: set of all `task_id`s of tasks currently pending. A "Pending" task has been added to a RabbitMQ queue (via a `basic_publish` call) by a producer (an API call, a UI action etc.). 
- `tq:qdj:<dataset>:start`: the UTC timestamp when **either** the first `task_id` has been added to a RabbitMQ queue (so, we have our first Pending task) or the timestamp when the first `task_id` has been checked out (so, we have our first Running task). The `start` key is updated when the first task is handed to a Worker. 
- `tq:qdj:<dataset>:last_update`: the UTC timestamp from the latest change to the state of tasks running for a certain `collection_id`. This is set when: a new task is Pending, a new task is Running, a new task is Done, a new task is canceled. 
- `tq:qds:<dataset>:<stage>`: a set of all `task_id`s that are either running or pending, for a certain stage.
- `tq:qds:<dataset>:<stage>:finished`: number of tasks that have been marked as "Done" for a certain stage. 
- `tq:qds:<dataset>:<stage>:running`: set of all `task_id`s of tasks currently running for a certain stage.
- `tq:qds:<dataset>:<stage>:pending`: set of all `task_id`s of tasks currently pending for a certain stage.

**Worker**

The parent class of all workers used in aleph: the Aleph worker, the `ingest-file` worker. Handles the consuming of tasks from RabbitMQ queues, and sending acknowledgements when the tasks are completed. The `dispatch_task` method is implemented in each subsequent child class. 

### Changes from the initial RabbitMQ implementation

- implemented **priorities** for tasks. Each task gets assigned a random priority. The Producer will also reserve a maximum priority for tasks that need to be queued and executed urgently. This maximum priority implementation will exist outside of `servicelayer`.
- added **Redis keys**: `tq:qds:<dataset>:<stage>` (`stage_key`), `tq:qds:<dataset>:<stage>:finished`, `tq:qds:<dataset>:<stage>:running`, `tq:qds:<dataset>:<stage>:pending` and added code to `get_status` to expose a break-down of tasks per stage. **The `job_id` key is set to `null` since jobs are no longer relevant. The key was preserved in the JSON in order to not introduce breaking changes.**
- `get_rabbitmq_connection` has been refactored and it will now **re-establish a new RabbitMQ connection** is the existing one was closed.

### Other changes

The **status API JSON response** has been modified, introducing a **breaking change**. The **jobs** key has been removed from the JSON. Now, the JSON contains a total number of running, pending and finished tasks, as well as a break-down of these tasks **per stage**.
  • Loading branch information
stchris authored Oct 8, 2024
1 parent d242b89 commit 267f63d
Show file tree
Hide file tree
Showing 14 changed files with 1,142 additions and 182 deletions.
16 changes: 15 additions & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,7 +1,21 @@
[bumpversion]
current_version = 1.22.2
current_version = 1.23.0-rc36
commit = True
tag = True
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)([-](?P<release>(pre|rc))(?P<build>\d+))?
serialize =
{major}.{minor}.{patch}-{release}{build}
{major}.{minor}.{patch}

[bumpversion:part:release]
optional_value = prod
first_value = rc
values =
rc
prod

[bumpversion:part:build]
first_value = 1

[bumpversion:file:setup.py]
search = version="{current_version}"
Expand Down
10 changes: 7 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ all: clean install test
.PHONY: build

build-docker:
docker-compose build --no-rm --parallel
docker compose build --no-rm --parallel

install:
pip install -q -e .
Expand All @@ -15,11 +15,15 @@ dev:
python3 -m pip install -q -r requirements-dev.txt

test:
docker compose run --rm shell make test-local
docker compose run --rm shell pytest --cov=servicelayer
@echo "⚠️ you might notice a warning about a fairy from SQLAlchemy"
@echo "this is fixed in a newer release -- see https://github.com/sqlalchemy/sqlalchemy/issues/10414"
@echo "we are ignoring this for now"

test-local:
pytest --cov=servicelayer


lint:
ruff check .

Expand All @@ -30,7 +34,7 @@ format-check:
black --check .

shell:
docker-compose run --rm shell
docker compose run --rm shell

build:
python3 setup.py sdist bdist_wheel
Expand Down
3 changes: 3 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ version: "3.2"
services:
rabbitmq:
image: rabbitmq:3.9-management-alpine
ports:
- '127.0.0.1:5673:5672'
- '127.0.0.1:15673:15672'

shell:
build:
Expand Down
3 changes: 2 additions & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pytest-env==1.1.3
pytest-cov==5.0.0
pytest-mock==3.14.0
wheel==0.43.0
time-machine==2.14.1
twine==4.0.2
moto==4.2.14
boto3==1.34.32
boto3==1.34.32
12 changes: 6 additions & 6 deletions ruff.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Enable pycodestyle (`E`) and Pyflakes (`F`) codes by default.
select = ["E", "F"]
ignore = []
lint.select = ["E", "F"]
lint.ignore = []

# Allow autofix for all enabled rules (when `--fix`) is provided.
fixable = ["A", "B", "C", "D", "E", "F", "G", "I", "N", "Q", "S", "T", "W", "ANN", "ARG", "BLE", "COM", "DJ", "DTZ", "EM", "ERA", "EXE", "FBT", "ICN", "INP", "ISC", "NPY", "PD", "PGH", "PIE", "PL", "PT", "PTH", "PYI", "RET", "RSE", "RUF", "SIM", "SLF", "TCH", "TID", "TRY", "UP", "YTT"]
unfixable = []
lint.fixable = ["A", "B", "C", "D", "E", "F", "G", "I", "N", "Q", "S", "T", "W", "ANN", "ARG", "BLE", "COM", "DJ", "DTZ", "EM", "ERA", "EXE", "FBT", "ICN", "INP", "ISC", "NPY", "PD", "PGH", "PIE", "PL", "PT", "PTH", "PYI", "RET", "RSE", "RUF", "SIM", "SLF", "TCH", "TID", "TRY", "UP", "YTT"]
lint.unfixable = []

# Exclude a variety of commonly ignored directories.
exclude = [
Expand Down Expand Up @@ -35,10 +35,10 @@ exclude = [
line-length = 88

# Allow unused variables when underscore-prefixed.
dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
lint.dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"

target-version = "py38"

[mccabe]
[lint.mccabe]
# Unlike Flake8, default to a complexity level of 10.
max-complexity = 10
3 changes: 1 addition & 2 deletions servicelayer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging

__version__ = "1.22.2"

__version__ = "1.23.0-rc36"

logging.getLogger("boto3").setLevel(logging.WARNING)
logging.getLogger("botocore").setLevel(logging.WARNING)
2 changes: 1 addition & 1 deletion servicelayer/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def apply_task_context(task, **kwargs):
start_time=time.time(),
trace_id=str(uuid.uuid4()),
retry=unpack_int(task.context.get("retries")),
**kwargs
**kwargs,
)


Expand Down
56 changes: 56 additions & 0 deletions servicelayer/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from prometheus_client import (
Counter,
Histogram,
REGISTRY,
GC_COLLECTOR,
PROCESS_COLLECTOR,
)

# These definitions should be moved as close to the place
# where they are used as possible. However, since we
# support both a homebrewed Worker and one based on
# RabbitMQ, these definitions would come into conflict.

REGISTRY.unregister(GC_COLLECTOR)
REGISTRY.unregister(PROCESS_COLLECTOR)

TASKS_STARTED = Counter(
"servicelayer_tasks_started_total",
"Number of tasks that a worker started processing",
["stage"],
)

TASKS_SUCCEEDED = Counter(
"servicelayer_tasks_succeeded_total",
"Number of successfully processed tasks",
["stage", "retries"],
)

TASKS_FAILED = Counter(
"servicelayer_tasks_failed_total",
"Number of failed tasks",
["stage", "retries", "failed_permanently"],
)

TASK_DURATION = Histogram(
"servicelayer_task_duration_seconds",
"Task duration in seconds",
["stage"],
# The bucket sizes are a rough guess right now, we might want to adjust
# them later based on observed runtimes
buckets=[
0.25,
0.5,
1,
5,
15,
30,
60,
60 * 15,
60 * 30,
60 * 60,
60 * 60 * 2,
60 * 60 * 6,
60 * 60 * 24,
],
)
4 changes: 1 addition & 3 deletions servicelayer/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@
RABBITMQ_BLOCKED_CONNECTION_TIMEOUT = env.to_int(
"RABBITMQ_BLOCKED_CONNECTION_TIMEOUT", 300
)
QUEUE_ALEPH = "aleph_queue"
QUEUE_INGEST = "ingest_queue"
QUEUE_INDEX = "index_queue"
RABBITMQ_MAX_PRIORITY = 10

# Sentry
SENTRY_DSN = env.get("SENTRY_DSN")
Expand Down
Loading

0 comments on commit 267f63d

Please sign in to comment.