Skip to content

Commit

Permalink
update, add upload/download example
Browse files Browse the repository at this point in the history
  • Loading branch information
qicosmos committed Mar 28, 2019
1 parent 2eb0d4f commit 029d08b
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 59 deletions.
65 changes: 65 additions & 0 deletions examples/client/main.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <iostream>
#include <sync_client.hpp>
#include <async_client.hpp>
#include <fstream>
#include "codec.h"
using namespace std::chrono_literals;

Expand Down Expand Up @@ -170,7 +171,71 @@ void test_async_client() {
std::cin >> str;
}

void test_upload() {
async_client client("127.0.0.1", 9000);
client.connect();
bool r = client.wait_conn(1);
if (!r) {
std::cout << "connect timeout" << std::endl;
return;
}

std::ifstream file("E:/acl.7z", std::ios::binary);
file.seekg(0, std::ios::end);
size_t file_len = file.tellg();
file.seekg(0, std::ios::beg);
std::string conent;
conent.resize(file_len);
file.read(&conent[0], file_len);
std::cout << file_len << std::endl;

{
auto f = client.async_call("upload", "test", conent);
if (f.wait_for(500ms) == std::future_status::timeout) {
std::cout << "timeout" << std::endl;
}
else {
f.get().as();
std::cout << "ok" << std::endl;
}
}
{
auto f = client.async_call("upload", "test1", conent);
if (f.wait_for(500ms) == std::future_status::timeout) {
std::cout << "timeout" << std::endl;
}
else {
f.get().as();
std::cout << "ok" << std::endl;
}
}
}

void test_download() {
async_client client("127.0.0.1", 9000);
client.connect();
bool r = client.wait_conn(1);
if (!r) {
std::cout << "connect timeout" << std::endl;
return;
}

auto f = client.async_call("download", "test");
if (f.wait_for(500ms) == std::future_status::timeout) {
std::cout << "timeout" << std::endl;
}
else {
auto content = f.get().as<std::string>();
std::cout << content.size() << std::endl;
std::ofstream file("test", std::ios::binary);
file.write(content.data(), content.size());
}
}

int main() {
//test_upload();
//test_download();

test_async_client();
test_get_person();
test_get_person_name();
Expand Down
27 changes: 26 additions & 1 deletion examples/server/main.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include <rpc_server.h>
using namespace rest_rpc;
using namespace rpc_service;

#include <fstream>
struct dummy{
int add(connection* conn, int a, int b) { return a + b; }
};
Expand Down Expand Up @@ -34,6 +34,29 @@ person get_person(connection* conn) {
return { 1, "tom", 20 };
}

void upload(connection* conn, const std::string& filename, const std::string& content) {
std::cout << content.size() << std::endl;
std::ofstream file(filename, std::ios::binary);
file.write(content.data(), content.size());
}

std::string download(connection* conn, const std::string& filename) {
std::ifstream file(filename, std::ios::binary);
if (!file) {
return "";
}

file.seekg(0, std::ios::end);
size_t file_len = file.tellg();
file.seekg(0, std::ios::beg);
std::string content;
content.resize(file_len);
file.read(&content[0], file_len);
std::cout << file_len << std::endl;

return content;
}

int main() {
rpc_server server(9000, 4);

Expand All @@ -43,6 +66,8 @@ int main() {
server.register_handler("hello", hello);
server.register_handler("get_person_name", get_person_name);
server.register_handler("get_person", get_person);
server.register_handler("upload", upload);
server.register_handler("download", download);

server.run();

Expand Down
71 changes: 29 additions & 42 deletions include/async_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ namespace rest_rpc {
public:
async_client(const std::string& host, unsigned short port) : socket_(ios_), work_(ios_), strand_(ios_),
deadline_(ios_), host_(host), port_(port), body_(INIT_BUF_SIZE) {
read_buffers_[0] = boost::asio::buffer(head_);
read_buffers_[1] = boost::asio::buffer(body_);
thd_ = std::make_shared<std::thread>([this] {
ios_.run();
});
Expand Down Expand Up @@ -226,6 +224,7 @@ namespace rest_rpc {

void write(std::uint64_t req_id, buffer_type&& message) {
size_t size = message.size();
assert(size < MAX_BUF_LEN);
message_type msg{ {message.release(), size}, req_id };
strand_.post([this, msg] {
outbox_.emplace_back(std::move(msg));
Expand Down Expand Up @@ -264,11 +263,7 @@ namespace rest_rpc {
}

void do_read() {
std::array<boost::asio::mutable_buffer, 2> read_buffers;
read_buffers[0] = boost::asio::buffer(head_);
read_buffers[1] = boost::asio::buffer(body_);

boost::asio::async_read(socket_, read_buffers, boost::asio::transfer_at_least(HEAD_LEN),
boost::asio::async_read(socket_, boost::asio::buffer(head_),
[this](const boost::system::error_code& ec, const size_t length) {
if (!socket_.is_open()) {
//LOG(INFO) << "socket already closed";
Expand All @@ -279,29 +274,48 @@ namespace rest_rpc {
if (!ec) {
const uint32_t body_len = *((uint32_t*)(head_));
auto req_id = *((std::uint64_t*)(head_ + sizeof(int32_t)));
if (body_len > 0 && body_len < MAX_BUF_LEN) {
if (body_.size() < body_len) { body_.resize(body_len); }
read_body(req_id, body_len);
return;
}

if (body_len == 0 || body_len > MAX_BUF_LEN) {
//LOG(INFO) << "invalid body len";
call_back(req_id, errc::make_error_code(errc::invalid_argument), {});
close();
return;
}
}
else {
//LOG(INFO) << ec.message();
if (err_cb_) err_cb_(ec);
close();
}
});
}

if (body_len > body_.size()) {
body_.resize(body_len);
read_buffers_[1] = boost::asio::buffer(body_);
read_body(req_id, length - HEAD_LEN, body_len);
return;
}
void read_body(std::uint64_t req_id, size_t body_len) {
boost::asio::async_read(
socket_, boost::asio::buffer(body_.data(), body_len),
[this, req_id, body_len](boost::system::error_code ec, std::size_t length) {
//cancel_timer();

if (!socket_.is_open()) {
//LOG(INFO) << "socket already closed";
call_back(req_id, errc::make_error_code(errc::connection_aborted), {});
return;
}

if (!ec) {
//entier body
call_back(req_id, ec, { body_.data(), body_len });

do_read();
}
else {
//LOG(INFO) << ec.message();
if (err_cb_) err_cb_(ec);
close();
call_back(req_id, ec, {});
}
});
}
Expand Down Expand Up @@ -356,32 +370,6 @@ namespace rest_rpc {
strand_.post([this, req_id]() { cb_map_.erase(req_id); });
}

void read_body(std::uint64_t req_id, size_t start, size_t body_len) {
boost::asio::async_read(
socket_, boost::asio::buffer(body_.data() + start, body_len - start),
[this, req_id, body_len](boost::system::error_code ec, std::size_t length) {
//cancel_timer();

if (!socket_.is_open()) {
//LOG(INFO) << "socket already closed";
call_back(req_id, errc::make_error_code(errc::connection_aborted), {});
return;
}

if (!ec) {
//entier body
std::cout << length << std::endl;
call_back(req_id, ec, {body_.data(), body_len });

do_read();
}
else {
//LOG(INFO) << ec.message();
call_back(req_id, ec, {});
}
});
}

void close() {
has_connected_ = true;
if (socket_.is_open()) {
Expand Down Expand Up @@ -417,6 +405,5 @@ namespace rest_rpc {

char head_[HEAD_LEN] = {};
std::vector<char> body_;
std::array<boost::asio::mutable_buffer, 2> read_buffers_;
};
}
6 changes: 4 additions & 2 deletions include/client_util.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ namespace rest_rpc {
inline T get_result(std::string_view result) {
rpc_service::msgpack_codec codec;
auto tp = codec.unpack<std::tuple<int>>(result.data(), result.size());
if (std::get<0>(tp) != 0)
throw std::logic_error("rpc error");
if (std::get<0>(tp) != 0) {
auto ret = codec.unpack<std::tuple<int, std::string>>(result.data(), result.size());
throw std::logic_error(std::get<1>(ret));
}

auto err_tp = codec.unpack<std::tuple<int, T>>(result.data(), result.size());
return std::get<1>(err_tp);
Expand Down
2 changes: 1 addition & 1 deletion include/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace rest_rpc {
namespace rpc_service {

using buffer_type = msgpack::sbuffer;
struct msgpack_codec {
struct msgpack_codec {
const static size_t init_size = 2 * 1024;

template<typename... Args>
Expand Down
15 changes: 13 additions & 2 deletions include/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,30 @@ class connection : public std::enable_shared_from_this<connection>, private boos

bool has_closed() const { return has_closed_; }

void response(const char* data, size_t len) {
void response(std::string data) {
auto len = data.size();
assert(len < MAX_BUF_LEN);
std::array<boost::asio::const_buffer, 3> write_buffers;
write_msg_ = std::move(data);
write_buffers[0] = boost::asio::buffer(&len, sizeof(uint32_t));
write_buffers[1] = boost::asio::buffer(&req_id_, sizeof(uint64_t));
write_buffers[2] = boost::asio::buffer((char*)data, len);
write_buffers[2] = boost::asio::buffer(write_msg_.data(), len);
reset_timer();
auto self = this->shared_from_this();
boost::asio::async_write(
socket_, write_buffers,
[this, self](boost::system::error_code ec, std::size_t length) {
if (ec) {
std::cout << ec.value() << " " << ec.message() << std::endl;
close();
return;
}
cancel_timer();
if (has_closed()) { return; }
if (!ec) {
read_head();
} else {
close();
//LOG(INFO) << ec.message();
}
});
Expand Down Expand Up @@ -144,6 +153,8 @@ class connection : public std::enable_shared_from_this<connection>, private boos
std::vector<char> body_;
std::uint64_t req_id_;

std::string write_msg_;

boost::asio::deadline_timer timer_;
std::size_t timeout_seconds_;
int64_t conn_id_ = 0;
Expand Down
15 changes: 10 additions & 5 deletions include/router.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class router : boost::noncopyable {

void remove_handler(std::string const& name) { this->map_invokers_.erase(name); }

void set_callback(const std::function<void(const std::string&, const std::string&, connection*,
void set_callback(const std::function<void(const std::string&, std::string&&, connection*,
bool)>& callback) {
callback_to_server_ = callback;
}
Expand All @@ -46,19 +46,24 @@ class router : boost::noncopyable {
auto it = map_invokers_.find(func_name);
if (it == map_invokers_.end()) {
result = codec.pack_args_str(result_code::FAIL, "unknown function: " + func_name);
callback_to_server_(func_name, result, conn, true);
callback_to_server_(func_name, std::move(result), conn, true);
return;
}

ExecMode model;
it->second(conn, data, size, result, model);
if (model == ExecMode::sync && callback_to_server_) {
callback_to_server_(func_name, result, conn, false);
if (result.size() >= MAX_BUF_LEN) {
result = codec.pack_args_str(result_code::FAIL, "the response result is out of range: more than 10M " + func_name);
callback_to_server_(func_name, std::move(result), conn, true);
return;
}
callback_to_server_(func_name, std::move(result), conn, false);
}
} catch (const std::exception& ex) {
msgpack_codec codec;
result = codec.pack_args_str(result_code::FAIL, ex.what());
callback_to_server_("", result, conn, true);
callback_to_server_("", std::move(result), conn, true);
}
}

Expand Down Expand Up @@ -173,7 +178,7 @@ class router : boost::noncopyable {
std::map<std::string,
std::function<void(connection*, const char*, size_t, std::string&, ExecMode& model)>>
map_invokers_;
std::function<void(const std::string&, const std::string&, connection*, bool)>
std::function<void(const std::string&, std::string&&, connection*, bool)>
callback_to_server_;
};
} // namespace rpc_service
Expand Down
Loading

0 comments on commit 029d08b

Please sign in to comment.