Skip to content

Try avoid queuing#4

Merged
basiliscos merged 174 commits into
masterfrom
try-avoid-queuing
Jun 24, 2017
Merged

Try avoid queuing#4
basiliscos merged 174 commits into
masterfrom
try-avoid-queuing

Conversation

@basiliscos

Copy link
Copy Markdown
Owner

No description provided.

@basiliscos

Copy link
Copy Markdown
Owner Author

@vinniefalco Could you, please, briefly review interfaces on that branch, namely

https://github.com/basiliscos/cpp-bredis/blob/try-avoid-queuing/include/bredis/AsyncConnection.hpp

https://github.com/basiliscos/cpp-bredis/blob/try-avoid-queuing/include/bredis/Command.hpp

I'm going to write documentation (Readme.md) + release v0.02.

I deciced that you are right, and Subscription as well as queueing should not be presented in the low-level module, as it's requires special hadling, i.e. close and move out socket before destroing subscription object. Othewise we'll end up with UB.

Comment thread include/bredis/AsyncConnection.hpp Outdated

namespace bredis {

template <typename AsyncStream> class AsyncConnection {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider renaming AsyncConnection to just Connection
Consider renaming AsyncStream to NextLayer

Comment thread include/bredis/AsyncConnection.hpp Outdated
template <typename AsyncStream> class AsyncConnection {
using protocol_type_t = typename AsyncStream::protocol_type;

static_assert(std::is_same<protocol_type_t, boost::asio::ip::tcp>::value ||

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same comment from before applies, this assert needlessly constrains the template type. What if I want AsyncStream to be boost::asio::ip::tcp::socket&? Or beast::test::fail_stream<boost::asio::ip::tcp::socket>?

Comment thread include/bredis/AsyncConnection.hpp Outdated
using args_container_t = std::vector<string_t>;

private:
AsyncStream socket_;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider renaming socket_ to stream_. Rationale: AsyncStream (or NextLayer) might not be a socket. Users could be misled into thinking this only works with sockets.

Comment thread include/bredis/AsyncConnection.hpp Outdated

public:
template <typename... Args>
AsyncConnection(Args &&... args) : socket_(std::forward<Args>(args)...) {}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good!

Comment thread include/bredis/AsyncConnection.hpp Outdated
template <typename... Args>
AsyncConnection(Args &&... args) : socket_(std::forward<Args>(args)...) {}

inline AsyncStream &next_layer() { return socket_; }

@vinniefalco vinniefalco Apr 27, 2017

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a const version of this function.

The inline keyword is unnecessary, function definitions placed in class declarations behave as if the inline keyword is already present.

Comment thread include/bredis/AsyncConnection.hpp Outdated
AsyncConnection(Args &&... args) : socket_(std::forward<Args>(args)...) {}

inline AsyncStream &next_layer() { return socket_; }
inline AsyncStream &&move_layer() { return std::move(socket_); }

@vinniefalco vinniefalco Apr 27, 2017

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is going to cause problems. First of all, there's no way to guarantee that AsyncStream is movable. You will have a function called move_layer which in some cases will perform a copy. Or worse, do nothing. For example what if AsyncStream is a reference type? The function would be meaningless in that case. I would just leave out this function. If callers don't want the connection object to have ownership of the socket they can just declare it with a reference type instead.

If you really want this behavior, this would be a better interface:

AsyncStream
release_layer()
{
    return std::move(socket_);
}

The name less misleading and it returns an object instead of the rvalue reference.

Comment thread include/bredis/AsyncConnection.hpp Outdated

template <typename ReadCallback, typename Buffer>
void async_read(Buffer &rx_buff, ReadCallback read_callback,
std::size_t replies_count = 1);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These asynchronous functions look like they work similarly to Asio in the sense that the callbacks are completion handlers. Consider making the return value use the boost::asio::async_result mechanism so that those callback parameters are true completion handlers which can use yield, coroutines, or user defined types. Check out this paper:
http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2014/n3896.pdf

In particular section 8. I can help you add the necessary code if you wish to go this route (and I highly recommend it).

Comment thread include/bredis/AsyncConnection.hpp Outdated
std::size_t replies_count = 1);

/* synchronous interface */
void write(const command_wrapper_t &command);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a version which supports error_code&?

Comment thread include/bredis/AsyncConnection.hpp Outdated
void async_write(const command_wrapper_t &command,
WriteCallback write_callback);

template <typename ReadCallback, typename Buffer>

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider renaming Buffer to DynamicBuffer to make it clear what the type requirements are.

Comment thread include/bredis/AsyncConnection.hpp Outdated
/* synchronous interface */
void write(const command_wrapper_t &command);

positive_parse_result_t read(boost::asio::streambuf &rx_buff);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider this signature instead:

template<class DynamicBuffer>
positive_parse_result_t
read(DynamicBuffer &rx_buff);

auto str = std::make_shared<std::string>(
boost::apply_visitor(command_serializer_visitor(), command));
auto str_ptr = str.get();
BREDIS_LOG_DEBUG("async_write >> " << str_ptr->c_str());

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a different way to log which doesn't require the macro or explicit statements. You could simply provide a logging wrapper which wraps the socket, and instantiate the class with that type as the next layer type.

Comment thread include/bredis/impl/connection.ipp Outdated

real_handler_t real_handler(std::move(handler));
real_handler = std::move(real_handler), &rx_buff, replies_count
](const sys::error_code &error_code, std::size_t bytes_transferred) mutable {

@vinniefalco vinniefalco Jun 12, 2017

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If real_handler has overloaded asio_handler_invoke to dispatch using an io_service::strand, this code will cause the stream_ to be accessed in an unsafe way if there is more than one thread calling io_service::run. That is because there is no overload of asio_handler_invoke which takes a pointer to your lambda in its last argument.

From
http://www.boost.org/doc/libs/1_59_0/doc/html/boost_asio/reference/asio_handler_invoke.html

"When asynchronous operations are composed from other asynchronous operations, all intermediate handlers should be invoked using the same method as the final handler. This is required to ensure that user-defined objects are not accessed in a way that may violate the guarantees."

Your lambda is an intermediate handler, and your asynchronous operation (Connection<NextLayer>::async_read) is composed from other asynchronous operations (async_read_until). You cannot use a lambda as the completion handler in the call to async_read_until and expect it to work correctly in all configurations. You must provide a user defined type which wraps the real_handler and forwards the hooking functions such as asio_handler_invoke.

That's what these friends are for:
https://github.com/vinniefalco/Beast/blob/master/examples/echo_op.cpp#L108

If you want this to work, and it seems like you do because you have put a lot of effort into it, you need to rewrite Connection<NextLayer>::async_read to work the way it does in this echo_op example. That means making a new class which holds the final handler and your intermediate state variables such as a reference to the stream and a reference to the dynamic buffer. Such a class might look like this:

template<class Stream, class DynamicBuffer, class Handler>
class async_read_op
{
  int step_ = 0;
  Stream& stream_;
  DynamicBuffer& buffer_;
  Handler handler_;

public:
  async_read_op(async_read_op&&) = default;

  template<class DeducedHandler>
  async_read_op(DeducedHandler&& handler, Stream& stream, DynamicBuffer& buffer)
    : stream_(stream)
    , buffer_(buffer)
    , handler_(std::forward<DeducedHandler>(handler_)
  {
  }

  void operator()(error_code ec, std::size_t bytes_transferred);

    friend bool asio_handler_is_continuation(echo_op* op)
    {
        // This next call is structured to permit argument
        // dependent lookup to take effect.
        using boost::asio::asio_handler_is_continuation;

        // Always use std::addressof to pass the pointer to the handler,
        // otherwise an unwanted overload of operator& may be called instead.
        return op->p_->step > 2 || // <--- change this number based on your switch
            asio_handler_is_continuation(std::addressof(op->handler_));
    }

    friend void* asio_handler_allocate(std::size_t size, echo_op* op)
    {
        using boost::asio::asio_handler_allocate;
        return asio_handler_allocate(size, std::addressof(op->handler_));
    }

    friend void asio_handler_deallocate(void* p, std::size_t size, echo_op* op)
    {
        using boost::asio::asio_handler_deallocate;
        return asio_handler_deallocate(p, size, std::addressof(op->handler_));
    }

    template<class Function>
    friend void asio_handler_invoke(Function&& f, echo_op* op)
    {
        using boost::asio::asio_handler_invoke;
        return asio_handler_invoke(f, std::addressof(op->handler_));
    }
};

Then move the code from your Connection<NextLayer>::async_read into async_read_op::operator() and convert it to a state machine like what you see here with the switch statement
https://github.com/vinniefalco/Beast/blob/master/examples/doc_core_samples.hpp#L388

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for explanation, will explore and do it safe way :)

let final handler be invoked the same way as intermediate
handlers (e.g. for using stands + multiple threads)
}
});
async_read_op<NextLayer, DynamicBuffer, real_handler_t> async_op(
std::move(real_handler), stream_, rx_buff, replies_count);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting way to do it

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:) I don't have state machine here, as logically it is just single "read-or-die" operation, i.e. there are no writes or any other async operations... because operations pipelining is pushed to upper lever.

Comment thread include/bredis/impl/async_op.ipp Outdated
}

callback_(error_code, std::move(result));
done_ = true;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need done_ at all, Asio will call your asio_handler_is_continuation exactly once, when you call async_read_until.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems so.

That also means, that friend will be simple as :

    friend bool asio_handler_is_continuation(async_read_op *op) {
        using boost::asio::asio_handler_is_continuation;
        return asio_handler_is_continuation(std::addressof(op->callback_));
    }

@basiliscos

Copy link
Copy Markdown
Owner Author

@vinniefalco Please, take a look the example

https://github.com/basiliscos/cpp-bredis/blob/try-avoid-queuing/examples/stream-parse.cpp

I hope, it will provide explanation why the result is templated by Iterator (deduced from Buffer template).

// and finally print it
std::cout << "on channel '" << payload->first
<< "' received payload :: '" << payload->second
<< "'\n";

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really hard to understand this code! This is an example?

Also, I suggest boost::string_ref or std::string_view instead of std::string const&

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Urghhhh. The example outputs something like

subscribed
subscription confirmed
on channel 'channel-one' received payload :: '{"a": 5555}'

Also, I suggest boost::string_ref or std::string_view instead of std::string const&

In this example (synch-subscription.cpp) it operates on extracted results, i.e. results already unbounded from buffer, so, they are already std::string, so, return const std::string& seems natural here.

In the example near by (stream-parse.cpp) int operates on yet bounded to buffer results So, there you'll find something like boost::string_ref. But, as the rx_buff might be scattered / fragmented, it cannot return just boost::string_ref as the last requires continuous/flattened memory region.

It is possible, however, apply some visitor, to let it flattens parsed_results, and then returns boost::string_refs, if needed; although I don't see rationale in that as r::extractor<Iterator>() does mostly the same, but with ownership semantics (i.e. the result of extraction will be std::string).

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough but this is really confusing. I suggest getting feedback from other users and see what they think

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, going to outline that moment in readme and release this week :)

@basiliscos basiliscos merged commit 7815aee into master Jun 24, 2017
@vinniefalco

Copy link
Copy Markdown

FINALLY LOL!! I suggest smaller, more frequent merges from now on :)

basiliscos added a commit that referenced this pull request Jan 25, 2018
basiliscos added a commit that referenced this pull request Jan 25, 2018
Issue #4 : remove unneeded commit on tx_buff
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants