Skip to content

Commit 82bbb43

Browse files
lynnagaraandrewshie-sentry
authored andcommitted
support routing stale messages to lowpri topic (#82322)
the sentry consumer configuration now supports an optional `stale_topic`. if passed, all invalid message rejected with reason "stale" get routed to the stale topic. raising stale message exceptions is configured for the transactions consumer
1 parent dfaa51b commit 82bbb43

File tree

6 files changed

+258
-19
lines changed

6 files changed

+258
-19
lines changed

src/sentry/conf/types/kafka_definition.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ class ConsumerDefinition(TypedDict, total=False):
8686
dlq_max_invalid_ratio: float | None
8787
dlq_max_consecutive_count: int | None
8888

89+
stale_topic: Topic
90+
8991

9092
def validate_consumer_definition(consumer_definition: ConsumerDefinition) -> None:
9193
if "dlq_topic" not in consumer_definition and (

src/sentry/consumers/__init__.py

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,10 @@
66

77
import click
88
from arroyo.backends.abstract import Consumer
9-
from arroyo.backends.kafka import KafkaProducer
109
from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration
1110
from arroyo.backends.kafka.consumer import KafkaConsumer
1211
from arroyo.commit import ONCE_PER_SECOND
13-
from arroyo.dlq import DlqLimit, DlqPolicy, KafkaDlqProducer
12+
from arroyo.dlq import DlqLimit, DlqPolicy
1413
from arroyo.processing.processor import StreamProcessor
1514
from arroyo.processing.strategies import Healthcheck
1615
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
@@ -22,11 +21,12 @@
2221
Topic,
2322
validate_consumer_definition,
2423
)
24+
from sentry.consumers.dlq import DlqStaleMessagesStrategyFactoryWrapper, maybe_build_dlq_producer
2525
from sentry.consumers.validate_schema import ValidateSchema
2626
from sentry.eventstream.types import EventStreamEventType
2727
from sentry.ingest.types import ConsumerType
2828
from sentry.utils.imports import import_string
29-
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
29+
from sentry.utils.kafka_config import get_topic_definition
3030

3131
logger = logging.getLogger(__name__)
3232

@@ -371,6 +371,7 @@ def ingest_transactions_options() -> list[click.Option]:
371371
"strategy_factory": "sentry.ingest.consumer.factory.IngestTransactionsStrategyFactory",
372372
"click_options": ingest_transactions_options(),
373373
"dlq_topic": Topic.INGEST_TRANSACTIONS_DLQ,
374+
"stale_topic": Topic.INGEST_TRANSACTIONS_DLQ,
374375
},
375376
"ingest-metrics": {
376377
"topic": Topic.INGEST_METRICS,
@@ -469,6 +470,8 @@ def get_stream_processor(
469470
synchronize_commit_group: str | None = None,
470471
healthcheck_file_path: str | None = None,
471472
enable_dlq: bool = True,
473+
# If set, messages above this age will be rerouted to the stale topic if one is configured
474+
stale_threshold_sec: int | None = None,
472475
enforce_schema: bool = False,
473476
group_instance_id: str | None = None,
474477
) -> StreamProcessor:
@@ -578,37 +581,38 @@ def build_consumer_config(group_id: str):
578581
consumer_topic.value, enforce_schema, strategy_factory
579582
)
580583

584+
if stale_threshold_sec:
585+
strategy_factory = DlqStaleMessagesStrategyFactoryWrapper(
586+
stale_threshold_sec, strategy_factory
587+
)
588+
581589
if healthcheck_file_path is not None:
582590
strategy_factory = HealthcheckStrategyFactoryWrapper(
583591
healthcheck_file_path, strategy_factory
584592
)
585593

586594
if enable_dlq and consumer_definition.get("dlq_topic"):
587-
try:
588-
dlq_topic = consumer_definition["dlq_topic"]
589-
except KeyError as e:
590-
raise click.BadParameter(
591-
f"Cannot enable DLQ for consumer: {consumer_name}, no DLQ topic has been defined for it"
592-
) from e
593-
try:
594-
dlq_topic_defn = get_topic_definition(dlq_topic)
595-
cluster_setting = dlq_topic_defn["cluster"]
596-
except ValueError as e:
597-
raise click.BadParameter(
598-
f"Cannot enable DLQ for consumer: {consumer_name}, DLQ topic {dlq_topic} is not configured in this environment"
599-
) from e
595+
dlq_topic = consumer_definition["dlq_topic"]
596+
else:
597+
dlq_topic = None
598+
599+
if stale_threshold_sec and consumer_definition.get("stale_topic"):
600+
stale_topic = consumer_definition["stale_topic"]
601+
else:
602+
stale_topic = None
600603

601-
producer_config = get_kafka_producer_cluster_options(cluster_setting)
602-
dlq_producer = KafkaProducer(producer_config)
604+
dlq_producer = maybe_build_dlq_producer(dlq_topic=dlq_topic, stale_topic=stale_topic)
603605

606+
if dlq_producer:
604607
dlq_policy = DlqPolicy(
605-
KafkaDlqProducer(dlq_producer, ArroyoTopic(dlq_topic_defn["real_topic_name"])),
608+
dlq_producer,
606609
DlqLimit(
607610
max_invalid_ratio=consumer_definition.get("dlq_max_invalid_ratio"),
608611
max_consecutive_count=consumer_definition.get("dlq_max_consecutive_count"),
609612
),
610613
None,
611614
)
615+
612616
else:
613617
dlq_policy = None
614618

src/sentry/consumers/dlq.py

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
import logging
2+
import time
3+
from collections.abc import Mapping, MutableMapping
4+
from concurrent.futures import Future
5+
from datetime import datetime, timedelta, timezone
6+
from enum import Enum
7+
8+
from arroyo.backends.kafka import KafkaPayload, KafkaProducer
9+
from arroyo.dlq import InvalidMessage, KafkaDlqProducer
10+
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
11+
from arroyo.types import FILTERED_PAYLOAD, BrokerValue, Commit, FilteredPayload, Message, Partition
12+
from arroyo.types import Topic as ArroyoTopic
13+
from arroyo.types import Value
14+
15+
from sentry.conf.types.kafka_definition import Topic
16+
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
17+
18+
logger = logging.getLogger(__name__)
19+
20+
21+
class RejectReason(Enum):
22+
STALE = "stale"
23+
INVALID = "invalid"
24+
25+
26+
class MultipleDestinationDlqProducer(KafkaDlqProducer):
27+
"""
28+
Produces to either the DLQ or stale message topic depending on the reason.
29+
"""
30+
31+
def __init__(
32+
self,
33+
producers: Mapping[RejectReason, KafkaDlqProducer | None],
34+
) -> None:
35+
self.producers = producers
36+
37+
def produce(
38+
self,
39+
value: BrokerValue[KafkaPayload],
40+
reason: str | None = None,
41+
) -> Future[BrokerValue[KafkaPayload]]:
42+
43+
reject_reason = RejectReason(reason) if reason else RejectReason.INVALID
44+
producer = self.producers.get(reject_reason)
45+
46+
if producer:
47+
return producer.produce(value)
48+
else:
49+
# No DLQ producer configured for the reason.
50+
logger.error("No DLQ producer configured for reason %s", reason)
51+
future: Future[BrokerValue[KafkaPayload]] = Future()
52+
future.set_running_or_notify_cancel()
53+
future.set_result(value)
54+
return future
55+
56+
57+
def _get_dlq_producer(topic: Topic | None) -> KafkaDlqProducer | None:
58+
if topic is None:
59+
return None
60+
61+
topic_defn = get_topic_definition(topic)
62+
config = get_kafka_producer_cluster_options(topic_defn["cluster"])
63+
real_topic = topic_defn["real_topic_name"]
64+
return KafkaDlqProducer(KafkaProducer(config), ArroyoTopic(real_topic))
65+
66+
67+
def maybe_build_dlq_producer(
68+
dlq_topic: Topic | None,
69+
stale_topic: Topic | None,
70+
) -> MultipleDestinationDlqProducer | None:
71+
if dlq_topic is None and stale_topic is None:
72+
return None
73+
74+
producers = {
75+
RejectReason.INVALID: _get_dlq_producer(dlq_topic),
76+
RejectReason.STALE: _get_dlq_producer(stale_topic),
77+
}
78+
79+
return MultipleDestinationDlqProducer(producers)
80+
81+
82+
class DlqStaleMessages(ProcessingStrategy[KafkaPayload]):
83+
def __init__(
84+
self,
85+
stale_threshold_sec: int,
86+
next_step: ProcessingStrategy[KafkaPayload | FilteredPayload],
87+
) -> None:
88+
self.stale_threshold_sec = stale_threshold_sec
89+
self.next_step = next_step
90+
91+
# A filtered message is created so we commit periodically if all are stale.
92+
self.last_forwarded_offsets = time.time()
93+
self.offsets_to_forward: MutableMapping[Partition, int] = {}
94+
95+
def submit(self, message: Message[KafkaPayload]) -> None:
96+
min_accepted_timestamp = datetime.now(timezone.utc) - timedelta(
97+
seconds=self.stale_threshold_sec
98+
)
99+
100+
if isinstance(message.value, BrokerValue):
101+
if message.value.timestamp < min_accepted_timestamp:
102+
self.offsets_to_forward[message.value.partition] = message.value.next_offset
103+
raise InvalidMessage(
104+
message.value.partition, message.value.offset, reason=RejectReason.STALE.value
105+
)
106+
107+
# If we get a valid message for a partition later, don't emit a filtered message for it
108+
if self.offsets_to_forward:
109+
for partition in message.committable:
110+
self.offsets_to_forward.pop(partition)
111+
112+
self.next_step.submit(message)
113+
114+
def poll(self) -> None:
115+
self.next_step.poll()
116+
117+
# Ensure we commit frequently even if all messages are invalid
118+
if self.offsets_to_forward:
119+
if time.time() > self.last_forwarded_offsets + 1:
120+
filtered_message = Message(Value(FILTERED_PAYLOAD, self.offsets_to_forward))
121+
self.next_step.submit(filtered_message)
122+
self.offsets_to_forward = {}
123+
self.last_forwarded_offsets = time.time()
124+
125+
def join(self, timeout: float | None = None) -> None:
126+
self.next_step.join(timeout)
127+
128+
def close(self) -> None:
129+
self.next_step.close()
130+
131+
def terminate(self) -> None:
132+
self.next_step.terminate()
133+
134+
135+
class DlqStaleMessagesStrategyFactoryWrapper(ProcessingStrategyFactory[KafkaPayload]):
136+
"""
137+
Wrapper used to dlq a message with a stale timestamp before it is passed to
138+
the rest of the pipeline. The InvalidMessage is raised with a
139+
"stale" reason so it can be routed to a separate stale topic.
140+
"""
141+
142+
def __init__(
143+
self,
144+
stale_threshold_sec: int,
145+
inner: ProcessingStrategyFactory[KafkaPayload | FilteredPayload],
146+
) -> None:
147+
self.stale_threshold_sec = stale_threshold_sec
148+
self.inner = inner
149+
150+
def create_with_partitions(
151+
self, commit: Commit, partitions: Mapping[Partition, int]
152+
) -> ProcessingStrategy[KafkaPayload]:
153+
rv = self.inner.create_with_partitions(commit, partitions)
154+
return DlqStaleMessages(self.stale_threshold_sec, rv)

src/sentry/runner/commands/run.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,11 @@ def cron(**options: Any) -> None:
416416
is_flag=True,
417417
default=True,
418418
)
419+
@click.option(
420+
"--stale-threshold-sec",
421+
type=click.IntRange(min=300),
422+
help="Routes stale messages to stale topic if provided. This feature is currently being tested, do not pass in production yet.",
423+
)
419424
@click.option(
420425
"--log-level",
421426
type=click.Choice(["debug", "info", "warning", "error", "critical"], case_sensitive=False),
@@ -500,6 +505,7 @@ def dev_consumer(consumer_names: tuple[str, ...]) -> None:
500505
synchronize_commit_group=None,
501506
synchronize_commit_log_topic=None,
502507
enable_dlq=False,
508+
stale_threshold_sec=None,
503509
healthcheck_file_path=None,
504510
enforce_schema=True,
505511
)

tests/sentry/consumers/__init__.py

Whitespace-only changes.

tests/sentry/consumers/test_dlq.py

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import time
2+
from datetime import datetime, timedelta, timezone
3+
from unittest.mock import Mock
4+
5+
import msgpack
6+
import pytest
7+
from arroyo.backends.kafka import KafkaPayload
8+
from arroyo.dlq import InvalidMessage
9+
from arroyo.types import BrokerValue, Message, Partition, Topic
10+
11+
from sentry.consumers.dlq import DlqStaleMessagesStrategyFactoryWrapper
12+
from sentry.testutils.pytest.fixtures import django_db_all
13+
14+
15+
def make_message(
16+
payload: bytes, partition: Partition, offset: int, timestamp: datetime | None = None
17+
) -> Message:
18+
return Message(
19+
BrokerValue(
20+
KafkaPayload(None, payload, []),
21+
partition,
22+
offset,
23+
timestamp if timestamp else datetime.now(),
24+
)
25+
)
26+
27+
28+
@pytest.mark.parametrize("stale_threshold_sec", [300])
29+
@django_db_all
30+
def test_dlq_stale_messages(factories, stale_threshold_sec) -> None:
31+
# Tests messages that have gotten stale (default longer than 5 minutes)
32+
33+
organization = factories.create_organization()
34+
project = factories.create_project(organization=organization)
35+
36+
empty_event_payload = msgpack.packb(
37+
{
38+
"type": "event",
39+
"project_id": project.id,
40+
"payload": b"{}",
41+
"start_time": int(time.time()),
42+
"event_id": "aaa",
43+
}
44+
)
45+
46+
partition = Partition(Topic("topic"), 0)
47+
offset = 10
48+
inner_factory_mock = Mock()
49+
inner_strategy_mock = Mock()
50+
inner_factory_mock.create_with_partitions = Mock(return_value=inner_strategy_mock)
51+
factory = DlqStaleMessagesStrategyFactoryWrapper(
52+
stale_threshold_sec=stale_threshold_sec,
53+
inner=inner_factory_mock,
54+
)
55+
strategy = factory.create_with_partitions(Mock(), Mock())
56+
57+
for time_diff in range(10, 0, -1):
58+
message = make_message(
59+
empty_event_payload,
60+
partition,
61+
offset - time_diff,
62+
timestamp=datetime.now(timezone.utc) - timedelta(minutes=time_diff),
63+
)
64+
if time_diff < 5:
65+
strategy.submit(message)
66+
else:
67+
with pytest.raises(InvalidMessage) as exc_info:
68+
strategy.submit(message)
69+
70+
assert exc_info.value.partition == partition
71+
assert exc_info.value.offset == offset - time_diff
72+
73+
assert inner_strategy_mock.submit.call_count == 4

0 commit comments

Comments
 (0)