Skip to content

Commit

Permalink
chore: use provided buffers with echo server (#321)
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Gershman <roman@dragonflydb.io>
  • Loading branch information
romange authored Oct 18, 2024
1 parent fae0c40 commit 5719e24
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 32 deletions.
81 changes: 66 additions & 15 deletions examples/echo_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
#include <sys/time.h>
#include <sys/mman.h>

#include <queue>

// clang-format on

#include <absl/strings/str_cat.h>

#include <boost/asio/read.hpp>

#include "base/histogram.h"
Expand All @@ -21,6 +24,10 @@
#include "util/http/http_handler.h"
#include "util/varz.h"

#ifdef __linux__
#include "util/fibers/uring_socket.h"
#endif

using namespace util;

using fb2::DnsResolve;
Expand Down Expand Up @@ -65,21 +72,30 @@ class EchoConnection : public Connection {

std::error_code ReadMsg(size_t* sz);

std::unique_ptr<uint8_t[]> work_buf_;
std::queue<FiberSocketBase::ProvidedBuffer> prov_buffers_;
size_t pending_read_bytes_ = 0, first_buf_offset_ = 0;
size_t req_len_ = 0;
};

std::error_code EchoConnection::ReadMsg(size_t* sz) {
io::MutableBytes mb(work_buf_.get(), req_len_);
FiberSocketBase::ProvidedBuffer pb[8];

while (pending_read_bytes_ < req_len_) {
auto res = socket_->RecvProvided(8, pb);
if (!res)
return res.error();
unsigned num_buf = *res;

auto res = socket_->Recv(mb, 0);
if (res) {
*sz = *res;
CHECK_EQ(*sz, req_len_);
return {};
for (unsigned i = 0; i < num_buf; ++i) {
prov_buffers_.push(pb[i]);
pending_read_bytes_ += pb[i].buffer.size();
}
if (pending_read_bytes_ > req_len_) {
DVLOG(1) << "Waited for " << req_len_ << " but got " << pending_read_bytes_;
}
}

return res.error();
return {};
}

static thread_local base::Histogram send_hist;
Expand All @@ -89,8 +105,9 @@ void EchoConnection::HandleRequests() {

std::error_code ec;
size_t sz;
iovec vec[2];
vector<iovec> vec;
uint8_t buf[8];
vec.resize(2);

int yes = 1;
if (GetFlag(FLAGS_tcp_nodelay)) {
Expand Down Expand Up @@ -129,7 +146,7 @@ void EchoConnection::HandleRequests() {
}

CHECK_LE(req_len_, 1UL << 26);
work_buf_.reset(new uint8_t[req_len_]);
vector<FiberSocketBase::ProvidedBuffer> returned_buffers;

// after the handshake.
while (true) {
Expand All @@ -144,19 +161,46 @@ void EchoConnection::HandleRequests() {
vec[0].iov_base = buf;
vec[0].iov_len = 4;
absl::little_endian::Store32(buf, sz);
vec[1].iov_base = work_buf_.get();
vec[1].iov_len = sz;
vec.resize(1);

size_t prepare_len = 0;
DCHECK(returned_buffers.empty());

while (prepare_len < req_len_) {
DCHECK(!prov_buffers_.empty());
size_t needed = req_len_ - prepare_len;
const auto& pbuf = prov_buffers_.front();
size_t has_bytes = pbuf.buffer.size() - first_buf_offset_;
if (has_bytes <= needed) {
vec.push_back({const_cast<uint8_t*>(pbuf.buffer.data()) + first_buf_offset_, has_bytes});
prepare_len += has_bytes;
DCHECK_GE(pending_read_bytes_, has_bytes);
pending_read_bytes_ -= has_bytes;
returned_buffers.push_back(pbuf);
prov_buffers_.pop();
first_buf_offset_ = 0;
} else {
vec.push_back({const_cast<uint8_t*>(pbuf.buffer.data()) + first_buf_offset_, needed});
first_buf_offset_ += needed;
prepare_len += needed;
DCHECK_GE(pending_read_bytes_, needed);
pending_read_bytes_ -= needed;
}
}

if (is_raw) {
auto prev = absl::GetCurrentTimeNanos();
// send(sock->native_handle(), work_buf_.get(), sz, 0);
ec = socket_->Write(vec + 1, 1);
// socket_->Send(io::Bytes{work_buf_.get(), sz}, 0);
ec = socket_->Write(vec.data() + 1, vec.size() - 1);
auto now = absl::GetCurrentTimeNanos();
send_hist.Add((now - prev) / 1000);
} else {
ec = socket_->Write(vec, 2);
ec = socket_->Write(vec.data(), vec.size());
}
for (const auto& pb : returned_buffers) {
socket_->ReturnProvided(pb);
}
returned_buffers.clear();
if (ec)
break;
}
Expand Down Expand Up @@ -451,6 +495,13 @@ int main(int argc, char* argv[]) {
pp->Run();

if (absl::GetFlag(FLAGS_connect).empty()) {
#ifdef __linux__
if (!absl::GetFlag(FLAGS_epoll)) {
pp->AwaitBrief([](unsigned, auto* pb) {
fb2::UringSocket::InitProvidedBuffers(512, 64, static_cast<fb2::UringProactor*>(pb));
});
}
#endif
RunServer(pp.get());
} else {
CHECK_GT(absl::GetFlag(FLAGS_size), 0U);
Expand Down
8 changes: 4 additions & 4 deletions util/fibers/epoll_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ io::Result<unsigned> EpollSocket::RecvProvided(unsigned buf_len, ProvidedBuffer*
DCHECK_GE(buf2.size(), ures);

memcpy(buf2.data(), buf.data(), ures);
proactor()->ReturnBuffer(buf);
proactor()->DeallocateBuffer(buf);
dest[0].buffer = {buf2.data(), ures};
dest[0].allocated = buf2.size();
return 1;
Expand All @@ -415,7 +415,7 @@ io::Result<unsigned> EpollSocket::RecvProvided(unsigned buf_len, ProvidedBuffer*
buf = proactor()->AllocateBuffer(bufreq_sz_);
res = recv(fd, buf.data(), buf.size(), 0);
if (res <= 0) {
proactor()->ReturnBuffer(buf);
proactor()->DeallocateBuffer(buf);
break;
}
ures = res;
Expand All @@ -428,7 +428,7 @@ io::Result<unsigned> EpollSocket::RecvProvided(unsigned buf_len, ProvidedBuffer*
return num_bufs;
} // res > 0

proactor()->ReturnBuffer(buf);
proactor()->DeallocateBuffer(buf);

if (res == 0 || errno != EAGAIN) {
break;
Expand Down Expand Up @@ -465,7 +465,7 @@ void EpollSocket::ReturnProvided(const ProvidedBuffer& pbuf) {
DCHECK_EQ(pbuf.cookie, 1);
DCHECK(!pbuf.buffer.empty());

proactor()->ReturnBuffer(
proactor()->DeallocateBuffer(
io::MutableBytes{const_cast<uint8_t*>(pbuf.buffer.data()), pbuf.allocated});
}

Expand Down
2 changes: 1 addition & 1 deletion util/fibers/proactor_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ io::MutableBytes ProactorBase::AllocateBuffer(size_t hint_sz) {
return io::MutableBytes{res, hint_sz};
}

void ProactorBase::ReturnBuffer(io::MutableBytes buf) {
void ProactorBase::DeallocateBuffer(io::MutableBytes buf) {
operator delete[](buf.data());
}

Expand Down
2 changes: 1 addition & 1 deletion util/fibers/proactor_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class ProactorBase {

// Returns a buffer of size at least min_size.
io::MutableBytes AllocateBuffer(size_t min_size);
void ReturnBuffer(io::MutableBytes buf);
void DeallocateBuffer(io::MutableBytes buf);

using OnIdleTask = std::function<uint32_t()>;
using PeriodicTask = std::function<void()>;
Expand Down
11 changes: 9 additions & 2 deletions util/fibers/uring_proactor.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,17 @@ class UringProactor : public ProactorBase {
// Return 1 or more buffers to the bufring. slice.data() should point to a buffer returned by
// GetBufRingPtr and its length should be within the range of the buffers handled by group_id.
void ReplenishBuffers(unsigned group_id, io::Bytes slice);
bool BufRingExists(unsigned group_id) const {
return group_id < bufring_groups_.size();

// Returns bufring entry size for the given group_id.
// -1 if group_id is invalid.
int BufRingEntrySize(unsigned group_id) const {
return group_id < bufring_groups_.size() ? bufring_groups_[group_id].entry_size : -1;
}

// Returns number of available entries at the time of the call.
// Every time a kernel event with IORING_CQE_F_BUFFER is processed,
// it consumes one or more entries from the buffer ring and available decreases.
// ReplenishBuffers returns the entries back to the ring.
unsigned BufRingAvailable(unsigned group_id) const;

// Returns 0 on success, errno on failure.
Expand Down
31 changes: 23 additions & 8 deletions util/fibers/uring_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ io::Result<unsigned> UringSocket::RecvProvided(unsigned buf_len, ProvidedBuffer*
int fd = ShiftedFd();
Proactor* p = GetProactor();
DCHECK(ProactorBase::me() == p);
DCHECK(p->BufRingExists(kUringSockBufGroup));
DCHECK_GT(p->BufRingEntrySize(kUringSockBufGroup), 0);

ssize_t res;
while (true) {
Expand Down Expand Up @@ -462,9 +462,21 @@ io::Result<unsigned> UringSocket::RecvProvided(unsigned buf_len, ProvidedBuffer*

res = -res;

// EAGAIN can happen in case of CQ overflow.
if (res == EAGAIN) {
continue;
if (res == ENOBUFS) {
int entry_size = p->BufRingEntrySize(kUringSockBufGroup);
DCHECK_GT(entry_size, 0);
io::MutableBytes buf = p->AllocateBuffer(entry_size);
int real_handle = native_handle();
int recv_res = recv(real_handle, buf.data(), buf.size(), 0);
if (recv_res > 0) {
dest[0].buffer = io::MutableBytes{buf.data(), static_cast<size_t>(res)};
dest[0].allocated = buf.size();
dest[0].cookie = 1;
return 1;
}

p->DeallocateBuffer(buf);
res = recv_res < 0 ? errno : ECONNABORTED;
}

if (res == 0)
Expand All @@ -479,11 +491,14 @@ io::Result<unsigned> UringSocket::RecvProvided(unsigned buf_len, ProvidedBuffer*
}

void UringSocket::ReturnProvided(const ProvidedBuffer& pbuf) {
CHECK_EQ(pbuf.cookie, 2);
CHECK(!pbuf.buffer.empty());

DCHECK(!pbuf.buffer.empty());
Proactor* p = GetProactor();
p->ReplenishBuffers(kUringSockBufGroup, pbuf.buffer);
if (pbuf.cookie == 2) {
p->ReplenishBuffers(kUringSockBufGroup, pbuf.buffer);
} else {
DCHECK_EQ(pbuf.cookie, 1);
p->DeallocateBuffer({const_cast<uint8_t*>(pbuf.buffer.data()), pbuf.allocated});
}
}

void UringSocket::OnSetProactor() {
Expand Down
2 changes: 1 addition & 1 deletion util/tls/tls_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ io::Result<unsigned> TlsSocket::RecvProvided(unsigned buf_len, ProvidedBuffer* d
}

void TlsSocket::ReturnProvided(const ProvidedBuffer& pbuf) {
proactor()->ReturnBuffer(
proactor()->DeallocateBuffer(
io::MutableBytes{const_cast<uint8_t*>(pbuf.buffer.data()), pbuf.allocated});
}

Expand Down

0 comments on commit 5719e24

Please sign in to comment.