Skip to content

Commit

Permalink
Add the ability to buffer metrics before sending them to unix domain …
Browse files Browse the repository at this point in the history
…socket (#98)

* Make publisher buffer metrics before sending them to spectatord with
each call.

* Make max_buffer_size configurable.

* Make sure max_buffer_size is passed to the publisher.

* Fix build.

* Only add newline if buffering.

* Clear the buffer after an exception too.

* Clear the buffer after all retries.

* Fix logging so that we log the actual buffer that was sent.

* Add logging around how many bytes sent for a given send(...) call

* Add more logging when sending zero byte buffer.

* Fix tests and add a new test for buffering publisher.

* Rename config variables, append a character instead.

* PR comments:
1. Add information about buffering functionality to the README.
2. Add .bazeliskrc file.
  • Loading branch information
cancecen authored Apr 25, 2023
1 parent 518797f commit 8d9ab60
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 17 deletions.
1 change: 1 addition & 0 deletions .bazeliskrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
USE_BAZEL_VERSION=5.4.0
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,7 @@ int main() {
}
}
```
## High-Volume Publishing
By default, the library sends every meter change to the spectatord sidecar immediately. This involves a blocking `send` call and underlying system calls, and may not be the most efficient way to publish metrics in high-volume use cases.
For this purpose a simple buffering functionality in `Publisher` is implemented, and it can be turned on by passing a buffer size to the `spectator::Config` constructor. It is important to note that, until this buffer fills up, the `Publisher` will not send nay meters to the sidecar. Therefore, if your application doesn't emit meters at a high rate, you should either keep the buffer very small, or do not configure a buffer size at all, which will fall back to the "publish immediately" mode of operation.
1 change: 1 addition & 0 deletions spectator/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace spectator {
struct Config {
std::string endpoint;
std::unordered_map<std::string, std::string> common_tags;
uint32_t bytes_to_buffer;
};

} // namespace spectator
32 changes: 21 additions & 11 deletions spectator/publisher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@

namespace spectator {

static const char NEW_LINE = '\n';

SpectatordPublisher::SpectatordPublisher(absl::string_view endpoint,
uint32_t bytes_to_buffer,
std::shared_ptr<spdlog::logger> logger)
: logger_(std::move(logger)),
udp_socket_(io_context_),
local_socket_(io_context_) {
local_socket_(io_context_), bytes_to_buffer_(bytes_to_buffer) {
buffer_.reserve(bytes_to_buffer_ + 1024);
if (absl::StartsWith(endpoint, "unix:")) {
setup_unix_domain(endpoint.substr(5));
} else if (absl::StartsWith(endpoint, "udp:")) {
Expand Down Expand Up @@ -50,17 +54,23 @@ void SpectatordPublisher::setup_unix_domain(absl::string_view path) {
// get a copy of the file path
std::string local_path{path};
sender_ = [local_path, this](std::string_view msg) {
for (auto i = 0; i < 3; ++i) {
try {
local_socket_.send(asio::buffer(msg));
logger_->trace("Sent (local): {}", msg);
break;
} catch (std::exception& e) {
local_reconnect(local_path);
logger_->warn("Unable to send {} - attempt {}/3 ({})", msg, i,
e.what());
buffer_.append(msg);
if (buffer_.length() >= bytes_to_buffer_) {
for (auto i = 0; i < 3; ++i) {
try {
auto sent_bytes = local_socket_.send(asio::buffer(buffer_));
logger_->trace("Sent (local): {} bytes, in total had {}", sent_bytes, buffer_.length());
break;
} catch (std::exception& e) {
local_reconnect(local_path);
logger_->warn("Unable to send {} - attempt {}/3 ({})", buffer_, i,
e.what());
}
}
}
buffer_.clear();
} else {
buffer_.push_back(NEW_LINE);
}
};
}

Expand Down
3 changes: 3 additions & 0 deletions spectator/publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class SpectatordPublisher {
public:
explicit SpectatordPublisher(
absl::string_view endpoint,
uint32_t bytes_to_buffer = 0,
std::shared_ptr<spdlog::logger> logger = DefaultLogger());
SpectatordPublisher(const SpectatordPublisher&) = delete;

Expand All @@ -31,6 +32,8 @@ class SpectatordPublisher {
asio::io_context io_context_;
asio::ip::udp::socket udp_socket_;
asio::local::datagram_protocol::socket local_socket_;
std::string buffer_;
uint32_t bytes_to_buffer_;
};

} // namespace spectator
35 changes: 30 additions & 5 deletions spectator/publisher_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ TEST(Publisher, Udp) {
logger->info("Udp Server started on port {}", server.GetPort());

SpectatordPublisher publisher{
fmt::format("udp:localhost:{}", server.GetPort())};
fmt::format("udp:localhost:{}", server.GetPort()), 0};
Counter c{std::make_shared<Id>("counter", Tags{}), &publisher};
c.Increment();
c.Add(2);
Expand All @@ -39,14 +39,14 @@ const char* first_not_null(char* a, const char* b) {
return b;
}

TEST(Publisher, Unix) {
TEST(Publisher, UnixNoBuffer) {
auto logger = spectator::DefaultLogger();
const auto* dir = first_not_null(std::getenv("TMPDIR"), "/tmp");
auto path = fmt::format("{}/testserver.{}", dir, getpid());
TestUnixServer server{path};
server.Start();
logger->info("Unix Server started on path {}", path);
SpectatordPublisher publisher{fmt::format("unix:{}", path)};
SpectatordPublisher publisher{fmt::format("unix:{}", path), 0};
Counter c{std::make_shared<Id>("counter", Tags{}), &publisher};
c.Increment();
c.Add(2);
Expand All @@ -55,11 +55,36 @@ TEST(Publisher, Unix) {
server.Stop();
unlink(path.c_str());
std::vector<std::string> expected{"c:counter:1", "c:counter:2"};
EXPECT_EQ(server.GetMessages(), expected);
EXPECT_EQ(msgs, expected);
}

TEST(Publisher, UnixBuffer) {
auto logger = spectator::DefaultLogger();
const auto* dir = first_not_null(std::getenv("TMPDIR"), "/tmp");
auto path = fmt::format("{}/testserver.{}", dir, getpid());
TestUnixServer server{path};
server.Start();
logger->info("Unix Server started on path {}", path);
// Do not send until we buffer 32 bytes of data.
SpectatordPublisher publisher{fmt::format("unix:{}", path), 32};
Counter c{std::make_shared<Id>("counter", Tags{}), &publisher};
c.Increment();
c.Increment();
std::this_thread::sleep_for(std::chrono::milliseconds(50));
auto msgs = server.GetMessages();
std::vector<std::string> emptyVector {};
EXPECT_EQ(msgs, emptyVector);
c.Increment();
std::this_thread::sleep_for(std::chrono::milliseconds(50));
msgs = server.GetMessages();
std::vector<std::string> expected{"c:counter:1\nc:counter:1\nc:counter:1"};
EXPECT_EQ(msgs, expected);
server.Stop();
unlink(path.c_str());
}

TEST(Publisher, Nop) {
SpectatordPublisher publisher{""};
SpectatordPublisher publisher{"", 0};
Counter c{std::make_shared<Id>("counter", Tags{}), &publisher};
c.Increment();
c.Add(2);
Expand Down
2 changes: 1 addition & 1 deletion spectator/registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ class SpectatordRegistry
std::move(logger)) {
extra_tags_ = Tags::from(config.common_tags);
state_.publisher =
std::make_unique<SpectatordPublisher>(config.endpoint, logger_);
std::make_unique<SpectatordPublisher>(config.endpoint, config.bytes_to_buffer, logger_);
}
};

Expand Down

0 comments on commit 8d9ab60

Please sign in to comment.