Skip to content

[feat][pip] PIP-419: Support consumer message filter on broker-side for specific subscription #24305

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

AuroraTwinkle
Copy link
Contributor

@AuroraTwinkle AuroraTwinkle commented May 15, 2025

Fixes #xyz

Main Issue: #8544
#7629

PIP: #xyz

Motivation

Currently, Pulsar does not have the ability to filter messages by subscription, this pip try to implement message filtering on the pulsar broker side, enrich the pulsar ecosystem, save bandwidth, and help RocketMQ users switch to Pulsar more smoothly. And, I've noticed two similar PRs #8544 and #7629, but neither was approved, so I want to try to get this work implemented.

Modifications

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:
AuroraTwinkle#6

Copy link

@AuroraTwinkle Please add the following content to your PR description and select a checkbox:

- [ ] `doc` <!-- Your PR contains doc changes -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

@AuroraTwinkle AuroraTwinkle force-pushed the feat/consumerMessageFilt branch from 62a1581 to 8810648 Compare May 15, 2025 03:32
@AuroraTwinkle AuroraTwinkle changed the title [feature][pip]:PIP-413: Support consumer message filter on broker-sid… [feature][pip]:PIP-419: Support consumer message filter on broker-sid… May 15, 2025
@github-actions github-actions bot added doc-not-needed Your PR changes do not impact docs and removed doc-label-missing labels May 15, 2025
@AuroraTwinkle AuroraTwinkle marked this pull request as ready for review May 15, 2025 03:36
@AuroraTwinkle AuroraTwinkle changed the title [feature][pip]:PIP-419: Support consumer message filter on broker-sid… [feature][pip]:PIP-413: Support consumer message filter on broker-side for specific subscription May 15, 2025
@AuroraTwinkle AuroraTwinkle force-pushed the feat/consumerMessageFilt branch from 9b66f68 to 8810648 Compare May 15, 2025 04:06
@AuroraTwinkle AuroraTwinkle force-pushed the feat/consumerMessageFilt branch from 4dc343f to fcccd50 Compare May 15, 2025 04:12
@AuroraTwinkle
Copy link
Contributor Author

@lhotari @StevenLuMT @liangyepianzhou I have submitted a pip about message filtering. Would you please review it when you have time? Looking forward to your suggestions. Thanks!

@liangyepianzhou liangyepianzhou changed the title [feature][pip]:PIP-413: Support consumer message filter on broker-side for specific subscription [feat][pip] PIP-413: Support consumer message filter on broker-side for specific subscription May 15, 2025
@liangyepianzhou liangyepianzhou added this to the 4.1.0 milestone May 15, 2025
@AuroraTwinkle AuroraTwinkle changed the title [feat][pip] PIP-413: Support consumer message filter on broker-side for specific subscription [feat][pip] PIP-419: Support consumer message filter on broker-side for specific subscription May 15, 2025
Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64)
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription")
.filterExpression(new FilterExpression("messageSource='wechat' AND messageType='audio'", FilterExpressionType.SQL92))
Copy link
Contributor

@liangyepianzhou liangyepianzhou May 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent work! I think this feature is indeed useful. However, I suggest that the filter expressions should be configured, modified, and deleted through the management/control plane (Pulsar Admin/CLI), rather than being set during consumer initialization. My reasoning is as follows:

  1. Practical Operational Considerations: When maintaining a messaging service, we typically provide users with a management platform to configure subscription settings. If filter conditions are set during consumer initialization, any configuration changes would require propagating updates to user clients and forcing client reinitialization to take effect. This prevents direct updates via admin operations.

  2. Service Stability: Broker-side configuration changes requiring client reinitialization could introduce consumption service instability.

  3. Subscription Consistency: A single subscription may have multiple consumer instances. Setting filter conditions at consumer initialization risks inconsistent filtering rules across consumers under the same subscription, potentially causing rule overwrites (e.g., Consumer A sets Filter X while Consumer B sets Filter Y, leading to conflicts).

Regarding point 3: This mirrors real-world issues observed in RocketMQ's message filtering. I've encountered multiple cases where tag-based filtering failed because different consumers in the same consumer group subscribed to conflicting tags, resulting in rule overrides.

Recommendation: Store filter rules as subscription metadata (managed via Admin API/CLI) rather than per-consumer settings. This would:

  • Ensure consistent filtering across all consumers in a subscription
  • Allow dynamic rule updates without client restarts
  • Prevent rule conflicts between consumers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a good idea to configure the filtering rules through admin API. This can avoid the problems you mentioned, which is also the current problem of RocketMQ. I will refactor it together after receiving more comments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's already possible to handle filtering rules through the Admin API. The pulsar-admin CLI tool has a plugin model that allows adding also new commands. This approach is used in Pulsar JMS: https://github.com/datastax/pulsar-jms/blob/master/pulsar-jms-admin-ext/src/main/java/com/datastax/oss/pulsar/jms/cli/SubscriptionBaseCommand.java (more context in my other comment, #24305 (comment))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Pulsar JMS, you could also provide the filter at a consumer level with consumer properties. If we were to add message filtering as a top level feature, it would be great to have full support in the clients too. In my other comment, I commented about this aspect

One of the possible compromises could be to make it an optional feature. In the Pulsar Java client API, we'd need to have a way to handle optional features in the builder interface itself so that the main builder interface for consumers doesn't get distracted by options that are only supported in certain configurations. One way to do this is with an "extensions" pattern. Where the builder would have a method extension(Class consumerExtensionBuilderClass) which continues configuring the extension in a "nested" builder.

@liangyepianzhou liangyepianzhou added doc-required Your PR changes impact docs and you will update later. and removed doc-not-needed Your PR changes do not impact docs labels May 15, 2025
@lhotari
Copy link
Member

lhotari commented May 15, 2025

And, I've noticed two similar PRs #8544 and #7629, but neither was approved, so I want to try to get this work implemented.

@AuroraTwinkle For former context why such feature has been rejected in the past, this email thread is a good read: https://lists.apache.org/thread/k0hb33nz42f3603hvs03p7lmltjgj40z .

There simply hasn't been consensus that we'd like to move Pulsar together as a community to this direction.
As a compromise, it was made possible to do broker side filtering using custom plugins with "PIP 105: Support pluggable entry filter in Dispatcher". and related subscription properties #15503. There's an implementation in Pulsar JMS using this approach: https://github.com/datastax/pulsar-jms/blob/master/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSFilter.java

There are some performance challenges currently when using broker side filtering. However, many of the problems have been addressed over time and if we'd introduce the feature as a top level feature, there would be more motivation to address more of the challenges such as ones of the gaps in broker cache and handling of "ack holes" in Pulsar efficiently.

The "ack hole" challenge is similar for delayed message delivery at large scale. Ack hole handling on broker side has been optimized with changes such as #9292, (must be enabled with managedLedgerPersistIndividualAckAsLongArray=true since default is false after #23759), increasing managedLedgerMaxUnackedRangesToPersist, setting managedCursorInfoCompressionType=LZ4, etc.

For broker cache, there's lot more to do. #23466 .
One of the workarounds is cacheEvictionByMarkDeletedPosition=true for message filtering. I've have a WIP branch to address many of the problems, but haven't had a chance to push that forward.
It's just an example that once there's a requirement to address performance issues, those would get addressed over time. That's why I'd like to see message filtering being added as a top level feature in Pulsar in the long term.

One of the possible compromises could be to make it an optional feature. In the Pulsar Java client API, we'd need to have a way to handle optional features in the builder interface itself so that the main builder interface for consumers doesn't get distracted by options that are only supported in certain configurations. One way to do this is with an "extensions" pattern. Where the builder would have a method <T extends ConsumerExtensionBuilder> extension(Class<T> consumerExtensionBuilderClass) which continues configuring the extension in a "nested" builder.

Comment on lines +96 to +100
### Solution 1 SQL-92

Message filtering using [SQL-92](https://en.wikipedia.org/wiki/SQL-92) syntax mentioned in https://github.com/apache/pulsar/pull/8544#issuecomment-1064619792

We can write the following SQL expression to filter messages, the pseudo code is as follow:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The project has forked the filtering logic from ActiveMQ in this module: https://github.com/datastax/pulsar-jms/tree/master/activemq-filters

Comment on lines +114 to +134
### Solution 2 Groovy

> [Groovy](https://groovy-lang.org/) is a JVM-based object-oriented programming language under Apache. It can be used for object-oriented programming and can also be used as a pure scripting language to call Groovy scripts in Java.
>

We can write the following Groovy expression to filter messages, the pseudo code is as follows:

```java
// The filter expression passed by the pulsar consumer client to the pulsar broker
String groovyExpression = "it.properties.messageType.equals('audio') && it.properties.messageSource.equals('wechat')"

// The pulsar broker uses the expression passed by the client to filter the messages and returns the messages that meet the conditions to the client.
GroovyShell shell = new GroovyShell();
Script script = shell.parse("return " + groovyExpression);
List filtedMessages = messages.stream()
.filter(message -> {
binding.setVariable("it", message);
script.setBinding(binding);
return (Boolean) script.run();
})
.collect(Collectors.toList());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be terrible from security perspective. Something like Google CEL or Spring Expression Language (SpEL) would be a better choice.

@dao-jun
Copy link
Member

dao-jun commented May 15, 2025

I’m +1 with the motivation.

even though we have EntryFilter already, but it's not ready to use right out of the box.

In the details, maybe we can impl EntryFilter, and manage the rules via pulsar admin?

SpEL is a better choice than groovy scripts, it can easily lead to metadata area OOM because of caching classloader, I had encountered this problem in our business systems.

Besides, I hope you can consider that the ack hole problems, and it could comes incorrect stats/metrics(such as backlog).

@AuroraTwinkle
Copy link
Contributor Author

And, I've noticed two similar PRs #8544 and #7629, but neither was approved, so I want to try to get this work implemented.

@AuroraTwinkle For former context why such feature has been rejected in the past, this email thread is a good read: https://lists.apache.org/thread/k0hb33nz42f3603hvs03p7lmltjgj40z .

There simply hasn't been consensus that we'd like to move Pulsar together as a community to this direction. As a compromise, it was made possible to do broker side filtering using custom plugins with "PIP 105: Support pluggable entry filter in Dispatcher". and related subscription properties #15503. There's an implementation in Pulsar JMS using this approach: https://github.com/datastax/pulsar-jms/blob/master/pulsar-jms-filters/src/main/java/com/datastax/oss/pulsar/jms/selectors/JMSFilter.java

There are some performance challenges currently when using broker side filtering. However, many of the problems have been addressed over time and if we'd introduce the feature as a top level feature, there would be more motivation to address more of the challenges such as ones of the gaps in broker cache and handling of "ack holes" in Pulsar efficiently.

The "ack hole" challenge is similar for delayed message delivery at large scale. Ack hole handling on broker side has been optimized with changes such as #9292, (must be enabled with managedLedgerPersistIndividualAckAsLongArray=true since default is false after #23759), increasing managedLedgerMaxUnackedRangesToPersist, setting managedCursorInfoCompressionType=LZ4, etc.

For broker cache, there's lot more to do. #23466 . One of the workarounds is cacheEvictionByMarkDeletedPosition=true for message filtering. I've have a WIP branch to address many of the problems, but haven't had a chance to push that forward. It's just an example that once there's a requirement to address performance issues, those would get addressed over time. That's why I'd like to see message filtering being added as a top level feature in Pulsar in the long term.

One of the possible compromises could be to make it an optional feature. In the Pulsar Java client API, we'd need to have a way to handle optional features in the builder interface itself so that the main builder interface for consumers doesn't get distracted by options that are only supported in certain configurations. One way to do this is with an "extensions" pattern. Where the builder would have a method <T extends ConsumerExtensionBuilder> extension(Class<T> consumerExtensionBuilderClass) which continues configuring the extension in a "nested" builder.

Thank you Lari for your rigorous and detailed suggestions. Please give me some time to understand them carefully.

@AuroraTwinkle
Copy link
Contributor Author

AuroraTwinkle commented May 16, 2025

I’m +1 with the motivation.

even though we have EntryFilter already, but it's not ready to use right out of the box.

In the details, maybe we can impl EntryFilter, and manage the rules via pulsar admin?

SpEL is a better choice than groovy scripts, it can easily lead to metadata area OOM because of caching classloader, I had encountered this problem in our business systems.

Besides, I hope you can consider that the ack hole problems, and it could comes incorrect stats/metrics(such as backlog).

Thanks.I will consider SpEL and ack hole problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-required Your PR changes impact docs and you will update later. PIP
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants