Skip to content

Refactor rebalance callback to more pure implementation #31

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 9 additions & 12 deletions e2e/both.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,14 @@ var testCase = new TestCase('Interoperability tests', function() {

crypto.randomBytes(4096, function(ex, buffer) {

consumer.on('rebalance', function(e) {
if (e.code === 500) {
setTimeout(function() {
producer.produce({
message: buffer,
topic: topic
}, function(err) {
t.ifError(err);
});
}, 1000);
}
});
var pT = setInterval(function() {
producer.produce({
message: buffer,
topic: topic
}, function(err) {
t.ifError(err);
});
}, 2000);

var tt = setInterval(function() {
if (!producer.isConnected()) {
Expand Down Expand Up @@ -116,6 +112,7 @@ var testCase = new TestCase('Interoperability tests', function() {
}

clearInterval(tt);
clearInterval(pT);

if (err) {
return cb(err);
Expand Down
42 changes: 28 additions & 14 deletions lib/kafka-consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,34 @@ function KafkaConsumer(conf, topicConf) {
return new KafkaConsumer(conf, topicConf);
}

var onRebalance = conf.rebalance_cb;

var self = this;

// If rebalance is undefined we don't want any part of this
if (onRebalance && typeof onRebalance === 'boolean') {
conf.rebalance_cb = function(e) {
// That's it
if (e.code === 500 /*CODES.REBALANCE.PARTITION_ASSIGNMENT*/) {
self.assign(e.assignment);
} else if (e.code === 501 /*CODES.REBALANCE.PARTITION_UNASSIGNMENT*/) {
self.unassign(e.assignment);
}
};
} else if (onRebalance && typeof onRebalance === 'function') {
/*
* Once this is opted in to, that's it. It's going to manually rebalance
* forever. There is no way to unset config values in librdkafka, just
* a way to override them.
*/

conf.rebalance_cb = function(e) {
self.emit('rebalance', e);
onRebalance.call(self, e);
};
}


/**
* KafkaConsumer message.
*
Expand All @@ -57,20 +85,6 @@ function KafkaConsumer(conf, topicConf) {

Client.call(this, conf, Kafka.KafkaConsumer, topicConf);

var self = this;

/**
* Rebalance event. Called when the KafkaConsumer is rebalancing.
*
* @event KafkaConsumer#rebalance
* @type {object}
* @property {number} code - whether the rebalance was an assignment or
* an unassignment
*/
this._client.onRebalance(function(e) {
self.emit('rebalance', e);
});

this.globalConfig = conf;
this.topicConfig = topicConf;
}
Expand Down
41 changes: 27 additions & 14 deletions src/callbacks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -425,31 +425,44 @@ void RebalanceDispatcher::Flush() {
break;
}

std::vector<rebalance_topic_partition_t> parts = _events[i].partitions;

v8::Local<v8::Array> tp_array = Nan::New<v8::Array>();

for (size_t i = 0; i < parts.size(); i++) {
v8::Local<v8::Object> tp_obj = Nan::New<v8::Object>();
rebalance_topic_partition_t tp = parts[i];

Nan::Set(tp_obj, Nan::New("topic").ToLocalChecked(),
Nan::New<v8::String>(tp.topic.c_str()).ToLocalChecked());
Nan::Set(tp_obj, Nan::New("partition").ToLocalChecked(),
Nan::New<v8::Number>(tp.partition));

if (tp.offset >= 0) {
Nan::Set(tp_obj, Nan::New("offset").ToLocalChecked(),
Nan::New<v8::Number>(tp.offset));
}

tp_array->Set(i, tp_obj);
}
// Now convert the TopicPartition list to a JS array
Nan::Set(jsobj, Nan::New("assignment").ToLocalChecked(), tp_array);

argv[0] = jsobj;

Dispatch(argc, argv);
}
}

Rebalance::~Rebalance() {}
Rebalance::Rebalance(NodeKafka::Consumer* that) :
that_(that) {
eof_cnt = 0;
Rebalance::Rebalance(v8::Local<v8::Function> &cb) {
dispatcher.AddCallback(cb);
}
Rebalance::~Rebalance() {}

void Rebalance::rebalance_cb(RdKafka::KafkaConsumer *consumer,
RdKafka::ErrorCode err,
std::vector<RdKafka::TopicPartition*> &partitions) {
if (err == RdKafka::ERR__ASSIGN_PARTITIONS) {
that_->Assign(partitions);
} else {
that_->Unassign();
}

RdKafka::ErrorCode err, std::vector<RdKafka::TopicPartition*> &partitions) {
dispatcher.Add(rebalance_event_t(err, partitions));
dispatcher.Execute();

eof_cnt = 0;
}

// Partitioner callback
Expand Down
43 changes: 33 additions & 10 deletions src/callbacks.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,38 @@ class Delivery : public RdKafka::DeliveryReportCb {

// Rebalance dispatcher

struct rebalance_topic_partition_t {
std::string topic;
int partition;
int64_t offset;

rebalance_topic_partition_t(std::string p_topic, int p_partition, int64_t p_offset): // NOLINT
topic(p_topic),
partition(p_partition),
offset(p_offset) {}
};

struct rebalance_event_t {
RdKafka::ErrorCode err;
std::vector<RdKafka::TopicPartition*> partitions;

rebalance_event_t(RdKafka::ErrorCode _err,
std::vector<RdKafka::TopicPartition*> _partitions):
err(_err),
partitions(_partitions) {}
std::vector<rebalance_topic_partition_t> partitions;

rebalance_event_t(RdKafka::ErrorCode p_err,
std::vector<RdKafka::TopicPartition*> p_partitions):
err(p_err) {
// Iterate over the topic partitions because we won't have them later
for (size_t topic_partition_i = 0;
topic_partition_i < p_partitions.size(); topic_partition_i++) {
RdKafka::TopicPartition* topic_partition =
p_partitions[topic_partition_i];

rebalance_topic_partition_t tp(
topic_partition->topic(),
topic_partition->partition(),
topic_partition->offset());

partitions.push_back(tp);
}
}
};

class RebalanceDispatcher : public Dispatcher {
Expand All @@ -162,16 +186,15 @@ class RebalanceDispatcher : public Dispatcher {

class Rebalance : public RdKafka::RebalanceCb {
public:
explicit Rebalance(NodeKafka::Consumer* that);
explicit Rebalance(v8::Local<v8::Function>&);
~Rebalance();
// NAN_DISALLOW_ASSIGN_COPY_MOVE?
NodeKafka::Consumer* const that_;

void rebalance_cb(RdKafka::KafkaConsumer *, RdKafka::ErrorCode,
std::vector<RdKafka::TopicPartition*> &);

RebalanceDispatcher dispatcher;
private:
int eof_cnt;
v8::Persistent<v8::Function> m_cb;
};

class Partitioner : public RdKafka::PartitionerCb {
Expand Down
150 changes: 150 additions & 0 deletions src/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,154 @@ std::vector<std::string> v8ArrayToStringVector(v8::Local<v8::Array> parameter) {
return newItem;
}

namespace TopicPartition {

/**
* @brief RdKafka::TopicPartition vector to a v8 Array
*
* @see v8ArrayToTopicPartitionVector
*/
v8::Local<v8::Array> ToV8Array(
std::vector<RdKafka::TopicPartition*> topic_partition_list) {
v8::Local<v8::Array> array = Nan::New<v8::Array>();
for (size_t topic_partition_i = 0;
topic_partition_i < topic_partition_list.size(); topic_partition_i++) {
RdKafka::TopicPartition* topic_partition =
topic_partition_list[topic_partition_i];

// We have the list now let's get the properties from it
v8::Local<v8::Object> obj = Nan::New<v8::Object>();

Nan::Set(obj, Nan::New("offset").ToLocalChecked(),
Nan::New<v8::Number>(topic_partition->offset()));
Nan::Set(obj, Nan::New("partition").ToLocalChecked(),
Nan::New<v8::Number>(topic_partition->partition()));
Nan::Set(obj, Nan::New("topic").ToLocalChecked(),
Nan::New<v8::String>(topic_partition->topic().c_str()).ToLocalChecked());

array->Set(topic_partition_i, obj);
}

return array;
}

} // namespace TopicPartition

namespace Metadata {

/**
* @brief RdKafka::Metadata to v8::Object
*
*/
v8::Local<v8::Object> ToV8Object(RdKafka::Metadata* metadata) {
v8::Local<v8::Object> obj = Nan::New<v8::Object>();

v8::Local<v8::Array> broker_data = Nan::New<v8::Array>();
v8::Local<v8::Array> topic_data = Nan::New<v8::Array>();

const BrokerMetadataList* brokers = metadata->brokers(); // NOLINT

unsigned int broker_i = 0;

for (BrokerMetadataList::const_iterator it = brokers->begin();
it != brokers->end(); ++it, broker_i++) {
// Start iterating over brokers and set the object up

const RdKafka::BrokerMetadata* x = *it;

v8::Local<v8::Object> current_broker = Nan::New<v8::Object>();

Nan::Set(current_broker, Nan::New("id").ToLocalChecked(),
Nan::New<v8::Number>(x->id()));
Nan::Set(current_broker, Nan::New("host").ToLocalChecked(),
Nan::New<v8::String>(x->host().c_str()).ToLocalChecked());
Nan::Set(current_broker, Nan::New("port").ToLocalChecked(),
Nan::New<v8::Number>(x->port()));

broker_data->Set(broker_i, current_broker);
}

unsigned int topic_i = 0;

const TopicMetadataList* topics = metadata->topics();

for (TopicMetadataList::const_iterator it = topics->begin();
it != topics->end(); ++it, topic_i++) {
// Start iterating over topics

const RdKafka::TopicMetadata* x = *it;

v8::Local<v8::Object> current_topic = Nan::New<v8::Object>();

Nan::Set(current_topic, Nan::New("name").ToLocalChecked(),
Nan::New<v8::String>(x->topic().c_str()).ToLocalChecked());

v8::Local<v8::Array> current_topic_partitions = Nan::New<v8::Array>();

const PartitionMetadataList* current_partition_data = x->partitions();

unsigned int partition_i = 0;
PartitionMetadataList::const_iterator itt;

for (itt = current_partition_data->begin();
itt != current_partition_data->end(); ++itt, partition_i++) {
// partition iterate
const RdKafka::PartitionMetadata* xx = *itt;

v8::Local<v8::Object> current_partition = Nan::New<v8::Object>();

Nan::Set(current_partition, Nan::New("id").ToLocalChecked(),
Nan::New<v8::Number>(xx->id()));
Nan::Set(current_partition, Nan::New("leader").ToLocalChecked(),
Nan::New<v8::Number>(xx->leader()));

const std::vector<int32_t> * replicas = xx->replicas();
const std::vector<int32_t> * isrs = xx->isrs();

std::vector<int32_t>::const_iterator r_it;
std::vector<int32_t>::const_iterator i_it;

unsigned int r_i = 0;
unsigned int i_i = 0;

v8::Local<v8::Array> current_replicas = Nan::New<v8::Array>();

for (r_it = replicas->begin(); r_it != replicas->end(); ++r_it, r_i++) {
current_replicas->Set(r_i, Nan::New<v8::Int32>(*r_it));
}

v8::Local<v8::Array> current_isrs = Nan::New<v8::Array>();

for (i_it = isrs->begin(); i_it != isrs->end(); ++i_it, i_i++) {
current_isrs->Set(r_i, Nan::New<v8::Int32>(*i_it));
}

Nan::Set(current_partition, Nan::New("replicas").ToLocalChecked(),
current_replicas);
Nan::Set(current_partition, Nan::New("isrs").ToLocalChecked(),
current_isrs);

current_topic_partitions->Set(partition_i, current_partition);
} // iterate over partitions

Nan::Set(current_topic, Nan::New("partitions").ToLocalChecked(),
current_topic_partitions);

topic_data->Set(topic_i, current_topic);
} // End iterating over topics

Nan::Set(obj, Nan::New("orig_broker_id").ToLocalChecked(),
Nan::New<v8::Number>(metadata->orig_broker_id()));

Nan::Set(obj, Nan::New("orig_broker_name").ToLocalChecked(),
Nan::New<v8::String>(metadata->orig_broker_name()).ToLocalChecked());

Nan::Set(obj, Nan::New("topics").ToLocalChecked(), topic_data);
Nan::Set(obj, Nan::New("brokers").ToLocalChecked(), broker_data);

return obj;
}

} // namespace Metadata

} // namespace NodeKafka
16 changes: 16 additions & 0 deletions src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

#include "deps/librdkafka/src-cpp/rdkafkacpp.h"

typedef std::vector<const RdKafka::BrokerMetadata*> BrokerMetadataList;
typedef std::vector<const RdKafka::PartitionMetadata*> PartitionMetadataList;
typedef std::vector<const RdKafka::TopicMetadata *> TopicMetadataList;

namespace NodeKafka {

void Log(std::string);
Expand Down Expand Up @@ -45,6 +49,18 @@ class scoped_mutex_lock {
uv_mutex_t &async_lock;
};

namespace TopicPartition {

v8::Local<v8::Array> ToV8Array(std::vector<RdKafka::TopicPartition*>);

}

namespace Metadata {

v8::Local<v8::Object> ToV8Object(RdKafka::Metadata*);

} // namespace Metadata

} // namespace NodeKafka

#endif // SRC_COMMON_H_
Loading