Skip to content

Fix memory leak when fetching metadata for a single topic #1130

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions src/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <vector>

#include "src/connection.h"
#include "kafka-operation-result.h"
#include "src/workers.h"

using RdKafka::Conf;
Expand Down Expand Up @@ -100,11 +101,11 @@ RdKafka::Handle* Connection::GetClient() {
return m_client;
}

Baton Connection::CreateTopic(std::string topic_name) {
KafkaOperationResult<RdKafka::Topic> Connection::CreateTopic(std::string topic_name) {
return CreateTopic(topic_name, NULL);
}

Baton Connection::CreateTopic(std::string topic_name, RdKafka::Conf* conf) {
KafkaOperationResult<RdKafka::Topic> Connection::CreateTopic(std::string topic_name, RdKafka::Conf* conf) {
std::string errstr;

RdKafka::Topic* topic = NULL;
Expand All @@ -114,19 +115,19 @@ Baton Connection::CreateTopic(std::string topic_name, RdKafka::Conf* conf) {
if (IsConnected()) {
topic = RdKafka::Topic::create(m_client, topic_name, conf, errstr);
} else {
return Baton(RdKafka::ErrorCode::ERR__STATE);
return KafkaOperationResult<RdKafka::Topic>(RdKafka::ErrorCode::ERR__STATE);
}
} else {
return Baton(RdKafka::ErrorCode::ERR__STATE);
return KafkaOperationResult<RdKafka::Topic>(RdKafka::ErrorCode::ERR__STATE);
}

if (!errstr.empty()) {
return Baton(RdKafka::ErrorCode::ERR_TOPIC_EXCEPTION, errstr);
return KafkaOperationResult<RdKafka::Topic>(RdKafka::ErrorCode::ERR_TOPIC_EXCEPTION, errstr);
}

// Maybe do it this way later? Then we don't need to do static_cast
// <RdKafka::Topic*>
return Baton(topic);
return KafkaOperationResult<RdKafka::Topic>(topic);
}

Baton Connection::QueryWatermarkOffsets(
Expand Down Expand Up @@ -189,15 +190,15 @@ Baton Connection::OffsetsForTimes(

Baton Connection::GetMetadata(
bool all_topics, std::string topic_name, int timeout_ms) {
RdKafka::Topic* topic = NULL;
std::unique_ptr<RdKafka::Topic> topic{};
RdKafka::ErrorCode err;

std::string errstr;

if (!topic_name.empty()) {
Baton b = CreateTopic(topic_name);
KafkaOperationResult<RdKafka::Topic> b = CreateTopic(topic_name);
if (b.err() == RdKafka::ErrorCode::ERR_NO_ERROR) {
topic = b.data<RdKafka::Topic*>();
topic = b.take_ownership();
}
}

Expand All @@ -211,7 +212,7 @@ Baton Connection::GetMetadata(
scoped_shared_read_lock lock(m_connection_lock);
if (IsConnected()) {
// Always send true - we
err = m_client->metadata(all_topics, topic, &metadata, timeout_ms);
err = m_client->metadata(all_topics, topic.get(), &metadata, timeout_ms);
} else {
err = RdKafka::ERR__STATE;
}
Expand Down
5 changes: 3 additions & 2 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "src/errors.h"
#include "src/config.h"
#include "src/callbacks.h"
#include "src/kafka-operation-result.h"

namespace NodeKafka {

Expand Down Expand Up @@ -56,8 +57,8 @@ class Connection : public Nan::ObjectWrap {
bool IsClosing();

// Baton<RdKafka::Topic*>
Baton CreateTopic(std::string);
Baton CreateTopic(std::string, RdKafka::Conf*);
KafkaOperationResult<RdKafka::Topic> CreateTopic(std::string);
KafkaOperationResult<RdKafka::Topic> CreateTopic(std::string, RdKafka::Conf*);
Baton GetMetadata(bool, std::string, int);
Baton QueryWatermarkOffsets(std::string, int32_t, int64_t*, int64_t*, int);
Baton OffsetsForTimes(std::vector<RdKafka::TopicPartition*> &, int);
Expand Down
63 changes: 63 additions & 0 deletions src/kafka-operation-result.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#ifndef SRC_KAFKA_OPERATION_RESULT_H_
#define SRC_KAFKA_OPERATION_RESULT_H_

#include <cassert>
#include <memory>
#include <string>

#include "rdkafkacpp.h"

namespace NodeKafka {
/**
* Type-safe wrapper for the result of an RDKafka library operation.
*/
template<typename T>
class KafkaOperationResult {
public:
/**
* Constructor for a successful operation result.
* Takes ownership of the data pointer.
*/
explicit KafkaOperationResult(T* data)
: m_data(data), m_err(RdKafka::ErrorCode::ERR_NO_ERROR) {}
explicit KafkaOperationResult(RdKafka::ErrorCode err)
: m_data(nullptr), m_err(err) {}
explicit KafkaOperationResult(RdKafka::ErrorCode err, std::string errstr)
: m_data(nullptr), m_err(err), m_errstr(errstr) {}

/**
* Get a non-owning pointer to the result data.
* Only should be called for non-error results.
*/
T* data() const {
assert(m_data != nullptr);
return m_data.get();
}

/**
* Transfer ownership of the result data to the caller.
* Only should be called for non-error results.
*/
std::unique_ptr<T> take_ownership() {
assert(m_data != nullptr);
std::unique_ptr<T> data = std::move(m_data);
m_data.reset();
return data;
}

RdKafka::ErrorCode err() const {
return m_err;
}

std::string errstr() const {
return m_errstr.empty() ? RdKafka::err2str(m_err) : m_errstr;
}

private:
std::unique_ptr<T> m_data;
RdKafka::ErrorCode m_err;
std::string m_errstr;
};
} // namespace NodeKafka

#endif // SRC_KAFKA_OPERATION_RESULT_H_
14 changes: 5 additions & 9 deletions src/producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "src/producer.h"
#include "src/kafka-consumer.h"
#include "src/kafka-operation-result.h"
#include "src/workers.h"

namespace NodeKafka {
Expand Down Expand Up @@ -617,22 +618,17 @@ NAN_METHOD(Producer::NodeProduce) {
Topic* topic = ObjectWrap::Unwrap<Topic>(info[0].As<v8::Object>());

// Unwrap it and turn it into an RdKafka::Topic*
Baton topic_baton = topic->toRDKafkaTopic(producer);
KafkaOperationResult<RdKafka::Topic> topic_result = topic->toRDKafkaTopic(producer);

if (topic_baton.err() != RdKafka::ERR_NO_ERROR) {
if (topic_result.err() != RdKafka::ERR_NO_ERROR) {
// Let the JS library throw if we need to so the error can be more rich
error_code = static_cast<int>(topic_baton.err());
error_code = static_cast<int>(topic_result.err());

return info.GetReturnValue().Set(Nan::New<v8::Number>(error_code));
}

RdKafka::Topic* rd_topic = topic_baton.data<RdKafka::Topic*>();

Baton b = producer->Produce(message_buffer_data, message_buffer_length,
rd_topic, partition, key_buffer_data, key_buffer_length, opaque);

// Delete the topic when we are done.
delete rd_topic;
topic_result.data(), partition, key_buffer_data, key_buffer_length, opaque);

error_code = static_cast<int>(b.err());
}
Expand Down
2 changes: 1 addition & 1 deletion src/topic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ std::string Topic::name() {
return m_topic_name;
}

Baton Topic::toRDKafkaTopic(Connection* handle) {
KafkaOperationResult<RdKafka::Topic> Topic::toRDKafkaTopic(Connection* handle) {
if (m_config) {
return handle->CreateTopic(m_topic_name, m_config);
} else {
Expand Down
3 changes: 2 additions & 1 deletion src/topic.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "rdkafkacpp.h"

#include "src/config.h"
#include "src/kafka-operation-result.h"

namespace NodeKafka {

Expand All @@ -24,7 +25,7 @@ class Topic : public Nan::ObjectWrap {
static void Init(v8::Local<v8::Object>);
static v8::Local<v8::Object> NewInstance(v8::Local<v8::Value> arg);

Baton toRDKafkaTopic(Connection *handle);
KafkaOperationResult<RdKafka::Topic> toRDKafkaTopic(Connection *handle);

protected:
static Nan::Persistent<v8::Function> constructor;
Expand Down