Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Messages] Sync job #285

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@
Changelog
=========

Version 0.3.2
=============

This release fixes a synchronisation delay issue. The pending message job blocked
while waiting for the last pending messages in the queue to be processed. This caused
a delay of several hours until the job could loop again on the pending messages collection
and start processing new pending messages. We removed the blocking synchronisation point
and now let the job pick up new pending messages while ignoring the ones that are already
being processed.

Version 0.3.1
=============

Expand Down
2 changes: 1 addition & 1 deletion deployment/samples/docker-compose/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ volumes:
services:
pyaleph:
restart: always
image: alephim/pyaleph-node:v0.3.1
image: alephim/pyaleph-node:v0.3.2
command: --config /opt/pyaleph/config.yml --key-dir /opt/pyaleph/keys -v
ports:
- "127.0.0.1:8000:8000/tcp"
Expand Down
2 changes: 1 addition & 1 deletion deployment/samples/docker-monitoring/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ volumes:
services:
pyaleph:
restart: always
image: alephim/pyaleph-node:v0.3.1
image: alephim/pyaleph-node:v0.3.2
command: --config /opt/pyaleph/config.yml --key-dir /opt/pyaleph/keys -vv
ports:
- "127.0.0.1:8000:8000/tcp"
Expand Down
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,5 +293,5 @@


rst_epilog = """
.. |pyaleph_version| replace:: v0.3.1
.. |pyaleph_version| replace:: v0.3.2
"""
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ testing =
pytest-aiohttp
pytest-asyncio
pytest-mock
types-pytz
types-requests
types-setuptools
nuls2 =
Expand Down
1 change: 1 addition & 0 deletions src/aleph/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def get_defaults():
"gateway_port": 8080,
"id": None,
"alive_topic": "ALEPH_ALIVE",
"sync_topic": "ALEPH_SYNC",
"reconnect_delay": 60,
"peers": [
"/dnsaddr/api1.aleph.im/ipfs/12D3KooWNgogVS6o8fVsPdzh2FJpCdJJLVSgJT38XGE1BJoCerHx",
Expand Down
6 changes: 6 additions & 0 deletions src/aleph/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from aleph.jobs.process_pending_messages import pending_messages_subprocess, retry_messages_task
from aleph.jobs.process_pending_txs import pending_txs_subprocess, handle_txs_task
from aleph.jobs.sync_unconfirmed_messages import sync_unconfirmed_messages_subprocess
from aleph.jobs.reconnect_ipfs import reconnect_ipfs_job

LOGGER = logging.getLogger("jobs")
Expand Down Expand Up @@ -32,8 +33,13 @@ def start_jobs(
target=pending_txs_subprocess,
args=(config_values, api_servers),
)
sync_unconfirmed_messages_process = Process(
target=sync_unconfirmed_messages_subprocess,
args=(config_values, api_servers),
)
p1.start()
p2.start()
sync_unconfirmed_messages_process.start()
else:
tasks.append(retry_messages_task(config=config, shared_stats=shared_stats))
tasks.append(handle_txs_task(config))
Expand Down
43 changes: 36 additions & 7 deletions src/aleph/jobs/process_pending_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ async def process_message_job_results(
finished_tasks: Set[asyncio.Task],
task_message_dict: Dict[asyncio.Task, Dict],
shared_stats: Dict[str, Any],
processing_messages: Set[Tuple],
):
await process_job_results(
finished_tasks,
Expand All @@ -84,6 +85,9 @@ async def process_message_job_results(
shared_stats["retry_messages_job_tasks"] -= 1
shared_stats["message_jobs"][message_type] -= 1

pending_message_id = get_pending_message_id(pending)
processing_messages.remove(pending_message_id)

del task_message_dict[message_task]


Expand All @@ -100,12 +104,26 @@ def validate_pending_message(pending: Dict):
)


def get_pending_message_id(pending_message: Dict) -> Tuple:
source = pending_message.get("source", {})
chain_name = source.get("chain_name", None)
height = source.get("height", None)

return (
pending_message["message"]["item_hash"],
pending_message["message"]["sender"],
chain_name,
height,
)


async def process_pending_messages(config: Config, shared_stats: Dict):
"""
Processes all the messages in the pending message queue.
"""

seen_ids: Dict[Tuple, int] = dict()
processing_messages = set()
find_params: Dict = {}

max_concurrent_tasks = config.aleph.jobs.pending_messages.max_concurrency.value
Expand All @@ -116,21 +134,30 @@ async def process_pending_messages(config: Config, shared_stats: Dict):
for message_type in MessageType:
shared_stats["message_jobs"][message_type] = 0

# Using a set is required as asyncio.wait takes and returns sets.
pending_tasks: Set[asyncio.Task] = set()
task_message_dict: Dict[asyncio.Task, Dict] = {}

while await PendingMessage.collection.count_documents(find_params):
# Using a set is required as asyncio.wait takes and returns sets.
pending_tasks: Set[asyncio.Task] = set()
task_message_dict: Dict[asyncio.Task, Dict] = {}

async for pending in PendingMessage.collection.find(find_params).sort(
[("retries", ASCENDING), ("message.time", ASCENDING)]
).batch_size(max_concurrent_tasks):

# Check if the message is already processing
pending_message_id = get_pending_message_id(pending)
if pending_message_id in processing_messages:
# Skip the message, we're already processing it
continue

processing_messages.add(pending_message_id)

if len(pending_tasks) == max_concurrent_tasks:
finished_tasks, pending_tasks = await asyncio.wait(
pending_tasks, return_when=asyncio.FIRST_COMPLETED
)
await process_message_job_results(
finished_tasks, task_message_dict, shared_stats
finished_tasks, task_message_dict, shared_stats, processing_messages
)

validate_pending_message(pending)
Expand All @@ -148,13 +175,15 @@ async def process_pending_messages(config: Config, shared_stats: Dict):
pending_tasks.add(message_task)
task_message_dict[message_task] = pending

# Wait for the last tasks
# This synchronization point is required when a few pending messages remain.
# We wait for at least one task to finish; the remaining tasks will be collected
# on the next iterations of the loop.
if pending_tasks:
finished_tasks, _ = await asyncio.wait(
pending_tasks, return_when=asyncio.ALL_COMPLETED
pending_tasks, return_when=asyncio.FIRST_COMPLETED
)
await process_message_job_results(
finished_tasks, task_message_dict, shared_stats
finished_tasks, task_message_dict, shared_stats, processing_messages
)

# TODO: move this to a dedicated job and/or check unicity on insertion
Expand Down
Loading