Try avoid queuing#4
Conversation
|
@vinniefalco Could you, please, briefly review interfaces on that branch, namely https://github.com/basiliscos/cpp-bredis/blob/try-avoid-queuing/include/bredis/AsyncConnection.hpp https://github.com/basiliscos/cpp-bredis/blob/try-avoid-queuing/include/bredis/Command.hpp I'm going to write documentation ( I deciced that you are right, and |
|
|
||
| namespace bredis { | ||
|
|
||
| template <typename AsyncStream> class AsyncConnection { |
There was a problem hiding this comment.
Consider renaming AsyncConnection to just Connection
Consider renaming AsyncStream to NextLayer
| template <typename AsyncStream> class AsyncConnection { | ||
| using protocol_type_t = typename AsyncStream::protocol_type; | ||
|
|
||
| static_assert(std::is_same<protocol_type_t, boost::asio::ip::tcp>::value || |
There was a problem hiding this comment.
The same comment from before applies, this assert needlessly constrains the template type. What if I want AsyncStream to be boost::asio::ip::tcp::socket&? Or beast::test::fail_stream<boost::asio::ip::tcp::socket>?
| using args_container_t = std::vector<string_t>; | ||
|
|
||
| private: | ||
| AsyncStream socket_; |
There was a problem hiding this comment.
Consider renaming socket_ to stream_. Rationale: AsyncStream (or NextLayer) might not be a socket. Users could be misled into thinking this only works with sockets.
|
|
||
| public: | ||
| template <typename... Args> | ||
| AsyncConnection(Args &&... args) : socket_(std::forward<Args>(args)...) {} |
| template <typename... Args> | ||
| AsyncConnection(Args &&... args) : socket_(std::forward<Args>(args)...) {} | ||
|
|
||
| inline AsyncStream &next_layer() { return socket_; } |
There was a problem hiding this comment.
Consider adding a const version of this function.
The inline keyword is unnecessary, function definitions placed in class declarations behave as if the inline keyword is already present.
| AsyncConnection(Args &&... args) : socket_(std::forward<Args>(args)...) {} | ||
|
|
||
| inline AsyncStream &next_layer() { return socket_; } | ||
| inline AsyncStream &&move_layer() { return std::move(socket_); } |
There was a problem hiding this comment.
This function is going to cause problems. First of all, there's no way to guarantee that AsyncStream is movable. You will have a function called move_layer which in some cases will perform a copy. Or worse, do nothing. For example what if AsyncStream is a reference type? The function would be meaningless in that case. I would just leave out this function. If callers don't want the connection object to have ownership of the socket they can just declare it with a reference type instead.
If you really want this behavior, this would be a better interface:
AsyncStream
release_layer()
{
return std::move(socket_);
}
The name less misleading and it returns an object instead of the rvalue reference.
|
|
||
| template <typename ReadCallback, typename Buffer> | ||
| void async_read(Buffer &rx_buff, ReadCallback read_callback, | ||
| std::size_t replies_count = 1); |
There was a problem hiding this comment.
These asynchronous functions look like they work similarly to Asio in the sense that the callbacks are completion handlers. Consider making the return value use the boost::asio::async_result mechanism so that those callback parameters are true completion handlers which can use yield, coroutines, or user defined types. Check out this paper:
http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2014/n3896.pdf
In particular section 8. I can help you add the necessary code if you wish to go this route (and I highly recommend it).
| std::size_t replies_count = 1); | ||
|
|
||
| /* synchronous interface */ | ||
| void write(const command_wrapper_t &command); |
There was a problem hiding this comment.
Is there a version which supports error_code&?
| void async_write(const command_wrapper_t &command, | ||
| WriteCallback write_callback); | ||
|
|
||
| template <typename ReadCallback, typename Buffer> |
There was a problem hiding this comment.
Consider renaming Buffer to DynamicBuffer to make it clear what the type requirements are.
| /* synchronous interface */ | ||
| void write(const command_wrapper_t &command); | ||
|
|
||
| positive_parse_result_t read(boost::asio::streambuf &rx_buff); |
There was a problem hiding this comment.
Consider this signature instead:
template<class DynamicBuffer>
positive_parse_result_t
read(DynamicBuffer &rx_buff);
| auto str = std::make_shared<std::string>( | ||
| boost::apply_visitor(command_serializer_visitor(), command)); | ||
| auto str_ptr = str.get(); | ||
| BREDIS_LOG_DEBUG("async_write >> " << str_ptr->c_str()); |
There was a problem hiding this comment.
There's a different way to log which doesn't require the macro or explicit statements. You could simply provide a logging wrapper which wraps the socket, and instantiate the class with that type as the next layer type.
|
|
||
| real_handler_t real_handler(std::move(handler)); | ||
| real_handler = std::move(real_handler), &rx_buff, replies_count | ||
| ](const sys::error_code &error_code, std::size_t bytes_transferred) mutable { |
There was a problem hiding this comment.
If real_handler has overloaded asio_handler_invoke to dispatch using an io_service::strand, this code will cause the stream_ to be accessed in an unsafe way if there is more than one thread calling io_service::run. That is because there is no overload of asio_handler_invoke which takes a pointer to your lambda in its last argument.
From
http://www.boost.org/doc/libs/1_59_0/doc/html/boost_asio/reference/asio_handler_invoke.html
"When asynchronous operations are composed from other asynchronous operations, all intermediate handlers should be invoked using the same method as the final handler. This is required to ensure that user-defined objects are not accessed in a way that may violate the guarantees."
Your lambda is an intermediate handler, and your asynchronous operation (Connection<NextLayer>::async_read) is composed from other asynchronous operations (async_read_until). You cannot use a lambda as the completion handler in the call to async_read_until and expect it to work correctly in all configurations. You must provide a user defined type which wraps the real_handler and forwards the hooking functions such as asio_handler_invoke.
That's what these friends are for:
https://github.com/vinniefalco/Beast/blob/master/examples/echo_op.cpp#L108
If you want this to work, and it seems like you do because you have put a lot of effort into it, you need to rewrite Connection<NextLayer>::async_read to work the way it does in this echo_op example. That means making a new class which holds the final handler and your intermediate state variables such as a reference to the stream and a reference to the dynamic buffer. Such a class might look like this:
template<class Stream, class DynamicBuffer, class Handler>
class async_read_op
{
int step_ = 0;
Stream& stream_;
DynamicBuffer& buffer_;
Handler handler_;
public:
async_read_op(async_read_op&&) = default;
template<class DeducedHandler>
async_read_op(DeducedHandler&& handler, Stream& stream, DynamicBuffer& buffer)
: stream_(stream)
, buffer_(buffer)
, handler_(std::forward<DeducedHandler>(handler_)
{
}
void operator()(error_code ec, std::size_t bytes_transferred);
friend bool asio_handler_is_continuation(echo_op* op)
{
// This next call is structured to permit argument
// dependent lookup to take effect.
using boost::asio::asio_handler_is_continuation;
// Always use std::addressof to pass the pointer to the handler,
// otherwise an unwanted overload of operator& may be called instead.
return op->p_->step > 2 || // <--- change this number based on your switch
asio_handler_is_continuation(std::addressof(op->handler_));
}
friend void* asio_handler_allocate(std::size_t size, echo_op* op)
{
using boost::asio::asio_handler_allocate;
return asio_handler_allocate(size, std::addressof(op->handler_));
}
friend void asio_handler_deallocate(void* p, std::size_t size, echo_op* op)
{
using boost::asio::asio_handler_deallocate;
return asio_handler_deallocate(p, size, std::addressof(op->handler_));
}
template<class Function>
friend void asio_handler_invoke(Function&& f, echo_op* op)
{
using boost::asio::asio_handler_invoke;
return asio_handler_invoke(f, std::addressof(op->handler_));
}
};
Then move the code from your Connection<NextLayer>::async_read into async_read_op::operator() and convert it to a state machine like what you see here with the switch statement
https://github.com/vinniefalco/Beast/blob/master/examples/doc_core_samples.hpp#L388
There was a problem hiding this comment.
Thanks a lot for explanation, will explore and do it safe way :)
let final handler be invoked the same way as intermediate handlers (e.g. for using stands + multiple threads)
| } | ||
| }); | ||
| async_read_op<NextLayer, DynamicBuffer, real_handler_t> async_op( | ||
| std::move(real_handler), stream_, rx_buff, replies_count); |
There was a problem hiding this comment.
This is an interesting way to do it
There was a problem hiding this comment.
:) I don't have state machine here, as logically it is just single "read-or-die" operation, i.e. there are no writes or any other async operations... because operations pipelining is pushed to upper lever.
| } | ||
|
|
||
| callback_(error_code, std::move(result)); | ||
| done_ = true; |
There was a problem hiding this comment.
I don't think you need done_ at all, Asio will call your asio_handler_is_continuation exactly once, when you call async_read_until.
There was a problem hiding this comment.
Seems so.
That also means, that friend will be simple as :
friend bool asio_handler_is_continuation(async_read_op *op) {
using boost::asio::asio_handler_is_continuation;
return asio_handler_is_continuation(std::addressof(op->callback_));
}
|
@vinniefalco Please, take a look the example https://github.com/basiliscos/cpp-bredis/blob/try-avoid-queuing/examples/stream-parse.cpp I hope, it will provide explanation why the result is templated by |
| // and finally print it | ||
| std::cout << "on channel '" << payload->first | ||
| << "' received payload :: '" << payload->second | ||
| << "'\n"; |
There was a problem hiding this comment.
Really hard to understand this code! This is an example?
Also, I suggest boost::string_ref or std::string_view instead of std::string const&
There was a problem hiding this comment.
Urghhhh. The example outputs something like
subscribed
subscription confirmed
on channel 'channel-one' received payload :: '{"a": 5555}'
Also, I suggest boost::string_ref or std::string_view instead of std::string const&
In this example (synch-subscription.cpp) it operates on extracted results, i.e. results already unbounded from buffer, so, they are already std::string, so, return const std::string& seems natural here.
In the example near by (stream-parse.cpp) int operates on yet bounded to buffer results So, there you'll find something like boost::string_ref. But, as the rx_buff might be scattered / fragmented, it cannot return just boost::string_ref as the last requires continuous/flattened memory region.
It is possible, however, apply some visitor, to let it flattens parsed_results, and then returns boost::string_refs, if needed; although I don't see rationale in that as r::extractor<Iterator>() does mostly the same, but with ownership semantics (i.e. the result of extraction will be std::string).
There was a problem hiding this comment.
Fair enough but this is really confusing. I suggest getting feedback from other users and see what they think
There was a problem hiding this comment.
Okay, going to outline that moment in readme and release this week :)
Switch to boost::optional<reference_wrapper<std::string>> instead of boost::optional<const std::string&> as the last have some issues
…into try-avoid-queuing
|
FINALLY LOL!! I suggest smaller, more frequent merges from now on :) |
Issue #4 : remove unneeded commit on tx_buff
No description provided.