Skip to content

Commit

Permalink
python: normalize callback structs (#1240)
Browse files Browse the repository at this point in the history
Description: Because there was no separate callbacks struct, `c_on_engine_running` and `c_on_exit` were namespaced under `Engine`. This PR moves the `std::function` closures to a dedicated `EngineCallbacks` and moves `c_on_engine_running` and `c_on_exit` into an anonymous namespace.

This PR also normalizes the callback structure for both `EngineCallbacks` and `StreamCallbacks`, so they're constructed & passed around the same.

Risk Level: Low, none of this code is built into a production target.
Testing: Pending

Signed-off-by: Cerek Hillen <chillen@lyft.com>
Signed-off-by: JP Simard <jp@jpsim.com>
  • Loading branch information
crockeo authored and jpsim committed Nov 28, 2022
1 parent 2c539eb commit e7abe65
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 93 deletions.
40 changes: 22 additions & 18 deletions mobile/library/cc/engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,31 @@
namespace Envoy {
namespace Platform {

namespace {

void c_on_engine_running(void* context) {
EngineCallbacks* engine_callbacks = static_cast<EngineCallbacks*>(context);
engine_callbacks->on_engine_running();
}

void c_on_exit(void* context) {
// NOTE: this function is intentionally empty
// as we don't actually do any post-processing on exit.
(void)context;
}

} // namespace

Engine::Engine(envoy_engine_t engine, const std::string& configuration, LogLevel log_level,
std::function<void()> on_engine_running)
: engine_(engine), on_engine_running_(on_engine_running) {
envoy_engine_callbacks callbacks{
.on_engine_running = &Engine::c_on_engine_running,
.on_exit = &Engine::c_on_exit,
.context = this,
EngineCallbacksSharedPtr callbacks)
: engine_(engine), callbacks_(callbacks) {
envoy_engine_callbacks envoy_callbacks{
.on_engine_running = &c_on_engine_running,
.on_exit = &c_on_exit,
.context = &this->callbacks_,
};

run_engine(this->engine_, callbacks, configuration.c_str(),
run_engine(this->engine_, envoy_callbacks, configuration.c_str(),
log_level_to_string(log_level).c_str());

this->stream_client_ = std::make_shared<StreamClient>(this->engine_);
Expand All @@ -27,16 +42,5 @@ Engine::~Engine() { terminate_engine(this->engine_); }
StreamClientSharedPtr Engine::stream_client() { return this->stream_client_; }
PulseClientSharedPtr Engine::pulse_client() { return this->pulse_client_; }

void Engine::c_on_engine_running(void* context) {
Engine* engine = static_cast<Engine*>(context);
engine->on_engine_running_();
}

void Engine::c_on_exit(void* context) {
// NOTE: this function is intentionally empty
// as we don't actually do any post-processing on exit.
(void)context;
}

} // namespace Platform
} // namespace Envoy
15 changes: 10 additions & 5 deletions mobile/library/cc/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ namespace Platform {
// - change context from Engine ptr to EngineCallbacks ptr
// - move c_on_(...) from private static fn to static fn in anonymous namespace

struct EngineCallbacks {
std::function<void()> on_engine_running;
// unused:
// std::function<void()> on_exit;
};

using EngineCallbacksSharedPtr = std::shared_ptr<EngineCallbacks>;

class Engine {
public:
~Engine();
Expand All @@ -24,15 +32,12 @@ class Engine {

private:
Engine(envoy_engine_t engine, const std::string& configuration, LogLevel log_level,
std::function<void()> on_engine_running);

static void c_on_engine_running(void* context);
static void c_on_exit(void* context);
EngineCallbacksSharedPtr callbacks);

friend class EngineBuilder;

envoy_engine_t engine_;
std::function<void()> on_engine_running_;
EngineCallbacksSharedPtr callbacks_;
StreamClientSharedPtr stream_client_;
PulseClientSharedPtr pulse_client_;
};
Expand Down
6 changes: 3 additions & 3 deletions mobile/library/cc/engine_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ EngineBuilder::EngineBuilder() {}

EngineBuilder& EngineBuilder::add_log_level(LogLevel log_level) {
this->log_level_ = log_level;
this->callbacks_ = std::make_shared<EngineCallbacks>();
return *this;
}

EngineBuilder& EngineBuilder::set_on_engine_running(std::function<void()> closure) {
this->on_engine_running_ = closure;
this->callbacks_->on_engine_running = closure;
return *this;
}

Expand Down Expand Up @@ -87,8 +88,7 @@ EngineSharedPtr EngineBuilder::build() {
}
}

Engine* engine =
new Engine(init_engine(), config_str, this->log_level_, this->on_engine_running_);
Engine* engine = new Engine(init_engine(), config_str, this->log_level_, this->callbacks_);
return EngineSharedPtr(engine);
}

Expand Down
2 changes: 1 addition & 1 deletion mobile/library/cc/engine_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class EngineBuilder {

private:
LogLevel log_level_ = LogLevel::info;
std::function<void()> on_engine_running_;
EngineCallbacksSharedPtr callbacks_;

std::string stats_domain_ = "0.0.0.0";
int connect_timeout_seconds_ = 30;
Expand Down
4 changes: 2 additions & 2 deletions mobile/library/cc/stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
namespace Envoy {
namespace Platform {

Stream::Stream(envoy_stream_t handle, EnvoyHttpCallbacksAdapterSharedPtr adapter)
: handle_(handle), adapter_(adapter) {}
Stream::Stream(envoy_stream_t handle, StreamCallbacksSharedPtr callbacks)
: handle_(handle), callbacks_(callbacks) {}

Stream& Stream::send_headers(RequestHeadersSharedPtr headers, bool end_stream) {
envoy_headers raw_headers = raw_header_map_as_envoy_headers(headers->all_headers());
Expand Down
4 changes: 2 additions & 2 deletions mobile/library/cc/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Platform {

class Stream {
public:
Stream(envoy_stream_t handle, EnvoyHttpCallbacksAdapterSharedPtr adapter);
Stream(envoy_stream_t handle, StreamCallbacksSharedPtr callbacks);

Stream& send_headers(RequestHeadersSharedPtr headers, bool end_stream);
Stream& send_data(envoy_data data);
Expand All @@ -22,7 +22,7 @@ class Stream {

private:
envoy_stream_t handle_;
EnvoyHttpCallbacksAdapterSharedPtr adapter_;
StreamCallbacksSharedPtr callbacks_;
};

using StreamSharedPtr = std::shared_ptr<Stream>;
Expand Down
81 changes: 40 additions & 41 deletions mobile/library/cc/stream_callbacks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,11 @@
namespace Envoy {
namespace Platform {

EnvoyHttpCallbacksAdapter::EnvoyHttpCallbacksAdapter(StreamCallbacksSharedPtr callbacks)
: stream_callbacks_(callbacks) {}
namespace {

envoy_http_callbacks EnvoyHttpCallbacksAdapter::as_envoy_http_callbacks() {
envoy_http_callbacks callbacks{
.on_headers = &EnvoyHttpCallbacksAdapter::c_on_headers,
.on_data = &EnvoyHttpCallbacksAdapter::c_on_data,
// on_metadata is not used
.on_trailers = &EnvoyHttpCallbacksAdapter::c_on_trailers,
.on_error = &EnvoyHttpCallbacksAdapter::c_on_error,
.on_complete = &EnvoyHttpCallbacksAdapter::c_on_complete,
.on_cancel = &EnvoyHttpCallbacksAdapter::c_on_cancel,
.context = this,
};
return callbacks;
}

void* EnvoyHttpCallbacksAdapter::c_on_headers(envoy_headers headers, bool end_stream,
void* context) {
auto self = static_cast<EnvoyHttpCallbacksAdapter*>(context);
if (self->stream_callbacks_->on_headers.has_value()) {
void* c_on_headers(envoy_headers headers, bool end_stream, void* context) {
auto stream_callbacks = static_cast<StreamCallbacks*>(context);
if (stream_callbacks->on_headers.has_value()) {
auto raw_headers = envoy_headers_as_raw_header_map(headers);
ResponseHeadersBuilder builder;
for (const auto& pair : raw_headers) {
Expand All @@ -36,61 +20,76 @@ void* EnvoyHttpCallbacksAdapter::c_on_headers(envoy_headers headers, bool end_st
}
builder.set(pair.first, pair.second);
}
self->stream_callbacks_->on_headers.value()(builder.build(), end_stream);
stream_callbacks->on_headers.value()(builder.build(), end_stream);
}
return context;
}

void* EnvoyHttpCallbacksAdapter::c_on_data(envoy_data data, bool end_stream, void* context) {
auto self = static_cast<EnvoyHttpCallbacksAdapter*>(context);
if (self->stream_callbacks_->on_error.has_value()) {
self->stream_callbacks_->on_data.value()(data, end_stream);
void* c_on_data(envoy_data data, bool end_stream, void* context) {
auto stream_callbacks = static_cast<StreamCallbacks*>(context);
if (stream_callbacks->on_error.has_value()) {
stream_callbacks->on_data.value()(data, end_stream);
}
return context;
}

void* EnvoyHttpCallbacksAdapter::c_on_trailers(envoy_headers metadata, void* context) {
auto self = static_cast<EnvoyHttpCallbacksAdapter*>(context);
if (self->stream_callbacks_->on_trailers.has_value()) {
void* c_on_trailers(envoy_headers metadata, void* context) {
auto stream_callbacks = static_cast<StreamCallbacks*>(context);
if (stream_callbacks->on_trailers.has_value()) {
auto raw_headers = envoy_headers_as_raw_header_map(metadata);
ResponseTrailersBuilder builder;
for (const auto& pair : raw_headers) {
builder.set(pair.first, pair.second);
}
self->stream_callbacks_->on_trailers.value()(builder.build());
stream_callbacks->on_trailers.value()(builder.build());
}
return context;
}

void* EnvoyHttpCallbacksAdapter::c_on_error(envoy_error raw_error, void* context) {
auto self = static_cast<EnvoyHttpCallbacksAdapter*>(context);
if (self->stream_callbacks_->on_error.has_value()) {
void* c_on_error(envoy_error raw_error, void* context) {
auto stream_callbacks = static_cast<StreamCallbacks*>(context);
if (stream_callbacks->on_error.has_value()) {
EnvoyErrorSharedPtr error = std::make_shared<EnvoyError>();
error->error_code = raw_error.error_code;
// TODO(crockeo): go back and convert from raw_error.message
// when doing so won't cause merge conflicts with other PRs.
error->message = "";
error->attempt_count = absl::optional<int>(raw_error.attempt_count);
self->stream_callbacks_->on_error.value()(error);
stream_callbacks->on_error.value()(error);
}
return context;
}

void* EnvoyHttpCallbacksAdapter::c_on_complete(void* context) {
auto self = static_cast<EnvoyHttpCallbacksAdapter*>(context);
if (self->stream_callbacks_->on_complete.has_value()) {
self->stream_callbacks_->on_complete.value();
void* c_on_complete(void* context) {
auto stream_callbacks = static_cast<StreamCallbacks*>(context);
if (stream_callbacks->on_complete.has_value()) {
stream_callbacks->on_complete.value();
}
return context;
}

void* EnvoyHttpCallbacksAdapter::c_on_cancel(void* context) {
auto self = static_cast<EnvoyHttpCallbacksAdapter*>(context);
if (self->stream_callbacks_->on_cancel.has_value()) {
self->stream_callbacks_->on_cancel.value();
void* c_on_cancel(void* context) {
auto stream_callbacks = static_cast<StreamCallbacks*>(context);
if (stream_callbacks->on_cancel.has_value()) {
stream_callbacks->on_cancel.value();
}
return context;
}

} // namespace

envoy_http_callbacks StreamCallbacks::as_envoy_http_callbacks() {
return envoy_http_callbacks{
.on_headers = &c_on_headers,
.on_data = &c_on_data,
// on_metadata is not used
.on_trailers = &c_on_trailers,
.on_error = &c_on_error,
.on_complete = &c_on_complete,
.on_cancel = &c_on_cancel,
.context = this,
};
}

} // namespace Platform
} // namespace Envoy
19 changes: 1 addition & 18 deletions mobile/library/cc/stream_callbacks.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,11 @@ struct StreamCallbacks {
absl::optional<OnErrorCallback> on_error;
absl::optional<OnCompleteCallback> on_complete;
absl::optional<OnCancelCallback> on_cancel;
};

using StreamCallbacksSharedPtr = std::shared_ptr<StreamCallbacks>;

class EnvoyHttpCallbacksAdapter {
public:
EnvoyHttpCallbacksAdapter(StreamCallbacksSharedPtr callbacks);

envoy_http_callbacks as_envoy_http_callbacks();

private:
static void* c_on_headers(envoy_headers headers, bool end_stream, void* context);
static void* c_on_data(envoy_data data, bool end_stream, void* context);
static void* c_on_trailers(envoy_headers metadata, void* context);
static void* c_on_error(envoy_error raw_error, void* context);
static void* c_on_complete(void* context);
static void* c_on_cancel(void* context);

StreamCallbacksSharedPtr stream_callbacks_;
};

using EnvoyHttpCallbacksAdapterSharedPtr = std::shared_ptr<EnvoyHttpCallbacksAdapter>;
using StreamCallbacksSharedPtr = std::shared_ptr<StreamCallbacks>;

} // namespace Platform
} // namespace Envoy
5 changes: 2 additions & 3 deletions mobile/library/cc/stream_prototype.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ StreamPrototype::StreamPrototype(envoy_engine_t engine) : engine_(engine) {

StreamSharedPtr StreamPrototype::start() {
auto stream = init_stream(this->engine_);
auto adapter = std::make_shared<EnvoyHttpCallbacksAdapter>(this->callbacks_);
start_stream(stream, adapter->as_envoy_http_callbacks());
start_stream(stream, this->callbacks_->as_envoy_http_callbacks());

return std::make_shared<Stream>(stream, adapter);
return std::make_shared<Stream>(stream, this->callbacks_);
}

StreamPrototype& StreamPrototype::set_on_headers(OnHeadersCallback closure) {
Expand Down

0 comments on commit e7abe65

Please sign in to comment.