-
Notifications
You must be signed in to change notification settings - Fork 3.6k
[feat][pip] PIP-419: Support consumer message filter on broker-side for specific subscription #24305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[feat][pip] PIP-419: Support consumer message filter on broker-side for specific subscription #24305
Conversation
@AuroraTwinkle Please add the following content to your PR description and select a checkbox:
|
…or specific subscription
62a1581
to
8810648
Compare
9b66f68
to
8810648
Compare
4dc343f
to
fcccd50
Compare
@lhotari @StevenLuMT @liangyepianzhou I have submitted a pip about message filtering. Would you please review it when you have time? Looking forward to your suggestions. Thanks! |
Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64) | ||
.topic("persistent://public/default/my-topic") | ||
.subscriptionName("my-subscription") | ||
.filterExpression(new FilterExpression("messageSource='wechat' AND messageType='audio'", FilterExpressionType.SQL92)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent work! I think this feature is indeed useful. However, I suggest that the filter expressions should be configured, modified, and deleted through the management/control plane (Pulsar Admin/CLI), rather than being set during consumer initialization. My reasoning is as follows:
-
Practical Operational Considerations: When maintaining a messaging service, we typically provide users with a management platform to configure subscription settings. If filter conditions are set during consumer initialization, any configuration changes would require propagating updates to user clients and forcing client reinitialization to take effect. This prevents direct updates via admin operations.
-
Service Stability: Broker-side configuration changes requiring client reinitialization could introduce consumption service instability.
-
Subscription Consistency: A single subscription may have multiple consumer instances. Setting filter conditions at consumer initialization risks inconsistent filtering rules across consumers under the same subscription, potentially causing rule overwrites (e.g., Consumer A sets Filter X while Consumer B sets Filter Y, leading to conflicts).
Regarding point 3: This mirrors real-world issues observed in RocketMQ's message filtering. I've encountered multiple cases where tag-based filtering failed because different consumers in the same consumer group subscribed to conflicting tags, resulting in rule overrides.
Recommendation: Store filter rules as subscription metadata (managed via Admin API/CLI) rather than per-consumer settings. This would:
- Ensure consistent filtering across all consumers in a subscription
- Allow dynamic rule updates without client restarts
- Prevent rule conflicts between consumers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a good idea to configure the filtering rules through admin API. This can avoid the problems you mentioned, which is also the current problem of RocketMQ. I will refactor it together after receiving more comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's already possible to handle filtering rules through the Admin API. The pulsar-admin CLI tool has a plugin model that allows adding also new commands. This approach is used in Pulsar JMS: https://github.com/datastax/pulsar-jms/blob/master/pulsar-jms-admin-ext/src/main/java/com/datastax/oss/pulsar/jms/cli/SubscriptionBaseCommand.java (more context in my other comment, #24305 (comment))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Pulsar JMS, you could also provide the filter at a consumer level with consumer properties. If we were to add message filtering as a top level feature, it would be great to have full support in the clients too. In my other comment, I commented about this aspect
One of the possible compromises could be to make it an optional feature. In the Pulsar Java client API, we'd need to have a way to handle optional features in the builder interface itself so that the main builder interface for consumers doesn't get distracted by options that are only supported in certain configurations. One way to do this is with an "extensions" pattern. Where the builder would have a method extension(Class consumerExtensionBuilderClass) which continues configuring the extension in a "nested" builder.
@AuroraTwinkle For former context why such feature has been rejected in the past, this email thread is a good read: https://lists.apache.org/thread/k0hb33nz42f3603hvs03p7lmltjgj40z . There simply hasn't been consensus that we'd like to move Pulsar together as a community to this direction. There are some performance challenges currently when using broker side filtering. However, many of the problems have been addressed over time and if we'd introduce the feature as a top level feature, there would be more motivation to address more of the challenges such as ones of the gaps in broker cache and handling of "ack holes" in Pulsar efficiently. The "ack hole" challenge is similar for delayed message delivery at large scale. Ack hole handling on broker side has been optimized with changes such as #9292, (must be enabled with For broker cache, there's lot more to do. #23466 . One of the possible compromises could be to make it an optional feature. In the Pulsar Java client API, we'd need to have a way to handle optional features in the builder interface itself so that the main builder interface for consumers doesn't get distracted by options that are only supported in certain configurations. One way to do this is with an "extensions" pattern. Where the builder would have a method |
### Solution 1 SQL-92 | ||
|
||
Message filtering using [SQL-92](https://en.wikipedia.org/wiki/SQL-92) syntax mentioned in https://github.com/apache/pulsar/pull/8544#issuecomment-1064619792 | ||
|
||
We can write the following SQL expression to filter messages, the pseudo code is as follow: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pulsar JMS reuses ActiveMQ's SQL-92 parser for filtering messages
https://github.com/datastax/pulsar-jms/blob/master/pulsar-jms-filters-common/src/main/java/com/datastax/oss/pulsar/jms/selectors/SelectorSupport.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The project has forked the filtering logic from ActiveMQ in this module: https://github.com/datastax/pulsar-jms/tree/master/activemq-filters
### Solution 2 Groovy | ||
|
||
> [Groovy](https://groovy-lang.org/) is a JVM-based object-oriented programming language under Apache. It can be used for object-oriented programming and can also be used as a pure scripting language to call Groovy scripts in Java. | ||
> | ||
|
||
We can write the following Groovy expression to filter messages, the pseudo code is as follows: | ||
|
||
```java | ||
// The filter expression passed by the pulsar consumer client to the pulsar broker | ||
String groovyExpression = "it.properties.messageType.equals('audio') && it.properties.messageSource.equals('wechat')" | ||
|
||
// The pulsar broker uses the expression passed by the client to filter the messages and returns the messages that meet the conditions to the client. | ||
GroovyShell shell = new GroovyShell(); | ||
Script script = shell.parse("return " + groovyExpression); | ||
List filtedMessages = messages.stream() | ||
.filter(message -> { | ||
binding.setVariable("it", message); | ||
script.setBinding(binding); | ||
return (Boolean) script.run(); | ||
}) | ||
.collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would be terrible from security perspective. Something like Google CEL or Spring Expression Language (SpEL) would be a better choice.
I’m +1 with the motivation. even though we have In the details, maybe we can impl SpEL is a better choice than groovy scripts, it can easily lead to metadata area OOM because of caching classloader, I had encountered this problem in our business systems. Besides, I hope you can consider that the ack hole problems, and it could comes incorrect stats/metrics(such as backlog). |
Thank you Lari for your rigorous and detailed suggestions. Please give me some time to understand them carefully. |
Thanks.I will consider SpEL and ack hole problems. |
Fixes #xyz
Main Issue: #8544
#7629
PIP: #xyz
Motivation
Currently, Pulsar does not have the ability to filter messages by subscription, this pip try to implement message filtering on the pulsar broker side, enrich the pulsar ecosystem, save bandwidth, and help RocketMQ users switch to Pulsar more smoothly. And, I've noticed two similar PRs #8544 and #7629, but neither was approved, so I want to try to get this work implemented.
Modifications
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository:
AuroraTwinkle#6