Skip to content

Commit ff7beb7

Browse files
author
Jamie Chapman-Brown
committed
Add support for fetching from Rabbit MQ Super Streams
1 parent c225c19 commit ff7beb7

28 files changed

+4537
-0
lines changed

distribution/pom.xml

+2
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,8 @@
456456
<argument>org.apache.druid.extensions.contrib:druid-deltalake-extensions</argument>
457457
<argument>-c</argument>
458458
<argument>org.apache.druid.extensions.contrib:druid-spectator-histogram</argument>
459+
<argument>-c</argument>
460+
<argument>org.apache.druid.extensions.contrib:druid-rabbit-indexing-service</argument>
459461
</arguments>
460462
</configuration>
461463
</execution>

docs/ingestion/rabbit-stream-ingestion.md

+239
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one
4+
~ or more contributor license agreements. See the NOTICE file
5+
~ distributed with this work for additional information
6+
~ regarding copyright ownership. The ASF licenses this file
7+
~ to you under the Apache License, Version 2.0 (the
8+
~ "License"); you may not use this file except in compliance
9+
~ with the License. You may obtain a copy of the License at
10+
~
11+
~ http://www.apache.org/licenses/LICENSE-2.0
12+
~
13+
~ Unless required by applicable law or agreed to in writing,
14+
~ software distributed under the License is distributed on an
15+
~ "AS IS" BASIS, WITHOUT WARRA NTIES OR CONDITIONS OF ANY
16+
~ KIND, either express or implied. See the License for the
17+
~ specific language governing permissions and limitations
18+
~ under the License.
19+
-->
20+
21+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
22+
<modelVersion>4.0.0</modelVersion>
23+
24+
<groupId>org.apache.druid.extensions.contrib</groupId>
25+
<artifactId>druid-rabbit-indexing-service</artifactId>
26+
<name>druid-rabbit-indexing-service</name>
27+
<description>druid-rabbit-indexing-service</description>
28+
29+
<parent>
30+
<groupId>org.apache.druid</groupId>
31+
<artifactId>druid</artifactId>
32+
<version>30.0.0-SNAPSHOT</version>
33+
<relativePath>../../pom.xml</relativePath>
34+
</parent>
35+
36+
<dependencies>
37+
<dependency>
38+
<groupId>org.apache.druid</groupId>
39+
<artifactId>druid-indexing-service</artifactId>
40+
<version>${project.parent.version}</version>
41+
<scope>provided</scope>
42+
</dependency>
43+
<dependency>
44+
<groupId>org.apache.druid</groupId>
45+
<artifactId>druid-processing</artifactId>
46+
<version>${project.parent.version}</version>
47+
<scope>provided</scope>
48+
</dependency>
49+
<dependency>
50+
<groupId>org.apache.druid</groupId>
51+
<artifactId>druid-server</artifactId>
52+
<version>${project.parent.version}</version>
53+
<scope>provided</scope>
54+
</dependency>
55+
<dependency>
56+
<groupId>io.netty</groupId>
57+
<artifactId>netty</artifactId>
58+
<version>3.10.6.Final</version>
59+
<scope>provided</scope>
60+
</dependency>
61+
<dependency>
62+
<groupId>com.google.code.findbugs</groupId>
63+
<artifactId>jsr305</artifactId>
64+
<scope>provided</scope>
65+
</dependency>
66+
<dependency>
67+
<groupId>commons-io</groupId>
68+
<artifactId>commons-io</artifactId>
69+
<scope>provided</scope>
70+
</dependency>
71+
<dependency>
72+
<groupId>com.fasterxml.jackson.core</groupId>
73+
<artifactId>jackson-annotations</artifactId>
74+
<scope>provided</scope>
75+
</dependency>
76+
<dependency>
77+
<groupId>joda-time</groupId>
78+
<artifactId>joda-time</artifactId>
79+
<scope>provided</scope>
80+
</dependency>
81+
<dependency>
82+
<groupId>com.google.inject</groupId>
83+
<artifactId>guice</artifactId>
84+
<scope>provided</scope>
85+
</dependency>
86+
<dependency>
87+
<groupId>com.fasterxml.jackson.core</groupId>
88+
<artifactId>jackson-databind</artifactId>
89+
<scope>provided</scope>
90+
</dependency>
91+
<dependency>
92+
<groupId>javax.ws.rs</groupId>
93+
<artifactId>jsr311-api</artifactId>
94+
<scope>provided</scope>
95+
</dependency>
96+
<dependency>
97+
<groupId>org.hamcrest</groupId>
98+
<artifactId>hamcrest-core</artifactId>
99+
<scope>test</scope>
100+
</dependency>
101+
<dependency>
102+
<groupId>com.fasterxml.jackson.core</groupId>
103+
<artifactId>jackson-core</artifactId>
104+
<scope>provided</scope>
105+
</dependency>
106+
<dependency>
107+
<groupId>com.google.guava</groupId>
108+
<artifactId>guava</artifactId>
109+
<scope>provided</scope>
110+
</dependency>
111+
<dependency>
112+
<groupId>javax.validation</groupId>
113+
<artifactId>validation-api</artifactId>
114+
<scope>provided</scope>
115+
</dependency>
116+
<dependency>
117+
<groupId>com.rabbitmq</groupId>
118+
<artifactId>stream-client</artifactId>
119+
<version>0.15.0</version>
120+
</dependency>
121+
<dependency>
122+
<groupId>jakarta.validation</groupId>
123+
<artifactId>jakarta.validation-api</artifactId>
124+
<version>2.0.2</version>
125+
<scope>provided</scope>
126+
</dependency>
127+
128+
<!-- Tests -->
129+
<dependency>
130+
<groupId>junit</groupId>
131+
<artifactId>junit</artifactId>
132+
<scope>test</scope>
133+
</dependency>
134+
<dependency>
135+
<groupId>org.apache.druid</groupId>
136+
<artifactId>druid-processing</artifactId>
137+
<version>${project.parent.version}</version>
138+
<type>test-jar</type>
139+
<scope>test</scope>
140+
</dependency>
141+
<dependency>
142+
<groupId>org.apache.druid</groupId>
143+
<artifactId>druid-server</artifactId>
144+
<version>${project.parent.version}</version>
145+
<type>test-jar</type>
146+
<scope>test</scope>
147+
</dependency>
148+
<dependency>
149+
<groupId>org.apache.druid</groupId>
150+
<artifactId>druid-indexing-service</artifactId>
151+
<version>${project.parent.version}</version>
152+
<type>test-jar</type>
153+
<scope>test</scope>
154+
</dependency>
155+
<dependency>
156+
<groupId>org.apache.curator</groupId>
157+
<artifactId>curator-test</artifactId>
158+
<scope>test</scope>
159+
</dependency>
160+
<dependency>
161+
<groupId>org.easymock</groupId>
162+
<artifactId>easymock</artifactId>
163+
<scope>test</scope>
164+
</dependency>
165+
<dependency>
166+
<groupId>org.assertj</groupId>
167+
<artifactId>assertj-core</artifactId>
168+
<scope>test</scope>
169+
</dependency>
170+
<dependency>
171+
<groupId>nl.jqno.equalsverifier</groupId>
172+
<artifactId>equalsverifier</artifactId>
173+
<scope>test</scope>
174+
</dependency>
175+
176+
</dependencies>
177+
178+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.indexing.rabbitstream;
21+
22+
import com.fasterxml.jackson.core.type.TypeReference;
23+
import com.fasterxml.jackson.databind.ObjectMapper;
24+
import org.apache.druid.data.input.impl.ByteEntity;
25+
import org.apache.druid.data.input.impl.InputRowParser;
26+
import org.apache.druid.indexing.common.LockGranularity;
27+
import org.apache.druid.indexing.common.TaskToolbox;
28+
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
29+
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
30+
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
31+
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
32+
import org.apache.druid.indexing.seekablestream.SequenceMetadata;
33+
import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
34+
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
35+
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
36+
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
37+
import org.apache.druid.java.util.emitter.EmittingLogger;
38+
import org.apache.druid.server.security.AuthorizerMapper;
39+
40+
import javax.annotation.Nonnull;
41+
import javax.annotation.Nullable;
42+
import javax.validation.constraints.NotNull;
43+
44+
import java.io.IOException;
45+
import java.nio.ByteBuffer;
46+
import java.util.List;
47+
import java.util.Map;
48+
import java.util.Set;
49+
import java.util.TreeMap;
50+
51+
/**
52+
* RabbitStream indexing task runner supporting incremental segments publishing
53+
*/
54+
public class IncrementalPublishingRabbitStreamIndexTaskRunner
55+
extends SeekableStreamIndexTaskRunner<String, Long, ByteEntity>
56+
{
57+
private static final EmittingLogger log = new EmittingLogger(IncrementalPublishingRabbitStreamIndexTaskRunner.class);
58+
private final RabbitStreamIndexTask task;
59+
60+
IncrementalPublishingRabbitStreamIndexTaskRunner(
61+
RabbitStreamIndexTask task,
62+
@Nullable InputRowParser<ByteBuffer> parser,
63+
AuthorizerMapper authorizerMapper,
64+
LockGranularity lockGranularityToUse)
65+
{
66+
super(
67+
task,
68+
parser,
69+
authorizerMapper,
70+
lockGranularityToUse);
71+
this.task = task;
72+
}
73+
74+
@Override
75+
protected Long getNextStartOffset(@NotNull Long sequenceNumber)
76+
{
77+
return sequenceNumber + 1;
78+
}
79+
80+
@Nonnull
81+
@Override
82+
protected List<OrderedPartitionableRecord<String, Long, ByteEntity>> getRecords(
83+
RecordSupplier<String, Long, ByteEntity> recordSupplier,
84+
TaskToolbox toolbox)
85+
{
86+
return recordSupplier.poll(task.getIOConfig().getPollTimeout());
87+
}
88+
89+
@Override
90+
protected SeekableStreamEndSequenceNumbers<String, Long> deserializePartitionsFromMetadata(
91+
ObjectMapper mapper,
92+
Object object)
93+
{
94+
return mapper.convertValue(object, mapper.getTypeFactory().constructParametrizedType(
95+
SeekableStreamEndSequenceNumbers.class,
96+
SeekableStreamEndSequenceNumbers.class,
97+
String.class,
98+
Long.class));
99+
}
100+
101+
@Override
102+
protected SeekableStreamDataSourceMetadata<String, Long> createDataSourceMetadata(
103+
SeekableStreamSequenceNumbers<String, Long> partitions)
104+
{
105+
return new RabbitStreamDataSourceMetadata(partitions);
106+
}
107+
108+
@Override
109+
protected OrderedSequenceNumber<Long> createSequenceNumber(Long sequenceNumber)
110+
{
111+
return RabbitSequenceNumber.of(sequenceNumber);
112+
}
113+
114+
@Override
115+
protected void possiblyResetDataSourceMetadata(
116+
TaskToolbox toolbox,
117+
RecordSupplier<String, Long, ByteEntity> recordSupplier,
118+
Set<StreamPartition<String>> assignment)
119+
{
120+
// do nothing
121+
}
122+
123+
@Override
124+
protected boolean isEndOffsetExclusive()
125+
{
126+
return true;
127+
}
128+
129+
@Override
130+
protected boolean isEndOfShard(Long seqNum)
131+
{
132+
return false;
133+
}
134+
135+
@Override
136+
public TypeReference<List<SequenceMetadata<String, Long>>> getSequenceMetadataTypeReference()
137+
{
138+
return new TypeReference<List<SequenceMetadata<String, Long>>>() {
139+
};
140+
}
141+
142+
@Nullable
143+
@Override
144+
protected TreeMap<Integer, Map<String, Long>> getCheckPointsFromContext(
145+
TaskToolbox toolbox,
146+
String checkpointsString) throws IOException
147+
{
148+
if (checkpointsString != null) {
149+
log.debug("Got checkpoints from task context[%s].", checkpointsString);
150+
return toolbox.getJsonMapper().readValue(
151+
checkpointsString,
152+
new TypeReference<TreeMap<Integer, Map<String, Long>>>() {
153+
});
154+
} else {
155+
return null;
156+
}
157+
}
158+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.indexing.rabbitstream;
21+
22+
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
23+
24+
import javax.validation.constraints.NotNull;
25+
26+
// OrderedSequenceNumber.equals() should be used instead.
27+
@SuppressWarnings("ComparableImplementedButEqualsNotOverridden")
28+
public class RabbitSequenceNumber extends OrderedSequenceNumber<Long>
29+
{
30+
private RabbitSequenceNumber(Long sequenceNumber)
31+
{
32+
super(sequenceNumber, false);
33+
}
34+
35+
public static RabbitSequenceNumber of(Long sequenceNumber)
36+
{
37+
return new RabbitSequenceNumber(sequenceNumber);
38+
}
39+
40+
@Override
41+
public int compareTo(
42+
@NotNull OrderedSequenceNumber<Long> o)
43+
{
44+
return this.get().compareTo(o.get());
45+
}
46+
47+
}

0 commit comments

Comments
 (0)