Skip to content

Commit

Permalink
multiplexing healthserver thread and async server worker threads. (#188)
Browse files Browse the repository at this point in the history
* fix

Signed-off-by: Shikugawa <Shikugawa@gmail.com>

* fix

Signed-off-by: Shikugawa <Shikugawa@gmail.com>

* fix

Signed-off-by: Shikugawa <Shikugawa@gmail.com>
  • Loading branch information
Shikugawa authored Oct 29, 2021
1 parent d969e87 commit ec5d627
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 107 deletions.
3 changes: 2 additions & 1 deletion src/service/async_service_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ void AsyncAuthServiceImpl::Run() {
spdlog::info("{}: Healthcheck Server listening on {}:{}", __func__,
config_.listen_address(), kHealthCheckServerPort);
health_server_ = std::make_unique<HealthcheckAsyncServer>(
chains_, config_.listen_address(), kHealthCheckServerPort);
*io_context_, chains_, config_.listen_address(), kHealthCheckServerPort);
health_server_->startAccept();

spdlog::info("{}: Server listening on {}", __func__, address_and_port_);

Expand Down
11 changes: 3 additions & 8 deletions src/service/healthcheck_http_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,19 @@ void HealthcheckHttpConnection::onWriteDone() {
}

HealthcheckAsyncServer::HealthcheckAsyncServer(
boost::asio::io_context& ioc,
const std::vector<std::unique_ptr<filters::FilterChain>>& chains,
std::string address, uint16_t port)
: chains_(chains),
acceptor_(ioc_, {beast::net::ip::make_address(address), port}),
sock_(ioc_),
th_([this] {
startAccept();
ioc_.run();
}) {}
acceptor_(ioc, {beast::net::ip::make_address(address), port}),
sock_(ioc) {}

HealthcheckAsyncServer::~HealthcheckAsyncServer() {
for (auto&& conn : active_connections_) {
delete conn;
}
active_connections_.clear();
acceptor_.close();
ioc_.stop();
th_.join();
}

void HealthcheckAsyncServer::removeConnection(HealthcheckHttpConnection* conn) {
Expand Down
6 changes: 2 additions & 4 deletions src/service/healthcheck_http_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,21 @@ class HealthcheckHttpConnection {
class HealthcheckAsyncServer {
public:
HealthcheckAsyncServer(
boost::asio::io_context& ioc,
const std::vector<std::unique_ptr<filters::FilterChain>>& chains,
std::string address, uint16_t port);

~HealthcheckAsyncServer();

void startAccept();
int getPort() const { return acceptor_.local_endpoint().port(); }
void removeConnection(HealthcheckHttpConnection* conn);

private:
void startAccept();

std::list<HealthcheckHttpConnection*> active_connections_;
const std::vector<std::unique_ptr<filters::FilterChain>>& chains_;
boost::asio::io_context ioc_;
tcp::acceptor acceptor_;
tcp::socket sock_;
boost::thread th_;
};

} // namespace service
Expand Down
192 changes: 98 additions & 94 deletions test/service/healthcheck_http_server_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "boost/asio/io_context.hpp"
#include "boost/asio/ip/tcp.hpp"
#include "boost/asio/spawn.hpp"
#include "config/config.pb.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "src/common/http/http.h"
Expand All @@ -19,107 +20,110 @@ using testing::_;
using testing::Return;
using testing::ReturnRef;

TEST(TestHealthCheckHttpServer, BasicFlowWithInactiveJwks) {
boost::asio::io_context io_context;

auto configuration = std::make_unique<config::FilterChain>();
auto chain =
std::make_unique<filters::FilterChainImpl>(io_context, *configuration, 1);

auto mock_resolver = std::make_shared<filters::oidc::MockJwksResolver>();
google::jwt_verify::JwksPtr dangling_jwks;
EXPECT_CALL(*mock_resolver, jwks())
.Times(2)
.WillRepeatedly(ReturnRef(dangling_jwks));

auto resolver_cache =
std::make_unique<filters::oidc::MockJwksResolverCache>();
EXPECT_CALL(*resolver_cache, getResolver())
.Times(2)
.WillRepeatedly(Return(mock_resolver));

chain->setJwksResolverCacheForTest(std::move(resolver_cache));
EXPECT_FALSE(chain->jwksActive());

std::vector<std::unique_ptr<filters::FilterChain>> chains;
chains.push_back(std::move(chain));

HealthcheckAsyncServer server(chains, "0.0.0.0", 0);
class Runner {
public:
void addChain(std::unique_ptr<config::FilterChain> config,
google::jwt_verify::JwksPtr jwks) {
auto chain = std::make_unique<filters::FilterChainImpl>(ioc_, *config, 1);
jwks_ = std::move(jwks);
auto mock_resolver = std::make_shared<filters::oidc::MockJwksResolver>();
EXPECT_CALL(*mock_resolver, jwks()).WillOnce(ReturnRef(jwks_));

auto resolver_cache =
std::make_unique<filters::oidc::MockJwksResolverCache>();
EXPECT_CALL(*resolver_cache, getResolver()).WillOnce(Return(mock_resolver));

chain->setJwksResolverCacheForTest(std::move(resolver_cache));
chains_.push_back(std::move(chain));
}

void clearChain() {
jwks_.reset();
chains_.clear();
}

int start() {
server_ =
std::make_unique<HealthcheckAsyncServer>(ioc_, chains_, "0.0.0.0", 0);
server_->startAccept();
work_ = std::make_unique<boost::asio::io_context::work>(ioc_);
th_ = std::thread([&] { ioc_.run(); });
return server_->getPort();
}

void stop() {
work_.reset();
ioc_.stop();
server_.reset();
th_.join();
}

private:
std::thread th_;
boost::asio::io_context ioc_;
std::unique_ptr<HealthcheckAsyncServer> server_;
std::unique_ptr<boost::asio::io_context::work> work_;
std::vector<std::unique_ptr<filters::FilterChain>> chains_;
google::jwt_verify::JwksPtr jwks_;
};

TEST(TestHealthCheckHttpServer, BasicFlowWithInactiveJwks) {
Runner runner;
runner.addChain(std::make_unique<config::FilterChain>(),
google::jwt_verify::JwksPtr());
const auto port = runner.start();
auto http_ptr = common::http::ptr_t(new common::http::HttpImpl);

boost::asio::spawn(io_context, [&](boost::asio::yield_context yield) {
boost::asio::io_context ioc;
boost::asio::spawn(ioc, [&](boost::asio::yield_context yield) {
auto res = http_ptr->SimpleGet(
fmt::format("http://0.0.0.0:{}/healthz", server.getPort()), {}, "",
io_context, yield);
fmt::format("http://0.0.0.0:{}/healthz", port), {}, "", ioc, yield);
EXPECT_EQ(res->result(), boost::beast::http::status::not_found);
});

io_context.run();
}

TEST(TestHealthCheckHttpServer, BasicFlowWithActiveJwks) {
boost::asio::io_context io_context;

auto configuration = std::make_unique<config::FilterChain>();
auto chain =
std::make_unique<filters::FilterChainImpl>(io_context, *configuration, 1);

auto mock_resolver = std::make_shared<filters::oidc::MockJwksResolver>();

std::string valid_jwks = R"(
{
"keys": [
{
"kty": "RSA",
"alg": "RS256",
"use": "sig",
"kid": "62a93512c9ee4c7f8067b5a216dade2763d32a47",
"n":
"up97uqrF9MWOPaPkwSaBeuAPLOr9FKcaWGdVEGzQ4f3Zq5WKVZowx9TCBxmImNJ1qmUi13pB8otwM_l5lfY1AFBMxVbQCUXntLovhDaiSvYp4wGDjFzQiYA-pUq8h6MUZBnhleYrkU7XlCBwNVyN8qNMkpLA7KFZYz-486GnV2NIJJx_4BGa3HdKwQGxi2tjuQsQvao5W4xmSVaaEWopBwMy2QmlhSFQuPUpTaywTqUcUq_6SfAHhZ4IDa_FxEd2c2z8gFGtfst9cY3lRYf-c_ZdboY3mqN9Su3-j3z5r2SHWlhB_LNAjyWlBGsvbGPlTqDziYQwZN4aGsqVKQb9Vw",
"e": "AQAB"
},
{
"kty": "RSA",
"alg": "RS256",
"use": "sig",
"kid": "b3319a147514df7ee5e4bcdee51350cc890cc89e",
"n":
"up97uqrF9MWOPaPkwSaBeuAPLOr9FKcaWGdVEGzQ4f3Zq5WKVZowx9TCBxmImNJ1qmUi13pB8otwM_l5lfY1AFBMxVbQCUXntLovhDaiSvYp4wGDjFzQiYA-pUq8h6MUZBnhleYrkU7XlCBwNVyN8qNMkpLA7KFZYz-486GnV2NIJJx_4BGa3HdKwQGxi2tjuQsQvao5W4xmSVaaEWopBwMy2QmlhSFQuPUpTaywTqUcUq_6SfAHhZ4IDa_FxEd2c2z8gFGtfst9cY3lRYf-c_ZdboY3mqN9Su3-j3z5r2SHWlhB_LNAjyWlBGsvbGPlTqDziYQwZN4aGsqVKQb9Vw",
"e": "AQAB"
}
]
}
)";
auto jwks = google::jwt_verify::Jwks::createFrom(
valid_jwks, google::jwt_verify::Jwks::JWKS);

EXPECT_CALL(*mock_resolver, jwks()).Times(2).WillRepeatedly(ReturnRef(jwks));

auto resolver_cache =
std::make_unique<filters::oidc::MockJwksResolverCache>();
EXPECT_CALL(*resolver_cache, getResolver())
.Times(2)
.WillRepeatedly(Return(mock_resolver));

chain->setJwksResolverCacheForTest(std::move(resolver_cache));
EXPECT_TRUE(chain->jwksActive());

std::vector<std::unique_ptr<filters::FilterChain>> chains;
chains.push_back(std::move(chain));

HealthcheckAsyncServer server(chains, "0.0.0.0", 0);

auto http_ptr = common::http::ptr_t(new common::http::HttpImpl);

boost::asio::spawn(io_context, [&](boost::asio::yield_context yield) {
auto res = http_ptr->SimpleGet(
fmt::format("http://0.0.0.0:{}/healthz", server.getPort()), {}, "",
io_context, yield);
EXPECT_EQ(res->result(), boost::beast::http::status::ok);
});

io_context.run();
ioc.run();
runner.clearChain();

{
std::string valid_jwks = R"(
{
"keys": [
{
"kty": "RSA",
"alg": "RS256",
"use": "sig",
"kid": "62a93512c9ee4c7f8067b5a216dade2763d32a47",
"n":
"up97uqrF9MWOPaPkwSaBeuAPLOr9FKcaWGdVEGzQ4f3Zq5WKVZowx9TCBxmImNJ1qmUi13pB8otwM_l5lfY1AFBMxVbQCUXntLovhDaiSvYp4wGDjFzQiYA-pUq8h6MUZBnhleYrkU7XlCBwNVyN8qNMkpLA7KFZYz-486GnV2NIJJx_4BGa3HdKwQGxi2tjuQsQvao5W4xmSVaaEWopBwMy2QmlhSFQuPUpTaywTqUcUq_6SfAHhZ4IDa_FxEd2c2z8gFGtfst9cY3lRYf-c_ZdboY3mqN9Su3-j3z5r2SHWlhB_LNAjyWlBGsvbGPlTqDziYQwZN4aGsqVKQb9Vw",
"e": "AQAB"
},
{
"kty": "RSA",
"alg": "RS256",
"use": "sig",
"kid": "b3319a147514df7ee5e4bcdee51350cc890cc89e",
"n":
"up97uqrF9MWOPaPkwSaBeuAPLOr9FKcaWGdVEGzQ4f3Zq5WKVZowx9TCBxmImNJ1qmUi13pB8otwM_l5lfY1AFBMxVbQCUXntLovhDaiSvYp4wGDjFzQiYA-pUq8h6MUZBnhleYrkU7XlCBwNVyN8qNMkpLA7KFZYz-486GnV2NIJJx_4BGa3HdKwQGxi2tjuQsQvao5W4xmSVaaEWopBwMy2QmlhSFQuPUpTaywTqUcUq_6SfAHhZ4IDa_FxEd2c2z8gFGtfst9cY3lRYf-c_ZdboY3mqN9Su3-j3z5r2SHWlhB_LNAjyWlBGsvbGPlTqDziYQwZN4aGsqVKQb9Vw",
"e": "AQAB"
}
]
}
)";
auto jwks = google::jwt_verify::Jwks::createFrom(
valid_jwks, google::jwt_verify::Jwks::JWKS);
runner.addChain(std::make_unique<config::FilterChain>(), std::move(jwks));

boost::asio::io_context ioc2;
boost::asio::spawn(ioc2, [&](boost::asio::yield_context yield) {
auto res = http_ptr->SimpleGet(
fmt::format("http://0.0.0.0:{}/healthz", port), {}, "", ioc2, yield);
EXPECT_EQ(res->result(), boost::beast::http::status::ok);
});

ioc2.run();
}

runner.stop();
}

} // namespace service
Expand Down

0 comments on commit ec5d627

Please sign in to comment.