Skip to content

Commit bac0683

Browse files
sabramkinSergey Abramkin
authored andcommitted
start work: callback_result_data, basic handler
1 parent c793c96 commit bac0683

File tree

12 files changed

+403
-65
lines changed

12 files changed

+403
-65
lines changed

bindings/cpp/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ set(ELLIPTICS_CLIENT_SRCS
3939
../../library/request_queue.cpp
4040
../../library/rbtree.c
4141
../../library/trans.c
42+
../../library/trans.cpp
4243
../../library/tests.c
4344
../../library/common.cpp
4445
../../library/access_context.cpp

bindings/cpp/async_result.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ bool async_result<T>::get(T &entry)
199199
{
200200
wait(session::throw_at_get);
201201
for (auto it = m_data->results.begin(); it != m_data->results.end(); ++it) {
202-
if (it->status() == 0 && !it->data().empty()) {
202+
if (it->is_valid() && it->status() == 0) {
203203
entry = *it;
204204
return true;
205205
}

bindings/cpp/callback.cpp

Lines changed: 91 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,24 +40,42 @@ class basic_handler
4040
return 0;
4141
}
4242

43+
// Used by old (protocol-dependent) mechanic, must be removed after refactoring
4344
basic_handler(std::unique_ptr<dnet_logger> logger, async_generic_result &result) :
4445
m_logger(std::move(logger)),
4546
m_handler(result), m_completed(0), m_total(0)
4647
{
48+
memset(&m_addr, 0, sizeof(dnet_addr));
49+
memset(&m_cmd, 0, sizeof(dnet_cmd));
4750
}
4851

49-
bool handle(dnet_addr *addr, dnet_cmd *cmd)
52+
basic_handler(const dnet_addr &addr, const dnet_cmd &cmd,
53+
std::unique_ptr<dnet_logger> logger, async_generic_result &result) :
54+
m_addr(addr),
55+
m_cmd(cmd),
56+
m_logger(std::move(logger)),
57+
m_handler(result), m_completed(0), m_total(0)
5058
{
51-
if (is_trans_destroyed(cmd)) {
52-
return increment_completed();
53-
}
59+
}
5460

61+
void log_reply_info(dnet_addr *addr, dnet_cmd *cmd)
62+
{
5563
DNET_LOG(m_logger, cmd->status ? DNET_LOG_ERROR : DNET_LOG_NOTICE, "{}: {}: handled reply from: {}, "
5664
"trans: {}, cflags: {}, status: {}, "
5765
"size: {}, client: {}, last: {}",
5866
dnet_dump_id(&cmd->id), dnet_cmd_string(cmd->cmd), addr ? dnet_addr_string(addr) : "<unknown>",
5967
cmd->trans, dnet_flags_dump_cflags(cmd->flags), int(cmd->status), cmd->size,
6068
!(cmd->flags & DNET_FLAGS_REPLY), !(cmd->flags & DNET_FLAGS_MORE));
69+
}
70+
71+
// Used by old (protocol-dependent) mechanic, must be removed after refactoring
72+
bool handle(dnet_addr *addr, dnet_cmd *cmd)
73+
{
74+
if (is_trans_destroyed(cmd)) {
75+
return increment_completed();
76+
}
77+
78+
log_reply_info(addr, cmd);
6179

6280
auto data = std::make_shared<callback_result_data>(addr, cmd);
6381

@@ -71,6 +89,29 @@ class basic_handler
7189
return false;
7290
}
7391

92+
int on_reply(const std::shared_ptr<void> &result)
93+
{
94+
// TODO(sabramkin): Output only protocol-independent known info (currently old-mechanic logging used)
95+
log_reply_info(&m_addr, &m_cmd);
96+
97+
auto data = std::make_shared<n2_callback_result_data>(m_addr, m_cmd, result, 0);
98+
callback_result_entry entry(data);
99+
m_handler.process(entry);
100+
return 0;
101+
}
102+
103+
int on_reply_error(int err)
104+
{
105+
// TODO(sabramkin): Output only protocol-independent known info (currently old-mechanic logging used)
106+
log_reply_info(&m_addr, &m_cmd);
107+
108+
auto data = std::make_shared<n2_callback_result_data>(m_addr, m_cmd, nullptr, err);
109+
data->error = create_error(err, "n2 lookup_new error"); // TODO(sabramkin): rework error
110+
callback_result_entry entry(data);
111+
m_handler.process(entry);
112+
return 0;
113+
}
114+
74115
// how many independent transactions share this handler plus call below
75116
// call below and corresponding +1 is needed, since transactions can be completed
76117
// before send_impl() calls this method to setup this 'reference counter'
@@ -92,6 +133,8 @@ class basic_handler
92133
return false;
93134
}
94135

136+
dnet_addr m_addr;
137+
dnet_cmd m_cmd;
95138
std::unique_ptr<dnet_logger> m_logger;
96139
async_result_handler<callback_result_entry> m_handler;
97140
std::atomic_size_t m_completed;
@@ -100,13 +143,20 @@ class basic_handler
100143

101144
} // namespace detail
102145

146+
const dnet_addr &get_destination_addr(session &sess)
147+
{
148+
const auto fwd = sess.get_forward();
149+
if (fwd.is_valid())
150+
return fwd.to_raw();
151+
return sess.get_direct_address().to_raw();
152+
}
153+
103154
template <typename Method, typename T>
104155
async_generic_result send_impl(session &sess, T &control, Method method)
105156
{
106157
async_generic_result result(sess);
107158

108159
detail::basic_handler *handler = new detail::basic_handler(sess.get_logger(), result);
109-
110160
control.complete = detail::basic_handler::handler;
111161
control.priv = handler;
112162

@@ -142,6 +192,42 @@ async_generic_result send_to_single_state(session &sess, dnet_io_control &contro
142192
return send_impl(sess, control, send_to_single_state_io_impl);
143193
}
144194

195+
template <typename Method>
196+
async_generic_result n2_send_impl(session &sess, const n2_request &request, Method method)
197+
{
198+
async_generic_result result(sess);
199+
200+
auto handler = std::make_shared<detail::basic_handler>(get_destination_addr(sess), request.cmd,
201+
sess.get_logger(), result);
202+
n2_request_info request_info{ request, n2_repliers() };
203+
204+
request_info.repliers.on_reply =
205+
[handler](const std::shared_ptr<void> &result){
206+
return handler->on_reply(result);
207+
};
208+
request_info.repliers.on_reply_error =
209+
[handler](int err){
210+
return handler->on_reply_error(err);
211+
};
212+
213+
const size_t count = method(sess, std::move(request_info));
214+
handler->set_total(count);
215+
return result;
216+
}
217+
218+
int n2_trans_alloc_send(dnet_session *s, n2_request_info &&request_info); // implemented in trans.cpp
219+
220+
static size_t n2_send_to_single_state_impl(session &sess, n2_request_info &&request_info)
221+
{
222+
n2_trans_alloc_send(sess.get_native(), std::move(request_info));
223+
return 1;
224+
}
225+
226+
async_generic_result n2_send_to_single_state(session &sess, const n2_request &request)
227+
{
228+
return n2_send_impl(sess, request, n2_send_to_single_state_impl);
229+
}
230+
145231
static size_t send_to_each_backend_impl(session &sess, dnet_trans_control &ctl)
146232
{
147233
return dnet_request_cmd(sess.get_native(), &ctl);

bindings/cpp/callback_p.h

Lines changed: 143 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,27 @@
3030
#include <thread>
3131

3232
#include "elliptics/async_result_cast.hpp"
33+
#include "library/n2_protocol.hpp"
34+
35+
/*
36+
* This macroses should be used surrounding all entry::methods which work directly
37+
* with m_data or data() to ensure that meaningful exceptions are thrown
38+
*/
39+
#define DNET_DATA_BEGIN_2() try { \
40+
do {} while (false)
41+
42+
#define DNET_DATA_END_2(SIZE) \
43+
} catch (not_found_error &) { \
44+
if (!is_valid()) { \
45+
throw_error(-ENOENT, "entry::%s(): entry is null", __FUNCTION__); \
46+
} else {\
47+
dnet_cmd *cmd = command(); \
48+
throw_error(-ENOENT, cmd->id, "entry::%s(): data.size is too small, expected: %zu, actual: %zu, status: %d", \
49+
__FUNCTION__, size_t(SIZE), data.size(), cmd->status); \
50+
} \
51+
throw; \
52+
} \
53+
do {} while (false)
3354

3455
namespace ioremap { namespace elliptics {
3556

@@ -63,7 +84,25 @@ class session_scope
6384
uint32_t m_policy;
6485
};
6586

66-
class callback_result_data
87+
// TODO(sabramkin): This abstraction is temporary and used while refactoring in progress.
88+
// TODO(sabramkin): After refactoring only n2_callback_result_data should stay, no base is needed.
89+
class callback_result_data_base
90+
{
91+
public:
92+
virtual dnet_addr *address() const = 0;
93+
virtual dnet_cmd *command() const = 0;
94+
virtual int status() const = 0;
95+
virtual bool is_valid() const = 0;
96+
virtual bool is_ack() const = 0;
97+
virtual bool is_final() const = 0;
98+
virtual bool is_client() const = 0;
99+
100+
virtual ~callback_result_data_base() = default;
101+
102+
error_info error;
103+
};
104+
105+
class callback_result_data : public callback_result_data_base
67106
{
68107
public:
69108
callback_result_data()
@@ -81,12 +120,112 @@ class callback_result_data
81120
memcpy(data.data<char>() + sizeof(dnet_addr), cmd, sizeof(dnet_cmd) + cmd->size);
82121
}
83122

84-
virtual ~callback_result_data()
123+
dnet_addr *address() const override
85124
{
125+
DNET_DATA_BEGIN_2();
126+
return data
127+
.data<dnet_addr>();
128+
DNET_DATA_END_2(0);
129+
}
130+
131+
dnet_cmd *command() const override
132+
{
133+
DNET_DATA_BEGIN_2();
134+
return data
135+
.skip<dnet_addr>()
136+
.data<dnet_cmd>();
137+
DNET_DATA_END_2(0);
138+
}
139+
140+
int status() const override
141+
{
142+
return command()->status;
143+
}
144+
145+
bool is_valid() const override
146+
{
147+
return !data.empty();
148+
}
149+
150+
bool is_ack() const override
151+
{
152+
return status() == 0 && data.empty();
153+
}
154+
155+
bool is_final() const override
156+
{
157+
return !(command()->flags & DNET_FLAGS_MORE);
158+
}
159+
160+
bool is_client() const override
161+
{
162+
return !(command()->flags & DNET_FLAGS_REPLY);
86163
}
87164

88165
data_pointer data;
89-
error_info error;
166+
};
167+
168+
class n2_callback_result_data : public callback_result_data_base
169+
{
170+
public:
171+
n2_callback_result_data() = default;
172+
173+
n2_callback_result_data(const dnet_addr &addr_in, const dnet_cmd &cmd_in,
174+
const std::shared_ptr<void> &result_in, int result_status_in)
175+
: addr(addr_in)
176+
, cmd(cmd_in)
177+
, result(result_in)
178+
, result_status(result_status_in)
179+
{
180+
// TODO(sabramkin):
181+
// Here is emulated protocol logic for single-response commands. It is hardcode that we must
182+
// resolve when we introduce bulk commands. See also is_final() method. Note that protocol
183+
// mustn't provide its inner structures (such as dnet_cmd), so we must remove command() method
184+
// in the future, and must remove cmd member.
185+
cmd.flags = (cmd.flags & ~(DNET_FLAGS_NEED_ACK)) | DNET_FLAGS_REPLY;
186+
}
187+
188+
dnet_addr *address() const override
189+
{
190+
return const_cast<dnet_addr *>(&addr);
191+
}
192+
193+
dnet_cmd *command() const override
194+
{
195+
return const_cast<dnet_cmd *>(&cmd);
196+
}
197+
198+
int status() const override
199+
{
200+
return result_status;
201+
}
202+
203+
bool is_valid() const override
204+
{
205+
return true;
206+
}
207+
208+
bool is_ack() const override
209+
{
210+
return result_status == 0 && !result;
211+
}
212+
213+
bool is_final() const override
214+
{
215+
return true; // TODO(sabramkin): rework when any bulk command is supported
216+
}
217+
218+
bool is_client() const override
219+
{
220+
return false;
221+
}
222+
223+
dnet_addr addr;
224+
dnet_cmd cmd;
225+
226+
// Either result or nonzero result_status must be set
227+
std::shared_ptr<void> result;
228+
int result_status;
90229
};
91230

92231
struct dnet_net_state_deleter
@@ -103,6 +242,7 @@ typedef std::unique_ptr<dnet_net_state, dnet_net_state_deleter> net_state_ptr;
103242
// Send request to specific state
104243
async_generic_result send_to_single_state(session &sess, const transport_control &control);
105244
async_generic_result send_to_single_state(session &sess, dnet_io_control &control);
245+
async_generic_result n2_send_to_single_state(session &sess, const n2_request &request);
106246

107247
// Send request to each backend
108248
async_generic_result send_to_each_backend(session &sess, const transport_control &control);

bindings/cpp/newapi/result_entry.cpp

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
#include "elliptics/newapi/result_entry.hpp"
2+
#include "bindings/cpp/callback_p.h"
3+
#include "library/n2_protocol.hpp"
24
#include "library/protocol.hpp"
35
#include "library/elliptics.h"
46

@@ -20,20 +22,21 @@ bool callback_result_entry::empty() const {
2022
return raw_data().empty();
2123
}
2224

23-
std::string lookup_result_entry::path() const {
24-
dnet_lookup_response response;
25+
lookup_result_entry::lookup_result_entry()
26+
: ioremap::elliptics::callback_result_entry(std::make_shared<ioremap::elliptics::n2_callback_result_data>()) {
27+
}
2528

26-
deserialize(raw_data(), response);
29+
std::string lookup_result_entry::path() const {
30+
auto &response = *static_cast<ioremap::elliptics::n2::lookup_response *>(message());
2731
return response.path;
2832
}
2933

3034
dnet_record_info lookup_result_entry::record_info() const {
35+
auto &response = *static_cast<ioremap::elliptics::n2::lookup_response *>(message());
36+
3137
dnet_record_info info;
3238
memset(&info, 0, sizeof(info));
3339

34-
dnet_lookup_response response;
35-
deserialize(raw_data(), response);
36-
3740
info.record_flags = response.record_flags;
3841
info.user_flags = response.user_flags;
3942

@@ -61,16 +64,12 @@ lookup_result_entry::checksum_t convert_checksum(const std::vector<unsigned char
6164
}
6265

6366
lookup_result_entry::checksum_t lookup_result_entry::json_checksum() const {
64-
dnet_lookup_response response;
65-
deserialize(raw_data(), response);
66-
67+
auto &response = *static_cast<ioremap::elliptics::n2::lookup_response *>(message());
6768
return convert_checksum(response.json_checksum);
6869
}
6970

7071
lookup_result_entry::checksum_t lookup_result_entry::data_checksum() const {
71-
dnet_lookup_response response;
72-
deserialize(raw_data(), response);
73-
72+
auto &response = *static_cast<ioremap::elliptics::n2::lookup_response *>(message());
7473
return convert_checksum(response.data_checksum);
7574
}
7675

0 commit comments

Comments
 (0)