Skip to content

Commit 96507ce

Browse files
shibdBaodi Shi
andauthored
[feat] Support auto download schema when create producer. (#157)
### Motivation If a schema exists in a topic, but the producer does not know it. He still wanted to send data to the topic. (The scenario in which the DLQ sends a message: Please refer #139 **[Verifying this change][5]**) ### Modifications - When creating a producer with `autoDownloadSchema` param, try to get the schema of that topic and use it. - Support `getSchema` on `LookupService`(HTTP and Binary). ### Verifying this change - Add `LookupServiceTest` unit test to cover `getSchema` logic. - Add `SchemaTest.testAutoDownloadSchema` unit test to cover creating producer success when the topic has a schema. Co-authored-by: Baodi Shi <baoddi@apache.org>
1 parent 9ed6a45 commit 96507ce

18 files changed

+501
-43
lines changed

include/pulsar/Schema.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ enum SchemaType
134134
// Return string representation of result code
135135
PULSAR_PUBLIC const char *strSchemaType(SchemaType schemaType);
136136

137+
PULSAR_PUBLIC SchemaType enumSchemaType(std::string schemaTypeStr);
138+
137139
class SchemaInfoImpl;
138140

139141
typedef std::map<std::string, std::string> StringMap;
@@ -195,7 +197,6 @@ class PULSAR_PUBLIC SchemaInfo {
195197
private:
196198
typedef std::shared_ptr<SchemaInfoImpl> SchemaInfoImplPtr;
197199
SchemaInfoImplPtr impl_;
198-
static constexpr uint32_t INVALID_SIZE = 0xFFFFFFFF;
199200
};
200201

201202
} // namespace pulsar

lib/BinaryProtoLookupService.cc

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,43 @@ Future<Result, NamespaceTopicsPtr> BinaryProtoLookupService::getTopicsOfNamespac
155155
return promise->getFuture();
156156
}
157157

158+
Future<Result, boost::optional<SchemaInfo>> BinaryProtoLookupService::getSchema(
159+
const TopicNamePtr& topicName) {
160+
GetSchemaPromisePtr promise = std::make_shared<Promise<Result, boost::optional<SchemaInfo>>>();
161+
162+
if (!topicName) {
163+
promise->setFailed(ResultInvalidTopicName);
164+
return promise->getFuture();
165+
}
166+
cnxPool_.getConnectionAsync(serviceNameResolver_.resolveHost())
167+
.addListener(std::bind(&BinaryProtoLookupService::sendGetSchemaRequest, this, topicName->toString(),
168+
std::placeholders::_1, std::placeholders::_2, promise));
169+
170+
return promise->getFuture();
171+
}
172+
173+
void BinaryProtoLookupService::sendGetSchemaRequest(const std::string& topicName, Result result,
174+
const ClientConnectionWeakPtr& clientCnx,
175+
GetSchemaPromisePtr promise) {
176+
if (result != ResultOk) {
177+
promise->setFailed(result);
178+
return;
179+
}
180+
181+
ClientConnectionPtr conn = clientCnx.lock();
182+
uint64_t requestId = newRequestId();
183+
LOG_DEBUG("sendGetSchemaRequest. requestId: " << requestId << " topicName: " << topicName);
184+
185+
conn->newGetSchema(topicName, requestId)
186+
.addListener([promise](Result result, boost::optional<SchemaInfo> schemaInfo) {
187+
if (result != ResultOk) {
188+
promise->setFailed(result);
189+
return;
190+
}
191+
promise->setValue(schemaInfo);
192+
});
193+
}
194+
158195
void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string& nsName, Result result,
159196
const ClientConnectionWeakPtr& clientCnx,
160197
NamespaceTopicsPromisePtr promise) {

lib/BinaryProtoLookupService.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#define _PULSAR_BINARY_LOOKUP_SERVICE_HEADER_
2121

2222
#include <pulsar/Authentication.h>
23+
#include <pulsar/Schema.h>
2324

2425
#include <mutex>
2526

@@ -32,6 +33,7 @@ class ConnectionPool;
3233
class LookupDataResult;
3334
class ServiceNameResolver;
3435
using NamespaceTopicsPromisePtr = std::shared_ptr<Promise<Result, NamespaceTopicsPtr>>;
36+
using GetSchemaPromisePtr = std::shared_ptr<Promise<Result, boost::optional<SchemaInfo>>>;
3537

3638
class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
3739
public:
@@ -45,6 +47,8 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
4547

4648
Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) override;
4749

50+
Future<Result, boost::optional<SchemaInfo>> getSchema(const TopicNamePtr& topicName) override;
51+
4852
private:
4953
std::mutex mutex_;
5054
uint64_t requestIdGenerator_ = 0;
@@ -68,6 +72,9 @@ class PULSAR_PUBLIC BinaryProtoLookupService : public LookupService {
6872
const ClientConnectionWeakPtr& clientCnx,
6973
NamespaceTopicsPromisePtr promise);
7074

75+
void sendGetSchemaRequest(const std::string& topiName, Result result,
76+
const ClientConnectionWeakPtr& clientCnx, GetSchemaPromisePtr promise);
77+
7178
void getTopicsOfNamespaceListener(Result result, NamespaceTopicsPtr topicsPtr,
7279
NamespaceTopicsPromisePtr promise);
7380

lib/ClientConnection.cc

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1308,6 +1308,52 @@ void ClientConnection::handleIncomingCommand(BaseCommand& incomingCmd) {
13081308
break;
13091309
}
13101310

1311+
case BaseCommand::GET_SCHEMA_RESPONSE: {
1312+
const auto& response = incomingCmd.getschemaresponse();
1313+
LOG_DEBUG(cnxString_ << "Received GetSchemaResponse from server. req_id: "
1314+
<< response.request_id());
1315+
Lock lock(mutex_);
1316+
auto it = pendingGetSchemaRequests_.find(response.request_id());
1317+
if (it != pendingGetSchemaRequests_.end()) {
1318+
Promise<Result, boost::optional<SchemaInfo>> getSchemaPromise = it->second;
1319+
pendingGetSchemaRequests_.erase(it);
1320+
lock.unlock();
1321+
1322+
if (response.has_error_code()) {
1323+
if (response.error_code() == proto::TopicNotFound) {
1324+
getSchemaPromise.setValue(boost::none);
1325+
} else {
1326+
Result result = getResult(response.error_code(), response.error_message());
1327+
LOG_WARN(cnxString_ << "Received error GetSchemaResponse from server "
1328+
<< result
1329+
<< (response.has_error_message()
1330+
? (" (" + response.error_message() + ")")
1331+
: "")
1332+
<< " -- req_id: " << response.request_id());
1333+
getSchemaPromise.setFailed(result);
1334+
}
1335+
return;
1336+
}
1337+
1338+
const auto& schema = response.schema();
1339+
const auto& properMap = schema.properties();
1340+
StringMap properties;
1341+
for (auto kv = properMap.begin(); kv != properMap.end(); ++kv) {
1342+
properties[kv->key()] = kv->value();
1343+
}
1344+
SchemaInfo schemaInfo(static_cast<SchemaType>(schema.type()), "",
1345+
schema.schema_data(), properties);
1346+
getSchemaPromise.setValue(schemaInfo);
1347+
} else {
1348+
lock.unlock();
1349+
LOG_WARN(
1350+
"GetSchemaResponse command - Received unknown request id from "
1351+
"server: "
1352+
<< response.request_id());
1353+
}
1354+
break;
1355+
}
1356+
13111357
default: {
13121358
LOG_WARN(cnxString_ << "Received invalid message from server");
13131359
close();
@@ -1708,6 +1754,23 @@ Future<Result, NamespaceTopicsPtr> ClientConnection::newGetTopicsOfNamespace(con
17081754
return promise.getFuture();
17091755
}
17101756

1757+
Future<Result, boost::optional<SchemaInfo>> ClientConnection::newGetSchema(const std::string& topicName,
1758+
uint64_t requestId) {
1759+
Lock lock(mutex_);
1760+
Promise<Result, boost::optional<SchemaInfo>> promise;
1761+
if (isClosed()) {
1762+
lock.unlock();
1763+
LOG_ERROR(cnxString_ << "Client is not connected to the broker");
1764+
promise.setFailed(ResultNotConnected);
1765+
return promise.getFuture();
1766+
}
1767+
1768+
pendingGetSchemaRequests_.insert(std::make_pair(requestId, promise));
1769+
lock.unlock();
1770+
sendCommand(Commands::newGetSchema(topicName, requestId));
1771+
return promise.getFuture();
1772+
}
1773+
17111774
void ClientConnection::closeSocket() {
17121775
boost::system::error_code err;
17131776
if (socket_) {

lib/ClientConnection.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
169169

170170
Future<Result, NamespaceTopicsPtr> newGetTopicsOfNamespace(const std::string& nsName, uint64_t requestId);
171171

172+
Future<Result, boost::optional<SchemaInfo>> newGetSchema(const std::string& topicName,
173+
uint64_t requestId);
174+
172175
private:
173176
struct PendingRequestData {
174177
Promise<Result, ResponseData> promise;
@@ -327,6 +330,9 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
327330
typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> PendingGetNamespaceTopicsMap;
328331
PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_;
329332

333+
typedef std::map<long, Promise<Result, boost::optional<SchemaInfo>>> PendingGetSchemaMap;
334+
PendingGetSchemaMap pendingGetSchemaRequests_;
335+
330336
mutable std::mutex mutex_;
331337
typedef std::unique_lock<std::mutex> Lock;
332338

lib/ClientImpl.cc

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ ExecutorServiceProviderPtr ClientImpl::getPartitionListenerExecutorProvider() {
142142
LookupServicePtr ClientImpl::getLookup() { return lookupServicePtr_; }
143143

144144
void ClientImpl::createProducerAsync(const std::string& topic, ProducerConfiguration conf,
145-
CreateProducerCallback callback) {
145+
CreateProducerCallback callback, bool autoDownloadSchema) {
146146
if (conf.isChunkingEnabled() && conf.getBatchingEnabled()) {
147147
throw std::invalid_argument("Batching and chunking of messages can't be enabled together");
148148
}
@@ -159,9 +159,28 @@ void ClientImpl::createProducerAsync(const std::string& topic, ProducerConfigura
159159
return;
160160
}
161161
}
162-
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
163-
std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), std::placeholders::_1,
164-
std::placeholders::_2, topicName, conf, callback));
162+
163+
if (autoDownloadSchema) {
164+
auto self = shared_from_this();
165+
auto confPtr = std::make_shared<ProducerConfiguration>(conf);
166+
lookupServicePtr_->getSchema(topicName).addListener(
167+
[self, topicName, confPtr, callback](Result res, boost::optional<SchemaInfo> topicSchema) {
168+
if (res != ResultOk) {
169+
callback(res, Producer());
170+
}
171+
if (topicSchema) {
172+
confPtr->setSchema(topicSchema.get());
173+
}
174+
175+
self->lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
176+
std::bind(&ClientImpl::handleCreateProducer, self, std::placeholders::_1,
177+
std::placeholders::_2, topicName, *confPtr, callback));
178+
});
179+
} else {
180+
lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
181+
std::bind(&ClientImpl::handleCreateProducer, shared_from_this(), std::placeholders::_1,
182+
std::placeholders::_2, topicName, conf, callback));
183+
}
165184
}
166185

167186
void ClientImpl::handleCreateProducer(const Result result, const LookupDataResultPtr partitionMetadata,

lib/ClientImpl.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,12 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
6868
bool poolConnections);
6969
~ClientImpl();
7070

71+
/**
72+
* @param autoDownloadSchema When it is true, Before creating a producer, it will try to get the schema
73+
* that exists for the topic.
74+
*/
7175
void createProducerAsync(const std::string& topic, ProducerConfiguration conf,
72-
CreateProducerCallback callback);
76+
CreateProducerCallback callback, bool autoDownloadSchema = false);
7377

7478
void subscribeAsync(const std::string& topic, const std::string& subscriptionName,
7579
const ConsumerConfiguration& conf, SubscribeCallback callback);

lib/Commands.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,21 @@ SharedBuffer Commands::newLookup(const std::string& topic, const bool authoritat
161161
return buffer;
162162
}
163163

164+
SharedBuffer Commands::newGetSchema(const std::string& topic, uint64_t requestId) {
165+
static BaseCommand cmd;
166+
static std::mutex mutex;
167+
std::lock_guard<std::mutex> lock(mutex);
168+
cmd.set_type(BaseCommand::GET_SCHEMA);
169+
170+
auto getSchema = cmd.mutable_getschema();
171+
getSchema->set_topic(topic);
172+
getSchema->set_request_id(requestId);
173+
174+
const SharedBuffer buffer = writeMessageWithSize(cmd);
175+
cmd.clear_getschema();
176+
return buffer;
177+
}
178+
164179
SharedBuffer Commands::newConsumerStats(uint64_t consumerId, uint64_t requestId) {
165180
static BaseCommand cmd;
166181
static std::mutex mutex;
@@ -872,5 +887,6 @@ bool Commands::peerSupportsMultiMessageAcknowledgement(int32_t peerVersion) {
872887
bool Commands::peerSupportsJsonSchemaAvroFormat(int32_t peerVersion) { return peerVersion >= proto::v13; }
873888

874889
bool Commands::peerSupportsGetOrCreateSchema(int32_t peerVersion) { return peerVersion >= proto::v15; }
890+
875891
} // namespace pulsar
876892
/* namespace pulsar */

lib/Commands.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ class Commands {
9191
static SharedBuffer newLookup(const std::string& topic, const bool authoritative, uint64_t requestId,
9292
const std::string& listenerName);
9393

94+
static SharedBuffer newGetSchema(const std::string& topic, uint64_t requestId);
95+
9496
static PairSharedBuffer newSend(SharedBuffer& headers, proto::BaseCommand& cmd, uint64_t producerId,
9597
uint64_t sequenceId, ChecksumType checksumType,
9698
const proto::MessageMetadata& metadata, const SharedBuffer& payload);

0 commit comments

Comments
 (0)