Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
consumer class optimization
  • Loading branch information
appkins committed Nov 28, 2018
commit 2c9c48360be1eca313fd08a5cd76f00650d8e4bb
2 changes: 1 addition & 1 deletion examples/cpp_redis_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ sigint_handler(int) {
}

int
main(void) {
main() {
#ifdef _WIN32
//! Windows netword DLL init
WORD version = MAKEWORD(2, 2);
Expand Down
57 changes: 46 additions & 11 deletions includes/cpp_redis/core/consumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,35 @@ namespace cpp_redis {
acknowledgement_callback_t acknowledgement_callback;
} consumer_callback_container_t;

typedef std::map<std::string, consumer_callback_container_t> consumer_queue_t;
class consumer_client_container {
public:
consumer_client_container();

client ack_client;
client poll_client;
};

typedef consumer_client_container consumer_client_container_t;

typedef std::multimap<std::string, consumer_callback_container_t> consumer_callbacks_t;

//typedef std::map<std::string, consumer_callback_container_t> consumer_callbacks_t;

class consumer {
public:
explicit consumer(std::string stream, std::string consumer, size_t max_concurrency = std::thread::hardware_concurrency());
explicit consumer(std::string stream, std::string consumer,
size_t max_concurrency = std::thread::hardware_concurrency());

consumer &subscribe(const std::string &group,
const consumer_callback_t &consumer_callback,
const acknowledgement_callback_t &acknowledgement_callback = nullptr);

void process();

bool queue_is_full();

void poll();

//! \brief Connect to redis server
//! \param host host to be connected to
//! \param port port to be connected to
Expand All @@ -67,6 +84,8 @@ namespace cpp_redis {
std::int32_t max_reconnects = 0,
std::uint32_t reconnect_interval_ms = 0);

void read_group_handler(const xreadgroup_options_t &a);

//!
//! commit pipelined transaction
//! that is, send to the network all commands pipelined by calling send() / subscribe() / ...
Expand All @@ -75,26 +94,42 @@ namespace cpp_redis {
//!
consumer &commit();

void dispatch_changed_handler(size_t size);

private:
std::string m_stream;
std::string m_name;
size_t m_max_concurrency;
std::shared_ptr<client> m_client;
std::shared_ptr<client> m_sub_client;
consumer_queue_t m_task_queue;
std::mutex m_task_queue_mutex;
std::shared_ptr<dispatch_queue_t> m_proc_queue;
std::unique_ptr<consumer_client_container_t> m_client;
/*std::unique_ptr<client> m_client;
std::unique_ptr<client> m_sub_client;*/
consumer_callbacks_t m_callbacks;
std::mutex m_callbacks_mutex;

std::mutex m_dispatch_queue_mutex;
std::unique_ptr<dispatch_queue_t> m_dispatch_queue;

std::mutex m_reply_queue_mutex;
std::queue<reply_t> m_reply_queue;

std::mutex m_q_status_mutex;
std::condition_variable m_q_status;
//dispatch_queue_t m_proc_queue;
std::atomic_bool dispatcher_full{false};
std::condition_variable dispatch_changed;
std::mutex dispatch_changed_mutex;

std::atomic_bool replies_empty{false};
std::condition_variable replies_changed;
std::mutex replies_changed_mutex;


bool is_ready = false;
std::condition_variable m_cv;
std::mutex m_cv_mutex;

std::mutex m_dispatch_status_mutex;
std::condition_variable m_dispatch_status;
//dispatch_queue_t m_dispatch_queue;

bool is_ready = false;
bool is_new = true;
};

} // namespace cpp_redis
Expand Down
6 changes: 5 additions & 1 deletion includes/cpp_redis/misc/dispatch_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
namespace cpp_redis {
typedef std::function<cpp_redis::message_type(const cpp_redis::message_type&)> dispatch_callback_t;

typedef std::function<void(size_t size)> notify_callback_t;

typedef struct dispatch_callback_collection {
dispatch_callback_t callback;
message_type message;
Expand All @@ -50,7 +52,7 @@ namespace cpp_redis {
class dispatch_queue {

public:
explicit dispatch_queue(std::string name, size_t thread_cnt = 1);
explicit dispatch_queue(std::string name, const notify_callback_t &notify_callback, size_t thread_cnt = 1);
~dispatch_queue();

// dispatch and copy
Expand All @@ -75,6 +77,8 @@ namespace cpp_redis {
std::condition_variable m_cv;
bool m_quit = false;

notify_callback_t notify_handler;

void dispatch_thread_handler();
};

Expand Down
125 changes: 75 additions & 50 deletions sources/core/consumer.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
#include <utility>

#include <utility>

// The MIT License (MIT)
//
// Copyright (c) 11/27/18 nick. <nbatkins@gmail.com>
Expand All @@ -24,78 +20,104 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.#include "consumer.hpp"

#include <cpp_redis/core/consumer.hpp>

#include <functional>

#include <cpp_redis/core/consumer.hpp>
using std::bind;
using namespace std::placeholders;

namespace cpp_redis {

consumer::consumer(std::string stream, std::string consumer, size_t max_concurrency)
: m_stream(std::move(stream)),
m_name(std::move(consumer)),
m_max_concurrency(max_concurrency),
m_task_queue(),
m_client(new client()),
m_sub_client(new client()),
m_proc_queue(new dispatch_queue(stream, max_concurrency)) {
m_callbacks(),
is_new(true) {
//auto fn = bind(&consumer::dispatch_changed_handler, this, 1, std::placeholders::_1);
m_dispatch_queue = std::unique_ptr<dispatch_queue_t>(new dispatch_queue(stream, [&](size_t size){
dispatch_changed_handler(size);
}, max_concurrency));
m_client = std::unique_ptr<consumer_client_container_t>(new consumer_client_container());
}

consumer &cpp_redis::consumer::subscribe(const std::string &group,
const consumer_callback_t &consumer_callback,
const acknowledgement_callback_t &acknowledgement_callback) {
std::unique_lock<std::mutex> task_queue_lock(m_task_queue_mutex);
m_task_queue[group] = {consumer_callback, acknowledgement_callback};
task_queue_lock.unlock();
//std::lock_guard<std::mutex> task_queue_lock(m_callbacks_mutex);
m_callbacks.insert({group, {consumer_callback, acknowledgement_callback}});
return *this;
}

void consumer::dispatch_changed_handler(size_t size) {
if (size >= m_max_concurrency) {
dispatcher_full.store(true);
dispatch_changed.notify_all();

std::cout << "Notified" <<
std::endl;
}
}

void consumer::connect(const std::string &host, size_t port, const connect_callback_t &connect_callback,
uint32_t timeout_ms, int32_t max_reconnects, uint32_t reconnect_interval_ms) {
m_client->connect(host, port, connect_callback, timeout_ms, max_reconnects, reconnect_interval_ms);
m_sub_client->connect(host, port, connect_callback, timeout_ms, max_reconnects, reconnect_interval_ms);
m_client->ack_client.connect(host, port, connect_callback, timeout_ms, max_reconnects, reconnect_interval_ms);
m_client->poll_client.connect(host, port, connect_callback, timeout_ms, max_reconnects, reconnect_interval_ms);
}

consumer &consumer::commit() {
//std::thread p([&]() {
// Set the consumer id to 0 so that we start with failed messages
std::string consumer_name = "0";

std::unique_lock<std::mutex> cv_mutex_lock(m_cv_mutex);
while (!is_ready) {
if (!is_ready)
if (m_max_concurrency <= m_proc_queue->size())
m_cv.wait(cv_mutex_lock);

std::lock_guard<std::mutex> task_queue_lock(m_task_queue_mutex);
for (auto &q : m_task_queue) {
//task_queue_lock.lock();
auto group = q.first;
auto cb_container = q.second;
//task_queue_lock.unlock();
m_sub_client->xreadgroup({group, consumer_name, {{m_stream}, {">"}}, 1, -1, false} // count, block, no_ack
, [&](cpp_redis::reply &reply) {
cpp_redis::xstream_reply xs(reply);
if (xs.empty()) {
if (consumer_name == "0") {
consumer_name = m_name;
}
} else {
m_reply_queue.push(reply);
m_q_status.notify_one();
//m_proc_queue->dispatch(fp_)
//process();
}
});
m_sub_client->sync_commit();
if (!is_ready) {
std::unique_lock<std::mutex> dispatch_lock(dispatch_changed_mutex);
dispatch_changed.wait(dispatch_lock, [&]() { return !dispatcher_full.load(); });
poll();
}
}
//});
return *this;
}

void consumer::poll() {
message_type m;
m_dispatch_queue->dispatch(m, [](const message_type &message) {
std::cout << "Something" << std::endl;
return message;
});
//std::lock_guard<std::mutex> task_queue_lock(m_callbacks_mutex);
//std::string consumer_name = m_name;
//std::string consumer_name = (is_new ? "0" : m_name);
std::string group = "groupone";
//auto q = m_.find("groupone");
//task_queue_lock.lock();
//auto group = q->first;
//auto cb_container = q->second;
//task_queue_lock.unlock();
read_group_handler({group, m_name, {{m_stream}, {">"}}, 1, -1, false}); // count, block, no_ack
}

void consumer::read_group_handler(const xreadgroup_options_t &a) {
m_client->poll_client.xreadgroup(a, [&](cpp_redis::reply &reply) {
cpp_redis::xstream_reply xs(reply);
if (xs.empty()) {
if (is_new)
is_new = false;
} else {
m_reply_queue.push(reply);
m_dispatch_status.notify_one();
}
}).sync_commit();
}

bool consumer::queue_is_full() {
std::lock_guard<std::mutex> cv_mutex_lock(m_cv_mutex);
size_t m_proc_size = m_dispatch_queue->size();
return m_max_concurrency <= m_proc_size;
}

void consumer::process() {
std::unique_lock<std::mutex> m_q_status_lock(m_q_status_mutex);
m_q_status.wait(m_q_status_lock, [this]() { return !m_reply_queue.empty(); });
std::unique_lock<std::mutex> replies_lock(replies_changed_mutex);
replies_changed.wait(replies_lock, [&]() { return !replies_empty.load(); });

auto r = m_reply_queue.back();
m_reply_queue.pop();
Expand All @@ -105,19 +127,19 @@ namespace cpp_redis {
for (auto &m : r.Messages) {
try {
std::string group_id = m.get_id();
auto task = m_task_queue.find(group_id);
auto task = m_callbacks.find(group_id);
auto callback_container = task->second;

auto callback = [&](const message_type &message) {
auto response = callback_container.consumer_callback(message);
m_client->xack(m_stream, group_id, {m.get_id()}, [&](const reply &r) {
m_client->ack_client.xack(m_stream, group_id, {m.get_id()}, [&](const reply &r) {
if (r.is_integer())
callback_container.acknowledgement_callback(r.as_integer());
});
m_client->sync_commit();
m_client->ack_client.sync_commit();
return response;
};
m_proc_queue->dispatch(m, callback);
m_dispatch_queue->dispatch(m, callback);
} catch (std::exception &exc) {
__CPP_REDIS_LOG(1, "Processing failed for message id: " + m.get_id() + "\nDetails: " + exc.what());
throw exc;
Expand All @@ -126,4 +148,7 @@ namespace cpp_redis {
}
}

consumer_client_container::consumer_client_container() : ack_client(), poll_client() {
}

} // namespace cpp_redis
8 changes: 6 additions & 2 deletions sources/misc/dispatch_queue.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#include <utility>

/*
*
* Created by nick on 11/22/18.
Expand Down Expand Up @@ -28,8 +30,8 @@

namespace cpp_redis {

dispatch_queue::dispatch_queue(std::string name, size_t thread_cnt) :
m_name(name), m_threads(thread_cnt) {
dispatch_queue::dispatch_queue(std::string name, const notify_callback_t &notify_callback, size_t thread_cnt) :
m_name(name), m_threads(thread_cnt), m_mq(), notify_handler(std::move(notify_callback)) {
printf("Creating dispatch queue: %s\n", name.c_str());
printf("Dispatch threads: %zu\n", thread_cnt);

Expand Down Expand Up @@ -85,6 +87,8 @@ namespace cpp_redis {
return (!m_mq.empty() || m_quit);
});

notify_handler(m_mq.size());

//after wait, we own the lock
if (!m_quit && !m_mq.empty()) {
auto op = std::move(m_mq.front());
Expand Down