Skip to content

Commit b264a92

Browse files
committed
extends broker interceptor
1 parent 200f433 commit b264a92

File tree

10 files changed

+263
-100
lines changed

10 files changed

+263
-100
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java

+54
Original file line numberDiff line numberDiff line change
@@ -52,18 +52,36 @@ public interface BrokerInterceptor extends AutoCloseable {
5252

5353
/**
5454
* Intercept messages before sending them to the consumers.
55+
* Deprecated, use {@link #beforeSendMessage(Subscription, Entry, long[], MessageMetadata, Consumer)} instead.
5556
*
5657
* @param subscription pulsar subscription
5758
* @param entry entry
5859
* @param ackSet entry ack bitset. it is either <tt>null</tt> or an array of long-based bitsets.
5960
* @param msgMetadata message metadata. The message metadata will be recycled after this call.
6061
*/
62+
@Deprecated
6163
default void beforeSendMessage(Subscription subscription,
6264
Entry entry,
6365
long[] ackSet,
6466
MessageMetadata msgMetadata) {
6567
}
6668

69+
/**
70+
* Intercept messages before sending them to the consumers.
71+
*
72+
* @param subscription pulsar subscription
73+
* @param entry entry
74+
* @param ackSet entry ack bitset. it is either <tt>null</tt> or an array of long-based bitsets.
75+
* @param msgMetadata message metadata. The message metadata will be recycled after this call.
76+
* @param consumer consumer. Consumer which entry are sent to.
77+
*/
78+
default void beforeSendMessage(Subscription subscription,
79+
Entry entry,
80+
long[] ackSet,
81+
MessageMetadata msgMetadata,
82+
Consumer consumer) {
83+
}
84+
6785
/**
6886
* Called by the broker when a new connection is created.
6987
*/
@@ -77,6 +95,18 @@ default void producerCreated(ServerCnx cnx, Producer producer,
7795
Map<String, String> metadata){
7896
}
7997

98+
/**
99+
* Called by the broker when a producer is closed.
100+
*
101+
* @param cnx client Connection
102+
* @param producer Producer object
103+
* @param metadata A map of metadata
104+
*/
105+
default void producerClosed(ServerCnx cnx,
106+
Producer producer,
107+
Map<String, String> metadata) {
108+
}
109+
80110
/**
81111
* Intercept after a consumer is created.
82112
*
@@ -89,6 +119,30 @@ default void consumerCreated(ServerCnx cnx,
89119
Map<String, String> metadata) {
90120
}
91121

122+
/**
123+
* Called by the broker when a consumer is closed.
124+
*
125+
* @param cnx client Connection
126+
* @param consumer Consumer object
127+
* @param metadata A map of metadata
128+
*/
129+
default void consumerClosed(ServerCnx cnx,
130+
Consumer consumer,
131+
Map<String, String> metadata) {
132+
}
133+
134+
/**
135+
* Intercept message when broker receive a send request.
136+
*
137+
* @param headersAndPayload entry's header and payload
138+
* @param publishContext Publish Context
139+
*/
140+
default void onMessagePublish(Producer producer,
141+
ByteBuf headersAndPayload,
142+
Topic.PublishContext publishContext) {
143+
144+
}
145+
92146
/**
93147
* Intercept after a message is produced.
94148
*

pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java

+39
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,26 @@ public void beforeSendMessage(Subscription subscription,
6363
}
6464
}
6565

66+
@Override
67+
public void beforeSendMessage(Subscription subscription,
68+
Entry entry,
69+
long[] ackSet,
70+
MessageMetadata msgMetadata,
71+
Consumer consumer) {
72+
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
73+
this.interceptor.beforeSendMessage(
74+
subscription, entry, ackSet, msgMetadata, consumer);
75+
}
76+
}
77+
78+
@Override
79+
public void onMessagePublish(Producer producer, ByteBuf headersAndPayload,
80+
Topic.PublishContext publishContext) {
81+
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
82+
this.interceptor.onMessagePublish(producer, headersAndPayload, publishContext);
83+
}
84+
}
85+
6686
@Override
6787
public void producerCreated(ServerCnx cnx, Producer producer,
6888
Map<String, String> metadata){
@@ -71,6 +91,15 @@ public void producerCreated(ServerCnx cnx, Producer producer,
7191
}
7292
}
7393

94+
@Override
95+
public void producerClosed(ServerCnx cnx,
96+
Producer producer,
97+
Map<String, String> metadata) {
98+
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
99+
this.interceptor.producerClosed(cnx, producer, metadata);
100+
}
101+
}
102+
74103
@Override
75104
public void consumerCreated(ServerCnx cnx,
76105
Consumer consumer,
@@ -81,6 +110,16 @@ public void consumerCreated(ServerCnx cnx,
81110
}
82111
}
83112

113+
@Override
114+
public void consumerClosed(ServerCnx cnx,
115+
Consumer consumer,
116+
Map<String, String> metadata) {
117+
try (ClassLoaderSwitcher ignored = new ClassLoaderSwitcher(classLoader)) {
118+
this.interceptor.consumerClosed(cnx, consumer, metadata);
119+
}
120+
}
121+
122+
84123
@Override
85124
public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId,
86125
long entryId, Topic.PublishContext publishContext) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java

+87-47
Original file line numberDiff line numberDiff line change
@@ -93,107 +93,143 @@ public static BrokerInterceptor load(ServiceConfiguration conf) throws IOExcepti
9393
}
9494
}
9595

96+
@Override
97+
public void onMessagePublish(Producer producer,
98+
ByteBuf headersAndPayload,
99+
Topic.PublishContext publishContext) {
100+
if (interceptorsEnabled()) {
101+
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
102+
value.onMessagePublish(producer, headersAndPayload, publishContext);
103+
}
104+
}
105+
}
106+
96107
@Override
97108
public void beforeSendMessage(Subscription subscription,
98109
Entry entry,
99110
long[] ackSet,
100111
MessageMetadata msgMetadata) {
101-
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
102-
value.beforeSendMessage(
103-
subscription,
104-
entry,
105-
ackSet,
106-
msgMetadata);
112+
if (interceptorsEnabled()) {
113+
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
114+
value.beforeSendMessage(subscription, entry, ackSet, msgMetadata);
115+
}
116+
}
117+
}
118+
119+
@Override
120+
public void beforeSendMessage(Subscription subscription,
121+
Entry entry,
122+
long[] ackSet,
123+
MessageMetadata msgMetadata,
124+
Consumer consumer) {
125+
if (interceptorsEnabled()) {
126+
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
127+
value.beforeSendMessage(subscription, entry, ackSet, msgMetadata, consumer);
128+
}
107129
}
108130
}
109131

110132
@Override
111133
public void consumerCreated(ServerCnx cnx,
112134
Consumer consumer,
113135
Map<String, String> metadata) {
114-
if (interceptors == null || interceptors.isEmpty()) {
115-
return;
136+
if (interceptorsEnabled()) {
137+
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
138+
value.consumerCreated(
139+
cnx,
140+
consumer,
141+
metadata);
142+
}
116143
}
117-
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
118-
value.consumerCreated(
119-
cnx,
120-
consumer,
121-
metadata);
144+
}
145+
146+
@Override
147+
public void consumerClosed(ServerCnx cnx,
148+
Consumer consumer,
149+
Map<String, String> metadata) {
150+
if (interceptorsEnabled()) {
151+
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
152+
value.consumerClosed(cnx, consumer, metadata);
153+
}
122154
}
123155
}
124156

125157
@Override
126158
public void producerCreated(ServerCnx cnx, Producer producer,
127159
Map<String, String> metadata){
128-
if (interceptors == null || interceptors.isEmpty()) {
129-
return;
160+
if (interceptorsEnabled()) {
161+
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
162+
value.producerCreated(cnx, producer, metadata);
163+
}
130164
}
131-
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
132-
value.producerCreated(cnx, producer, metadata);
165+
}
166+
167+
@Override
168+
public void producerClosed(ServerCnx cnx,
169+
Producer producer,
170+
Map<String, String> metadata) {
171+
if (interceptorsEnabled()) {
172+
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
173+
value.producerClosed(cnx, producer, metadata);
174+
}
133175
}
134176
}
135177

136178
@Override
137179
public void messageProduced(ServerCnx cnx, Producer producer, long startTimeNs, long ledgerId,
138180
long entryId, Topic.PublishContext publishContext) {
139-
if (interceptors == null || interceptors.isEmpty()) {
140-
return;
141-
}
142-
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
143-
value.messageProduced(cnx, producer, startTimeNs, ledgerId, entryId, publishContext);
181+
if (interceptorsEnabled()) {
182+
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
183+
value.messageProduced(cnx, producer, startTimeNs, ledgerId, entryId, publishContext);
184+
}
144185
}
145186
}
146187

147188
@Override
148189
public void messageDispatched(ServerCnx cnx, Consumer consumer, long ledgerId,
149190
long entryId, ByteBuf headersAndPayload) {
150-
if (interceptors == null || interceptors.isEmpty()) {
151-
return;
152-
}
153-
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
154-
value.messageDispatched(cnx, consumer, ledgerId, entryId, headersAndPayload);
191+
if (interceptorsEnabled()) {
192+
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
193+
value.messageDispatched(cnx, consumer, ledgerId, entryId, headersAndPayload);
194+
}
155195
}
156196
}
157197

158198
@Override
159199
public void messageAcked(ServerCnx cnx, Consumer consumer,
160200
CommandAck ackCmd) {
161-
if (interceptors == null || interceptors.isEmpty()) {
162-
return;
163-
}
164-
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
165-
value.messageAcked(cnx, consumer, ackCmd);
201+
if (interceptorsEnabled()) {
202+
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
203+
value.messageAcked(cnx, consumer, ackCmd);
204+
}
166205
}
167206
}
168207

169208
@Override
170209
public void txnOpened(long tcId, String txnID) {
171-
if (interceptors == null || interceptors.isEmpty()) {
172-
return;
173-
}
174-
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
175-
value.txnOpened(tcId, txnID);
210+
if (interceptorsEnabled()) {
211+
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
212+
value.txnOpened(tcId, txnID);
213+
}
176214
}
177215
}
178216

179217
@Override
180218
public void txnEnded(String txnID, long txnAction) {
181-
if (interceptors == null || interceptors.isEmpty()) {
182-
return;
183-
}
184-
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
185-
value.txnEnded(txnID, txnAction);
219+
if (interceptorsEnabled()) {
220+
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
221+
value.txnEnded(txnID, txnAction);
222+
}
186223
}
187224
}
188225

189226

190227
@Override
191228
public void onConnectionCreated(ServerCnx cnx) {
192-
if (interceptors == null || interceptors.isEmpty()) {
193-
return;
194-
}
195-
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
196-
value.onConnectionCreated(cnx);
229+
if (interceptorsEnabled()) {
230+
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
231+
value.onConnectionCreated(cnx);
232+
}
197233
}
198234
}
199235

@@ -237,4 +273,8 @@ public void initialize(PulsarService pulsarService) throws Exception {
237273
public void close() {
238274
interceptors.values().forEach(BrokerInterceptorWithClassLoader::close);
239275
}
276+
277+
private boolean interceptorsEnabled() {
278+
return interceptors != null && !interceptors.isEmpty();
279+
}
240280
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java

+2
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,9 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray
195195

196196
BrokerInterceptor interceptor = subscription.interceptor();
197197
if (null != interceptor) {
198+
// keep for compatibility if users has implemented the old interface
198199
interceptor.beforeSendMessage(subscription, entry, ackSet, msgMetadata);
200+
interceptor.beforeSendMessage(subscription, entry, ackSet, msgMetadata, consumer);
199201
}
200202
}
201203
if (CollectionUtils.isNotEmpty(entriesToFiltered)) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -811,10 +811,8 @@ public CompletableFuture<Void> closeAsync() {
811811
}
812812
});
813813

814-
if (interceptor != null) {
815-
interceptor.close();
816-
interceptor = null;
817-
}
814+
interceptor.close();
815+
interceptor = null;
818816

819817
try {
820818
authenticationService.close();

0 commit comments

Comments
 (0)