1
+ import logging
1
2
import time
2
- from collections .abc import Callable , Mapping , MutableMapping
3
+ from collections .abc import Mapping , MutableMapping
3
4
from concurrent .futures import Future
4
5
from datetime import datetime , timedelta , timezone
5
6
from enum import Enum
6
7
7
8
from arroyo .backends .kafka import KafkaPayload , KafkaProducer
8
9
from arroyo .dlq import InvalidMessage , KafkaDlqProducer
9
10
from arroyo .processing .strategies .abstract import ProcessingStrategy , ProcessingStrategyFactory
10
- from arroyo .types import FILTERED_PAYLOAD , BrokerValue , Message , Partition
11
+ from arroyo .types import FILTERED_PAYLOAD , BrokerValue , Commit , FilteredPayload , Message , Partition
11
12
from arroyo .types import Topic as ArroyoTopic
12
13
from arroyo .types import Value
13
14
14
15
from sentry .conf .types .kafka_definition import Topic
15
16
from sentry .utils .kafka_config import get_kafka_producer_cluster_options , get_topic_definition
16
17
18
+ logger = logging .getLogger (__name__ )
19
+
17
20
18
21
class RejectReason (Enum ):
19
22
STALE = "stale"
@@ -27,16 +30,30 @@ class MultipleDestinationDlqProducer(KafkaDlqProducer):
27
30
28
31
def __init__ (
29
32
self ,
30
- producers : Mapping [RejectReason , KafkaDlqProducer ],
31
- topic_selector : Callable [[BrokerValue [KafkaPayload ], str ], RejectReason ],
33
+ producers : Mapping [RejectReason , KafkaDlqProducer | None ],
32
34
) -> None :
33
35
self .producers = producers
34
- self .topic_selector = topic_selector
35
36
36
37
def produce (
37
- self , value : BrokerValue [KafkaPayload ], reason : str
38
+ self ,
39
+ value : BrokerValue [KafkaPayload ],
40
+ reason : str | None = None ,
38
41
) -> Future [BrokerValue [KafkaPayload ]]:
39
- return self .producers [self .topic_selector (value , reason )].produce (value )
42
+ try :
43
+ reject_reason = RejectReason (reason )
44
+ producer = self .producers .get (reject_reason )
45
+ except ValueError :
46
+ producer = None
47
+
48
+ if producer :
49
+ return producer .produce (value )
50
+ else :
51
+ # No DLQ producer configured for the reason.
52
+ logger .error ("No DLQ producer configured for reason %s" , reason )
53
+ future : Future [BrokerValue [KafkaPayload ]] = Future ()
54
+ future .set_running_or_notify_cancel ()
55
+ future .set_result (value )
56
+ return future
40
57
41
58
42
59
def _get_dlq_producer (topic : Topic | None ) -> KafkaDlqProducer | None :
@@ -50,7 +67,8 @@ def _get_dlq_producer(topic: Topic | None) -> KafkaDlqProducer | None:
50
67
51
68
52
69
def build_dlq_producer (
53
- dlq_topic : Topic | None , stale_topic : Topic | None
70
+ dlq_topic : Topic | None ,
71
+ stale_topic : Topic | None ,
54
72
) -> MultipleDestinationDlqProducer | None :
55
73
if dlq_topic is None and stale_topic is None :
56
74
return None
@@ -63,9 +81,11 @@ def build_dlq_producer(
63
81
return MultipleDestinationDlqProducer (producers )
64
82
65
83
66
- class DlqStaleMessages (ProcessingStrategy ):
84
+ class DlqStaleMessages (ProcessingStrategy [ KafkaPayload ] ):
67
85
def __init__ (
68
- self , stale_threshold_sec : int , next_step : ProcessingStrategy [KafkaPayload ]
86
+ self ,
87
+ stale_threshold_sec : int ,
88
+ next_step : ProcessingStrategy [KafkaPayload | FilteredPayload ],
69
89
) -> None :
70
90
self .stale_threshold_sec = stale_threshold_sec
71
91
self .next_step = next_step
@@ -80,17 +100,18 @@ def submit(self, message: Message[KafkaPayload]) -> None:
80
100
)
81
101
82
102
if isinstance (message .value , BrokerValue ):
83
- message_timestamp = message .timestamp .astimezone (timezone .utc )
84
- if message_timestamp < min_accepted_timestamp :
85
- self .offsets_to_forward [message .value .partition , message .value .next_offset ]
103
+ if message .value .timestamp < min_accepted_timestamp :
104
+ self .offsets_to_forward [message .value .partition ] = message .value .next_offset
86
105
raise InvalidMessage (
87
- message .value .partition , message .value .offset , RejectReason .STALE .value
106
+ message .value .partition , message .value .offset , reason = RejectReason .STALE .value
88
107
)
89
108
90
109
if self .offsets_to_forward and time .time () > self .last_forwarded_offsets + 1 :
91
- message = Message (Value (FILTERED_PAYLOAD ) , self .offsets_to_forward )
110
+ filtered_message = Message (Value (FILTERED_PAYLOAD , self .offsets_to_forward ) )
92
111
self .offsets_to_forward = {}
93
- self .next_step .submit (message )
112
+ self .next_step .submit (filtered_message )
113
+
114
+ self .next_step .submit (message )
94
115
95
116
def poll (self ) -> None :
96
117
self .next_step .poll ()
@@ -105,17 +126,23 @@ def terminate(self) -> None:
105
126
self .next_step .terminate ()
106
127
107
128
108
- class DlqStaleMessagesStrategyFactoryWrapper (ProcessingStrategyFactory ):
129
+ class DlqStaleMessagesStrategyFactoryWrapper (ProcessingStrategyFactory [ KafkaPayload ] ):
109
130
"""
110
131
Wrapper used to dlq a message with a stale timestamp before it is passed to
111
132
the rest of the pipeline. The InvalidMessage is raised with a
112
133
"stale" reason so it can be routed to a separate stale topic.
113
134
"""
114
135
115
- def __init__ (self , stale_threshold_sec : int , inner : ProcessingStrategyFactory ) -> None :
136
+ def __init__ (
137
+ self ,
138
+ stale_threshold_sec : int ,
139
+ inner : ProcessingStrategyFactory [KafkaPayload | FilteredPayload ],
140
+ ) -> None :
116
141
self .stale_threshold_sec = stale_threshold_sec
117
142
self .inner = inner
118
143
119
- def create_with_partitions (self , commit , partitions ) -> ProcessingStrategy :
144
+ def create_with_partitions (
145
+ self , commit : Commit , partitions : Mapping [Partition , int ]
146
+ ) -> ProcessingStrategy [KafkaPayload ]:
120
147
rv = self .inner .create_with_partitions (commit , partitions )
121
148
return DlqStaleMessages (self .stale_threshold_sec , rv )
0 commit comments