Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🔧 Modify SSE Manager env var defaults, and make setting of locks configurable #999

Merged
merged 11 commits into from
Aug 29, 2024
4 changes: 4 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"autouse",
"basicmessage",
"basicmessages",
"camelcase",
"capi",
"cloudagent",
"cloudapi",
Expand Down Expand Up @@ -59,6 +60,8 @@
"prover",
"psubscribe",
"pydantic",
"PYTHONPATH",
"reqs",
"revoc",
"rpush",
"rrid",
Expand All @@ -71,6 +74,7 @@
"starlette",
"subwallet",
"subwallets",
"teardown",
"tran",
"trustping",
"trustregistry",
Expand Down
1 change: 1 addition & 0 deletions app/tests/e2e/issuer/test_save_exchange_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ async def test_get_cred_exchange_records(
},
)

await asyncio.sleep(0.5) # short sleep to allow records to update
faber_records = (await faber_client.get(CREDENTIALS_BASE_PATH)).json()

faber_cred_ex_response = (
Expand Down
6 changes: 5 additions & 1 deletion app/tests/util/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from app.tests.util.sse_listener import SseListener, SseListenerTimeout
from app.util.tenants import get_wallet_id_from_b64encoded_jwt
from shared import RichAsyncClient
from shared.constants import MAX_EVENT_AGE_SECONDS
from shared.log_config import get_logger
from shared.models.webhook_events import CloudApiTopics

Expand Down Expand Up @@ -45,7 +46,10 @@ async def check_webhook_state(
attempt = 0

while not event and attempt < max_tries:
look_back_duration = min(30, look_back + attempt * max_duration)
look_back_duration = min(
MAX_EVENT_AGE_SECONDS,
look_back + attempt * max_duration,
)
try:
if filter_map:
# Assuming that filter_map contains 1 key-value pair
Expand Down
2 changes: 1 addition & 1 deletion endorser/services/endorsement_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ async def _process_endorsement_requests(self) -> NoReturn:
while True:
try:
batch_keys = self.redis_service.scan_keys(
match_pattern=f"{self.endorse_prefix}:*", count=10000
match_pattern=f"{self.endorse_prefix}:*", count=5000
)
if batch_keys:
attempts_without_events = 0 # Reset the counter
Expand Down
2 changes: 1 addition & 1 deletion scripts/k6/libs/functions.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ export function getTrustRegistryActor(walletName) {
};

const response = http.get(url);
// console.log(`Respone: ${response}`)
// console.log(`Response: ${response}`)
if (response.status === 200) {
// Request was successful
// console.log(`Issuer found for actor_name ${walletName}`);
Expand Down
6 changes: 3 additions & 3 deletions scripts/k6/scenarios/revoke-credentials.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
deleteTenant,
getCredentialDefinitionId,
getWalletIdByWalletName,
revokeCredential,
revokeCredentialAutoPublish,
} from "../libs/functions.js";
import { createIssuerIfNotExists } from "../libs/issuerUtils.js";
import { createSchemaIfNotExists } from "../libs/schemaUtils.js";
Expand Down Expand Up @@ -128,9 +128,9 @@ export default function (data) {

const issuerIndex = __ITER % numIssuers;
const issuer = issuers[issuerIndex];
const revokeCredentialResponse = revokeCredentialAutoPubish(issuer.accessToken, id.credential_exchange_id);
const revokeCredentialResponse = revokeCredentialAutoPublish(issuer.accessToken, id.credential_exchange_id);
check(revokeCredentialResponse, {
"Credential revoked sucessfully": (r) => {
"Credential revoked successfully": (r) => {
if (r.status !== 200) {
throw new Error(`Unexpected response while revoking credential: ${r.response}`);
}
Expand Down
7 changes: 4 additions & 3 deletions shared/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@
LEDGER_REGISTRATION_URL = os.getenv("LEDGER_REGISTRATION_URL", f"{url}:9000/register")

# Sse manager
MAX_EVENT_AGE_SECONDS = float(os.getenv("MAX_EVENT_AGE_SECONDS", "30"))
MAX_QUEUE_SIZE = int(os.getenv("MAX_QUEUE_SIZE", "200"))
QUEUE_CLEANUP_PERIOD = int(os.getenv("QUEUE_CLEANUP_PERIOD", "60"))
MAX_EVENT_AGE_SECONDS = float(os.getenv("MAX_EVENT_AGE_SECONDS", "10"))
MAX_QUEUE_SIZE = int(os.getenv("MAX_QUEUE_SIZE", "2000"))
QUEUE_CLEANUP_PERIOD = int(os.getenv("QUEUE_CLEANUP_PERIOD", "30"))
CLIENT_QUEUE_POLL_PERIOD = float(os.getenv("CLIENT_QUEUE_POLL_PERIOD", "0.2"))
SET_LOCKS = bool(os.getenv("SET_LOCKS", ""))

# Sse
SSE_TIMEOUT = int(
Expand Down
74 changes: 42 additions & 32 deletions webhooks/services/acapy_events_processor.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import asyncio
import datetime
import sys
from typing import Any, Dict, List, NoReturn
from typing import Any, Dict, List, NoReturn, Optional
from uuid import uuid4

import orjson

from shared import APIRouter
from shared.constants import GOVERNANCE_LABEL
from shared.constants import GOVERNANCE_LABEL, SET_LOCKS
from shared.log_config import get_logger
from shared.models.endorsement import (
obfuscate_primary_data_in_payload,
Expand Down Expand Up @@ -146,7 +146,7 @@ async def _process_incoming_events(self) -> NoReturn:
while True:
try:
batch_event_keys = self.redis_service.scan_keys(
match_pattern=self.acapy_redis_prefix, count=10000
match_pattern=self.acapy_redis_prefix, count=5000
)
if batch_event_keys:
attempts_without_events = 0 # Reset the counter
Expand Down Expand Up @@ -189,40 +189,50 @@ def _attempt_process_list_events(self, list_key: str) -> None:
list_key: The Redis key of the list to process.
"""
lock_key = f"lock:{list_key}"
extend_lock_task = None

lock_duration = 2000 # milliseconds
if SET_LOCKS:
lock_duration_ms = 3000 # milliseconds

if self.redis_service.set_lock(lock_key, px=lock_duration):
try:
# Start a background task to extend the lock periodically
# This is just to ensure that on the off chance that 2000ms isn't enough to process all the
# events in the list, we want to avoid replicas processing the same webhook event twice
extend_lock_task = self.redis_service.extend_lock_task(
lock_key, interval=datetime.timedelta(milliseconds=lock_duration)
if self.redis_service.set_lock(lock_key, px=lock_duration_ms):
logger.debug("Successfully set lock key for list index: {}", lock_key)
else:
logger.debug(
"Event {} is currently being processed by another instance.",
list_key,
)
return

self._process_list_events(list_key)
except Exception as e: # pylint: disable=W0718
# if this particular event is unprocessable, we should remove it from the inputs, to avoid deadlocking
logger.error("Processing {} raised an exception: {}", list_key, e)
self._handle_unprocessable_event(list_key, e)
finally:
# Cancel the lock extension task if it's still running
if extend_lock_task:
extend_lock_task.cancel()

# Delete lock after processing list, whether it completed or errored:
if self.redis_service.delete_key(lock_key):
logger.debug("Deleted lock key: {}", lock_key)
else:
logger.warning(
"Could not delete lock key: {}. Perhaps it expired?", lock_key
)
else:
logger.debug(
"Event {} is currently being processed by another instance.", list_key
# Start a background task to extend the lock periodically
# This is just to ensure that on the off chance that 3000ms isn't enough to process all the
# events in the list, we want to avoid replicas processing the same webhook event twice
extend_lock_task = self.redis_service.extend_lock_task(
lock_key, interval=datetime.timedelta(milliseconds=lock_duration_ms)
)
else:
extend_lock_task = None

try:
self._process_list_events(list_key)
except Exception as e: # pylint: disable=W0718
# if this particular event is unprocessable, we should remove it from the inputs, to avoid deadlocking
logger.error("Processing {} raised an exception: {}", list_key, e)
self._handle_unprocessable_event(list_key, e)
finally:
if SET_LOCKS:
self._cleanup_lock(lock_key, extend_lock_task)

return

def _cleanup_lock(
self, lock_key: str, extend_lock_task: Optional[asyncio.Task]
) -> None:
if extend_lock_task:
extend_lock_task.cancel()

if not self.redis_service.delete_key(lock_key):
logger.warning("Could not delete lock: {}. Perhaps it expired?", lock_key)
else:
logger.trace("Deleted lock key: {}", lock_key)

def _process_list_events(self, list_key) -> None:
"""
Expand Down
2 changes: 1 addition & 1 deletion webhooks/services/webhooks_redis_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ def get_all_cloudapi_wallet_ids(self) -> List[str]:
next_cursor, keys = self.redis.scan(
cursor=cursor,
match=f"{self.cloudapi_redis_prefix}:*",
count=10000,
count=5000,
target_nodes=RedisCluster.PRIMARIES,
)
if keys:
Expand Down
10 changes: 5 additions & 5 deletions webhooks/tests/routes/test_sse.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ async def test_sse_event_stream_generator_wallet_id_topic_field_desired_state(

@pytest.mark.anyio
@pytest.mark.parametrize("group_id", [None, "correct_group", "wrong_group"])
@pytest.mark.parametrize("look_back", [0.0, 15.0])
@pytest.mark.parametrize("look_back", [0.0, MAX_EVENT_AGE_SECONDS / 2])
async def test_sse_subscribe_wallet(
sse_manager_mock, # pylint: disable=redefined-outer-name
group_id,
Expand Down Expand Up @@ -390,7 +390,7 @@ async def test_sse_subscribe_wallet(

@pytest.mark.anyio
@pytest.mark.parametrize("group_id", [None, "correct_group", "wrong_group"])
@pytest.mark.parametrize("look_back", [0.0, 15.0])
@pytest.mark.parametrize("look_back", [0.0, MAX_EVENT_AGE_SECONDS / 2])
async def test_sse_subscribe_wallet_topic(
sse_manager_mock, # pylint: disable=redefined-outer-name
group_id,
Expand Down Expand Up @@ -451,7 +451,7 @@ async def test_sse_subscribe_wallet_topic(

@pytest.mark.anyio
@pytest.mark.parametrize("group_id", [None, "correct_group", "wrong_group"])
@pytest.mark.parametrize("look_back", [0.0, 15.0])
@pytest.mark.parametrize("look_back", [0.0, MAX_EVENT_AGE_SECONDS / 2])
async def test_sse_subscribe_event_with_state(
sse_manager_mock, # pylint: disable=redefined-outer-name
group_id,
Expand Down Expand Up @@ -515,7 +515,7 @@ async def test_sse_subscribe_event_with_state(

@pytest.mark.anyio
@pytest.mark.parametrize("group_id", [None, "correct_group", "wrong_group"])
@pytest.mark.parametrize("look_back", [0.0, 15.0])
@pytest.mark.parametrize("look_back", [0.0, MAX_EVENT_AGE_SECONDS / 2])
async def test_sse_subscribe_stream_with_fields(
sse_manager_mock, # pylint: disable=redefined-outer-name
group_id,
Expand Down Expand Up @@ -582,7 +582,7 @@ async def test_sse_subscribe_stream_with_fields(

@pytest.mark.anyio
@pytest.mark.parametrize("group_id", [None, "correct_group", "wrong_group"])
@pytest.mark.parametrize("look_back", [0.0, 15.0])
@pytest.mark.parametrize("look_back", [0.0, MAX_EVENT_AGE_SECONDS / 2])
async def test_sse_subscribe_event_with_field_and_state(
sse_manager_mock, # pylint: disable=redefined-outer-name
group_id,
Expand Down
13 changes: 8 additions & 5 deletions webhooks/tests/services/test_acapy_events_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from redis.client import PubSubWorkerThread
from redis.cluster import ClusterPubSub

from shared.constants import GOVERNANCE_LABEL
from shared.constants import GOVERNANCE_LABEL, SET_LOCKS
from webhooks.services.acapy_events_processor import AcaPyEventsProcessor

# pylint: disable=redefined-outer-name
Expand Down Expand Up @@ -163,11 +163,14 @@ async def test_attempt_process_list_events(acapy_events_processor_mock):

acapy_events_processor_mock._attempt_process_list_events(event_key)

acapy_events_processor_mock.redis_service.set_lock.assert_called_with(
lock_key, px=2000
)
acapy_events_processor_mock._process_list_events.assert_called_with(event_key)
acapy_events_processor_mock.redis_service.delete_key.assert_called_with(lock_key)
if SET_LOCKS:
acapy_events_processor_mock.redis_service.set_lock.assert_called_with(
lock_key, px=3000
)
acapy_events_processor_mock.redis_service.delete_key.assert_called_with(
lock_key
)


@pytest.mark.anyio
Expand Down
Loading