Skip to content

Fix enable checksums persistance #14757

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions ydb/core/tx/schemeshard/schemeshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
return true;
}

typedef std::tuple<TPathId, TString, TString, TString, TString, bool, TString, ui32> TBackupSettingsRec;
typedef std::tuple<TPathId, TString, TString, TString, TString, bool, TString, ui32, bool> TBackupSettingsRec;
typedef TDeque<TBackupSettingsRec> TBackupSettingsRows;

template <typename SchemaTable, typename TRowSet>
Expand All @@ -815,7 +815,8 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
rowSet.template GetValueOrDefault<typename SchemaTable::ScanSettings>(""),
rowSet.template GetValueOrDefault<typename SchemaTable::NeedToBill>(true),
rowSet.template GetValueOrDefault<typename SchemaTable::TableDescription>(""),
rowSet.template GetValueOrDefault<typename SchemaTable::NumberOfRetries>(0)
rowSet.template GetValueOrDefault<typename SchemaTable::NumberOfRetries>(0),
rowSet.template GetValueOrDefault<typename SchemaTable::EnableChecksums>(false)
);
}

Expand Down Expand Up @@ -3791,6 +3792,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
bool needToBill = std::get<5>(rec);
TString tableDesc = std::get<6>(rec);
ui32 nRetries = std::get<7>(rec);
bool enableChecksums = std::get<8>(rec);

Y_ABORT_UNLESS(tableName.size() > 0);

Expand All @@ -3800,6 +3802,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
tableInfo->BackupSettings.SetTableName(tableName);
tableInfo->BackupSettings.SetNeedToBill(needToBill);
tableInfo->BackupSettings.SetNumberOfRetries(nRetries);
tableInfo->BackupSettings.SetEnableChecksums(enableChecksums);

if (ytSerializedSettings) {
auto settings = tableInfo->BackupSettings.MutableYTSettings();
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/tx/schemeshard/schemeshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3360,15 +3360,17 @@ void TSchemeShard::PersistBackupSettings(
NIceDb::TUpdate<Schema::BackupSettings::ScanSettings>(settings.GetScanSettings().SerializeAsString()), \
NIceDb::TUpdate<Schema::BackupSettings::NeedToBill>(settings.GetNeedToBill()), \
NIceDb::TUpdate<Schema::BackupSettings::TableDescription>(settings.GetTable().SerializeAsString()), \
NIceDb::TUpdate<Schema::BackupSettings::NumberOfRetries>(settings.GetNumberOfRetries())); \
NIceDb::TUpdate<Schema::BackupSettings::NumberOfRetries>(settings.GetNumberOfRetries()), \
NIceDb::TUpdate<Schema::BackupSettings::EnableChecksums>(settings.GetEnableChecksums())); \
} else { \
db.Table<Schema::MigratedBackupSettings>().Key(pathId.OwnerId, pathId.LocalPathId).Update( \
NIceDb::TUpdate<Schema::MigratedBackupSettings::TableName>(settings.GetTableName()), \
NIceDb::TUpdate<Schema::MigratedBackupSettings::Kind>(settings.Get##Kind().SerializeAsString()), \
NIceDb::TUpdate<Schema::MigratedBackupSettings::ScanSettings>(settings.GetScanSettings().SerializeAsString()), \
NIceDb::TUpdate<Schema::MigratedBackupSettings::NeedToBill>(settings.GetNeedToBill()), \
NIceDb::TUpdate<Schema::MigratedBackupSettings::TableDescription>(settings.GetTable().SerializeAsString()), \
NIceDb::TUpdate<Schema::MigratedBackupSettings::NumberOfRetries>(settings.GetNumberOfRetries())); \
NIceDb::TUpdate<Schema::MigratedBackupSettings::NumberOfRetries>(settings.GetNumberOfRetries()), \
NIceDb::TUpdate<Schema::MigratedBackupSettings::EnableChecksums>(settings.GetEnableChecksums())); \
} \
}

Expand Down
8 changes: 6 additions & 2 deletions ydb/core/tx/schemeshard/schemeshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,7 @@ struct Schema : NIceDb::Schema {
struct NumberOfRetries : Column<8, NScheme::NTypeIds::Uint32> {};
struct ScanSettings : Column<9, NScheme::NTypeIds::String> {};
struct NeedToBill : Column<10, NScheme::NTypeIds::Bool> {};
struct EnableChecksums : Column<11, NScheme::NTypeIds::Bool> {};
// deprecated
struct CreateDestinationFlag : Column<4, NScheme::NTypeIds::Bool> {};
struct EraseOldDataFlag : Column<5, NScheme::NTypeIds::Bool> {};
Expand All @@ -598,7 +599,8 @@ struct Schema : NIceDb::Schema {
TableDescription,
NumberOfRetries,
ScanSettings,
NeedToBill
NeedToBill,
EnableChecksums
>;
};

Expand All @@ -613,6 +615,7 @@ struct Schema : NIceDb::Schema {
struct NumberOfRetries : Column<9, NScheme::NTypeIds::Uint32> {};
struct ScanSettings : Column<10, NScheme::NTypeIds::String> {};
struct NeedToBill : Column<11, NScheme::NTypeIds::Bool> {};
struct EnableChecksums : Column<12, NScheme::NTypeIds::Bool> {};
// deprecated
struct CreateDestinationFlag : Column<5, NScheme::NTypeIds::Bool> {};
struct EraseOldDataFlag : Column<6, NScheme::NTypeIds::Bool> {};
Expand All @@ -629,7 +632,8 @@ struct Schema : NIceDb::Schema {
TableDescription,
NumberOfRetries,
ScanSettings,
NeedToBill
NeedToBill,
EnableChecksums
>;
};

Expand Down
83 changes: 78 additions & 5 deletions ydb/core/tx/schemeshard/ut_export/ut_export.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
#include <ydb/core/metering/metering.h>
#include <ydb/core/protos/schemeshard/operations.pb.h>
#include <ydb/core/tablet_flat/shared_cache_events.h>
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
#include <ydb/core/tx/schemeshard/ut_helpers/auditlog_helpers.h>
#include <ydb/core/tx/schemeshard/schemeshard_billing_helpers.h>
#include <ydb/core/testlib/actors/block_events.h>
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/wrappers/ut_helpers/s3_mock.h>
#include <ydb/core/tx/schemeshard/schemeshard_billing_helpers.h>
#include <ydb/core/tx/schemeshard/ut_helpers/auditlog_helpers.h>
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
#include <ydb/core/wrappers/s3_wrapper.h>
#include <ydb/core/metering/metering.h>
#include <ydb/core/wrappers/ut_helpers/s3_mock.h>
#include <ydb/public/api/protos/ydb_export.pb.h>

#include <util/string/builder.h>
Expand Down Expand Up @@ -2386,6 +2387,78 @@ partitioning_settings {
)", port));
env.TestWaitNotification(runtime, txId);

UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().size(), 8);
const auto* dataChecksum = s3Mock.GetData().FindPtr("/data_00.csv.sha256");
UNIT_ASSERT(dataChecksum);
UNIT_ASSERT_VALUES_EQUAL(*dataChecksum, "19dcd641390a61063ee45f3e6e06b8f0d3acfc33f934b9bf1ba204668a98f21d data_00.csv");

const auto* metadataChecksum = s3Mock.GetData().FindPtr("/metadata.json.sha256");
UNIT_ASSERT(metadataChecksum);
UNIT_ASSERT_VALUES_EQUAL(*metadataChecksum, "b72575244ae0cce8dffd45f3537d1e412bfe39de4268f4f85f529cb529870903 metadata.json");

const auto* schemeChecksum = s3Mock.GetData().FindPtr("/scheme.pb.sha256");
UNIT_ASSERT(schemeChecksum);
UNIT_ASSERT_VALUES_EQUAL(*schemeChecksum, "cb1fb80965ae92e6369acda2b3b5921fd5518c97d6437f467ce00492907f9eb6 scheme.pb");

const auto* permissionsChecksum = s3Mock.GetData().FindPtr("/permissions.pb.sha256");
UNIT_ASSERT(permissionsChecksum);
UNIT_ASSERT_VALUES_EQUAL(*permissionsChecksum, "b41fd8921ff3a7314d9c702dc0e71aace6af8443e0102add0432895c5e50a326 permissions.pb");
}

Y_UNIT_TEST(EnableChecksumsPersistance) {
TTestBasicRuntime runtime;
TTestEnv env(runtime);
ui64 txId = 100;

// Create test table
TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)");
env.TestWaitNotification(runtime, txId);

// Add some test data
UploadRow(runtime, "/MyRoot/Table", 0, {1}, {2}, {TCell::Make(1u)}, {TCell::Make(1u)});

TPortManager portManager;
const ui16 port = portManager.GetPort();

TS3Mock s3Mock({}, TS3Mock::TSettings(port));
UNIT_ASSERT(s3Mock.Start());

// Block sending backup task to datashards
TBlockEvents<TEvDataShard::TEvProposeTransaction> block(runtime, [](auto& ev) {
NKikimrTxDataShard::TFlatSchemeTransaction schemeTx;
UNIT_ASSERT(schemeTx.ParseFromString(ev.Get()->Get()->GetTxBody()));
return schemeTx.HasBackup();
});

// Start export and expect it to be blocked
TestExport(runtime, ++txId, "/MyRoot", Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_path: "/MyRoot/Table"
destination_prefix: ""
}
}
)", port));

runtime.WaitFor("backup task is sent to datashards", [&]{ return block.size() >= 1; });

// Stop blocking new events
block.Stop();

// Reboot SchemeShard to resend backup task
RebootTablet(runtime, TTestTxConfig::SchemeShard, runtime.AllocateEdgeActor());

// Wait for export to complete
env.TestWaitNotification(runtime, txId);

// Verify checksums are created
UNIT_ASSERT_VALUES_EQUAL(s3Mock.GetData().size(), 8);

const auto* dataChecksum = s3Mock.GetData().FindPtr("/data_00.csv.sha256");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,11 @@
"ColumnId": 10,
"ColumnName": "NeedToBill",
"ColumnType": "Bool"
},
{
"ColumnId": 11,
"ColumnName": "EnableChecksums",
"ColumnType": "Bool"
}
],
"ColumnsDropped": [],
Expand All @@ -1184,7 +1189,8 @@
7,
8,
9,
10
10,
11
],
"RoomID": 0,
"Codec": 0,
Expand Down Expand Up @@ -4549,6 +4555,11 @@
"ColumnId": 11,
"ColumnName": "NeedToBill",
"ColumnType": "Bool"
},
{
"ColumnId": 12,
"ColumnName": "EnableChecksums",
"ColumnType": "Bool"
}
],
"ColumnsDropped": [],
Expand All @@ -4565,7 +4576,8 @@
8,
9,
10,
11
11,
12
],
"RoomID": 0,
"Codec": 0,
Expand Down
Loading