Skip to content

Commit 13b57c4

Browse files
committed
support routing stale messages to lowpri topic
the sentry consumer configuration now supports an optional `stale_topic`. if passed, all invalid message rejected with reason "stale" get routed to the stale topic. raising stale message exceptions is configured for the transactions consumer
1 parent da1e5af commit 13b57c4

File tree

3 files changed

+149
-19
lines changed

3 files changed

+149
-19
lines changed

src/sentry/consumers/__init__.py

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,10 @@
66

77
import click
88
from arroyo.backends.abstract import Consumer
9-
from arroyo.backends.kafka import KafkaProducer
109
from arroyo.backends.kafka.configuration import build_kafka_consumer_configuration
1110
from arroyo.backends.kafka.consumer import KafkaConsumer
1211
from arroyo.commit import ONCE_PER_SECOND
13-
from arroyo.dlq import DlqLimit, DlqPolicy, KafkaDlqProducer
12+
from arroyo.dlq import DlqLimit, DlqPolicy
1413
from arroyo.processing.processor import StreamProcessor
1514
from arroyo.processing.strategies import Healthcheck
1615
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
@@ -22,11 +21,12 @@
2221
Topic,
2322
validate_consumer_definition,
2423
)
24+
from sentry.consumers.dlq import DlqStaleMessagesStrategyFactoryWrapper, build_dlq_producer
2525
from sentry.consumers.validate_schema import ValidateSchema
2626
from sentry.eventstream.types import EventStreamEventType
2727
from sentry.ingest.types import ConsumerType
2828
from sentry.utils.imports import import_string
29-
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
29+
from sentry.utils.kafka_config import get_topic_definition
3030

3131
logger = logging.getLogger(__name__)
3232

@@ -339,6 +339,7 @@ def ingest_transactions_options() -> list[click.Option]:
339339
"strategy_factory": "sentry.ingest.consumer.factory.IngestTransactionsStrategyFactory",
340340
"click_options": ingest_transactions_options(),
341341
"dlq_topic": Topic.INGEST_TRANSACTIONS_DLQ,
342+
"stale_topic": Topic.INGEST_TRANSACTIONS_DLQ,
342343
},
343344
"ingest-metrics": {
344345
"topic": Topic.INGEST_METRICS,
@@ -437,6 +438,8 @@ def get_stream_processor(
437438
synchronize_commit_group: str | None = None,
438439
healthcheck_file_path: str | None = None,
439440
enable_dlq: bool = True,
441+
# If set, messages above this age will be rerouted to the stale topic if one is configured
442+
stale_threshold_sec: int | None = None,
440443
enforce_schema: bool = False,
441444
group_instance_id: str | None = None,
442445
) -> StreamProcessor:
@@ -546,31 +549,31 @@ def build_consumer_config(group_id: str):
546549
consumer_topic.value, enforce_schema, strategy_factory
547550
)
548551

552+
if stale_threshold_sec:
553+
strategy_factory = DlqStaleMessagesStrategyFactoryWrapper(
554+
stale_threshold_sec, strategy_factory
555+
)
556+
549557
if healthcheck_file_path is not None:
550558
strategy_factory = HealthcheckStrategyFactoryWrapper(
551559
healthcheck_file_path, strategy_factory
552560
)
553561

554562
if enable_dlq and consumer_definition.get("dlq_topic"):
555-
try:
556-
dlq_topic = consumer_definition["dlq_topic"]
557-
except KeyError as e:
558-
raise click.BadParameter(
559-
f"Cannot enable DLQ for consumer: {consumer_name}, no DLQ topic has been defined for it"
560-
) from e
561-
try:
562-
dlq_topic_defn = get_topic_definition(dlq_topic)
563-
cluster_setting = dlq_topic_defn["cluster"]
564-
except ValueError as e:
565-
raise click.BadParameter(
566-
f"Cannot enable DLQ for consumer: {consumer_name}, DLQ topic {dlq_topic} is not configured in this environment"
567-
) from e
563+
dlq_topic = consumer_definition["dlq_topic"]
564+
else:
565+
dlq_topic = None
566+
567+
if stale_threshold_sec and consumer_definition.get("stale_topic"):
568+
stale_topic = consumer_definition["stale_topic"]
569+
else:
570+
stale_topic = None
568571

569-
producer_config = get_kafka_producer_cluster_options(cluster_setting)
570-
dlq_producer = KafkaProducer(producer_config)
572+
if enable_dlq or stale_threshold_sec:
573+
dlq_producer = build_dlq_producer(dlq_topic=dlq_topic, stale_topic=stale_topic)
571574

572575
dlq_policy = DlqPolicy(
573-
KafkaDlqProducer(dlq_producer, ArroyoTopic(dlq_topic_defn["real_topic_name"])),
576+
dlq_producer,
574577
DlqLimit(
575578
max_invalid_ratio=consumer_definition.get("dlq_max_invalid_ratio"),
576579
max_consecutive_count=consumer_definition.get("dlq_max_consecutive_count"),

src/sentry/consumers/dlq.py

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
import time
2+
from collections.abc import Callable, Mapping, MutableMapping
3+
from concurrent.futures import Future
4+
from datetime import datetime, timedelta, timezone
5+
from enum import Enum
6+
7+
from arroyo.backends.kafka import KafkaPayload, KafkaProducer
8+
from arroyo.dlq import InvalidMessage, KafkaDlqProducer
9+
from arroyo.processing.strategies.abstract import ProcessingStrategy, ProcessingStrategyFactory
10+
from arroyo.types import FILTERED_PAYLOAD, BrokerValue, Message, Partition
11+
from arroyo.types import Topic as ArroyoTopic
12+
from arroyo.types import Value
13+
14+
from sentry.conf.types.kafka_definition import Topic
15+
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
16+
17+
18+
class RejectReason(Enum):
19+
STALE = "stale"
20+
INVALID = "invalid"
21+
22+
23+
class MultipleDestinationDlqProducer(KafkaDlqProducer):
24+
"""
25+
Produces to either the DLQ or stale message topic depending on the reason.
26+
"""
27+
28+
def __init__(
29+
self,
30+
producers: Mapping[RejectReason, KafkaDlqProducer],
31+
topic_selector: Callable[[BrokerValue[KafkaPayload], str], RejectReason],
32+
) -> None:
33+
self.producers = producers
34+
self.topic_selector = topic_selector
35+
36+
def produce(
37+
self, value: BrokerValue[KafkaPayload], reason: str
38+
) -> Future[BrokerValue[KafkaPayload]]:
39+
return self.producers[self.topic_selector(value, reason)].produce(value)
40+
41+
42+
def _get_dlq_producer(topic: Topic | None) -> KafkaDlqProducer | None:
43+
if topic is None:
44+
return None
45+
46+
topic_defn = get_topic_definition(topic)
47+
config = get_kafka_producer_cluster_options(topic_defn["cluster"])
48+
real_topic = topic_defn["real_topic_name"]
49+
return KafkaDlqProducer(KafkaProducer(config), ArroyoTopic(real_topic))
50+
51+
52+
def build_dlq_producer(
53+
dlq_topic: Topic | None, stale_topic: Topic | None
54+
) -> MultipleDestinationDlqProducer | None:
55+
if dlq_topic is None and stale_topic is None:
56+
return None
57+
58+
producers = {
59+
RejectReason.INVALID: _get_dlq_producer(dlq_topic),
60+
RejectReason.STALE: _get_dlq_producer(stale_topic),
61+
}
62+
63+
return MultipleDestinationDlqProducer(producers)
64+
65+
66+
class DlqStaleMessages(ProcessingStrategy):
67+
def __init__(
68+
self, stale_threshold_sec: int, next_step: ProcessingStrategy[KafkaPayload]
69+
) -> None:
70+
self.stale_threshold_sec = stale_threshold_sec
71+
self.next_step = next_step
72+
73+
# A filtered message is created so we commit periodically if all are stale.
74+
self.last_forwarded_offsets = time.time()
75+
self.offsets_to_forward: MutableMapping[Partition, int] = {}
76+
77+
def submit(self, message: Message[KafkaPayload]) -> None:
78+
min_accepted_timestamp = datetime.now(timezone.utc) - timedelta(
79+
seconds=self.stale_threshold_sec
80+
)
81+
82+
if isinstance(message.value, BrokerValue):
83+
message_timestamp = message.timestamp.astimezone(timezone.utc)
84+
if message_timestamp < min_accepted_timestamp:
85+
self.offsets_to_forward[message.value.partition, message.value.next_offset]
86+
raise InvalidMessage(
87+
message.value.partition, message.value.offset, RejectReason.STALE.value
88+
)
89+
90+
if self.offsets_to_forward and time.time() > self.last_forwarded_offsets + 1:
91+
message = Message(Value(FILTERED_PAYLOAD), self.offsets_to_forward)
92+
self.offsets_to_forward = {}
93+
self.next_step.submit(message)
94+
95+
def poll(self) -> None:
96+
self.next_step.poll()
97+
98+
def join(self, timeout: float | None = None) -> None:
99+
self.next_step.join(timeout)
100+
101+
def close(self) -> None:
102+
self.next_step.close()
103+
104+
def terminate(self) -> None:
105+
self.next_step.terminate()
106+
107+
108+
class DlqStaleMessagesStrategyFactoryWrapper(ProcessingStrategyFactory):
109+
"""
110+
Wrapper used to dlq a message with a stale timestamp before it is passed to
111+
the rest of the pipeline. The InvalidMessage is raised with a
112+
"stale" reason so it can be routed to a separate stale topic.
113+
"""
114+
115+
def __init__(self, stale_threshold_sec: int, inner: ProcessingStrategyFactory) -> None:
116+
self.stale_threshold_sec = stale_threshold_sec
117+
self.inner = inner
118+
119+
def create_with_partitions(self, commit, partitions) -> ProcessingStrategy:
120+
rv = self.inner.create_with_partitions(commit, partitions)
121+
return DlqStaleMessages(self.stale_threshold_sec, rv)

src/sentry/runner/commands/run.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,11 @@ def cron(**options: Any) -> None:
416416
is_flag=True,
417417
default=True,
418418
)
419+
@click.option(
420+
"--stale-threshold-sec",
421+
type=click.IntRange(min=300),
422+
help="Routes stale messages to stale topic if provided.",
423+
)
419424
@click.option(
420425
"--log-level",
421426
type=click.Choice(["debug", "info", "warning", "error", "critical"], case_sensitive=False),
@@ -500,6 +505,7 @@ def dev_consumer(consumer_names: tuple[str, ...]) -> None:
500505
synchronize_commit_group=None,
501506
synchronize_commit_log_topic=None,
502507
enable_dlq=False,
508+
stale_threshold_sec=None,
503509
healthcheck_file_path=None,
504510
enforce_schema=True,
505511
)

0 commit comments

Comments
 (0)