Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include "ydb_service_import.h"

#include "ydb_common.h"

#include <ydb/public/lib/ydb_cli/common/normalize_path.h>
#include <ydb/public/lib/ydb_cli/common/print_operation.h>
#include <ydb/public/lib/ydb_cli/common/interactive.h>
Expand All @@ -16,6 +18,10 @@
#include <unistd.h>
#endif

namespace NYdb::NDump {
extern const char SCHEME_FILE_NAME[];
}

namespace NYdb::NConsoleClient {

TCommandImport::TCommandImport()
Expand Down Expand Up @@ -120,16 +126,43 @@ int TCommandImportFromS3::Run(TConfig& config) {
settings.AccessKey(AwsAccessKey);
settings.SecretKey(AwsSecretKey);

for (const auto& item : Items) {
settings.AppendItem({item.Source, item.Destination});
}

if (Description) {
settings.Description(Description);
}

settings.NumberOfRetries(NumberOfRetries);

InitAwsAPI();
try {
auto s3Client = CreateS3ClientWrapper(settings);
for (auto item : Items) {
std::optional<TString> token;
if (!item.Source.empty() && item.Source.back() != '/') {
item.Source += "/";
}
if (!item.Destination.empty() && item.Destination.back() == '.') {
item.Destination.pop_back();
}
if (item.Destination.empty() || item.Destination.back() != '/') {
item.Destination += "/";
}
do {
auto listResult = s3Client->ListObjectKeys(item.Source, token);
token = listResult.NextToken;
for (TStringBuf key : listResult.Keys) {
if (key.ChopSuffix(NDump::SCHEME_FILE_NAME)) {
TString destination = item.Destination + key.substr(item.Source.Size());
settings.AppendItem({TString(key), std::move(destination)});
}
}
} while (token);
}
} catch (...) {
ShutdownAwsAPI();
throw;
}
ShutdownAwsAPI();

TImportClient client(CreateDriver(config));
TImportFromS3Response response = client.ImportFromS3(std::move(settings)).GetValueSync();
ThrowOnError(response);
Expand Down
1 change: 0 additions & 1 deletion ydb/public/lib/ydb_cli/commands/ydb_service_import.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include "ydb_command.h"
#include "ydb_common.h"

#include <ydb/public/sdk/cpp/client/ydb_import/import.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
Expand Down
67 changes: 67 additions & 0 deletions ydb/public/lib/ydb_cli/common/aws.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
#include "aws.h"

#include <ydb/public/sdk/cpp/client/ydb_import/import.h>

#include <aws/core/Aws.h>
#include <aws/core/auth/AWSCredentialsProvider.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/ListObjectsV2Request.h>

namespace NYdb::NConsoleClient {

const TString TCommandWithAwsCredentials::AwsCredentialsFile = "~/.aws/credentials";
Expand Down Expand Up @@ -34,4 +41,64 @@ TString TCommandWithAwsCredentials::ReadIniKey(const TString& iniKey) {
}
}

class TS3ClientWrapper : public IS3ClientWrapper {
public:
TS3ClientWrapper(const NImport::TImportFromS3Settings& settings)
: Bucket(settings.Bucket_)
{
Aws::S3::S3ClientConfiguration config;
config.endpointOverride = settings.Endpoint_;
if (settings.Scheme_ == ES3Scheme::HTTP) {
config.scheme = Aws::Http::Scheme::HTTP;
} else if (settings.Scheme_ == ES3Scheme::HTTPS) {
config.scheme = Aws::Http::Scheme::HTTPS;
} else {
throw TMisuseException() << "\"" << settings.Scheme_ << "\" scheme type is not supported";
}

Client = std::make_unique<Aws::S3::S3Client>(
Aws::Auth::AWSCredentials(settings.AccessKey_, settings.SecretKey_),
config,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
true);
}

TListS3Result ListObjectKeys(const TString& prefix, const std::optional<TString>& token) override {
auto request = Aws::S3::Model::ListObjectsV2Request()
.WithBucket(Bucket)
.WithPrefix(prefix);
if (token) {
request.WithContinuationToken(*token);
}
auto response = Client->ListObjectsV2(request);
if (!response.IsSuccess()) {
throw TMisuseException() << "ListObjectKeys error: " << response.GetError().GetMessage();
}
TListS3Result result;
for (const auto& object : response.GetResult().GetContents()) {
result.Keys.push_back(TString(object.GetKey()));
}
if (response.GetResult().GetIsTruncated()) {
result.NextToken = TString(response.GetResult().GetNextContinuationToken());
}
return result;
}

private:
std::unique_ptr<Aws::S3::S3Client> Client;
const TString Bucket;
};

std::unique_ptr<IS3ClientWrapper> CreateS3ClientWrapper(const NImport::TImportFromS3Settings& settings) {
return std::make_unique<TS3ClientWrapper>(settings);
}

void InitAwsAPI() {
Aws::InitAPI(Aws::SDKOptions());
}

void ShutdownAwsAPI() {
Aws::ShutdownAPI(Aws::SDKOptions());
}

}
20 changes: 20 additions & 0 deletions ydb/public/lib/ydb_cli/common/aws.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
#include <util/generic/maybe.h>
#include <util/system/env.h>

namespace NYdb::NImport {
struct TImportFromS3Settings;
}

namespace NYdb::NConsoleClient {

class TCommandWithAwsCredentials {
Expand Down Expand Up @@ -60,4 +64,20 @@ class TCommandWithAwsCredentials {
TMaybe<TString> AwsProfile;
};

struct TListS3Result {
std::vector<TString> Keys;
std::optional<TString> NextToken;
};

class IS3ClientWrapper {
public:
virtual TListS3Result ListObjectKeys(const TString& prefix, const std::optional<TString>& token) = 0;
virtual ~IS3ClientWrapper() = default;
};

std::unique_ptr<IS3ClientWrapper> CreateS3ClientWrapper(const NImport::TImportFromS3Settings& settings);

void InitAwsAPI();
void ShutdownAwsAPI();

}
1 change: 1 addition & 0 deletions ydb/public/lib/ydb_cli/common/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ SRCS(
)

PEERDIR(
contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3
library/cpp/config
library/cpp/getopt
library/cpp/json/writer
Expand Down