Skip to content

Commit

Permalink
tests: Wait events per consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
thibault-p authored and AymericDu committed Feb 28, 2025
1 parent d323ccc commit c559077
Showing 1 changed file with 19 additions and 13 deletions.
32 changes: 19 additions & 13 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from oio.common.http_urllib3 import get_pool_manager
from oio.common.json import json as jsonlib
from oio.common.kafka import DEFAULT_PRESERVED_TOPIC, KafkaConsumer
from oio.common.logger import get_logger
from oio.common.storage_method import STORAGE_METHODS
from oio.event.beanstalk import Beanstalk, ResponseError
from oio.event.evob import Event
Expand Down Expand Up @@ -236,6 +237,8 @@ def setUpClass(cls):
cls._cls_ns = cls._cls_conf["namespace"]
cls._cls_uri = "http://" + cls._cls_conf["proxy"]

cls._cls_logger = get_logger(cls._cls_conf)

cls._consumers = []
cls._cls_kafka_consumer = cls._register_consumer()

Expand Down Expand Up @@ -296,7 +299,7 @@ def setUp(self):
self._deregister_at_teardown = []

self._cached_events = {}
self._used_events = set()
self._used_events = {}

for consumer in self._consumers:
partitions = self._wait_kafka_partition_assignment(kafka_consumer=consumer)
Expand Down Expand Up @@ -871,35 +874,38 @@ def wait_for_event(
if not kafka_consumer:
kafka_consumer = self._cls_kafka_consumer

cached_events = self._cached_events.setdefault(kafka_consumer, {})
used_events = self._used_events.setdefault(kafka_consumer, set())

def match_event(key, event):
if types and event.event_type not in types:
logging.debug("ignore event %s (event mismatch)", event)
self.logger.debug("ignore event %s (event mismatch)", event)
return False
if reqid and event.reqid != reqid:
logging.info("ignore event %s (request_id mismatch)", event)
self.logger.info("ignore event %s (request_id mismatch)", event)
return False
if svcid and event.svcid != svcid:
logging.info("ignore event %s (service_id mismatch)", event)
self.logger.info("ignore event %s (service_id mismatch)", event)
return False
if fields and any(fields[k] != event.url.get(k) for k in fields):
logging.info("ignore event %s (filter mismatch)", event)
self.logger.info("ignore event %s (filter mismatch)", event)
return False
if origin and event.origin != origin:
logging.info("ignore event %s (origin mismatch)", event)
self.logger.info("ignore event %s (origin mismatch)", event)
return False
if data_fields and any(
data_fields[k] != event.data.get(k) for k in data_fields
):
logging.info("ignore event %s (data_fields mismatch)", event)
self.logger.info("ignore event %s (data_fields mismatch)", event)
return False

logging.info("event %s", event)
self._used_events.add(key)
self.logger.info("event %s", event)
used_events.add(key)
return True

# Check if event is already present
for key, event in self._cached_events.items():
if key in self._used_events:
for key, event in cached_events.items():
if key in used_events:
continue
if match_event(key, event):
return event
Expand All @@ -921,12 +927,12 @@ def match_event(key, event):
event_obj.job_id = event.offset()

# Add to cache
self._cached_events[event_key] = event_obj
cached_events[event_key] = event_obj

if match_event(event_key, event_obj):
return event_obj

logging.warning(
self._cls_logger.warning(
"wait_for_kafka_event(reqid=%s, types=%s, svcid=%s, fields=%s,"
" timeout=%s) reached its timeout",
reqid,
Expand Down

0 comments on commit c559077

Please sign in to comment.