Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 5 additions & 43 deletions ydb/core/tx/tiering/tier/object.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "object.h"
#include "s3_uri.h"

#include <library/cpp/json/writer/json_value.h>
#include <library/cpp/protobuf/json/proto2json.h>
Expand Down Expand Up @@ -46,50 +47,11 @@ TConclusionStatus TTierConfig::DeserializeFromProto(const NKikimrSchemeOp::TExte
}
}

NUri::TUri url;
if (url.Parse(proto.GetLocation(), NUri::TFeature::FeaturesAll) != NUri::TState::EParsed::ParsedOK) {
return TConclusionStatus::Fail("Cannot parse url: " + proto.GetLocation());
}

switch (url.GetScheme()) {
case NUri::TScheme::SchemeEmpty:
break;
case NUri::TScheme::SchemeHTTP:
ProtoConfig.SetScheme(::NKikimrSchemeOp::TS3Settings_EScheme_HTTP);
break;
case NUri::TScheme::SchemeHTTPS:
ProtoConfig.SetScheme(::NKikimrSchemeOp::TS3Settings_EScheme_HTTPS);
break;
default:
return TConclusionStatus::Fail("Unknown schema in url");
}

{
TStringBuf endpoint;
TStringBuf bucket;

TStringBuf host = url.GetHost();
TStringBuf path = url.GetField(NUri::TField::FieldPath);
if (!path.Empty()) {
endpoint = host;
bucket = path;
bucket.SkipPrefix("/");
if (bucket.Contains("/")) {
return TConclusionStatus::Fail(TStringBuilder() << "Not a bucket (contains directories): " << bucket);
}
} else {
if (!path.TrySplit('.', endpoint, bucket)) {
return TConclusionStatus::Fail(TStringBuilder() << "Bucket is not specified in URL: " << path);
}
}

if (url.GetField(NUri::TField::FieldPort)) {
ProtoConfig.SetEndpoint(TStringBuilder() << endpoint << ":" << url.GetPort());
} else {
ProtoConfig.SetEndpoint(TString(endpoint));
}
ProtoConfig.SetBucket(TString(bucket));
auto parsedUri = TS3Uri::ParseUri(proto.GetLocation());
if (parsedUri.IsFail()) {
return parsedUri;
}
parsedUri->FillSettings(ProtoConfig);

return TConclusionStatus::Success();
}
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/tiering/tier/s3_uri.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#include "s3_uri.h"

namespace NKikimr::NColumnShard::NTiers {
}
179 changes: 179 additions & 0 deletions ydb/core/tx/tiering/tier/s3_uri.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
#include <ydb/core/protos/flat_scheme_op.pb.h>

#include <ydb/library/accessor/accessor.h>
#include <ydb/library/conclusion/result.h>

#include <library/cpp/uri/uri.h>
#include <util/string/builder.h>

namespace NKikimr::NColumnShard::NTiers {

class TS3Uri {
private:
YDB_READONLY_DEF(std::optional<NKikimrSchemeOp::TS3Settings_EScheme>, Scheme);
YDB_READONLY_DEF(TString, Bucket);
YDB_READONLY_DEF(TString, Host);
YDB_READONLY_DEF(std::optional<ui16>, Port);
YDB_READONLY_DEF(std::optional<TString>, Folder);

enum TUriStyle {
PATH_STYLE = 1,
VIRTUAL_HOSTED_STYLE = 2,
};

inline static const std::vector<TString> BucketHostSeparators = { ".s3.", ".s3-" };

private:
static TStringBuf StripPath(const TStringBuf& path) {
TStringBuf stripped = path;
while (stripped.SkipPrefix("/")) {
}
while (stripped.ChopSuffix("/")) {
}
return stripped;
}

static std::optional<TUriStyle> DeduceUriStyle(const NUri::TUri& uri) {
const bool hasSubdomain = std::count(uri.GetHost().begin(), uri.GetHost().end(), '.') >= 2;
const bool hasPath = !StripPath(uri.GetField(NUri::TField::FieldPath)).Empty();
if (hasSubdomain && !hasPath) {
return VIRTUAL_HOSTED_STYLE;
}
if (!hasSubdomain && hasPath) {
return PATH_STYLE;
}

// URI style deduction copied from AWS SDK for Java
for (const TString& sep : BucketHostSeparators) {
if (uri.GetHost().StartsWith(sep.substr(1))) {
return PATH_STYLE;
}
if (uri.GetHost().Contains(sep)) {
return VIRTUAL_HOSTED_STYLE;
}
}

return std::nullopt;
}

static TConclusion<TS3Uri> ParsePathStyleUri(const NUri::TUri& input) {
TS3Uri result;

TStringBuf path = StripPath(input.GetField(NUri::TField::FieldPath));

if (path.Empty()) {
return TConclusionStatus::Fail(TStringBuilder() << "Missing bucket in path-style S3 uri: " << input.Serialize());
}

TStringBuf folder;
TStringBuf bucket;
if (path.TryRSplit('/', folder, bucket)) {
result.Folder = folder;
result.Bucket = bucket;
} else {
result.Bucket = path;
}

result.Host = input.GetHost();

if (auto status = result.FillStyleAgnosticFields(input); status.IsFail()) {
return status;
}
return result;
}

static TConclusion<TS3Uri> ParseVirtualHostedStyleUri(const NUri::TUri& input) {
TS3Uri result;

for (const TString& sep : BucketHostSeparators) {
if (const ui64 findSep = input.GetHost().find(sep); findSep != TStringBuf::npos) {
result.Bucket = input.GetHost().SubStr(0, findSep);
result.Host = input.GetHost().SubStr(findSep + 1);
break;
}
}
if (result.Host.empty()) {
TStringBuf host;
TStringBuf bucket;
if (input.GetHost().TrySplit('.', bucket, host)) {
result.Host = host;
result.Bucket = bucket;
} else {
return TConclusionStatus::Fail(TStringBuilder() << "Missing bucket in virtual-hosted style S3 uri: " << input.Serialize());
}
}

if (TStringBuf path = StripPath(input.GetField(NUri::TField::FieldPath))) {
result.Folder = path;
}

if (auto status = result.FillStyleAgnosticFields(input); status.IsFail()) {
return status;
}
return result;
}

TConclusionStatus FillStyleAgnosticFields(const NUri::TUri& from) {
if (from.GetField(NUri::TField::FieldPort)) {
Port = from.GetPort();
}

switch (from.GetScheme()) {
case NUri::TScheme::SchemeEmpty:
break;
case NUri::TScheme::SchemeHTTP:
Scheme = NKikimrSchemeOp::TS3Settings_EScheme_HTTP;
break;
case NUri::TScheme::SchemeHTTPS:
Scheme = NKikimrSchemeOp::TS3Settings_EScheme_HTTPS;
break;
default:
return TConclusionStatus::Fail(TStringBuilder() << "Unexpected scheme in url: " << from.Serialize());
}

return TConclusionStatus::Success();
}

public:
static TConclusion<TS3Uri> ParseUri(const TString& input) {
NUri::TUri uri;
if (uri.Parse(input, NUri::TFeature::NewFeaturesRecommended) != NUri::TState::EParsed::ParsedOK) {
return TConclusionStatus::Fail("Cannot parse URI: " + input);
}

TUriStyle uriStyle;
if (const auto deducedStyle = DeduceUriStyle(uri)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeduceUriStyle(uri).value_or(PATH_STYLE)

uriStyle = *deducedStyle;
} else {
uriStyle = PATH_STYLE;
}

switch (uriStyle) {
case PATH_STYLE:
return ParsePathStyleUri(uri);
case VIRTUAL_HOSTED_STYLE:
return ParseVirtualHostedStyleUri(uri);
}
}

TString GetEndpoint() const {
TString endpoint = Host;
if (Port) {
endpoint += TStringBuilder() << ':' << *Port;
}
if (Folder) {
endpoint += TStringBuilder() << '/' << *Folder;
}
return endpoint;
}

void FillSettings(NKikimrSchemeOp::TS3Settings& settings) const {
settings.SetEndpoint(GetEndpoint());
settings.SetBucket(Bucket);
if (Scheme) {
settings.SetScheme(*Scheme);
}
}
};

} // namespace NKikimr::NColumnShard::NTiers
2 changes: 2 additions & 0 deletions ydb/core/tx/tiering/tier/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ LIBRARY()

SRCS(
object.cpp
s3_uri.cpp
)

PEERDIR(
ydb/library/conclusion
ydb/services/metadata/secret/accessor
contrib/restricted/aws/aws-crt-cpp
)

YQL_LAST_ABI_VERSION()
Expand Down
40 changes: 39 additions & 1 deletion ydb/core/tx/tiering/ut/ut_object.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
#include <ydb/core/tx/tiering/tier/object.h>
#include <ydb/core/tx/tiering/tier/s3_uri.h>

#include <library/cpp/testing/unittest/registar.h>

namespace NKikimr {

using namespace NColumnShard;

Y_UNIT_TEST_SUITE(S3SettingsConvertion) {
Y_UNIT_TEST_SUITE(S3SettingsConversion) {
void ValidateConversion(
const NKikimrSchemeOp::TExternalDataSourceDescription& input, TConclusion<const NKikimrSchemeOp::TS3Settings> expectedResult) {
NTiers::TTierConfig config;
Expand Down Expand Up @@ -69,6 +70,43 @@ Y_UNIT_TEST_SUITE(S3SettingsConvertion) {
)", &output));
ValidateConversion(input, output);
}

Y_UNIT_TEST(FoldersStrictStyle) {
std::vector<TString> uris = {
"http://s3.yandexcloud.net:8080/my-folder/subfolder/bucket",
"http://bucket.s3.yandexcloud.net:8080/my-folder/subfolder",
};
for (const auto& input : uris) {
NTiers::TS3Uri uri = NTiers::TS3Uri::ParseUri(input).DetachResult();
UNIT_ASSERT_STRINGS_EQUAL_C(uri.GetEndpoint(), "s3.yandexcloud.net:8080/my-folder/subfolder", input);
UNIT_ASSERT_STRINGS_EQUAL_C(uri.GetBucket(), "bucket", input);
}
}

Y_UNIT_TEST(FoldersStyleDeduction) {
std::vector<TString> uris = {
"http://storage.yandexcloud.net:8080/my-folder/subfolder/bucket",
"http://storage.yandexcloud.net:8080///my-folder/subfolder/bucket//",
};
for (const auto& input : uris) {
NTiers::TS3Uri uri = NTiers::TS3Uri::ParseUri(input).DetachResult();
UNIT_ASSERT_STRINGS_EQUAL_C(uri.GetEndpoint(), "storage.yandexcloud.net:8080/my-folder/subfolder", input);
UNIT_ASSERT_STRINGS_EQUAL_C(uri.GetBucket(), "bucket", input);
}
}

Y_UNIT_TEST(StyleDeduction) {
std::vector<TString> uris = {
"http://storage.yandexcloud.net/bucket",
"http://my-s3.net/bucket",
"http://bucket.my-s3.net",
"http://bucket.my-s3.net/",
};
for (const auto& input : uris) {
NTiers::TS3Uri uri = NTiers::TS3Uri::ParseUri(input).DetachResult();
UNIT_ASSERT_STRINGS_EQUAL_C(uri.GetBucket(), "bucket", input);
}
}
}

} // namespace NKikimr
Loading