Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ set(ELLIPTICS_VERSION "${ELLIPTICS_VERSION_ABI}.${ELLIPTICS_VERSION_MINOR}")
set(Boost_NO_BOOST_CMAKE TRUE)

set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmake/Modules/")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x -std=gnu++0x")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++0x -std=gnu++0x -Wfatal-errors")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wfatal-errors")
include(CheckLargefile)
include(CheckAtomic)
include(CheckSendfile)
Expand Down
1 change: 1 addition & 0 deletions bindings/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ set(ELLIPTICS_CLIENT_SRCS
../../library/request_queue.cpp
../../library/rbtree.c
../../library/trans.c
../../library/trans.cpp
../../library/tests.c
../../library/common.cpp
../../library/access_context.cpp
Expand Down
5 changes: 4 additions & 1 deletion bindings/cpp/async_result.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,10 @@ bool async_result<T>::get(T &entry)
{
wait(session::throw_at_get);
for (auto it = m_data->results.begin(); it != m_data->results.end(); ++it) {
if (it->status() == 0 && !it->data().empty()) {
bool is_valuable_entry = it->tmp_is_n2_protocol()
? (it->status() == 0 && !it->data().empty())
: (it->is_valid() && it->status() == 0);
if (is_valuable_entry) {
entry = *it;
return true;
}
Expand Down
111 changes: 106 additions & 5 deletions bindings/cpp/callback.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,24 +40,41 @@ class basic_handler
return 0;
}

// Used by old (protocol-dependent) mechanic, must be removed after refactoring
basic_handler(std::unique_ptr<dnet_logger> logger, async_generic_result &result) :
m_logger(std::move(logger)),
m_handler(result), m_completed(0), m_total(0)
{
memset(&m_addr, 0, sizeof(dnet_addr));
memset(&m_cmd, 0, sizeof(dnet_cmd));
}

bool handle(dnet_addr *addr, dnet_cmd *cmd)
basic_handler(const dnet_cmd &cmd, std::unique_ptr<dnet_logger> logger, async_generic_result &result) :
m_cmd(cmd),
m_logger(std::move(logger)),
m_handler(result), m_completed(0), m_total(0)
{
if (is_trans_destroyed(cmd)) {
return increment_completed();
}
memset(&m_addr, 0, sizeof(dnet_addr));
}

void log_reply_info(dnet_addr *addr, dnet_cmd *cmd)
{
DNET_LOG(m_logger, cmd->status ? DNET_LOG_ERROR : DNET_LOG_NOTICE, "{}: {}: handled reply from: {}, "
"trans: {}, cflags: {}, status: {}, "
"size: {}, client: {}, last: {}",
dnet_dump_id(&cmd->id), dnet_cmd_string(cmd->cmd), addr ? dnet_addr_string(addr) : "<unknown>",
cmd->trans, dnet_flags_dump_cflags(cmd->flags), int(cmd->status), cmd->size,
!(cmd->flags & DNET_FLAGS_REPLY), !(cmd->flags & DNET_FLAGS_MORE));
}

// Used by old (protocol-dependent) mechanic, must be removed after refactoring
bool handle(dnet_addr *addr, dnet_cmd *cmd)
{
if (is_trans_destroyed(cmd)) {
return increment_completed();
}

log_reply_info(addr, cmd);

auto data = std::make_shared<callback_result_data>(addr, cmd);

Expand All @@ -71,6 +88,33 @@ class basic_handler
return false;
}

int on_reply(const std::shared_ptr<n2_body> &result, bool is_last)
{
// TODO(sabramkin): Output only protocol-independent known info (currently old-mechanic logging used)
log_reply_info(&m_addr, &m_cmd);

auto data = std::make_shared<n2_callback_result_data>(m_addr, m_cmd, result, 0, is_last);
callback_result_entry entry(data);
m_handler.process(entry);

increment_completed(); // TODO(sabramkin): correctly process trans destroying
return 0;
}

int on_reply_error(int err, bool is_last)
{
// TODO(sabramkin): Output only protocol-independent known info (currently old-mechanic logging used)
log_reply_info(&m_addr, &m_cmd);

auto data = std::make_shared<n2_callback_result_data>(m_addr, m_cmd, nullptr, err, is_last);
data->error = create_error(err, "n2 lookup_new error"); // TODO(sabramkin): rework error
callback_result_entry entry(data);
m_handler.process(entry);

increment_completed(); // TODO(sabramkin): correctly process trans destroying
return 0;
}

// how many independent transactions share this handler plus call below
// call below and corresponding +1 is needed, since transactions can be completed
// before send_impl() calls this method to setup this 'reference counter'
Expand All @@ -92,6 +136,11 @@ class basic_handler
return false;
}

public:
dnet_addr m_addr;

private:
dnet_cmd m_cmd;
std::unique_ptr<dnet_logger> m_logger;
async_result_handler<callback_result_entry> m_handler;
std::atomic_size_t m_completed;
Expand All @@ -106,7 +155,6 @@ async_generic_result send_impl(session &sess, T &control, Method method)
async_generic_result result(sess);

detail::basic_handler *handler = new detail::basic_handler(sess.get_logger(), result);

control.complete = detail::basic_handler::handler;
control.priv = handler;

Expand Down Expand Up @@ -142,6 +190,59 @@ async_generic_result send_to_single_state(session &sess, dnet_io_control &contro
return send_impl(sess, control, send_to_single_state_io_impl);
}

template <typename Method>
async_generic_result n2_send_impl(session &sess, const n2_request &request, Method method)
{
async_generic_result result(sess);

auto handler = std::make_shared<detail::basic_handler>(request.cmd, sess.get_logger(), result);

auto calls_counter = std::make_shared<std::atomic<bool>>(false);
auto test_and_set_reply_has_sent = [calls_counter](bool last) {
if (last) {
return calls_counter->exchange(true);
} else {
return bool(*calls_counter);
}
};

n2_request_info request_info{ request, n2_repliers() };

request_info.repliers.on_reply =
[handler, test_and_set_reply_has_sent](const std::shared_ptr<n2_body> &result, bool last){
if (test_and_set_reply_has_sent(last)) {
return -EALREADY;
}

return handler->on_reply(result, last);
};
request_info.repliers.on_reply_error =
[handler, test_and_set_reply_has_sent](int err, bool last){
if (test_and_set_reply_has_sent(last)) {
return -EALREADY;
}

return handler->on_reply_error(err, last);
};

const size_t count = method(sess, std::move(request_info), handler->m_addr);
handler->set_total(count);
return result;
}

int n2_trans_alloc_send(dnet_session *s, n2_request_info &&request_info, dnet_addr &addr_out); // implemented in trans.cpp

static size_t n2_send_to_single_state_impl(session &sess, n2_request_info &&request_info, dnet_addr &addr_out)
{
n2_trans_alloc_send(sess.get_native(), std::move(request_info), addr_out);
return 1;
}

async_generic_result n2_send_to_single_state(session &sess, const n2_request &request)
{
return n2_send_impl(sess, request, n2_send_to_single_state_impl);
}

static size_t send_to_each_backend_impl(session &sess, dnet_trans_control &ctl)
{
return dnet_request_cmd(sess.get_native(), &ctl);
Expand Down
Loading