Skip to content

Commit

Permalink
Merge pull request redpanda-data#23877 from mmaslankaprv/rest-client-…
Browse files Browse the repository at this point in the history
…reorg

Iceberg REST client reorganization
  • Loading branch information
mmaslankaprv authored Oct 23, 2024
2 parents a66c47c + 61ea3e9 commit 0c7a11f
Show file tree
Hide file tree
Showing 24 changed files with 483 additions and 282 deletions.
1 change: 1 addition & 0 deletions src/v/http/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ redpanda_cc_library(
"//src/v/base",
"//src/v/http:utils",
"//src/v/thirdparty/ada",
"//src/v/utils:named_type",
"@abseil-cpp//absl/container:flat_hash_map",
"@abseil-cpp//absl/container:flat_hash_set",
"@boost//:algorithm",
Expand Down
2 changes: 1 addition & 1 deletion src/v/http/request_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ namespace http {
request_builder& request_builder::host(std::string_view host) {
_url = ada::parse<ada::url_aggregator>(host);
if (!_url) {
_error = fmt::format("failed to parse host: {}", host);
_error = url_build_error{fmt::format("failed to parse host: {}", host)};
} else {
// Mark url state as good, the default state is that the host is not set
_error = std::nullopt;
Expand Down
8 changes: 5 additions & 3 deletions src/v/http/request_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "base/seastarx.h"
#include "thirdparty/ada/ada.h"
#include "utils/named_type.h"

#include <seastar/core/sstring.hh>

Expand All @@ -22,15 +23,16 @@

namespace http {

using url_build_error = named_type<ss::sstring, struct url_build_error_t>;

// Builds a request using the builder pattern. Allows setting the host, target
// (path), method, headers and query params.
class request_builder {
public:
using error_type = ss::sstring;
static constexpr auto default_state{"host not set"};

using expected
= tl::expected<boost::beast::http::request_header<>, error_type>;
= tl::expected<boost::beast::http::request_header<>, url_build_error>;

// The host supplied here is parsed and stored as a result. When the request
// is finally built this is added as a host header. If the parse failed then
Expand Down Expand Up @@ -81,7 +83,7 @@ class request_builder {
boost::beast::http::request_header<> _request;
absl::flat_hash_map<ss::sstring, ss::sstring> _query_params_kv;
absl::flat_hash_set<ss::sstring> _query_params;
std::optional<error_type> _error{default_state};
std::optional<url_build_error> _error{default_state};
};

} // namespace http
4 changes: 3 additions & 1 deletion src/v/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@ v_cc_library(
values.cc
values_avro.cc
values_bytes.cc
rest_client/parsers.cc
rest_client/json.cc
rest_client/retry_policy.cc
rest_client/catalog_client.cc
rest_client/entities.cc
rest_client/error.cc
DEPS
Avro::avro
v::bytes
Expand Down
13 changes: 13 additions & 0 deletions src/v/iceberg/json_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,19 @@ parse_optional_i64(const json::Value& v, std::string_view member_name) {
return json->get().GetInt64();
}

std::optional<ss::sstring>
parse_optional_str(const json::Value& v, std::string_view member_name) {
const auto json = parse_optional(v, member_name);
if (!json.has_value()) {
return std::nullopt;
}
if (!json->get().IsString()) {
throw std::invalid_argument(
fmt::format("Expected string for field '{}'", member_name));
}
return json->get().GetString();
}

bool parse_required_bool(const json::Value& v, std::string_view member_name) {
const auto& bool_json = parse_required(v, member_name);
if (!bool_json.IsBool()) {
Expand Down
3 changes: 3 additions & 0 deletions src/v/iceberg/json_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ parse_optional_i32(const json::Value& v, std::string_view member_name);
std::optional<int64_t>
parse_optional_i64(const json::Value& v, std::string_view member_name);

std::optional<ss::sstring>
parse_optional_str(const json::Value& v, std::string_view member_name);

bool parse_required_bool(const json::Value& v, std::string_view member_name);

std::string_view
Expand Down
68 changes: 58 additions & 10 deletions src/v/iceberg/rest_client/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ load("//bazel:build.bzl", "redpanda_cc_library")
package(default_visibility = ["//src/v/iceberg:__subpackages__"])

redpanda_cc_library(
name = "rest_client_types",
name = "error",
srcs = [
"error.cc",
],
hdrs = [
"types.h",
"error.h",
],
include_prefix = "iceberg/rest_client",
deps = [
"//src/v/base",
"//src/v/http:request_builder",
"//src/v/thirdparty/ada",
"//src/v/utils:named_type",
"@boost//:beast",
Expand All @@ -18,16 +22,17 @@ redpanda_cc_library(
)

redpanda_cc_library(
name = "rest_client_parsers",
name = "json",
srcs = [
"parsers.cc",
"json.cc",
],
hdrs = [
"parsers.h",
"json.h",
],
include_prefix = "iceberg/rest_client",
deps = [
":rest_client_types",
":oauth_token",
"//src/v/iceberg:json_utils",
"//src/v/json",
"//src/v/utils:named_type",
],
Expand All @@ -43,14 +48,54 @@ redpanda_cc_library(
],
include_prefix = "iceberg/rest_client",
deps = [
":rest_client_types",
":error",
"//src/v/http",
"//src/v/net",
],
)

redpanda_cc_library(
name = "rest_catalog_client",
name = "credentials",
hdrs = [
"credentials.h",
],
include_prefix = "iceberg/rest_client",
deps = [
"//src/v/base",
"@seastar",
],
)

redpanda_cc_library(
name = "oauth_token",
hdrs = [
"oauth_token.h",
],
include_prefix = "iceberg/rest_client",
deps = [
"//src/v/base",
"@seastar",
],
)

redpanda_cc_library(
name = "entities",
srcs = [
"entities.cc",
],
hdrs = [
"entities.h",
],
include_prefix = "iceberg/rest_client",
deps = [
"//src/v/container:fragmented_vector",
"//src/v/http:rest_entity",
"//src/v/ssx:sformat",
],
)

redpanda_cc_library(
name = "client",
srcs = [
"catalog_client.cc",
],
Expand All @@ -59,8 +104,10 @@ redpanda_cc_library(
],
include_prefix = "iceberg/rest_client",
deps = [
":rest_client_parsers",
":rest_client_types",
":credentials",
":error",
":json",
":oauth_token",
":retry_policy",
"//src/v/bytes:iobuf",
"//src/v/bytes:iobuf_parser",
Expand All @@ -71,6 +118,7 @@ redpanda_cc_library(
"//src/v/utils:named_type",
"//src/v/utils:retry_chain_node",
"@abseil-cpp//absl/strings",
"@rapidjson",
"@seastar",
],
)
43 changes: 26 additions & 17 deletions src/v/iceberg/rest_client/catalog_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
#include "bytes/iobuf_parser.h"
#include "http/request_builder.h"
#include "http/utils.h"
#include "iceberg/rest_client/parsers.h"
#include "iceberg/rest_client/json.h"

#include <seastar/core/sleep.hh>
#include <seastar/coroutine/as_future.hh>

#include <absl/strings/str_join.h>
#include <absl/strings/strip.h>
#include <rapidjson/error/en.h>

namespace {

Expand All @@ -34,6 +35,24 @@ T trim_slashes(std::optional<T> input, typename T::type default_value) {
} // namespace

namespace iceberg::rest_client {
namespace {
template<typename Func>
auto parse_as_expected(std::string_view ctx, Func&& parse_func) {
using ret_t = std::invoke_result_t<Func, const json::Document&>;
return [f = std::forward<Func>(parse_func),
ctx](const json::Document& document) -> expected<ret_t> {
try {
return f(document);
} catch (...) {
return tl::unexpected<domain_error>(json_parse_error{
.context = ss::sstring(ctx),
.error = parse_error_msg{fmt::format(
"error parsing JSON - {}", std::current_exception())},
});
}
};
}
} // namespace

expected<json::Document> parse_json(iobuf&& raw_response) {
iobuf_parser p{std::move(raw_response)};
Expand All @@ -52,15 +71,15 @@ expected<json::Document> parse_json(iobuf&& raw_response) {
}

catalog_client::catalog_client(
client_source& client_source,
std::unique_ptr<http::abstract_client> http_client,
ss::sstring endpoint,
credentials credentials,
std::optional<base_path> base_path,
std::optional<prefix_path> prefix,
std::optional<api_version> api_version,
std::optional<oauth_token> token,
std::unique_ptr<retry_policy> retry_policy)
: _client_source{client_source}
: _http_client(std::move(http_client))
, _endpoint{std::move(endpoint)}
, _credentials{std::move(credentials)}
, _path_components{std::move(base_path), std::move(prefix), std::move(api_version)}
Expand All @@ -86,7 +105,7 @@ catalog_client::acquire_token(retry_chain_node& rtc) {
});
co_return (co_await perform_request(rtc, token_request, std::move(payload)))
.and_then(parse_json)
.and_then(parse_oauth_token);
.and_then(parse_as_expected("oauth_token", parse_oauth_token));
}

ss::sstring catalog_client::root_path() const {
Expand All @@ -102,10 +121,10 @@ catalog_client::ensure_token(retry_chain_node& rtc) {
co_return (co_await acquire_token(rtc))
.and_then([this](auto t) -> expected<ss::sstring> {
_oauth_token.emplace(t);
return t.token;
return t.access_token;
});
}
co_return _oauth_token->token;
co_return _oauth_token->access_token;
}

ss::future<expected<iobuf>> catalog_client::perform_request(
Expand All @@ -123,11 +142,9 @@ ss::future<expected<iobuf>> catalog_client::perform_request(

std::vector<http_call_error> retriable_errors{};

auto client_ptr = _client_source.get().acquire();
while (true) {
const auto permit = rtc.retry();
if (!permit.is_allowed) {
co_await client_ptr->shutdown_and_stop();
co_return tl::unexpected(
retries_exhausted{.errors = std::move(retriable_errors)});
}
Expand All @@ -137,30 +154,22 @@ ss::future<expected<iobuf>> catalog_client::perform_request(
request_payload.emplace(payload->copy());
}
auto response_f = co_await ss::coroutine::as_future(
client_ptr->request_and_collect_response(
_http_client->request_and_collect_response(
std::move(request.value()), std::move(request_payload)));
auto call_res = _retry_policy->should_retry(std::move(response_f));

if (call_res.has_value()) {
co_await client_ptr->shutdown_and_stop();
co_return std::move(call_res->body);
}

auto& error = call_res.error();
if (!error.can_be_retried) {
co_await client_ptr->shutdown_and_stop();
co_return tl::unexpected(std::move(error.err));
}

if (error.is_transport_error()) {
co_await client_ptr->shutdown_and_stop();
client_ptr = _client_source.get().acquire();
}

retriable_errors.emplace_back(std::move(error.err));
co_await ss::sleep_abortable(permit.delay, rtc.root_abort_source());
}
co_await client_ptr->shutdown_and_stop();
}

path_components::path_components(
Expand Down
21 changes: 8 additions & 13 deletions src/v/iceberg/rest_client/catalog_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
#include "bytes/iobuf.h"
#include "http/client.h"
#include "http/request_builder.h"
#include "iceberg/rest_client/credentials.h"
#include "iceberg/rest_client/error.h"
#include "iceberg/rest_client/oauth_token.h"
#include "iceberg/rest_client/retry_policy.h"
#include "iceberg/rest_client/types.h"
#include "json/document.h"
#include "utils/named_type.h"
#include "utils/retry_chain_node.h"
Expand All @@ -28,16 +30,6 @@ using base_path = named_type<ss::sstring, struct base_path_t>;
using prefix_path = named_type<ss::sstring, struct prefix_t>;
using api_version = named_type<ss::sstring, struct api_version_t>;

// A client source generates low level http clients for the catalog client to
// make API calls with. Once a generic http client pool is implemented, it can
// use the same interface and hand out client leases instead of unique ptrs.
struct client_source {
// A client returned by this method call is owned by the caller. It should
// be shut down after use by the caller.
virtual std::unique_ptr<http::abstract_client> acquire() = 0;
virtual ~client_source() = default;
};

// Holds parts of a root path used by catalog client
struct path_components {
path_components(
Expand Down Expand Up @@ -87,7 +79,7 @@ class catalog_client {
/// if valid. If expired, a new one will be acquired \param retry_policy a
/// retry policy used to determine how failing calls will be retried
catalog_client(
client_source& client_source,
std::unique_ptr<http::abstract_client> client,
ss::sstring endpoint,
credentials credentials,
std::optional<base_path> base_path = std::nullopt,
Expand All @@ -96,6 +88,9 @@ class catalog_client {
std::optional<oauth_token> token = std::nullopt,
std::unique_ptr<retry_policy> retry_policy = nullptr);

// Must be called before destroying the client to prevent resource leak
ss::future<> shutdown() { return _http_client->shutdown_and_stop(); }

private:
// The root url calculated from base url, prefix and api version. Given a
// base url of "/b", an api version "v2" and a prefix of "x/y", the root url
Expand All @@ -118,7 +113,7 @@ class catalog_client {
http::request_builder request_builder,
std::optional<iobuf> payload = std::nullopt);

std::reference_wrapper<client_source> _client_source;
std::unique_ptr<http::abstract_client> _http_client;
ss::sstring _endpoint;
credentials _credentials;
path_components _path_components;
Expand Down
Loading

0 comments on commit 0c7a11f

Please sign in to comment.