Skip to content

auditlog: add exports/imports #8550

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/grpc_services/rpc_export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>,
if (this->UserToken) {
ev->Record.SetUserSID(this->UserToken->GetUserSID());
}
ev->Record.SetPeerName(this->Request->GetPeerName());

auto& createExport = *ev->Record.MutableRequest();
*createExport.MutableOperationParams() = request.operation_params();
Expand Down
1 change: 1 addition & 0 deletions ydb/core/grpc_services/rpc_import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class TImportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>,
if (this->UserToken) {
ev->Record.SetUserSID(this->UserToken->GetUserSID());
}
ev->Record.SetPeerName(this->Request->GetPeerName());

auto& createImport = *ev->Record.MutableRequest();
createImport.MutableOperationParams()->CopyFrom(request.operation_params());
Expand Down
11 changes: 8 additions & 3 deletions ydb/core/protos/export.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ message TCreateExportRequest {
message TEvCreateExportRequest {
optional uint64 TxId = 1;
optional string DatabaseName = 2;
optional string UserSID = 4;
optional TCreateExportRequest Request = 3;
optional string UserSID = 4;
optional string PeerName = 5;
}

message TCreateExportResponse {
Expand Down Expand Up @@ -70,8 +71,10 @@ message TCancelExportRequest {

message TEvCancelExportRequest {
optional uint64 TxId = 1;
optional string DatabaseName = 3;
optional TCancelExportRequest Request = 2;
optional string DatabaseName = 3;
optional string UserSID = 4;
optional string PeerName = 5;
}

message TCancelExportResponse {
Expand All @@ -90,8 +93,10 @@ message TForgetExportRequest {

message TEvForgetExportRequest {
optional uint64 TxId = 1;
optional string DatabaseName = 3;
optional TForgetExportRequest Request = 2;
optional string DatabaseName = 3;
optional string UserSID = 4;
optional string PeerName = 5;
}

message TForgetExportResponse {
Expand Down
9 changes: 7 additions & 2 deletions ydb/core/protos/import.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ message TEvCreateImportRequest {
optional string DatabaseName = 2;
optional string UserSID = 3;
optional TCreateImportRequest Request = 4;
optional string PeerName = 5;
}

message TCreateImportResponse {
Expand Down Expand Up @@ -68,8 +69,10 @@ message TCancelImportRequest {

message TEvCancelImportRequest {
optional uint64 TxId = 1;
optional string DatabaseName = 3;
optional TCancelImportRequest Request = 2;
optional string DatabaseName = 3;
optional string UserSID = 4;
optional string PeerName = 5;
}

message TCancelImportResponse {
Expand All @@ -88,8 +91,10 @@ message TForgetImportRequest {

message TEvForgetImportRequest {
optional uint64 TxId = 1;
optional string DatabaseName = 3;
optional TForgetImportRequest Request = 2;
optional string DatabaseName = 3;
optional string UserSID = 4;
optional string PeerName = 5;
}

message TForgetImportResponse {
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/tx/schemeshard/schemeshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4210,8 +4210,9 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
TString settings = rowset.GetValue<Schema::Exports::Settings>();
auto domainPathId = TPathId(rowset.GetValueOrDefault<Schema::Exports::DomainPathOwnerId>(selfId),
rowset.GetValue<Schema::Exports::DomainPathId>());
TString peerName = rowset.GetValueOrDefault<Schema::Exports::PeerName>();

TExportInfo::TPtr exportInfo = new TExportInfo(id, uid, kind, settings, domainPathId);
TExportInfo::TPtr exportInfo = new TExportInfo(id, uid, kind, settings, domainPathId, peerName);

if (rowset.HaveValue<Schema::Exports::UserSID>()) {
exportInfo->UserSID = rowset.GetValue<Schema::Exports::UserSID>();
Expand Down Expand Up @@ -4308,11 +4309,12 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
TImportInfo::EKind kind = static_cast<TImportInfo::EKind>(rowset.GetValue<Schema::Imports::Kind>());
auto domainPathId = TPathId(rowset.GetValue<Schema::Imports::DomainPathOwnerId>(),
rowset.GetValue<Schema::Imports::DomainPathLocalId>());
TString peerName = rowset.GetValueOrDefault<Schema::Imports::PeerName>();

Ydb::Import::ImportFromS3Settings settings;
Y_ABORT_UNLESS(ParseFromStringNoSizeLimit(settings, rowset.GetValue<Schema::Imports::Settings>()));

TImportInfo::TPtr importInfo = new TImportInfo(id, uid, kind, settings, domainPathId);
TImportInfo::TPtr importInfo = new TImportInfo(id, uid, kind, settings, domainPathId, peerName);

if (rowset.HaveValue<Schema::Imports::UserSID>()) {
importInfo->UserSID = rowset.GetValue<Schema::Imports::UserSID>();
Expand Down
228 changes: 219 additions & 9 deletions ydb/core/tx/schemeshard/schemeshard_audit_log.cpp
Original file line number Diff line number Diff line change
@@ -1,20 +1,29 @@
#include "schemeshard_audit_log.h"
#include "schemeshard_path.h"
#include "schemeshard_audit_log_fragment.h"
#include <util/string/vector.h>

#include <ydb/public/api/protos/ydb_export.pb.h>
#include <ydb/public/api/protos/ydb_import.pb.h>

#include <ydb/core/audit/audit_log.h>
#include <ydb/core/protos/flat_tx_scheme.pb.h>
#include <ydb/core/protos/export.pb.h>
#include <ydb/core/protos/import.pb.h>

#include <ydb/core/util/address_classifier.h>
#include <util/string/vector.h>
#include <ydb/core/audit/audit_log.h>

#include "schemeshard_path.h"
#include "schemeshard_impl.h"
#include "schemeshard_xxport__helpers.h"
#include "schemeshard_audit_log_fragment.h"
#include "schemeshard_audit_log.h"

namespace NKikimr::NSchemeShard {

namespace {
const TString SchemeshardComponentName = "schemeshard";

//NOTE: EmptyValue couldn't be an empty string as AUDIT_PART() skips parts with an empty values
const TString EmptyValue = "{none}";
}
const TString SchemeshardComponentName = "schemeshard";

//NOTE: EmptyValue couldn't be an empty string as AUDIT_PART() skips parts with an empty values
const TString EmptyValue = "{none}";

TString GeneralStatus(NKikimrScheme::EStatus actualStatus) {
switch(actualStatus) {
Expand Down Expand Up @@ -68,6 +77,8 @@ TPath DatabasePathFromWorkingDir(TSchemeShard* SS, const TString &opWorkingDir)
return databasePath;
}

} // anonymous namespace

void AuditLogModifySchemeTransaction(const NKikimrScheme::TEvModifySchemeTransaction& request, const NKikimrScheme::TEvModifySchemeTransactionResult& response, TSchemeShard* SS, const TString& userSID) {
// Each TEvModifySchemeTransaction.Transaction is a self sufficient operation and should be logged independently
// (even if it was packed into a single TxProxy transaction with some other operations).
Expand Down Expand Up @@ -167,6 +178,205 @@ void AuditLogModifySchemeTransactionDeprecated(const NKikimrScheme::TEvModifySch
}
}

namespace {

struct TXxportRecord {
TString OperationName;
ui64 Id;
TString Uid;
TString RemoteAddress;
TString UserSID;
TString DatabasePath;
TString Status;
Ydb::StatusIds::StatusCode DetailedStatus;
TString Reason;
TVector<std::pair<TString, TString>> AdditionalParts;
TString StartTime;
TString EndTime;
TString CloudId;
TString FolderId;
TString ResourceId;
};

void AuditLogXxport(TXxportRecord&& record) {
AUDIT_LOG(
AUDIT_PART("component", SchemeshardComponentName)

AUDIT_PART("id", std::to_string(record.Id))
AUDIT_PART("uid", record.Uid);
AUDIT_PART("remote_address", (!record.RemoteAddress.empty() ? record.RemoteAddress : EmptyValue))
AUDIT_PART("subject", (!record.UserSID.empty() ? record.UserSID : EmptyValue))
AUDIT_PART("database", (!record.DatabasePath.empty() ? record.DatabasePath : EmptyValue))
AUDIT_PART("operation", record.OperationName)
AUDIT_PART("status", record.Status)
AUDIT_PART("detailed_status", Ydb::StatusIds::StatusCode_Name(record.DetailedStatus))
AUDIT_PART("reason", record.Reason)

// all parts are considered required, so all empty values are replaced with a special stub
for (const auto& [name, value] : record.AdditionalParts) {
AUDIT_PART(name, (!value.empty() ? value : EmptyValue))
}

AUDIT_PART("start_time", record.StartTime)
AUDIT_PART("end_time", record.EndTime)

AUDIT_PART("cloud_id", record.CloudId);
AUDIT_PART("folder_id", record.FolderId);
AUDIT_PART("resource_id", record.ResourceId);
);
}

using TParts = decltype(TXxportRecord::AdditionalParts);

template <class Proto>
TParts ExportKindSpecificParts(const Proto& proto) {
//NOTE: intentional switch -- that will help to detect (by breaking the compilation)
// the moment when and if oneof Settings will be extended
switch (proto.GetSettingsCase()) {
case Proto::kExportToYtSettings:
return ExportKindSpecificParts(proto.GetExportToYtSettings());
case Proto::kExportToS3Settings:
return ExportKindSpecificParts(proto.GetExportToS3Settings());
case Proto::SETTINGS_NOT_SET:
return {};
}
}
template <> TParts ExportKindSpecificParts(const Ydb::Export::ExportToYtSettings& proto) {
return {
{"export_type", "yt"},
{"export_item_count", ToString(proto.items().size())},
{"export_yt_prefix", ((proto.items().size() > 0) ? proto.items(0).destination_path() : "")},
};
}
template <> TParts ExportKindSpecificParts(const Ydb::Export::ExportToS3Settings& proto) {
return {
{"export_type", "s3"},
{"export_item_count", ToString(proto.items().size())},
{"export_s3_bucket", proto.bucket()},
//NOTE: take first item's destination_prefix as a "good enough approximation"
// (each item has its own destination_prefix, but in practice they are all the same)
{"export_s3_prefix", ((proto.items().size() > 0) ? proto.items(0).destination_prefix() : "")},
};
}

template <class Proto>
TParts ImportKindSpecificParts(const Proto& proto) {
//NOTE: intentional switch -- that will help to detect (by breaking the compilation)
// the moment when and if oneof Settings will be extended
switch (proto.GetSettingsCase()) {
case Proto::kImportFromS3Settings:
return ImportKindSpecificParts(proto.GetImportFromS3Settings());
case Proto::SETTINGS_NOT_SET:
return {};
}
}
template <> TParts ImportKindSpecificParts(const Ydb::Import::ImportFromS3Settings& proto) {
return {
{"import_type", "s3"},
{"export_item_count", ToString(proto.items().size())},
{"import_s3_bucket", proto.bucket()},
//NOTE: take first item's source_prefix as a "good enough approximation"
// (each item has its own source_prefix, but in practice they are all the same)
{"import_s3_prefix", ((proto.items().size() > 0) ? proto.items(0).source_prefix() : "")},
};
}

} // anonymous namespace

template <class Request, class Response>
void _AuditLogXxportStart(const Request& request, const Response& response, const TString& operationName, TParts&& additionalParts, TSchemeShard* SS) {
TPath databasePath = DatabasePathFromWorkingDir(SS, request.GetDatabaseName());
auto [cloud_id, folder_id, database_id] = GetDatabaseCloudIds(databasePath);
auto peerName = NKikimr::NAddressClassifier::ExtractAddress(request.GetPeerName());
const auto& entry = response.GetResponse().GetEntry();

AuditLogXxport({
.OperationName = operationName,
//NOTE: original request's tx-id is used as an operation id
.Id = request.GetTxId(),
.Uid = GetUid(request.GetRequest().GetOperationParams()),
.RemoteAddress = peerName,
.UserSID = request.GetUserSID(),
.DatabasePath = databasePath.PathString(),
.Status = (entry.GetStatus() == Ydb::StatusIds::SUCCESS ? "SUCCESS" : "ERROR"),
.DetailedStatus = entry.GetStatus(),
//NOTE: use main issue (on {ex,im}port itself), ignore issues on individual items
.Reason = ((entry.IssuesSize() > 0) ? entry.GetIssues(0).message() : ""),

.AdditionalParts = std::move(additionalParts),

// no start or end times

.CloudId = cloud_id,
.FolderId = folder_id,
.ResourceId = database_id,
});
}

void AuditLogExportStart(const NKikimrExport::TEvCreateExportRequest& request, const NKikimrExport::TEvCreateExportResponse& response, TSchemeShard* SS) {
_AuditLogXxportStart(request, response, "EXPORT START", ExportKindSpecificParts(request.GetRequest()), SS);
}

void AuditLogImportStart(const NKikimrImport::TEvCreateImportRequest& request, const NKikimrImport::TEvCreateImportResponse& response, TSchemeShard* SS) {
_AuditLogXxportStart(request, response, "IMPORT START", ImportKindSpecificParts(request.GetRequest()), SS);
}

template <class Info>
void _AuditLogXxportEnd(const Info& info, const TString& operationName, TParts&& additionalParts, TSchemeShard* SS) {
const TPath databasePath = TPath::Init(info.DomainPathId, SS);
auto [cloud_id, folder_id, database_id] = GetDatabaseCloudIds(databasePath);
auto peerName = NKikimr::NAddressClassifier::ExtractAddress(info.PeerName);
TString userSID = *info.UserSID.OrElse(EmptyValue);
TString startTime = (info.StartTime != TInstant::Zero() ? info.StartTime.ToString() : TString());
TString endTime = (info.EndTime != TInstant::Zero() ? info.EndTime.ToString() : TString());

// Info.State can't be anything but Done or Cancelled here
Y_ABORT_UNLESS(info.State == Info::EState::Done || info.State == Info::EState::Cancelled);
TString status = TString(info.State == Info::EState::Done ? "SUCCESS" : "ERROR");
Ydb::StatusIds::StatusCode detailedStatus = (info.State == Info::EState::Done ? Ydb::StatusIds::SUCCESS : Ydb::StatusIds::CANCELLED);

AuditLogXxport({
.OperationName = operationName,
.Id = info.Id,
.Uid = info.Uid,
.RemoteAddress = peerName,
.UserSID = userSID,
.DatabasePath = databasePath.PathString(),
.Status = status,
.DetailedStatus = detailedStatus,
.Reason = info.Issue,

.AdditionalParts = std::move(additionalParts),

.StartTime = startTime,
.EndTime = endTime,

.CloudId = cloud_id,
.FolderId = folder_id,
.ResourceId = database_id,
});
}

void AuditLogExportEnd(const TExportInfo& info, TSchemeShard* SS) {
NKikimrExport::TCreateExportRequest proto;
// TSchemeShard::FromXxportInfo() can not be used here
switch (info.Kind) {
case TExportInfo::EKind::YT:
Y_ABORT_UNLESS(proto.MutableExportToYtSettings()->ParseFromString(info.Settings));
proto.MutableExportToYtSettings()->clear_token();
break;
case TExportInfo::EKind::S3:
Y_ABORT_UNLESS(proto.MutableExportToS3Settings()->ParseFromString(info.Settings));
proto.MutableExportToS3Settings()->clear_access_key();
proto.MutableExportToS3Settings()->clear_secret_key();
break;
}
_AuditLogXxportEnd(info, "EXPORT END", ExportKindSpecificParts(proto), SS);
}
void AuditLogImportEnd(const TImportInfo& info, TSchemeShard* SS) {
_AuditLogXxportEnd(info, "IMPORT END", ImportKindSpecificParts(info.Settings), SS);
}

void AuditLogLogin(const NKikimrScheme::TEvLogin& request, const NKikimrScheme::TEvLoginResult& response, TSchemeShard* SS) {
static const TString LoginOperationName = "LOGIN";

Expand Down
18 changes: 18 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_audit_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,30 @@ class TEvLogin;
class TEvLoginResult;
}

namespace NKikimrExport {
class TEvCreateExportRequest;
class TEvCreateExportResponse;
}

namespace NKikimrImport {
class TEvCreateImportRequest;
class TEvCreateImportResponse;
}

namespace NKikimr::NSchemeShard {

class TSchemeShard;
struct TExportInfo;
struct TImportInfo;

void AuditLogModifySchemeTransaction(const NKikimrScheme::TEvModifySchemeTransaction& request, const NKikimrScheme::TEvModifySchemeTransactionResult& response, TSchemeShard* SS, const TString& userSID);
void AuditLogModifySchemeTransactionDeprecated(const NKikimrScheme::TEvModifySchemeTransaction& request, const NKikimrScheme::TEvModifySchemeTransactionResult& response, TSchemeShard* SS, const TString& userSID);

void AuditLogExportStart(const NKikimrExport::TEvCreateExportRequest& request, const NKikimrExport::TEvCreateExportResponse& response, TSchemeShard* SS);
void AuditLogExportEnd(const TExportInfo& exportInfo, TSchemeShard* SS);

void AuditLogImportStart(const NKikimrImport::TEvCreateImportRequest& request, const NKikimrImport::TEvCreateImportResponse& response, TSchemeShard* SS);
void AuditLogImportEnd(const TImportInfo& importInfo, TSchemeShard* SS);

void AuditLogLogin(const NKikimrScheme::TEvLogin& request, const NKikimrScheme::TEvLoginResult& response, TSchemeShard* SS);
}
Loading
Loading