Skip to content

Commit 8e000f3

Browse files
srinipunuruxinyuiscool
authored andcommitted
Refactoring the EventHub System Producer into reusable AsyncSystemProducer
Refactoring eventhub system producer into common reusable components 1. AsyncSystemProducer : All the system producers that have real async send API with call backs can use this 2. NoFlushAsyncSystemProducer: All the system producers that have real async call back based send API and doesn't provide flush semantics can use this. The system producers that implement AsyncSystemproducers can be used across Samza and Brooklin. TODO: AsyncSystemProducer needs to be moved to the api layer so that it can be used across different system producers (i.e. eventhub and kinesis) Author: Srinivasulu Punuru <spunuru@linkedin.com> Reviewers: Boris S <boryas@apache.org> Closes apache#458 from srinipunuru/async-prod.3
1 parent aba7393 commit 8e000f3

File tree

3 files changed

+291
-123
lines changed

3 files changed

+291
-123
lines changed
Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.samza.system.eventhub.producer;
21+
22+
import java.time.Duration;
23+
import java.util.HashMap;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.Set;
27+
import java.util.concurrent.CompletableFuture;
28+
import java.util.concurrent.ConcurrentHashMap;
29+
import java.util.concurrent.ExecutionException;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.TimeoutException;
32+
import java.util.concurrent.atomic.AtomicReference;
33+
import java.util.function.Function;
34+
import java.util.stream.Collectors;
35+
import org.apache.samza.SamzaException;
36+
import org.apache.samza.config.Config;
37+
import org.apache.samza.config.StreamConfig;
38+
import org.apache.samza.metrics.Counter;
39+
import org.apache.samza.metrics.MetricsRegistry;
40+
import org.apache.samza.system.OutgoingMessageEnvelope;
41+
import org.apache.samza.system.SystemProducer;
42+
import org.apache.samza.system.eventhub.metrics.SamzaHistogram;
43+
import org.slf4j.Logger;
44+
import org.slf4j.LoggerFactory;
45+
46+
47+
/**
48+
* SystemProducer Helper that can be used to implement SystemProducer that provide async send APIs
49+
* e.g. Kinesis, Event Hubs, etc..
50+
* To implement an AsyncSystemProducer, you need to implement {@link AsyncSystemProducer#sendAsync}
51+
*/
52+
public abstract class AsyncSystemProducer implements SystemProducer {
53+
54+
private static final Logger LOG = LoggerFactory.getLogger(AsyncSystemProducer.class.getName());
55+
56+
private static final long DEFAULT_FLUSH_TIMEOUT_MILLIS = Duration.ofMinutes(1L).toMillis();
57+
58+
/**
59+
* The constant CONFIG_STREAM_LIST. This config is used to get the list of streams produced by this EventHub system.
60+
*/
61+
public static final String CONFIG_STREAM_LIST = "systems.%s.stream.list";
62+
63+
private static final String SEND_ERRORS = "sendErrors";
64+
private static final String SEND_CALLBACK_LATENCY = "sendCallbackLatency";
65+
private static final String SEND_LATENCY = "sendLatency";
66+
67+
/**
68+
* The constant AGGREGATE.
69+
*/
70+
protected static final String AGGREGATE = "aggregate";
71+
72+
/**
73+
* The Stream ids.
74+
*/
75+
protected final List<String> streamIds;
76+
/**
77+
* The Map between Physical streamName to virtual Samza streamids.
78+
*/
79+
protected final Map<String, String> physicalToStreamIds;
80+
81+
/**
82+
* The Samza metrics registry used to store all the metrics emitted.
83+
*/
84+
protected final MetricsRegistry metricsRegistry;
85+
86+
/**
87+
* The Pending futures corresponding to the send calls that are still in flight and for which we haven't
88+
* received the call backs yet.
89+
* This needs to be concurrent because it is accessed across the send/flush thread and callback thread.
90+
*/
91+
protected final Set<CompletableFuture<Void>> pendingFutures = ConcurrentHashMap.newKeySet();
92+
93+
private final AtomicReference<Throwable> sendExceptionOnCallback = new AtomicReference<>(null);
94+
95+
private static Counter aggSendErrors = null;
96+
private static SamzaHistogram aggSendLatency = null;
97+
private static SamzaHistogram aggSendCallbackLatency = null;
98+
99+
private final HashMap<String, SamzaHistogram> sendLatency = new HashMap<>();
100+
private final HashMap<String, SamzaHistogram> sendCallbackLatency = new HashMap<>();
101+
private final HashMap<String, Counter> sendErrors = new HashMap<>();
102+
103+
/**
104+
* Instantiates a new Async system producer.
105+
*
106+
* @param systemName the system name
107+
* @param config the config
108+
* @param metricsRegistry the registry
109+
*/
110+
public AsyncSystemProducer(String systemName, Config config, MetricsRegistry metricsRegistry) {
111+
StreamConfig sconfig = new StreamConfig(config);
112+
streamIds = config.getList(String.format(CONFIG_STREAM_LIST, systemName));
113+
physicalToStreamIds =
114+
streamIds.stream().collect(Collectors.toMap(sconfig::getPhysicalName, Function.identity()));
115+
this.metricsRegistry = metricsRegistry;
116+
}
117+
118+
/**
119+
* {@inheritDoc}
120+
*/
121+
@Override
122+
public synchronized void send(String source, OutgoingMessageEnvelope envelope) {
123+
checkForSendCallbackErrors("Received exception on message send");
124+
125+
String streamName = envelope.getSystemStream().getStream();
126+
String streamId = physicalToStreamIds.getOrDefault(streamName, streamName);
127+
128+
long beforeSendTimeMs = System.currentTimeMillis();
129+
130+
CompletableFuture<Void> sendResult = sendAsync(source, envelope);
131+
132+
long afterSendTimeMs = System.currentTimeMillis();
133+
134+
long latencyMs = afterSendTimeMs - beforeSendTimeMs;
135+
sendLatency.get(streamId).update(latencyMs);
136+
aggSendLatency.update(latencyMs);
137+
138+
pendingFutures.add(sendResult);
139+
140+
// Auto update the metrics and possible throwable when futures are complete.
141+
sendResult.handle((aVoid, throwable) -> {
142+
pendingFutures.remove(sendResult);
143+
long callbackLatencyMs = System.currentTimeMillis() - afterSendTimeMs;
144+
sendCallbackLatency.get(streamId).update(callbackLatencyMs);
145+
aggSendCallbackLatency.update(callbackLatencyMs);
146+
if (throwable != null) {
147+
sendErrors.get(streamId).inc();
148+
aggSendErrors.inc();
149+
LOG.error("Send message to event hub: {} failed with exception: ", streamId, throwable);
150+
sendExceptionOnCallback.compareAndSet(null, throwable);
151+
}
152+
return aVoid;
153+
});
154+
}
155+
156+
public void start() {
157+
streamIds.forEach(streamId -> {
158+
sendCallbackLatency.put(streamId, new SamzaHistogram(metricsRegistry, streamId, SEND_CALLBACK_LATENCY));
159+
sendLatency.put(streamId, new SamzaHistogram(metricsRegistry, streamId, SEND_LATENCY));
160+
sendErrors.put(streamId, metricsRegistry.newCounter(streamId, SEND_ERRORS));
161+
});
162+
163+
if (aggSendLatency == null) {
164+
aggSendLatency = new SamzaHistogram(metricsRegistry, AGGREGATE, SEND_LATENCY);
165+
aggSendCallbackLatency = new SamzaHistogram(metricsRegistry, AGGREGATE, SEND_CALLBACK_LATENCY);
166+
aggSendErrors = metricsRegistry.newCounter(AGGREGATE, SEND_ERRORS);
167+
}
168+
}
169+
170+
/**
171+
* Default implementation of the flush that just waits for all the pendingFutures to be complete.
172+
* SystemProducer should override this, if the underlying system provides flush semantics.
173+
* @param source String representing the source of the message.
174+
*/
175+
@Override
176+
public synchronized void flush(String source) {
177+
long incompleteSends = pendingFutures.stream().filter(x -> !x.isDone()).count();
178+
LOG.info("Trying to flush pending {} sends.", incompleteSends);
179+
checkForSendCallbackErrors("Received exception on message send.");
180+
CompletableFuture<Void> future =
181+
CompletableFuture.allOf(pendingFutures.toArray(new CompletableFuture[pendingFutures.size()]));
182+
183+
try {
184+
// Block until all the pending sends are complete or timeout.
185+
future.get(DEFAULT_FLUSH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
186+
} catch (InterruptedException | ExecutionException | TimeoutException e) {
187+
incompleteSends = pendingFutures.stream().filter(x -> !x.isDone()).count();
188+
String msg = String.format("Flush failed with error. Total pending sends %d", incompleteSends);
189+
LOG.error(msg, e);
190+
throw new SamzaException(msg, e);
191+
}
192+
193+
pendingFutures.clear();
194+
195+
checkForSendCallbackErrors("Sending one or more of the messages failed during flush.");
196+
}
197+
198+
/**
199+
* Sends a specified message envelope from a specified Samza source.
200+
* @param source String representing the source of the message.
201+
* @param envelope Aggregate object representing the serialized message to send from the source.
202+
* @return the completable future for the async send call
203+
*/
204+
public abstract CompletableFuture<Void> sendAsync(String source, OutgoingMessageEnvelope envelope);
205+
206+
/**
207+
* This method is used to check whether there were any previous exceptions in the send call backs.
208+
* And if any exception occurs the producer is considered to be stuck/broken
209+
* @param msg the msg that is passed to the exception.
210+
*/
211+
protected void checkForSendCallbackErrors(String msg) {
212+
// Check for send errors
213+
Throwable sendThrowable = sendExceptionOnCallback.get();
214+
if (sendThrowable != null) {
215+
LOG.error(msg, sendThrowable);
216+
throw new SamzaException(msg, sendThrowable);
217+
}
218+
}
219+
}

0 commit comments

Comments
 (0)