Skip to content

Commit f98cfd5

Browse files
committed
Merge main
2 parents 0efe86b + 63c4245 commit f98cfd5

File tree

80 files changed

+2966
-222
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

80 files changed

+2966
-222
lines changed

.github/workflows/ci-pr-validation.yaml

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,34 @@ concurrency:
2929

3030
jobs:
3131

32+
wireshark-dissector-build:
33+
name: Build the Wireshark dissector
34+
runs-on: ${{ matrix.os }}
35+
timeout-minutes: 60
36+
strategy:
37+
matrix:
38+
os: [ubuntu-20.04, macos-12]
39+
40+
steps:
41+
- name: checkout
42+
uses: actions/checkout@v3
43+
44+
- name: Install deps (Ubuntu)
45+
if: ${{ startsWith(matrix.os, 'ubuntu') }}
46+
run: |
47+
sudo apt-get update -y
48+
sudo apt-get install -y protobuf-compiler libprotobuf-dev wireshark-dev
49+
50+
- name: Install deps (macOS)
51+
if: ${{ startsWith(matrix.os, 'macos') }}
52+
run:
53+
brew install wireshark protobuf
54+
55+
- name: Build wireshark plugin
56+
run: |
57+
cmake -S wireshark -B build-wireshark
58+
cmake --build build-wireshark
59+
3260
unit-tests:
3361
name: Run unit tests
3462
runs-on: ubuntu-22.04
@@ -264,7 +292,7 @@ jobs:
264292
check-completion:
265293
name: Check Completion
266294
runs-on: ubuntu-latest
267-
needs: [unit-tests, cpp-build-windows, package, cpp-build-macos]
295+
needs: [wireshark-dissector-build, unit-tests, cpp-build-windows, package, cpp-build-macos]
268296

269297
steps:
270298
- run: true

CMakeLists.txt

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,6 @@ MESSAGE(STATUS "BUILD_STATIC_LIB: " ${BUILD_STATIC_LIB})
7171
option(BUILD_TESTS "Build tests" ON)
7272
MESSAGE(STATUS "BUILD_TESTS: " ${BUILD_TESTS})
7373

74-
option(BUILD_WIRESHARK "Build Pulsar Wireshark dissector" OFF)
75-
MESSAGE(STATUS "BUILD_WIRESHARK: " ${BUILD_WIRESHARK})
76-
7774
option(BUILD_PERF_TOOLS "Build Pulsar CLI perf producer/consumer" OFF)
7875
MESSAGE(STATUS "BUILD_PERF_TOOLS: " ${BUILD_PERF_TOOLS})
7976

@@ -416,10 +413,6 @@ if (BUILD_TESTS)
416413
add_subdirectory(tests)
417414
endif()
418415

419-
if (BUILD_WIRESHARK)
420-
add_subdirectory(wireshark)
421-
endif()
422-
423416
find_package(ClangTools)
424417
set(BUILD_SUPPORT_DIR "${PROJECT_SOURCE_DIR}/build-support")
425418
add_custom_target(format ${BUILD_SUPPORT_DIR}/run_clang_format.py

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,10 @@ cd tests
225225
./pulsar-test-service-stop.sh
226226
```
227227

228+
## Wireshark Dissector
229+
230+
See the [wireshark](wireshark/) directory for details.
231+
228232
## Requirements for Contributors
229233

230234
It's required to install [LLVM](https://llvm.org/builds/) for `clang-tidy` and `clang-format`. Pulsar C++ client use `clang-format` **11** to format files. `make format` automatically formats the files.
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#!/usr/bin/env bash
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
#
20+
21+
set -e -x
22+
23+
if [ $# -ne 2 ]; then
24+
echo "Usage: $0 \$CONTAINER_ID \$START_TEST_SERVICE_INSIDE_CONTAINER"
25+
exit 1
26+
fi
27+
28+
CONTAINER_ID=$1
29+
START_TEST_SERVICE_INSIDE_CONTAINER=$2
30+
31+
echo $CONTAINER_ID >> .tests-container-id.txt
32+
33+
docker cp test-conf $CONTAINER_ID:/pulsar/test-conf
34+
docker cp build-support/$START_TEST_SERVICE_INSIDE_CONTAINER $CONTAINER_ID:$START_TEST_SERVICE_INSIDE_CONTAINER
35+
36+
docker exec -i $CONTAINER_ID /$START_TEST_SERVICE_INSIDE_CONTAINER
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
#!/usr/bin/env bash
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
#
20+
21+
set -e -x
22+
23+
export PULSAR_EXTRA_OPTS=-Dpulsar.auth.basic.conf=test-conf/.htpasswd
24+
25+
# Generate secret key and token
26+
mkdir -p data/tokens
27+
bin/pulsar tokens create-secret-key --output data/tokens/secret.key
28+
29+
bin/pulsar tokens create \
30+
--subject token-principal \
31+
--secret-key file:///pulsar/data/tokens/secret.key \
32+
> /pulsar/data/tokens/token.txt
33+
34+
export PULSAR_STANDALONE_CONF=test-conf/standalone-ssl-mim.conf
35+
export PULSAR_PID_DIR=/tmp
36+
bin/pulsar-daemon start standalone \
37+
--no-functions-worker --no-stream-storage \
38+
--bookkeeper-dir data/bookkeeper
39+
40+
echo "-- Wait for Pulsar service to be ready"
41+
until curl http://localhost:8081/metrics > /dev/null 2>&1 ; do sleep 1; done
42+
43+
echo "-- Pulsar service is ready -- Configure permissions"
44+
45+
export PULSAR_CLIENT_CONF=test-conf/client-ssl-mim.conf
46+
47+
# Create "standalone" cluster if it does not exist
48+
bin/pulsar-admin clusters list | grep -q '^standalone$' ||
49+
bin/pulsar-admin clusters create \
50+
standalone \
51+
--url http://localhost:8081/ \
52+
--url-secure https://localhost:8444/ \
53+
--broker-url pulsar://localhost:6652/ \
54+
--broker-url-secure pulsar+ssl://localhost:6653/
55+
56+
# Create "private" tenant
57+
bin/pulsar-admin tenants create private -r "" -c "standalone"
58+
59+
# Create "private/auth" with required authentication
60+
bin/pulsar-admin namespaces create private/auth --clusters standalone
61+
62+
bin/pulsar-admin namespaces grant-permission private/auth \
63+
--actions produce,consume \
64+
--role "token-principal"
65+
66+
echo "-- Ready to start tests"

include/pulsar/Consumer.h

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
#include <pulsar/BrokerConsumerStats.h>
2323
#include <pulsar/ConsumerConfiguration.h>
24+
#include <pulsar/TypedMessage.h>
2425
#include <pulsar/defines.h>
2526

2627
#include <iostream>
@@ -91,6 +92,14 @@ class PULSAR_PUBLIC Consumer {
9192
*/
9293
Result receive(Message& msg);
9394

95+
template <typename T>
96+
Result receive(TypedMessage<T>& msg, typename TypedMessage<T>::Decoder decoder) {
97+
Message rawMsg;
98+
auto result = receive(rawMsg);
99+
msg = TypedMessage<T>{rawMsg, decoder};
100+
return result;
101+
}
102+
94103
/**
95104
*
96105
* @param msg a non-const reference where the received message will be copied
@@ -101,6 +110,14 @@ class PULSAR_PUBLIC Consumer {
101110
*/
102111
Result receive(Message& msg, int timeoutMs);
103112

113+
template <typename T>
114+
Result receive(TypedMessage<T>& msg, int timeoutMs, typename TypedMessage<T>::Decoder decoder) {
115+
Message rawMsg;
116+
auto result = receive(rawMsg, timeoutMs);
117+
msg = TypedMessage<T>{rawMsg, decoder};
118+
return result;
119+
}
120+
104121
/**
105122
* Receive a single message
106123
* <p>
@@ -114,6 +131,14 @@ class PULSAR_PUBLIC Consumer {
114131
*/
115132
void receiveAsync(ReceiveCallback callback);
116133

134+
template <typename T>
135+
void receiveAsync(std::function<void(Result result, const TypedMessage<T>&)> callback,
136+
typename TypedMessage<T>::Decoder decoder) {
137+
receiveAsync([callback, decoder](Result result, const Message& msg) {
138+
callback(result, TypedMessage<T>{msg, decoder});
139+
});
140+
}
141+
117142
/**
118143
* Batch receiving messages.
119144
*

include/pulsar/ConsumerConfiguration.h

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,20 @@
2828
#include <pulsar/Message.h>
2929
#include <pulsar/Result.h>
3030
#include <pulsar/Schema.h>
31+
#include <pulsar/TypedMessage.h>
3132
#include <pulsar/defines.h>
3233

3334
#include <functional>
3435
#include <memory>
3536

3637
#include "BatchReceivePolicy.h"
38+
#include "DeadLetterPolicy.h"
3739

3840
namespace pulsar {
3941

4042
class Consumer;
4143
class PulsarWrapper;
44+
class PulsarFriend;
4245

4346
/// Callback definition for non-data operation
4447
typedef std::vector<Message> Messages;
@@ -48,7 +51,7 @@ typedef std::function<void(Result, const Messages& msgs)> BatchReceiveCallback;
4851
typedef std::function<void(Result result, MessageId messageId)> GetLastMessageIdCallback;
4952

5053
/// Callback definition for MessageListener
51-
typedef std::function<void(Consumer consumer, const Message& msg)> MessageListener;
54+
typedef std::function<void(Consumer& consumer, const Message& msg)> MessageListener;
5255

5356
typedef std::shared_ptr<ConsumerEventListener> ConsumerEventListenerPtr;
5457

@@ -126,6 +129,15 @@ class PULSAR_PUBLIC ConsumerConfiguration {
126129
*/
127130
ConsumerConfiguration& setMessageListener(MessageListener messageListener);
128131

132+
template <typename T>
133+
ConsumerConfiguration& setTypedMessageListener(
134+
std::function<void(Consumer&, const TypedMessage<T>&)> listener,
135+
typename TypedMessage<T>::Decoder decoder) {
136+
return setMessageListener([listener, decoder](Consumer& consumer, const Message& msg) {
137+
listener(consumer, TypedMessage<T>{msg, decoder});
138+
});
139+
}
140+
129141
/**
130142
* @return the message listener
131143
*/
@@ -398,6 +410,42 @@ class PULSAR_PUBLIC ConsumerConfiguration {
398410
*/
399411
const BatchReceivePolicy& getBatchReceivePolicy() const;
400412

413+
/**
414+
* Set dead letter policy for consumer
415+
*
416+
* By default, some messages are redelivered many times, even to the extent that they can never be
417+
* stopped. By using the dead letter mechanism, messages have the max redelivery count, when they
418+
* exceeding the maximum number of redeliveries. Messages are sent to dead letter topics and acknowledged
419+
* automatically.
420+
*
421+
* You can enable the dead letter mechanism by setting the dead letter policy.
422+
* Example:
423+
*
424+
* <pre>
425+
* * DeadLetterPolicy dlqPolicy = DeadLetterPolicyBuilder()
426+
* .maxRedeliverCount(10)
427+
* .build();
428+
* </pre>
429+
* Default dead letter topic name is {TopicName}-{Subscription}-DLQ.
430+
* To set a custom dead letter topic name
431+
* <pre>
432+
* DeadLetterPolicy dlqPolicy = DeadLetterPolicyBuilder()
433+
* .deadLetterTopic("dlq-topic")
434+
* .maxRedeliverCount(10)
435+
* .initialSubscriptionName("init-sub-name")
436+
* .build();
437+
* </pre>
438+
* @param deadLetterPolicy Default value is empty
439+
*/
440+
void setDeadLetterPolicy(const DeadLetterPolicy& deadLetterPolicy);
441+
442+
/**
443+
* Get dead letter policy.
444+
*
445+
* @return dead letter policy
446+
*/
447+
const DeadLetterPolicy& getDeadLetterPolicy() const;
448+
401449
/**
402450
* Set whether the subscription status should be replicated.
403451
* The default value is `false`.
@@ -553,7 +601,25 @@ class PULSAR_PUBLIC ConsumerConfiguration {
553601
*/
554602
bool isStartMessageIdInclusive() const;
555603

604+
/**
605+
* Enable the batch index acknowledgment.
606+
*
607+
* It should be noted that this option can only work when the broker side also enables the batch index
608+
* acknowledgment. See the `acknowledgmentAtBatchIndexLevelEnabled` config in `broker.conf`.
609+
*
610+
* Default: false
611+
*
612+
* @param enabled whether to enable the batch index acknowledgment
613+
*/
614+
ConsumerConfiguration& setBatchIndexAckEnabled(bool enabled);
615+
616+
/**
617+
* The associated getter of setBatchingEnabled
618+
*/
619+
bool isBatchIndexAckEnabled() const;
620+
556621
friend class PulsarWrapper;
622+
friend class PulsarFriend;
557623

558624
private:
559625
std::shared_ptr<ConsumerConfigurationImpl> impl_;

0 commit comments

Comments
 (0)