Skip to content

Commit d2424cd

Browse files
authored
Merge pull request #216 from rabbitmq/support-sql-filter-expressions
Support SQL filter expressions
2 parents f01ad64 + 5d3ec25 commit d2424cd

File tree

6 files changed

+94
-3
lines changed

6 files changed

+94
-3
lines changed

src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,27 @@ interface FilterOptions<T> {
565565
* @return type-parameter object
566566
*/
567567
T propertySymbol(String key, String value);
568+
569+
/**
570+
* Set an SQL filter expression.
571+
*
572+
* <p>Section 6 of the AMQP Filter Expressions specification defines the semantics of the
573+
* feature, but the <b>SQL syntax follows the <a
574+
* href="https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#message-selector-syntax">JMS
575+
* message selector syntax</a></b>.
576+
*
577+
* <p>Requires RabbitMQ 4.2 or more.
578+
*
579+
* @param sql SQL expression
580+
* @return type-parameter object
581+
* @see <a
582+
* href="https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#message-selector-syntax">JMS
583+
* message selector syntax</a>
584+
* @see <a
585+
* href="https://groups.oasis-open.org/higherlogic/ws/public/document?document_id=66227">AMQP
586+
* Filter Expressions</a>
587+
*/
588+
T sql(String sql);
568589
}
569590

570591
/**

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static com.rabbitmq.client.amqp.impl.Tuples.pair;
2727
import static com.rabbitmq.client.amqp.impl.Utils.supportFilterExpressions;
2828
import static com.rabbitmq.client.amqp.impl.Utils.supportSetToken;
29+
import static com.rabbitmq.client.amqp.impl.Utils.supportSqlFilterExpressions;
2930
import static java.lang.System.nanoTime;
3031
import static java.time.Duration.ofNanos;
3132

@@ -115,7 +116,9 @@ final class AmqpConnection extends ResourceBase implements Connection {
115116
private final ConnectionSettings.AffinityStrategy affinityStrategy;
116117
private final String name;
117118
private final Lock instanceLock = new ReentrantLock();
118-
private final boolean filterExpressionsSupported, setTokenSupported;
119+
private final boolean filterExpressionsSupported,
120+
setTokenSupported,
121+
sqlFilterExpressionsSupported;
119122
private volatile ConsumerWorkService consumerWorkService;
120123
private volatile Executor dispatchingExecutor;
121124
private final boolean privateDispatchingExecutor;
@@ -212,6 +215,7 @@ final class AmqpConnection extends ResourceBase implements Connection {
212215
String brokerVersion = brokerVersion(this.nativeConnection);
213216
this.filterExpressionsSupported = supportFilterExpressions(brokerVersion);
214217
this.setTokenSupported = supportSetToken(brokerVersion);
218+
this.sqlFilterExpressionsSupported = supportSqlFilterExpressions(brokerVersion);
215219
LOGGER.debug("Opened connection '{}' on node '{}'.", this.name(), this.connectionNodename());
216220
this.state(OPEN);
217221
this.environment.metricsCollector().openConnection();
@@ -856,6 +860,10 @@ boolean filterExpressionsSupported() {
856860
return this.filterExpressionsSupported;
857861
}
858862

863+
boolean sqlFilterExpressionsSupported() {
864+
return this.sqlFilterExpressionsSupported;
865+
}
866+
859867
boolean setTokenSupported() {
860868
return this.setTokenSupported;
861869
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ private ClientReceiver createNativeReceiver(
219219
(ClientReceiver)
220220
ExceptionUtils.wrapGet(
221221
nativeSession.openReceiver(address, receiverOptions).openFuture());
222+
boolean filterOk = true;
222223
if (!filters.isEmpty()) {
223224
Map<String, String> remoteSourceFilters = receiver.source().filters();
224225
for (Map.Entry<String, Object> localEntry : localSourceFilters.entrySet()) {
@@ -227,9 +228,15 @@ private ClientReceiver createNativeReceiver(
227228
"Missing filter value in attach response: {} => {}",
228229
localEntry.getKey(),
229230
localEntry.getValue());
231+
filterOk = false;
230232
}
231233
}
232234
}
235+
if (!filterOk) {
236+
receiver.close();
237+
throw new AmqpException(
238+
"The sending endpoint filters do not match the receiving endpoint filters");
239+
}
233240
return receiver;
234241
} catch (ClientException e) {
235242
throw ExceptionUtils.convert(e, "Error while creating receiver from '%s'", address);

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,11 +226,11 @@ static StreamOptions streamOptions(Map<String, DescribedType> filters) {
226226

227227
private static class DefaultStreamFilterOptions implements StreamFilterOptions {
228228

229-
private final StreamOptions streamOptions;
229+
private final DefaultStreamOptions streamOptions;
230230
private final Map<String, DescribedType> filters;
231231

232232
private DefaultStreamFilterOptions(
233-
StreamOptions streamOptions, Map<String, DescribedType> filters) {
233+
DefaultStreamOptions streamOptions, Map<String, DescribedType> filters) {
234234
this.streamOptions = streamOptions;
235235
this.filters = filters;
236236
}
@@ -440,6 +440,16 @@ public StreamFilterOptions propertySymbol(String key, String value) {
440440
return this.applicationPropertyFilter(key, Symbol.valueOf(value));
441441
}
442442

443+
@Override
444+
public StreamFilterOptions sql(String sql) {
445+
if (!this.streamOptions.builder.connection.filterExpressionsSupported()) {
446+
throw new IllegalArgumentException(
447+
"AMQP SQL filter expressions requires at least RabbitMQ 4.2.0");
448+
}
449+
this.filters.put("sql-filter", filterValue("apache.org:selector-filter:string", sql));
450+
return this;
451+
}
452+
443453
@Override
444454
public StreamOptions stream() {
445455
return this.streamOptions;

src/main/java/com/rabbitmq/client/amqp/impl/Utils.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,15 @@ static boolean is4_0_OrMore(String brokerVersion) {
236236
}
237237
}
238238

239+
static boolean is4_2_OrMore(String brokerVersion) {
240+
try {
241+
return versionCompare(currentVersion(brokerVersion), "4.2.0") >= 0;
242+
} catch (Exception e) {
243+
LOGGER.debug("Unable to parse broker version {}", brokerVersion, e);
244+
return true;
245+
}
246+
}
247+
239248
static boolean is4_1_OrMore(String brokerVersion) {
240249
try {
241250
return versionCompare(currentVersion(brokerVersion), "4.1.0") >= 0;
@@ -253,6 +262,10 @@ static boolean supportSetToken(String brokerVersion) {
253262
return is4_1_OrMore(brokerVersion);
254263
}
255264

265+
static boolean supportSqlFilterExpressions(String brokerVersion) {
266+
return is4_2_OrMore(brokerVersion);
267+
}
268+
256269
static final class ObservationConnectionInfo implements ObservationCollector.ConnectionInfo {
257270

258271
private final String address;

src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@
2121
import static com.rabbitmq.client.amqp.Management.QueueType.STREAM;
2222
import static com.rabbitmq.client.amqp.impl.Assertions.assertThat;
2323
import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_1_0;
24+
import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_2_0;
2425
import static com.rabbitmq.client.amqp.impl.TestUtils.sync;
2526
import static com.rabbitmq.client.amqp.impl.TestUtils.waitUntilStable;
2627
import static java.nio.charset.StandardCharsets.*;
2728
import static java.util.stream.IntStream.range;
2829
import static org.assertj.core.api.Assertions.assertThat;
2930
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3031

32+
import com.rabbitmq.client.amqp.AmqpException;
3133
import com.rabbitmq.client.amqp.Connection;
3234
import com.rabbitmq.client.amqp.Consumer;
3335
import com.rabbitmq.client.amqp.ConsumerBuilder;
@@ -531,6 +533,36 @@ void filterExpressionStringModifier() {
531533
msgs.forEach(m -> assertThat(m).hasSubject("foo bar"));
532534
}
533535

536+
@Test
537+
@BrokerVersionAtLeast(RABBITMQ_4_2_0)
538+
void sqlFilterExpressionsShouldFilterMessages() {
539+
publish(1, m -> m.subject("abc 123"));
540+
publish(1, m -> m.subject("foo bar"));
541+
publish(1, m -> m.subject("ab 12"));
542+
543+
List<Message> msgs = consume(2, m -> m.sql("properties.subject LIKE 'ab%'"));
544+
msgs.forEach(m -> assertThat(m.subject()).startsWith("ab"));
545+
546+
msgs = consume(1, m -> m.sql("properties.subject like 'foo%'"));
547+
msgs.forEach(m -> assertThat(m).hasSubject("foo bar"));
548+
}
549+
550+
@Test
551+
@BrokerVersionAtLeast(RABBITMQ_4_2_0)
552+
void incorrectFilterShouldThrowException() {
553+
assertThatThrownBy(
554+
() ->
555+
connection.consumerBuilder().queue(name).messageHandler((ctx, msg) -> {}).stream()
556+
.offset(FIRST)
557+
.filter()
558+
.sql("TRUE TRUE")
559+
.stream()
560+
.builder()
561+
.build())
562+
.isInstanceOf(AmqpException.class)
563+
.hasMessageContaining("filters do not match");
564+
}
565+
534566
void publish(int messageCount) {
535567
this.publish(messageCount, UnaryOperator.identity());
536568
}

0 commit comments

Comments
 (0)