Skip to content

Commit 712b484

Browse files
committed
[improve] Support KeyValue Schema.
1 parent ecc1995 commit 712b484

23 files changed

+882
-27
lines changed

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ apache-pulsar-client-cpp-*.tar.gz
4545
/examples/SampleConsumerListener
4646
/examples/SampleConsumerListenerCApi
4747
/examples/SampleReaderCApi
48+
/examples/SampleKeyValueSchemaConsumer
49+
/examples/SampleKeyValueSchemaProducer
4850
/examples/SampleFileLogger
4951
/tests/main
5052
/tests/pulsar-tests
@@ -98,4 +100,4 @@ vcpkg_installed/
98100
*.rej
99101
.tests-container-id.txt
100102
Testing
101-
.test-token.txt
103+
.test-token.txt

examples/CMakeLists.txt

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -60,25 +60,36 @@ set(SAMPLE_CONSUMER_LISTENER_C_SOURCES
6060
set(SAMPLE_READER_C_SOURCES
6161
SampleReaderCApi.c
6262
)
63+
set(SAMPLE_KEY_VALUE_SCHEMA_CONSUMER
64+
SampleKeyValueSchemaConsumer.cc
65+
)
66+
67+
set(SAMPLE_KEY_VALUE_SCHEMA_PRODUCER
68+
SampleKeyValueSchemaProducer.cc
69+
)
6370

64-
add_executable(SampleAsyncProducer ${SAMPLE_ASYNC_PRODUCER_SOURCES})
65-
add_executable(SampleConsumer ${SAMPLE_CONSUMER_SOURCES})
66-
add_executable(SampleConsumerListener ${SAMPLE_CONSUMER_LISTENER_SOURCES})
67-
add_executable(SampleProducer ${SAMPLE_PRODUCER_SOURCES})
68-
add_executable(SampleFileLogger ${SAMPLE_FILE_LOGGER_SOURCES})
69-
add_executable(SampleProducerCApi ${SAMPLE_PRODUCER_C_SOURCES})
70-
add_executable(SampleConsumerCApi ${SAMPLE_CONSUMER_C_SOURCES})
71-
add_executable(SampleAsyncConsumerCApi ${SAMPLE_CONSUMER_LISTENER_C_SOURCES})
72-
add_executable(SampleConsumerListenerCApi ${SAMPLE_CONSUMER_LISTENER_C_SOURCES})
73-
add_executable(SampleReaderCApi ${SAMPLE_READER_C_SOURCES})
71+
add_executable(SampleAsyncProducer ${SAMPLE_ASYNC_PRODUCER_SOURCES})
72+
add_executable(SampleConsumer ${SAMPLE_CONSUMER_SOURCES})
73+
add_executable(SampleConsumerListener ${SAMPLE_CONSUMER_LISTENER_SOURCES})
74+
add_executable(SampleProducer ${SAMPLE_PRODUCER_SOURCES})
75+
add_executable(SampleFileLogger ${SAMPLE_FILE_LOGGER_SOURCES})
76+
add_executable(SampleProducerCApi ${SAMPLE_PRODUCER_C_SOURCES})
77+
add_executable(SampleConsumerCApi ${SAMPLE_CONSUMER_C_SOURCES})
78+
add_executable(SampleAsyncConsumerCApi ${SAMPLE_CONSUMER_LISTENER_C_SOURCES})
79+
add_executable(SampleConsumerListenerCApi ${SAMPLE_CONSUMER_LISTENER_C_SOURCES})
80+
add_executable(SampleReaderCApi ${SAMPLE_READER_C_SOURCES})
81+
add_executable(SampleKeyValueSchemaConsumer ${SAMPLE_KEY_VALUE_SCHEMA_CONSUMER})
82+
add_executable(SampleKeyValueSchemaProducer ${SAMPLE_KEY_VALUE_SCHEMA_PRODUCER})
7483

75-
target_link_libraries(SampleAsyncProducer ${CLIENT_LIBS} pulsarShared)
76-
target_link_libraries(SampleConsumer ${CLIENT_LIBS} pulsarShared)
77-
target_link_libraries(SampleConsumerListener ${CLIENT_LIBS} pulsarShared)
78-
target_link_libraries(SampleProducer ${CLIENT_LIBS} pulsarShared)
79-
target_link_libraries(SampleFileLogger ${CLIENT_LIBS} pulsarShared)
80-
target_link_libraries(SampleProducerCApi ${CLIENT_LIBS} pulsarShared)
81-
target_link_libraries(SampleConsumerCApi ${CLIENT_LIBS} pulsarShared)
82-
target_link_libraries(SampleAsyncConsumerCApi ${CLIENT_LIBS} pulsarShared)
83-
target_link_libraries(SampleConsumerListenerCApi ${CLIENT_LIBS} pulsarShared)
84-
target_link_libraries(SampleReaderCApi ${CLIENT_LIBS} pulsarShared)
84+
target_link_libraries(SampleAsyncProducer ${CLIENT_LIBS} pulsarShared)
85+
target_link_libraries(SampleConsumer ${CLIENT_LIBS} pulsarShared)
86+
target_link_libraries(SampleConsumerListener ${CLIENT_LIBS} pulsarShared)
87+
target_link_libraries(SampleProducer ${CLIENT_LIBS} pulsarShared)
88+
target_link_libraries(SampleFileLogger ${CLIENT_LIBS} pulsarShared)
89+
target_link_libraries(SampleProducerCApi ${CLIENT_LIBS} pulsarShared)
90+
target_link_libraries(SampleConsumerCApi ${CLIENT_LIBS} pulsarShared)
91+
target_link_libraries(SampleAsyncConsumerCApi ${CLIENT_LIBS} pulsarShared)
92+
target_link_libraries(SampleConsumerListenerCApi ${CLIENT_LIBS} pulsarShared)
93+
target_link_libraries(SampleReaderCApi ${CLIENT_LIBS} pulsarShared)
94+
target_link_libraries(SampleKeyValueSchemaConsumer ${CLIENT_LIBS} pulsarShared)
95+
target_link_libraries(SampleKeyValueSchemaProducer ${CLIENT_LIBS} pulsarShared)
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#include <iostream>
20+
#include <pulsar/Client.h>
21+
#include <lib/LogUtils.h>
22+
23+
DECLARE_LOG_OBJECT()
24+
25+
using namespace pulsar;
26+
27+
int main() {
28+
Client client("pulsar://localhost:6650");
29+
30+
std::string jsonSchema =
31+
R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
32+
33+
SchemaInfo keySchema(JSON, "key-json", jsonSchema);
34+
SchemaInfo valueSchema(JSON, "value-json", jsonSchema);
35+
SchemaInfo keyValueSchema(keySchema, valueSchema, KeyValueEncodingType::INLINE);
36+
ConsumerConfiguration consumerConfiguration;
37+
consumerConfiguration.setSchema(keyValueSchema);
38+
39+
Consumer consumer;
40+
Result result = client.subscribe("persistent://public/default/kv-schema", "consumer-1",
41+
consumerConfiguration, consumer);
42+
if (result != ResultOk) {
43+
LOG_ERROR("Failed to subscribe: " << result);
44+
return -1;
45+
}
46+
47+
LOG_INFO("Start receive message.")
48+
49+
Message msg;
50+
while (true) {
51+
consumer.receive(msg);
52+
LOG_INFO("Received: " << msg << " with payload '" << msg.getDataAsString() << "'");
53+
LOG_INFO("Received: " << msg << " with partitionKey '" << msg.getPartitionKey() << "'");
54+
KeyValue keyValue = msg.getKeyValueData();
55+
LOG_INFO("Received: " << msg << " with key '" << keyValue.getKey() << "'");
56+
LOG_INFO("Received: " << msg << " with value '" << keyValue.getValueAsString() << "'");
57+
consumer.acknowledge(msg);
58+
}
59+
60+
client.close();
61+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#include <iostream>
20+
#include <thread>
21+
#include <pulsar/Client.h>
22+
#include <lib/LogUtils.h>
23+
24+
DECLARE_LOG_OBJECT()
25+
26+
using namespace pulsar;
27+
28+
int main() {
29+
Client client("pulsar://localhost:6650");
30+
31+
std::string jsonSchema =
32+
R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})";
33+
34+
SchemaInfo keySchema(JSON, "key-json", jsonSchema);
35+
SchemaInfo valueSchema(JSON, "value-json", jsonSchema);
36+
SchemaInfo keyValueSchema(keySchema, valueSchema, KeyValueEncodingType::INLINE);
37+
LOG_INFO("KeyValue schema content: " << keyValueSchema.getSchema());
38+
39+
ProducerConfiguration producerConfiguration;
40+
producerConfiguration.setSchema(keyValueSchema);
41+
42+
Producer producer;
43+
Result result =
44+
client.createProducer("persistent://public/default/kv-schema", producerConfiguration, producer);
45+
if (result != ResultOk) {
46+
LOG_ERROR("Error creating producer: " << result);
47+
return -1;
48+
}
49+
50+
std::string jsonData = "{\"re\":2.1,\"im\":1.23}";
51+
52+
KeyValue keyValue(std::move(jsonData), std::move(jsonData));
53+
54+
Message msg = MessageBuilder().setContent(keyValue).setProperty("x", "1").build();
55+
result = producer.send(msg);
56+
if (result == ResultOk) {
57+
LOG_INFO("send message ok");
58+
}
59+
client.close();
60+
}

include/pulsar/KeyValue.h

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#ifndef KEY_VALUE_HPP_
20+
#define KEY_VALUE_HPP_
21+
22+
#include <string>
23+
#include <memory>
24+
#include "defines.h"
25+
#include "Schema.h"
26+
27+
namespace pulsar {
28+
29+
class KeyValueImpl;
30+
31+
/**
32+
* Use to when the user uses key value schema.
33+
*/
34+
class PULSAR_PUBLIC KeyValue {
35+
public:
36+
/**
37+
* Constructor key value, according to keyValueEncodingType, whether key and value be encoded together.
38+
*
39+
* @param key key data.
40+
* @param value value data.
41+
* @param keyValueEncodingType key value encoding type.
42+
*/
43+
KeyValue(std::string &&key, std::string &&value);
44+
45+
/**
46+
* Get the key of KeyValue.
47+
*
48+
* @return character stream for key
49+
*/
50+
std::string getKey() const;
51+
52+
/**
53+
* Get the value of the KeyValue.
54+
*
55+
*
56+
* @return the pointer to the KeyValue value
57+
*/
58+
const void *getValue() const;
59+
60+
/**
61+
* Get the value length of the keyValue.
62+
*
63+
* @return the length of the KeyValue value
64+
*/
65+
size_t getValueLength() const;
66+
67+
/**
68+
* Get string representation of the KeyValue value.
69+
*
70+
* @return the string representation of the KeyValue value
71+
*/
72+
std::string getValueAsString() const;
73+
74+
private:
75+
typedef std::shared_ptr<KeyValueImpl> KeyValueImplPtr;
76+
KeyValue(KeyValueImplPtr keyValueImplPtr);
77+
KeyValueImplPtr impl_;
78+
friend class Message;
79+
friend class MessageBuilder;
80+
};
81+
} // namespace pulsar
82+
83+
#endif /* KEY_VALUE_HPP_ */

include/pulsar/Message.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <string>
2727

2828
#include "MessageId.h"
29+
#include "KeyValue.h"
2930

3031
namespace pulsar {
3132
namespace proto {
@@ -92,6 +93,13 @@ class PULSAR_PUBLIC Message {
9293
*/
9394
std::string getDataAsString() const;
9495

96+
/**
97+
* Get key value message.
98+
*
99+
* @return key value message.
100+
*/
101+
KeyValue getKeyValueData() const;
102+
95103
/**
96104
* Get the unique message ID associated with this message.
97105
*

include/pulsar/MessageBuilder.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#ifndef MESSAGE_BUILDER_H
2020
#define MESSAGE_BUILDER_H
2121

22+
#include <pulsar/KeyValue.h>
2223
#include <pulsar/Message.h>
2324
#include <pulsar/defines.h>
2425

@@ -60,6 +61,13 @@ class PULSAR_PUBLIC MessageBuilder {
6061
*/
6162
MessageBuilder& setContent(std::string&& data);
6263

64+
/**
65+
* Set the key value content of the message
66+
*
67+
* @param data the content of the key value.
68+
*/
69+
MessageBuilder& setContent(const KeyValue& data);
70+
6371
/**
6472
* Set content of the message to a buffer already allocated by the caller. No copies of
6573
* this buffer will be made. The caller is responsible to ensure the memory buffer is

include/pulsar/Schema.h

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,27 @@
2727

2828
namespace pulsar {
2929

30+
/**
31+
* Encoding types of supported KeyValueSchema for Pulsar messages.
32+
*/
33+
enum class KeyValueEncodingType
34+
{
35+
/**
36+
* Key is stored as message key, while value is stored as message payload.
37+
*/
38+
SEPARATED,
39+
40+
/**
41+
* Key and value are stored as message payload.
42+
*/
43+
INLINE
44+
};
45+
46+
// Return string representation of result code
47+
PULSAR_PUBLIC const char *strEncodingType(pulsar::KeyValueEncodingType encodingType);
48+
49+
PULSAR_PUBLIC const KeyValueEncodingType enumEncodingType(std::string encodingTypeStr);
50+
3051
enum SchemaType
3152
{
3253
/**
@@ -143,6 +164,14 @@ class PULSAR_PUBLIC SchemaInfo {
143164
SchemaInfo(SchemaType schemaType, const std::string &name, const std::string &schema,
144165
const StringMap &properties = StringMap());
145166

167+
/**
168+
* @param keySchema the key schema.
169+
* @param valueSchema the value schema.
170+
* @param keyValueEncodingType Encoding types of supported KeyValueSchema for Pulsar messages.
171+
*/
172+
SchemaInfo(const SchemaInfo &keySchema, const SchemaInfo &valueSchema,
173+
const KeyValueEncodingType &keyValueEncodingType = KeyValueEncodingType::INLINE);
174+
146175
/**
147176
* @return the schema type
148177
*/
@@ -171,3 +200,5 @@ class PULSAR_PUBLIC SchemaInfo {
171200
} // namespace pulsar
172201

173202
PULSAR_PUBLIC std::ostream &operator<<(std::ostream &s, pulsar::SchemaType schemaType);
203+
204+
PULSAR_PUBLIC std::ostream &operator<<(std::ostream &s, pulsar::KeyValueEncodingType encodingType);

lib/Commands.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ static inline bool isBuiltInSchema(SchemaType schemaType) {
6969
case AVRO:
7070
case PROTOBUF:
7171
case PROTOBUF_NATIVE:
72+
case KEY_VALUE:
7273
return true;
7374

7475
default:
@@ -90,6 +91,8 @@ static inline proto::Schema_Type getSchemaType(SchemaType type) {
9091
return proto::Schema_Type_Avro;
9192
case PROTOBUF_NATIVE:
9293
return proto::Schema_Type_ProtobufNative;
94+
case KEY_VALUE:
95+
return proto::Schema_Type_KeyValue;
9396
default:
9497
return proto::Schema_Type_None;
9598
}

0 commit comments

Comments
 (0)