Skip to content

Commit

Permalink
Merge pull request qicosmos#109 from muxiaozi/master
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos authored Jul 4, 2023
2 parents 8782f1d + 1818c73 commit 93088a7
Showing 1 changed file with 8 additions and 10 deletions.
18 changes: 8 additions & 10 deletions include/rest_rpc/rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
#include <thread>
#include <utility>

using namespace rest_rpc::rpc_service;

namespace rest_rpc {

/**
Expand Down Expand Up @@ -259,7 +257,7 @@ class rpc_client : private asio::noncopyable {
future_map_.emplace(fu_id, std::move(p));
}

msgpack_codec codec;
rpc_service::msgpack_codec codec;
auto ret = codec.pack_args(std::forward<Args>(args)...);
write(fu_id, request_type::req_res, std::move(ret),
MD5::MD5Hash32(rpc_name.data()));
Expand Down Expand Up @@ -309,7 +307,7 @@ class rpc_client : private asio::noncopyable {
callback_map_.emplace(cb_id, call);
}

msgpack_codec codec;
rpc_service::msgpack_codec codec;
auto ret = codec.pack_args(std::forward<Args>(args)...);
write(cb_id, request_type::req_res, std::move(ret),
MD5::MD5Hash32(rpc_name.data()));
Expand Down Expand Up @@ -353,15 +351,15 @@ class rpc_client : private asio::noncopyable {

template <typename T, size_t TIMEOUT = 3>
void publish(std::string key, T &&t) {
msgpack_codec codec;
rpc_service::msgpack_codec codec;
auto buf = codec.pack(std::move(t));
call<TIMEOUT>("publish", std::move(key), "",
std::string(buf.data(), buf.size()));
}

template <typename T, size_t TIMEOUT = 3>
void publish_by_token(std::string key, std::string token, T &&t) {
msgpack_codec codec;
rpc_service::msgpack_codec codec;
auto buf = codec.pack(std::move(t));
call<TIMEOUT>("publish_by_token", std::move(key), std::move(token),
std::string(buf.data(), buf.size()));
Expand Down Expand Up @@ -424,16 +422,16 @@ class rpc_client : private asio::noncopyable {
deadline_.async_wait([this, timeout](const asio::error_code &ec) {
if (!ec) {
if (has_connected_) {
write(0, request_type::req_res, buffer_type(0), 0);
write(0, request_type::req_res, rpc_service::buffer_type(0), 0);
}
}

reset_deadline_timer(timeout);
});
}

void write(std::uint64_t req_id, request_type type, buffer_type &&message,
uint32_t func_id) {
void write(std::uint64_t req_id, request_type type,
rpc_service::buffer_type &&message, uint32_t func_id) {
size_t size = message.size();
assert(size < MAX_BUF_LEN);
client_message_type msg{req_id, type, {message.release(), size}, func_id};
Expand Down Expand Up @@ -565,7 +563,7 @@ class rpc_client : private asio::noncopyable {
}

void send_subscribe(const std::string &key, const std::string &token) {
msgpack_codec codec;
rpc_service::msgpack_codec codec;
auto ret = codec.pack_args(key, token);
write(0, request_type::sub_pub, std::move(ret), MD5::MD5Hash32(key.data()));
}
Expand Down

0 comments on commit 93088a7

Please sign in to comment.