@@ -35,39 +35,42 @@ final class KafkaRecordTracker {
35
35
private AtomicLong total ;
36
36
private ConcurrentLinkedQueue <EventBatch > failed ;
37
37
private volatile Map <TopicPartition , OffsetAndMetadata > offsets ;
38
+ private Collection <TopicPartition > partitions ;
38
39
39
40
public KafkaRecordTracker () {
40
41
all = new ConcurrentHashMap <>();
41
42
failed = new ConcurrentLinkedQueue <>();
42
43
total = new AtomicLong ();
43
44
offsets = new HashMap <>();
45
+ partitions = new ArrayList <TopicPartition >();
44
46
}
45
47
48
+ /**
49
+ * Remove acked events and update the corresponding offsets finding the
50
+ * lowest consecutive HEC-commited offsets.
51
+ *
52
+ * @param batches the acked event batches
53
+ */
46
54
public void removeAckedEventBatches (final List <EventBatch > batches ) {
47
- for (final EventBatch batch : batches ) {
48
- //log.debug("Processing batch {}", batch.getUUID());
49
- removeAckedEventBatch (batch );
50
- }
51
- }
52
-
53
- public void removeAckedEventBatch (final EventBatch batch ) {
54
- final List <Event > events = batch .getEvents ();
55
- final Event event = events .get (0 );
56
- if (event .getTied () instanceof SinkRecord ) {
57
- final SinkRecord record = (SinkRecord ) event .getTied ();
58
- TopicPartition tp = new TopicPartition (record .topic (), record .kafkaPartition ());
59
- //log.debug("Processing topic {} partition {}", record.topic(), record.kafkaPartition());
55
+ log .debug ("received acked event batches={}" , batches );
56
+ /* Loop all *assigned* partitions to find the lowest consecutive
57
+ * HEC-commited offsets. A batch could contain events coming from a
58
+ * variety of topic/partitions, and scanning those events coulb be
59
+ * expensive.
60
+ * Note that if some events are tied to an unassigned partition those
61
+ * offsets won't be able to be commited.
62
+ */
63
+ for (TopicPartition tp : partitions ) {
60
64
TreeMap <Long , EventBatch > tpRecords = all .get (tp );
61
65
if (tpRecords == null ) {
62
- log .error ("KafkaRecordTracker removing a batch in an unknown partition {} {} {}" , record .topic (), record .kafkaPartition (), record .kafkaOffset ());
63
- return ;
66
+ continue ; // nothing to remove in this case
64
67
}
65
68
long offset = -1 ;
66
69
Iterator <Map .Entry <Long , EventBatch >> iter = tpRecords .entrySet ().iterator ();
67
70
for (; iter .hasNext ();) {
68
71
Map .Entry <Long , EventBatch > e = iter .next ();
69
72
if (e .getValue ().isCommitted ()) {
70
- // log.debug("processing offset {}", e.getKey());
73
+ log .debug ("processing offset {}" , e .getKey ());
71
74
offset = e .getKey ();
72
75
iter .remove ();
73
76
total .decrementAndGet ();
@@ -76,11 +79,7 @@ public void removeAckedEventBatch(final EventBatch batch) {
76
79
}
77
80
}
78
81
if (offset >= 0 ) {
79
- if (offsets .containsKey (tp )) {
80
- offsets .replace (tp , new OffsetAndMetadata (offset + 1 ));
81
- } else {
82
- offsets .put (tp , new OffsetAndMetadata (offset + 1 ));
83
- }
82
+ offsets .put (tp , new OffsetAndMetadata (offset + 1 ));
84
83
}
85
84
}
86
85
}
@@ -116,21 +115,77 @@ public Collection<EventBatch> getAndRemoveFailedRecords() {
116
115
Collection <EventBatch > records = new ArrayList <>();
117
116
while (!failed .isEmpty ()) {
118
117
final EventBatch batch = failed .poll ();
118
+ /* Don't return null batches. */
119
119
if (batch != null ) {
120
+ /* Purge events from closed partitions because it won't be
121
+ * possible to commit their offsets. */
122
+ batch .getEvents ().removeIf (e -> !partitions .contains (getPartitionFromEvent (e )));
120
123
records .add (batch );
121
124
}
122
125
}
123
126
return records ;
124
127
}
125
128
126
- // Loop through all SinkRecords for all topic partitions to
127
- // find all lowest consecutive committed offsets, calculate
128
- // the topic/partition offsets and then remove them
129
+ /**
130
+ * Return offsets computed when event batches are acked.
131
+ *
132
+ * @return map of topic/partition to offset/metadata
133
+ */
129
134
public Map <TopicPartition , OffsetAndMetadata > computeOffsets () {
130
135
return offsets ;
131
136
}
132
137
133
138
public long totalEvents () {
134
139
return total .get ();
135
140
}
141
+
142
+ public void open (Collection <TopicPartition > partitions ) {
143
+ this .partitions .addAll (partitions );
144
+ log .debug ("open partitions={} so currently assigned partitions={}" ,
145
+ partitions , this .partitions );
146
+ }
147
+
148
+ public void close (Collection <TopicPartition > partitions ) {
149
+ this .partitions .removeAll (partitions );
150
+ log .debug ("close partitions={} so currently assigned partitions={}" ,
151
+ partitions , this .partitions );
152
+ cleanupAfterClosedPartitions (partitions );
153
+ }
154
+
155
+ private TopicPartition getPartitionFromEvent (Event event ) {
156
+ if (event .getTied () instanceof SinkRecord ) {
157
+ final SinkRecord r = (SinkRecord ) event .getTied ();
158
+ return new TopicPartition (r .topic (), r .kafkaPartition ());
159
+ } else return null ;
160
+ }
161
+
162
+ /**
163
+ * Clean up and purge all things related to a partition that's closed (i.e.
164
+ * became unassigned) to this task and reported via SinkTask.close(). This
165
+ * avoids race conditions related to late received acks after a partition
166
+ * rebalance.
167
+ *
168
+ * @param partitions partition closed and now unassigned for this task
169
+ */
170
+ public void cleanupAfterClosedPartitions (Collection <TopicPartition > partitions )
171
+ {
172
+ /* Purge offsets. */
173
+ offsets .keySet ().removeAll (partitions );
174
+ log .warn ("purge offsets for closed partitions={} leaving offsets={}" ,
175
+ partitions , offsets );
176
+
177
+ /* Count and purge outstanding event topic/partition records. */
178
+ long countOfEventsToRemove = partitions .stream ()
179
+ .map (tp -> all .get (tp )) // get unassigned topic/partition records
180
+ .filter (Objects ::nonNull ) // filter out null values
181
+ .map (tpr -> tpr .size ()) // get number of tp records
182
+ .mapToInt (Integer ::intValue ) // map to int
183
+ .sum ();
184
+ if (countOfEventsToRemove > 0 ) {
185
+ log .warn ("purge events={} from closed partitions={}" ,
186
+ countOfEventsToRemove , partitions );
187
+ all .keySet ().removeAll (partitions );
188
+ total .addAndGet (-1L * countOfEventsToRemove );
189
+ }
190
+ }
136
191
}
0 commit comments