Skip to content

Commit

Permalink
Merge pull request qicosmos#26 from jovany-wang/java
Browse files Browse the repository at this point in the history
Support Java client for rest_rpc
  • Loading branch information
qicosmos authored Oct 22, 2020
2 parents 0c04537 + 611e875 commit 4530dc7
Show file tree
Hide file tree
Showing 18 changed files with 837 additions and 49 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,9 @@ cmake_install.cmake

# idea
.idea/

.DS_Store
*iml
target/

build/
2 changes: 2 additions & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ add_executable(basic_client client/main.cpp)

if (ENABLE_SSL)
target_link_libraries(basic_server ${Boost_LIBRARIES} -lssl -lcrypto -lpthread)
target_link_libraries(basic_client ${Boost_LIBRARIES} -lssl -lcrypto -lpthread)
else()
target_link_libraries(basic_server ${Boost_LIBRARIES})
target_link_libraries(basic_client ${Boost_LIBRARIES})
endif()
21 changes: 10 additions & 11 deletions examples/client/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -621,21 +621,20 @@ void benchmark_test(){
}

int main() {
// benchmark_test();
test_sub1();
benchmark_test();
test_connect();
test_callback();
test_echo();
test_sync_client();
test_async_client();
//test_threads();
//test_sub();
//test_call_with_timeout();
//test_connect();
//test_upload();
//test_download();
//multi_client_performance(20);
//test_performance1();
//test_multiple_thread();
test_threads();
test_sub1();
test_call_with_timeout();
test_connect();
test_upload();
test_download();
multi_client_performance(20);
test_performance1();
test_multiple_thread();
return 0;
}
4 changes: 2 additions & 2 deletions examples/server/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ using namespace rpc_service;
#include "qps.h"

struct dummy{
int add(rpc_conn conn, int a, int b) {
return a + b;
int add(rpc_conn conn, int a, int b) {
return a + b;
}
};

Expand Down
135 changes: 99 additions & 36 deletions include/rest_rpc/rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,25 @@
#include <string>
#include <deque>
#include <future>
#include <utility>
#include "use_asio.hpp"
#include "client_util.hpp"
#include "const_vars.h"
#include "meta_util.hpp"
#include <functional>

using namespace rest_rpc::rpc_service;

namespace rest_rpc {

/**
* The type to indicate the language of the client.
*/
enum class client_language_t {
CPP = 0,
JAVA = 1,
};

class req_result {
public:
req_result() = default;
Expand Down Expand Up @@ -48,15 +59,39 @@ namespace rest_rpc {

class rpc_client : private asio::noncopyable {
public:
rpc_client() : socket_(ios_), work_(ios_),
rpc_client() : socket_(ios_), work_(ios_),
deadline_(ios_), body_(INIT_BUF_SIZE) {
thd_ = std::make_shared<std::thread>([this] {
ios_.run();
});
}

rpc_client(const std::string& host, unsigned short port) : socket_(ios_), work_(ios_),
deadline_(ios_), host_(host), port_(port), body_(INIT_BUF_SIZE) {
rpc_client(client_language_t client_language,
std::function<void(long, const std::string &)> on_result_received_callback)
: socket_(ios_), work_(ios_),
deadline_(ios_), body_(INIT_BUF_SIZE),
client_language_(client_language),
on_result_received_callback_(std::move(on_result_received_callback)) {
thd_ = std::make_shared<std::thread>([this] {
ios_.run();
});
}

rpc_client(const std::string& host, unsigned short port)
: rpc_client(client_language_t::CPP, nullptr, host, port) {}

rpc_client(client_language_t client_language,
std::function<void(long, const std::string&)> on_result_received_callback,
std::string host,
unsigned short port)
: socket_(ios_),
work_(ios_),
deadline_(ios_),
host_(std::move(host)),
port_(port),
body_(INIT_BUF_SIZE),
client_language_(client_language),
on_result_received_callback_(std::move(on_result_received_callback)) {
thd_ = std::make_shared<std::thread>([this] {
ios_.run();
});
Expand Down Expand Up @@ -248,6 +283,25 @@ namespace rest_rpc {
return future;
}

/**
* This internal_async_call is used for other language client.
* We use callback to handle the result is received, so we should not
* add the future to the future map.
*/
long internal_async_call(const std::string& encoded_func_name_and_args) {
auto p = std::make_shared<std::promise<req_result>>();
uint64_t fu_id = 0;
{
std::unique_lock<std::mutex> lock(cb_mtx_);
fu_id_++;
fu_id = fu_id_;
}
msgpack::sbuffer sbuffer;
sbuffer.write(encoded_func_name_and_args.data(), encoded_func_name_and_args.size());
write(fu_id, request_type::req_res, std::move(sbuffer));
return fu_id;
}

template<size_t TIMEOUT = DEFAULT_TIMEOUT, typename... Args>
void async_call(const std::string& rpc_name, std::function<void(boost::system::error_code, string_view)> cb, Args&& ... args) {
if (!has_connected_) {
Expand Down Expand Up @@ -521,42 +575,48 @@ namespace rest_rpc {
}

void call_back(uint64_t req_id, const boost::system::error_code& ec, string_view data) {
temp_req_id_ = req_id;
auto cb_flag = req_id >> 63;
if (cb_flag) {
std::shared_ptr<call_t> cl = nullptr;
{
std::unique_lock<std::mutex> lock(cb_mtx_);
cl = std::move(callback_map_[req_id]);
}
if (client_language_ == client_language_t::JAVA) {
// For Java client.
// TODO(qwang): Call java callback.
// handle error.
on_result_received_callback_(req_id, data.to_string());
} else {
// For CPP client.
temp_req_id_ = req_id;
auto cb_flag = req_id >> 63;
if (cb_flag) {
std::shared_ptr<call_t> cl = nullptr;
{
std::unique_lock<std::mutex> lock(cb_mtx_);
cl = std::move(callback_map_[req_id]);
}

assert(cl);
if (!cl->has_timeout()) {
cl->cancel();
cl->callback(ec, data);
}
else {
cl->callback(asio::error::make_error_code(asio::error::timed_out), {});
}
assert(cl);
if (!cl->has_timeout()) {
cl->cancel();
cl->callback(ec, data);
} else {
cl->callback(asio::error::make_error_code(asio::error::timed_out), {});
}

std::unique_lock<std::mutex> lock(cb_mtx_);
callback_map_.erase(req_id);
}
else {
std::unique_lock<std::mutex> lock(cb_mtx_);
auto& f = future_map_[req_id];
if (ec) {
//LOG<<ec.message();
if (!f) {
//std::cout << "invalid req_id" << std::endl;
return;
}
}
std::unique_lock<std::mutex> lock(cb_mtx_);
callback_map_.erase(req_id);
} else {
std::unique_lock<std::mutex> lock(cb_mtx_);
auto &f = future_map_[req_id];
if (ec) {
//LOG<<ec.message();
if (!f) {
//std::cout << "invalid req_id" << std::endl;
return;
}
}

assert(f);
f->set_value(req_result{ data });
future_map_.erase(req_id);
}
assert(f);
f->set_value(req_result{data});
future_map_.erase(req_id);
}
}
}

void callback_sub(const boost::system::error_code& ec, string_view result) {
Expand Down Expand Up @@ -793,5 +853,8 @@ namespace rest_rpc {

std::unordered_map<std::string, std::function<void(string_view)>> sub_map_;
std::set<std::pair<std::string, std::string>> key_token_set_;

client_language_t client_language_ = client_language_t::CPP;
std::function<void(long, const std::string&)> on_result_received_callback_;
};
}
45 changes: 45 additions & 0 deletions java/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.restrpc</groupId>
<artifactId>restrpc</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>7.0.0</version>
</dependency>
<dependency>
<groupId>org.msgpack</groupId>
<artifactId>msgpack-core</artifactId>
<version>0.8.21</version>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>native_dependencies</directory>
</resource>
</resources>
</build>
</project>
28 changes: 28 additions & 0 deletions java/src/main/java/org/restrpc/client/AsyncRpcFunction.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.restrpc.client;

import java.util.concurrent.CompletableFuture;

public interface AsyncRpcFunction {

CompletableFuture<Object> invoke(Class returnClz);

<Arg1Type>
CompletableFuture<Object> invoke(Class returnClz, Arg1Type arg1);

<Arg1Type, Arg2Type>
CompletableFuture<Object> invoke(Class returnClz, Arg1Type arg1, Arg2Type arg2);

<Arg1Type, Arg2Type, Arg3Type>
CompletableFuture<Object> invoke(Class returnClz, Arg1Type arg1, Arg2Type arg2, Arg3Type arg3);


<Arg1Type, Arg2Type, Arg3Type, Arg4Type>
CompletableFuture<Object> invoke(Class returnClz, Arg1Type arg1, Arg2Type arg2, Arg3Type arg3, Arg4Type arg4);

<Arg1Type, Arg2Type, Arg3Type, Arg4Type, Arg5Type>
CompletableFuture<Object> invoke(Class returnClz, Arg1Type arg1, Arg2Type arg2, Arg3Type arg3, Arg4Type arg4, Arg5Type arg5);

<Arg1Type, Arg2Type, Arg3Type, Arg4Type, Arg5Type, Arg6Type>
CompletableFuture<Object> invoke(Class returnClz, Arg1Type arg1, Arg2Type arg2, Arg3Type arg3, Arg4Type arg4, Arg5Type arg5, Arg6Type arg6);

}
59 changes: 59 additions & 0 deletions java/src/main/java/org/restrpc/client/AsyncRpcFunctionImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.restrpc.client;

import java.util.concurrent.CompletableFuture;

public class AsyncRpcFunctionImpl implements AsyncRpcFunction {

private RpcClient rpcClient;

private String funcName;

public AsyncRpcFunctionImpl(RpcClient rpcClient, String funcName) {
this.rpcClient = rpcClient;
this.funcName = funcName;
}

public CompletableFuture<Object> invoke(Class returnClz) {
return internalInvoke(returnClz, new Object[0]);
}

public <Arg1Type> CompletableFuture<Object> invoke(Class returnClz, Arg1Type arg1) {
Object[] args = new Object[] {arg1};
return internalInvoke(returnClz, args);
}

public <Arg1Type, Arg2Type>
CompletableFuture<Object> invoke(Class returnClz, Arg1Type arg1, Arg2Type arg2) {
Object[] args = new Object[] {arg1, arg2};
return internalInvoke(returnClz, args);
}

public <Arg1Type, Arg2Type, Arg3Type>
CompletableFuture<Object> invoke(Class returnClz, Arg1Type arg1, Arg2Type arg2, Arg3Type arg3) {
Object[] args = new Object[] {arg1, arg2, arg3};
return internalInvoke(returnClz, args);
}


public <Arg1Type, Arg2Type, Arg3Type, Arg4Type>
CompletableFuture<Object> invoke(Class returnClz, Arg1Type arg1, Arg2Type arg2, Arg3Type arg3, Arg4Type arg4) {
Object[] args = new Object[] {arg1, arg2, arg3, arg4};
return internalInvoke(returnClz, args);
}

public <Arg1Type, Arg2Type, Arg3Type, Arg4Type, Arg5Type>
CompletableFuture<Object> invoke(Class returnClz, Arg1Type arg1, Arg2Type arg2, Arg3Type arg3, Arg4Type arg4, Arg5Type arg5) {
Object[] args = new Object[] {arg1, arg2, arg3, arg4, arg5};
return internalInvoke(returnClz, args);
}

public <Arg1Type, Arg2Type, Arg3Type, Arg4Type, Arg5Type, Arg6Type>
CompletableFuture<Object> invoke(Class returnClz, Arg1Type arg1, Arg2Type arg2, Arg3Type arg3, Arg4Type arg4, Arg5Type arg5, Arg6Type arg6) {
Object[] args = new Object[] {arg1, arg2, arg3, arg4, arg5, arg6};
return internalInvoke(returnClz, args);
}

private CompletableFuture<Object> internalInvoke(Class returnClz, Object[] args) {
return rpcClient.invoke(returnClz, funcName, args);
}
}
Loading

0 comments on commit 4530dc7

Please sign in to comment.