Skip to content

Commit 8743213

Browse files
author
Jamie Chapman-Brown
committed
extend tests
1 parent e45da1f commit 8743213

15 files changed

+1190
-99
lines changed

docs/development/extensions-contrib/rabbit-stream-ingestion.md

+81-17
Large diffs are not rendered by default.

extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/IncrementalPublishingRabbitStreamIndexTaskRunner.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
import java.util.TreeMap;
5050

5151
/**
52-
* Kafka indexing task runner supporting incremental segments publishing
52+
* RabbitStream indexing task runner supporting incremental segments publishing
5353
*/
5454
public class IncrementalPublishingRabbitStreamIndexTaskRunner
5555
extends SeekableStreamIndexTaskRunner<String, Long, ByteEntity>
@@ -95,7 +95,7 @@ protected SeekableStreamEndSequenceNumbers<String, Long> deserializePartitionsFr
9595
SeekableStreamEndSequenceNumbers.class,
9696
SeekableStreamEndSequenceNumbers.class,
9797
String.class,
98-
String.class));
98+
Long.class));
9999
}
100100

101101
@Override

extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskModule.java

+3
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232

3333
public class RabbitStreamIndexTaskModule implements DruidModule
3434
{
35+
36+
static final String PROPERTY_BASE = "druid.rabbit";
37+
3538
@Override
3639
public List<? extends Module> getJacksonModules()
3740
{

extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskTuningConfig.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,13 @@ public class RabbitStreamIndexTaskTuningConfig extends SeekableStreamIndexTaskTu
4242
static final int ASSUMED_RECORD_SIZE = 10_000;
4343

4444
/**
45-
* Together with {@link KinesisIndexTaskIOConfig#RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION}, don't take up more
45+
* Together with {@link RabbitStreamIndexTaskIOConfig#RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION}, don't take up more
4646
* than 15% of the heap per task.
4747
*/
4848
private static final double RECORD_BUFFER_MEMORY_MAX_HEAP_FRACTION = 0.1;
4949

5050
/**
51-
* Together with {@link KinesisIndexTaskIOConfig#MAX_RECORD_FETCH_MEMORY}, don't take up more than 200MB per task.
51+
* Together with {@link RabbitStreamIndexTaskIOConfig#MAX_RECORD_FETCH_MEMORY}, don't take up more than 200MB per task.
5252
*/
5353
private static final int MAX_RECORD_BUFFER_MEMORY = 100_000_000;
5454

extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamRecordSupplier.java

+77-63
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.druid.indexing.rabbitstream;
2121

2222
import com.fasterxml.jackson.databind.ObjectMapper;
23+
import com.google.common.annotations.VisibleForTesting;
2324
import com.google.common.collect.ImmutableList;
2425
import com.google.common.collect.ImmutableSet;
2526
import com.google.common.collect.Queues;
@@ -32,8 +33,6 @@
3233
import com.rabbitmq.stream.OffsetSpecification;
3334
import com.rabbitmq.stream.impl.Client;
3435
import com.rabbitmq.stream.impl.Client.ClientParameters;
35-
import io.netty.handler.ssl.SslContext;
36-
import io.netty.handler.ssl.SslContextBuilder;
3736
import org.apache.druid.data.input.impl.ByteEntity;
3837
import org.apache.druid.indexing.rabbitstream.supervisor.RabbitStreamSupervisorIOConfig;
3938
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
@@ -45,7 +44,6 @@
4544

4645
import javax.annotation.Nonnull;
4746

48-
import java.io.FileInputStream;
4947
import java.net.URI;
5048
import java.net.URISyntaxException;
5149
import java.util.ArrayList;
@@ -86,18 +84,17 @@ public class RabbitStreamRecordSupplier implements RecordSupplier<String, Long,
8684

8785
private String password;
8886
private String username;
89-
private String certificateFilePath;
9087

9188
public RabbitStreamRecordSupplier(
9289
Map<String, Object> consumerProperties,
9390
ObjectMapper mapper,
9491
String uri,
9592
int recordBufferSize,
9693
int recordBufferOfferTimeout,
97-
int maxRecordsPerPoll)
94+
int maxRecordsPerPoll
95+
)
9896
{
9997
this.uri = uri;
100-
this.env = this.getRabbitEnvironment();
10198
this.mapper = mapper;
10299

103100
this.recordBufferSize = recordBufferSize;
@@ -118,7 +115,6 @@ public RabbitStreamRecordSupplier(
118115

119116
this.password = null;
120117
this.username = null;
121-
this.certificateFilePath = null;
122118

123119
if (consumerProperties != null) {
124120

@@ -137,38 +133,47 @@ public RabbitStreamRecordSupplier(
137133
if (e.getKey().equals("username")) {
138134
this.username = e.getValue();
139135
}
140-
if (e.getKey().equals("cerfiticateFilePath")) {
141-
this.certificateFilePath = e.getValue();
142-
}
143136
}
144137
}
145138
}
139+
140+
this.env = this.getRabbitEnvironment();
141+
146142
}
147143

148144
public void startBackgroundFetch()
149145
{
150-
151146
try {
152-
stateSemaphore.acquire();
153-
try {
154-
if (this.isRunning != false) {
155-
return;
156-
}
157-
for (Map.Entry<String, ConsumerBuilder> entry : streamBuilders.entrySet()) {
158-
consumers.add(
159-
entry.getValue().offset(
160-
offsetMap.get(entry.getKey())).build());
161-
}
162-
this.isRunning = true;
147+
// aquire uninteruptibly to prevent state corruption issues
148+
// on consumers and isRunning
149+
stateSemaphore.acquireUninterruptibly();
150+
if (this.isRunning != false) {
151+
return;
163152
}
164-
finally {
165-
stateSemaphore.release();
153+
for (Map.Entry<String, ConsumerBuilder> entry : streamBuilders.entrySet()) {
154+
consumers.add(
155+
entry.getValue().offset(
156+
offsetMap.get(entry.getKey())).build());
166157
}
158+
this.isRunning = true;
167159
}
168-
catch (InterruptedException exc) {
160+
finally {
161+
stateSemaphore.release();
169162
}
170163
}
171164

165+
@VisibleForTesting
166+
public int bufferSize()
167+
{
168+
return queue.size();
169+
}
170+
171+
@VisibleForTesting
172+
public boolean isRunning()
173+
{
174+
return this.isRunning;
175+
}
176+
172177
public void stopBackgroundFetch()
173178
{
174179
try {
@@ -192,9 +197,14 @@ public void stopBackgroundFetch()
192197

193198
}
194199

200+
public EnvironmentBuilder getEnvBuilder()
201+
{
202+
return Environment.builder();
203+
}
204+
195205
public Environment getRabbitEnvironment()
196206
{
197-
EnvironmentBuilder envBuilder = Environment.builder().uri(this.uri);
207+
EnvironmentBuilder envBuilder = this.getEnvBuilder().uri(this.uri);
198208

199209
if (this.password != null) {
200210
envBuilder = envBuilder.password(this.password);
@@ -203,22 +213,6 @@ public Environment getRabbitEnvironment()
203213
envBuilder = envBuilder.username(this.username);
204214
}
205215

206-
if (this.certificateFilePath != null) {
207-
try {
208-
FileInputStream inputStream =
209-
new FileInputStream(this.certificateFilePath);
210-
SslContext sslContext = SslContextBuilder
211-
.forClient()
212-
.trustManager(inputStream)
213-
.build();
214-
envBuilder = envBuilder.tls().sslContext(sslContext).environmentBuilder();
215-
}
216-
catch (Exception exc) {
217-
log.error("failed to configure ssl for cockroach");
218-
log.error(exc, exc.getMessage());
219-
}
220-
}
221-
222216
return envBuilder.build();
223217
}
224218

@@ -229,6 +223,27 @@ public static String getStreamFromSubstream(String partionID)
229223
return String.join("-", res);
230224
}
231225

226+
private void removeOldAssignments(Set<StreamPartition<String>> streamPartitionstoKeep)
227+
{
228+
Iterator<Map.Entry<String, ConsumerBuilder>> streamBuilderIterator = streamBuilders.entrySet().iterator();
229+
while (streamBuilderIterator.hasNext()) {
230+
Map.Entry<String, ConsumerBuilder> entry = streamBuilderIterator.next();
231+
StreamPartition<String> comparitor = new StreamPartition<String>(getStreamFromSubstream(entry.getKey()), entry.getKey());
232+
if (!streamPartitionstoKeep.contains(comparitor)) {
233+
streamBuilderIterator.remove();
234+
}
235+
}
236+
237+
Iterator<Map.Entry<String, OffsetSpecification>> offsetItterator = offsetMap.entrySet().iterator();
238+
while (offsetItterator.hasNext()) {
239+
Map.Entry<String, OffsetSpecification> entry = offsetItterator.next();
240+
StreamPartition<String> comparitor = new StreamPartition<String>(getStreamFromSubstream(entry.getKey()), entry.getKey());
241+
if (!streamPartitionstoKeep.contains(comparitor)) {
242+
offsetItterator.remove();
243+
}
244+
}
245+
}
246+
232247
@Override
233248
public void assign(Set<StreamPartition<String>> streamPartitions)
234249
{
@@ -242,24 +257,7 @@ public void assign(Set<StreamPartition<String>> streamPartitions)
242257
this.superStream = part.getStream();
243258
}
244259

245-
Iterator<Map.Entry<String, ConsumerBuilder>> streamBuilderItterator = streamBuilders.entrySet().iterator();
246-
while (streamBuilderItterator.hasNext()) {
247-
Map.Entry<String, ConsumerBuilder> entry = streamBuilderItterator.next();
248-
// I don't think this works....
249-
StreamPartition<String> comparitor = new StreamPartition<String>(getStreamFromSubstream(entry.getKey()), entry.getKey());
250-
if (!streamPartitions.contains(comparitor)) {
251-
streamBuilderItterator.remove();
252-
}
253-
}
254-
255-
Iterator<Map.Entry<String, OffsetSpecification>> offsetItterator = offsetMap.entrySet().iterator();
256-
while (offsetItterator.hasNext()) {
257-
Map.Entry<String, OffsetSpecification> entry = offsetItterator.next();
258-
StreamPartition<String> comparitor = new StreamPartition<String>(getStreamFromSubstream(entry.getKey()), entry.getKey());
259-
if (!streamPartitions.contains(comparitor)) {
260-
offsetItterator.remove();
261-
}
262-
}
260+
removeOldAssignments(streamPartitions);
263261

264262

265263
}
@@ -346,9 +344,14 @@ public void handle(MessageHandler.Context context, Message message)
346344

347345
}
348346

347+
/**
348+
* optionalStartBackgroundFetch ensures that a background fetch is running
349+
* if this.queue is running low on records. We want to minimize thrashing
350+
* around starting/stopping the consumers.
351+
*/
349352
public void optionalStartBackgroundFetch()
350353
{
351-
if (this.queue.size() < this.maxRecordsPerPoll) {
354+
if (this.queue.size() < Math.min(this.maxRecordsPerPoll * 2, recordBufferSize / 2)) {
352355
this.startBackgroundFetch();
353356
}
354357
}
@@ -410,10 +413,21 @@ public Long getPosition(StreamPartition<String> partition)
410413
throw new UnsupportedOperationException("getPosition() is not supported in RabbitMQ streams");
411414
}
412415

416+
417+
public ClientParameters getParameters()
418+
{
419+
return new ClientParameters();
420+
}
421+
422+
public Client getClient(ClientParameters parameters)
423+
{
424+
return new Client(parameters);
425+
}
426+
413427
@Override
414428
public Set<String> getPartitionIds(String stream)
415429
{
416-
ClientParameters parameters = new ClientParameters();
430+
ClientParameters parameters = getParameters();
417431

418432
try {
419433
URI parsedURI = new URI(uri);
@@ -431,7 +445,7 @@ public Set<String> getPartitionIds(String stream)
431445
parameters.username(username);
432446
}
433447

434-
Client client = new Client(parameters);
448+
Client client = getClient(parameters);
435449

436450
List<String> partitions = client.partitions(stream);
437451
client.close();

extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -72,15 +72,15 @@
7272
import java.util.stream.Collectors;
7373

7474
/**
75-
* Supervisor responsible for managing the KafkaIndexTasks for a single
75+
* Supervisor responsible for managing the RabbitStreamIndexTasks for a single
7676
* dataSource. At a high level, the class accepts a
77-
* {@link RabbitStreamSupervisorSpec} which includes the Kafka topic and
77+
* {@link RabbitStreamSupervisorSpec} which includes the rabbit super stream and
7878
* configuration as well as an ingestion spec which will be used to generate the
79-
* indexing tasks. The run loop periodically refreshes its view of the Kafka
80-
* topic's partitions and the list of running indexing tasks and ensures that
79+
* indexing tasks. The run loop periodically refreshes its view of the super stream's
80+
* partitions and the list of running indexing tasks and ensures that
8181
* all partitions are being read from and that there are enough tasks to satisfy
8282
* the desired number of replicas. As tasks complete, new tasks are queued to
83-
* process the next range of Kafka offsets.
83+
* process the next range of rabbit stream offsets.
8484
*/
8585
public class RabbitStreamSupervisor extends SeekableStreamSupervisor<String, Long, ByteEntity>
8686
{
@@ -377,6 +377,7 @@ protected void updatePartitionLagFromStream()
377377
}
378378
catch (Exception e) {
379379
log.warn("Could not fetch partitions for topic/stream [%s]", getIoConfig().getStream());
380+
getRecordSupplierLock().unlock();
380381
throw new StreamException(e);
381382
}
382383

extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ public RabbitStreamSupervisorIOConfig(
8383
new IdleConfig(null, null));
8484

8585
this.consumerProperties = consumerProperties;
86+
Preconditions.checkNotNull(uri, "uri");
8687
this.uri = uri;
87-
Preconditions.checkNotNull(stream, "uri");
8888

8989
this.pollTimeout = pollTimeout != null ? pollTimeout : DEFAULT_POLL_TIMEOUT_MILLIS;
9090
}

0 commit comments

Comments
 (0)