Skip to content

Commit

Permalink
Add a data consumer pool to reuse the data consumer (apache#691)
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman authored and imay committed Apr 28, 2019
1 parent 2314a3e commit 567d5de
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 131 deletions.
1 change: 1 addition & 0 deletions be/src/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ add_library(Runtime STATIC
stream_load/stream_load_context.cpp
stream_load/stream_load_executor.cpp
routine_load/data_consumer.cpp
routine_load/data_consumer_pool.cpp
routine_load/routine_load_task_executor.cpp
)

Expand Down
120 changes: 84 additions & 36 deletions be/src/runtime/routine_load/data_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,18 @@
#include <vector>

#include "common/status.h"
#include "runtime/stream_load/stream_load_pipe.h"
#include "runtime/routine_load/kafka_consumer_pipe.h"
#include "service/backend_options.h"
#include "util/defer_op.h"
#include "util/stopwatch.hpp"
#include "util/uid_util.h"

namespace doris {

Status KafkaDataConsumer::init() {
// init kafka consumer will only set common configs such as
// brokers, groupid
Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
std::unique_lock<std::mutex> l(_lock);
if (_init) {
// this consumer has already been initialized.
Expand All @@ -42,29 +47,32 @@ Status KafkaDataConsumer::init() {
auto conf_deleter = [conf] () { delete conf; };
DeferOp delete_conf(std::bind<void>(conf_deleter));

std::stringstream ss;
ss << BackendOptions::get_localhost() << "_";
std::string group_id = ss.str() + UniqueId().to_string();
LOG(INFO) << "init kafka consumer with group id: " << group_id;

std::string errstr;
auto set_conf = [conf, &errstr](const std::string& conf_key, const std::string& conf_val) {
auto set_conf = [&conf, &errstr](const std::string& conf_key, const std::string& conf_val) {
if (conf->set(conf_key, conf_val, errstr) != RdKafka::Conf::CONF_OK) {
std::stringstream ss;
ss << "failed to set '" << conf_key << "'";
LOG(WARNING) << ss.str();
return Status(ss.str());
}
VLOG(3) << "set " << conf_key << ": " << conf_val;
return Status::OK;
};

RETURN_IF_ERROR(set_conf("metadata.broker.list", _ctx->kafka_info->brokers));

RETURN_IF_ERROR(set_conf("metadata.broker.list", _ctx->kafka_info->brokers));
RETURN_IF_ERROR(set_conf("group.id", _ctx->kafka_info->group_id));
RETURN_IF_ERROR(set_conf("client.id", _ctx->kafka_info->client_id));
RETURN_IF_ERROR(set_conf("metadata.broker.list", ctx->kafka_info->brokers));
RETURN_IF_ERROR(set_conf("group.id", group_id));
RETURN_IF_ERROR(set_conf("enable.partition.eof", "false"));
RETURN_IF_ERROR(set_conf("enable.auto.offset.store", "false"));
// TODO: set it larger than 0 after we set rd_kafka_conf_set_stats_cb()
RETURN_IF_ERROR(set_conf("statistics.interval.ms", "0"));
RETURN_IF_ERROR(set_conf("auto.offset.reset", "error"));

KafkaEventCb event_cb;
if (conf->set("event_cb", &event_cb, errstr) != RdKafka::Conf::CONF_OK) {
if (conf->set("event_cb", &_k_event_cb, errstr) != RdKafka::Conf::CONF_OK) {
std::stringstream ss;
ss << "failed to set 'event_cb'";
LOG(WARNING) << ss.str();
Expand All @@ -78,14 +86,27 @@ Status KafkaDataConsumer::init() {
return Status("failed to create kafka consumer");
}

VLOG(3) << "finished to init kafka consumer. " << ctx->brief();

_init = true;
return Status::OK;
}

Status KafkaDataConsumer::assign_topic_partitions(StreamLoadContext* ctx) {
DCHECK(_k_consumer);
// create TopicPartitions
std::stringstream ss;
std::vector<RdKafka::TopicPartition*> topic_partitions;
for (auto& entry : _ctx->kafka_info->begin_offset) {
for (auto& entry : ctx->kafka_info->begin_offset) {
RdKafka::TopicPartition* tp1 = RdKafka::TopicPartition::create(
_ctx->kafka_info->topic, entry.first, entry.second);
ctx->kafka_info->topic, entry.first, entry.second);
topic_partitions.push_back(tp1);
ss << "partition[" << entry.first << "-" << entry.second << "] ";
}

VLOG(1) << "assign topic partitions: " << ctx->kafka_info->topic
<< ", " << ss.str();

// delete TopicPartition finally
auto tp_deleter = [&topic_partitions] () {
std::for_each(topic_partitions.begin(), topic_partitions.end(),
Expand All @@ -96,59 +117,67 @@ Status KafkaDataConsumer::init() {
// assign partition
RdKafka::ErrorCode err = _k_consumer->assign(topic_partitions);
if (err) {
LOG(WARNING) << "failed to assign topic partitions: " << _ctx->brief(true)
LOG(WARNING) << "failed to assign topic partitions: " << ctx->brief(true)
<< ", err: " << RdKafka::err2str(err);
return Status("failed to assgin topic partitions");
return Status("failed to assign topic partitions");
}

VLOG(3) << "finished to init kafka consumer. "
<< _ctx->brief(true);

_init = true;
return Status::OK;
}

Status KafkaDataConsumer::start() {
Status KafkaDataConsumer::start(StreamLoadContext* ctx) {
{
std::unique_lock<std::mutex> l(_lock);
if (!_init) {
return Status("consumer is not initialized");
}
}

int64_t left_time = _ctx->kafka_info->max_interval_s;
int64_t left_rows = _ctx->kafka_info->max_batch_rows;
int64_t left_bytes = _ctx->kafka_info->max_batch_size;
int64_t left_time = ctx->kafka_info->max_interval_s;
int64_t left_rows = ctx->kafka_info->max_batch_rows;
int64_t left_bytes = ctx->kafka_info->max_batch_size;

std::shared_ptr<KafkaConsumerPipe> kakfa_pipe = std::static_pointer_cast<KafkaConsumerPipe>(ctx->body_sink);

LOG(INFO) << "start consumer"
<< ". interval(s): " << left_time
<< ". max time(s): " << left_time
<< ", bath rows: " << left_rows
<< ", batch size: " << left_bytes
<< ". " << _ctx->brief();
<< ". " << ctx->brief();

MonotonicStopWatch watch;
watch.start();
Status st;
while (true) {
std::unique_lock<std::mutex> l(_lock);
if (_cancelled) {
_kafka_consumer_pipe->cancel();
kakfa_pipe ->cancel();
return Status::CANCELLED;
}

if (_finished) {
_kafka_consumer_pipe->finish();
kakfa_pipe ->finish();
return Status::OK;
}

if (left_time <= 0 || left_rows <= 0 || left_bytes <=0) {
VLOG(3) << "kafka consume batch finished"
VLOG(3) << "kafka consume batch done"
<< ". left time=" << left_time
<< ", left rows=" << left_rows
<< ", left bytes=" << left_bytes;
_kafka_consumer_pipe->finish();
_finished = true;
return Status::OK;

if (left_bytes == ctx->kafka_info->max_batch_size) {
// nothing to be consumed, cancel it
kakfa_pipe->cancel();
_cancelled = true;
return Status::CANCELLED;
} else {
DCHECK(left_bytes < ctx->kafka_info->max_batch_size);
DCHECK(left_rows < ctx->kafka_info->max_batch_rows);
kakfa_pipe->finish();
_finished = true;
return Status::OK;
}
}

// consume 1 message at a time
Expand All @@ -160,15 +189,15 @@ Status KafkaDataConsumer::start() {
<< ", offset: " << msg->offset()
<< ", len: " << msg->len();

st = _kafka_consumer_pipe->append_with_line_delimiter(
st = kakfa_pipe ->append_with_line_delimiter(
static_cast<const char *>(msg->payload()),
static_cast<size_t>(msg->len()));
if (st.ok()) {
left_rows--;
left_bytes -= msg->len();
_ctx->kafka_info->cmt_offset[msg->partition()] = msg->offset();
VLOG(3) << "consume partition[ " << msg->partition()
<< " - " << msg->offset();
ctx->kafka_info->cmt_offset[msg->partition()] = msg->offset();
VLOG(3) << "consume partition[" << msg->partition()
<< " - " << msg->offset() << "]";
}

break;
Expand All @@ -185,17 +214,17 @@ Status KafkaDataConsumer::start() {
delete msg;

if (!st.ok()) {
_kafka_consumer_pipe->cancel();
kakfa_pipe ->cancel();
return st;
}

left_time = _ctx->kafka_info->max_interval_s - watch.elapsed_time() / 1000 / 1000 / 1000;
left_time = ctx->kafka_info->max_interval_s - watch.elapsed_time() / 1000 / 1000 / 1000;
}

return Status::OK;
}

Status KafkaDataConsumer::cancel() {
Status KafkaDataConsumer::cancel(StreamLoadContext* ctx) {
std::unique_lock<std::mutex> l(_lock);
if (!_init) {
return Status("consumer is not initialized");
Expand All @@ -209,4 +238,23 @@ Status KafkaDataConsumer::cancel() {
return Status::OK;
}

Status KafkaDataConsumer::reset() {
std::unique_lock<std::mutex> l(_lock);
_finished = false;
_cancelled = false;
return Status::OK;
}

// if the kafka brokers and topic are same,
// we considered this consumer as matched, thus can be reused.
bool KafkaDataConsumer::match(StreamLoadContext* ctx) {
if (ctx->load_src_type != TLoadSourceType::KAFKA) {
return false;
}
if (_brokers != ctx->kafka_info->brokers || _topic != ctx->kafka_info->topic) {
return false;
}
return true;
}

} // end namespace doris
87 changes: 47 additions & 40 deletions be/src/runtime/routine_load/data_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,43 +22,43 @@
#include "librdkafka/rdkafkacpp.h"

#include "runtime/stream_load/stream_load_context.h"
#include "util/uid_util.h"

namespace doris {

class KafkaConsumerPipe;
class Status;
class StreamLoadPipe;

class DataConsumer {
public:
DataConsumer(StreamLoadContext* ctx):
_ctx(ctx),
_init(false),
_finished(false),
_cancelled(false) {

_ctx->ref();
}

virtual ~DataConsumer() {
if (_ctx->unref()) {
delete _ctx;
}
}

// init the consumer with the given parameters
virtual Status init() = 0;

virtual Status init(StreamLoadContext* ctx) = 0;
// start consuming
virtual Status start() = 0;

virtual Status start(StreamLoadContext* ctx) = 0;
// cancel the consuming process.
// if the consumer is not initialized, or the consuming
// process is already finished, call cancel() will
// return ERROR
virtual Status cancel() = 0;
virtual Status cancel(StreamLoadContext* ctx) = 0;
// reset the data consumer before being reused
virtual Status reset() = 0;
// return true the if the consumer match the need
virtual bool match(StreamLoadContext* ctx) = 0;

const UniqueId& id() { return _id; }

protected:
StreamLoadContext* _ctx;
UniqueId _id;

// lock to protect the following bools
std::mutex _lock;
Expand All @@ -67,34 +67,6 @@ class DataConsumer {
bool _cancelled;
};

class KafkaDataConsumer : public DataConsumer {
public:
KafkaDataConsumer(
StreamLoadContext* ctx,
std::shared_ptr<KafkaConsumerPipe> kafka_consumer_pipe
):
DataConsumer(ctx),
_kafka_consumer_pipe(kafka_consumer_pipe) {
}

virtual Status init() override;

virtual Status start() override;

virtual Status cancel() override;

virtual ~KafkaDataConsumer() {
if (_k_consumer) {
_k_consumer->close();
delete _k_consumer;
}
}

private:
std::shared_ptr<KafkaConsumerPipe> _kafka_consumer_pipe;
RdKafka::KafkaConsumer* _k_consumer = nullptr;
};

class KafkaEventCb : public RdKafka::EventCb {
public:
void event_cb(RdKafka::Event &event) {
Expand Down Expand Up @@ -126,4 +98,39 @@ class KafkaEventCb : public RdKafka::EventCb {
}
};

class KafkaDataConsumer : public DataConsumer {
public:
KafkaDataConsumer(StreamLoadContext* ctx):
DataConsumer(ctx),
_brokers(ctx->kafka_info->brokers),
_topic(ctx->kafka_info->topic) {
}

virtual ~KafkaDataConsumer() {
VLOG(3) << "deconstruct consumer";
if (_k_consumer) {
_k_consumer->close();
delete _k_consumer;
_k_consumer = nullptr;
}
}

virtual Status init(StreamLoadContext* ctx) override;
virtual Status start(StreamLoadContext* ctx) override;
virtual Status cancel(StreamLoadContext* ctx) override;
// reassign partition topics
virtual Status reset() override;
virtual bool match(StreamLoadContext* ctx) override;

Status assign_topic_partitions(StreamLoadContext* ctx);

private:
std::string _brokers;
std::string _topic;

KafkaEventCb _k_event_cb;
RdKafka::KafkaConsumer* _k_consumer = nullptr;
std::shared_ptr<KafkaConsumerPipe> _k_consumer_pipe;
};

} // end namespace doris
Loading

0 comments on commit 567d5de

Please sign in to comment.