Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 44 additions & 11 deletions include/mp/proxy-io.h
Original file line number Diff line number Diff line change
Expand Up @@ -499,27 +499,60 @@ std::unique_ptr<ProxyClient<InitInterface>> ConnectStream(EventLoop& loop, int f
kj::mv(init_client), connection.release(), /* destroy_connection= */ true);
}

//! Given stream file descriptor and an init object, construct a new ProxyServer
//! object that handles requests from the stream calling the init object. Embed
//! the ProxyServer in a Connection object that is stored and erased if
//! Given stream and init objects, construct a new ProxyServer object that
//! handles requests from the stream by calling the init object. Embed the
//! ProxyServer in a Connection object that is stored and erased if
//! disconnected. This should be called from the event loop thread.
template <typename InitInterface, typename InitImpl>
void ServeStream(EventLoop& loop, int fd, InitImpl& init)
void _Serve(EventLoop& loop, kj::Own<kj::AsyncIoStream>&& stream, InitImpl& init)
{
loop.m_incoming_connections.emplace_front(loop,
loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
[&](Connection& connection) {
// Set owned to false so proxy object doesn't attempt to delete init
// object on disconnect/close.
return kj::heap<mp::ProxyServer<InitInterface>>(&init, false, connection);
});
loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), [&](Connection& connection) {
// Set owned to false so proxy object doesn't attempt to delete init
// object on disconnect/close.
return kj::heap<mp::ProxyServer<InitInterface>>(&init, false, connection);
});
auto it = loop.m_incoming_connections.begin();
it->onDisconnect([&loop, it] {
loop.log() << "IPC server: socket disconnected.";
loop.m_incoming_connections.erase(it);
});
}

//! Given connection receiver and an init object, handle incoming connections by
//! calling _Serve, to create ProxyServer objects and forward requests to the
//! init object.
template <typename InitInterface, typename InitImpl>
void _Listen(EventLoop& loop, kj::Own<kj::ConnectionReceiver>&& listener, InitImpl& init)
{
auto* ptr = listener.get();
loop.m_task_set->add(ptr->accept().then(kj::mvCapture(kj::mv(listener),
[&loop, &init](kj::Own<kj::ConnectionReceiver>&& listener, kj::Own<kj::AsyncIoStream>&& stream) {
_Serve<InitInterface>(loop, kj::mv(stream), init);
_Listen<InitInterface>(loop, kj::mv(listener), init);
})));
}

//! Given stream file descriptor and an init object, handle requests on the
//! stream by calling methods on the Init object.
template <typename InitInterface, typename InitImpl>
void ServeStream(EventLoop& loop, int fd, InitImpl& init)
{
_Serve<InitInterface>(
loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init);
}

//! Given listening socket file descriptor and an init object, handle incoming
//! connections and requests by calling methods on the Init object.
template <typename InitInterface, typename InitImpl>
void ListenConnections(EventLoop& loop, int fd, InitImpl& init)
{
loop.sync([&]() {
_Listen<InitInterface>(loop,
loop.m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP),
init);
});
}

extern thread_local ThreadContext g_thread_context;

} // namespace mp
Expand Down