Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ydb/core/grpc_services/rpc_list_operations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class TListOperationsRPC: public TRpcOperationRequestActor<TListOperationsRPC, T
return "[ListIndexBuilds]";
case TOperationId::SCRIPT_EXECUTION:
return "[ListScriptExecutions]";
case TOperationId::SS_BG_TASKS:
return "[SchemeShardTasks]";
default:
return "[Untagged]";
}
Expand Down Expand Up @@ -165,6 +167,7 @@ class TListOperationsRPC: public TRpcOperationRequestActor<TListOperationsRPC, T
case TOperationId::EXPORT:
case TOperationId::IMPORT:
case TOperationId::BUILD_INDEX:
case TOperationId::SS_BG_TASKS:
break;
case TOperationId::SCRIPT_EXECUTION:
SendListScriptExecutions();
Expand All @@ -181,6 +184,7 @@ class TListOperationsRPC: public TRpcOperationRequestActor<TListOperationsRPC, T
switch (ev->GetTypeRewrite()) {
hFunc(TEvExport::TEvListExportsResponse, Handle);
hFunc(TEvImport::TEvListImportsResponse, Handle);
hFunc(NSchemeShard::NBackground::TEvListResponse, Handle);
hFunc(TEvIndexBuilder::TEvListResponse, Handle);
hFunc(NKqp::TEvListScriptExecutionOperationsResponse, Handle);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ TConclusionStatus TAlterShardingOperation::DoDeserialize(NYql::TObjectSettingsIm
if (*modification == "SPLIT") {
Increase = true;
} else if (*modification == "MERGE") {
return TConclusionStatus::Fail("modification is impossible yet");
Increase = false;
} else {
return TConclusionStatus::Fail("undefined modification: \"" + *modification + "\"");
}
Expand All @@ -20,9 +20,10 @@ TConclusionStatus TAlterShardingOperation::DoDeserialize(NYql::TObjectSettingsIm

void TAlterShardingOperation::DoSerializeScheme(NKikimrSchemeOp::TModifyScheme& scheme, const bool isStandalone) const {
AFL_VERIFY(!isStandalone);
AFL_VERIFY(!!Increase);
scheme.SetOperationType(NKikimrSchemeOp::ESchemeOpAlterColumnTable);
scheme.MutableAlterColumnTable()->SetName(GetStoreName());
*scheme.MutableAlterColumnTable()->MutableReshardColumnTable() = NKikimrSchemeOp::TReshardColumnTable();
scheme.MutableAlterColumnTable()->MutableReshardColumnTable()->SetIncrease(*Increase);
}

}
149 changes: 149 additions & 0 deletions ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include "helpers/typed_local.h"
#include "helpers/local.h"
#include "helpers/writer.h"
#include <ydb/core/tx/columnshard/data_sharing/initiator/controller/abstract.h>
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
#include <ydb/core/tx/columnshard/common/snapshot.h>
Expand All @@ -7,6 +9,8 @@
#include <ydb/core/tx/columnshard/data_sharing/destination/session/destination.h>
#include <ydb/core/tx/columnshard/data_sharing/destination/events/control.h>
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
#include <ydb/public/sdk/cpp/client/ydb_ss_tasks/task.h>

namespace NKikimr::NKqp {

Expand Down Expand Up @@ -261,6 +265,151 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
tester.Execute(0, {1, 2}, false, NOlap::TSnapshot(TInstant::Now().MilliSeconds(), 1232123), {0});
tester.WaitNormalization();
}

class TReshardingTest {
private:
YDB_ACCESSOR(TString, ShardingType, "HASH_FUNCTION_CONSISTENCY_64");

void WaitResharding() {
const TInstant start = TInstant::Now();
bool clean = false;
while (TInstant::Now() - start < TDuration::Seconds(200)) {
NYdb::NOperation::TOperationClient operationClient(Kikimr.GetDriver());
auto result = operationClient.List<NYdb::NSchemeShard::TBackgroundProcessesResponse>().GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
if (result.GetList().size() == 0) {
Cerr << "RESHARDING_FINISHED" << Endl;
clean = true;
break;
}
UNIT_ASSERT_VALUES_EQUAL(result.GetList().size(), 1);
Sleep(TDuration::Seconds(1));
Cerr << "WAIT_FINISHED..." << Endl;
}
AFL_VERIFY(clean);
}

void CheckCount(const ui32 expectation) {
auto it = Kikimr.GetTableClient().StreamExecuteScanQuery(R"(
--!syntax_v1

SELECT
COUNT(*)
FROM `/Root/olapStore/olapTable`
)").GetValueSync();

UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
TString result = StreamResultToYson(it);
Cerr << result << Endl;
CompareYson(result, "[[" + ::ToString(expectation) + "u;]]");
}

TKikimrRunner Kikimr;
public:

TReshardingTest()
: Kikimr(TKikimrSettings().SetWithSampleTables(false))
{

}

void Execute() {
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
csController->SetPeriodicWakeupActivationPeriod(TDuration::Seconds(1));
csController->SetLagForCompactionBeforeTierings(TDuration::Seconds(1));
csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);

TLocalHelper(Kikimr).SetShardingMethod(ShardingType).CreateTestOlapTable("olapTable", "olapStore", 16, 4);
auto tableClient = Kikimr.GetTableClient();

Tests::NCommon::TLoggerInit(Kikimr).SetComponents({ NKikimrServices::TX_COLUMNSHARD }, "CS").SetPriority(NActors::NLog::PRI_DEBUG).Initialize();

std::vector<TString> uids;
std::vector<TString> resourceIds;
std::vector<ui32> levels;

{
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000);
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 1100000, 300100000, 10000);
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 1200000, 300200000, 10000);
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 1300000, 300300000, 10000);
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 1400000, 300400000, 10000);
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 2000000, 200000000, 70000);
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 3000000, 100000000, 110000);

const auto filler = [&](const ui32 startRes, const ui32 startUid, const ui32 count) {
for (ui32 i = 0; i < count; ++i) {
uids.emplace_back("uid_" + ::ToString(startUid + i));
resourceIds.emplace_back(::ToString(startRes + i));
levels.emplace_back(i % 5);
}
};

filler(1000000, 300000000, 10000);
filler(1100000, 300100000, 10000);
filler(1200000, 300200000, 10000);
filler(1300000, 300300000, 10000);
filler(1400000, 300400000, 10000);
filler(2000000, 200000000, 70000);
filler(3000000, 100000000, 110000);

}

CheckCount(230000);
{
auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=SPLIT);)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}

WaitResharding();
AFL_VERIFY(csController->GetShardingFiltersCount().Val() == 0);
CheckCount(230000);

i64 count = csController->GetShardingFiltersCount().Val();
AFL_VERIFY(count == 16)("count", count);
WriteTestData(Kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000);
csController->WaitIndexation(TDuration::Seconds(5));
csController->WaitCompactions(TDuration::Seconds(5));

csController->SetCompactionControl(NYDBTest::EOptimizerCompactionWeightControl::Disable);

CheckCount(230000);

AFL_VERIFY(count == csController->GetShardingFiltersCount().Val())("count", count)("val", csController->GetShardingFiltersCount().Val());
const ui32 portionsCount = 8;
for (ui32 i = 0; i < 3; ++i) {
{
auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=MERGE);)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
WaitResharding();
csController->WaitCleaning(TDuration::Seconds(5));

CheckCount(230000);
AFL_VERIFY(count + portionsCount == csController->GetShardingFiltersCount().Val())("count", count)("val", csController->GetShardingFiltersCount().Val());
count += portionsCount;
}
{
auto alterQuery = TStringBuilder() << R"(ALTER OBJECT `/Root/olapStore/olapTable` (TYPE TABLESTORE) SET (ACTION=ALTER_SHARDING, MODIFICATION=MERGE);)";
auto session = tableClient.CreateSession().GetValueSync().GetSession();
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_UNEQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
}
};

Y_UNIT_TEST(TableReshardingConsistency64) {
TReshardingTest().SetShardingType("HASH_FUNCTION_CONSISTENCY_64").Execute();
}

Y_UNIT_TEST(TableReshardingModuloN) {
TReshardingTest().SetShardingType("HASH_FUNCTION_MODULO_N").Execute();
}

}

}
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/olap/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ PEERDIR(
ydb/core/tx/columnshard
ydb/core/kqp/ut/olap/helpers
ydb/core/tx/datashard/ut_common
ydb/public/sdk/cpp/client/ydb_operation
)

YQL_LAST_ABI_VERSION()
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/flat_scheme_op.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1626,6 +1626,7 @@ message TShardingModification {
message TShardingTransfer {
optional uint64 DestinationTabletId = 1;
repeated uint64 SourceTabletIds = 2;
optional bool Moving = 3 [default = false];
}

message TShardingTransfers {
Expand Down
10 changes: 6 additions & 4 deletions ydb/core/tx/columnshard/blobs_action/bs/blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,10 @@ std::shared_ptr<NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTas
auto logoBlobId = unifiedBlobId.GetLogoBlobId();
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_delete_gc", logoBlobId);
NBlobOperations::NBlobStorage::TGCTask::TGCLists& gl = perGroupGCListsInFlight[unifiedBlobId.GetDsGroup()];
BlobsManagerCounters.OnCollectDropExplicit(logoBlobId.BlobSize());
gl.DontKeepList.insert(logoBlobId);
if (!sharedBlobsInfo->BuildStoreCategories({ unifiedBlobId }).GetDirect().IsEmpty()) {
BlobsManagerCounters.OnCollectDropExplicit(logoBlobId.BlobSize());
gl.DontKeepList.insert(logoBlobId);
}
}
if (extractedToRemoveFromDB.GetSize() >= blobsGCCountLimit) {
newCollectGenSteps.clear();
Expand Down Expand Up @@ -314,8 +316,8 @@ std::shared_ptr<NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTas
auto logoBlobId = unifiedBlobId.GetLogoBlobId();
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_delete_gc", logoBlobId);
NBlobOperations::NBlobStorage::TGCTask::TGCLists& gl = perGroupGCListsInFlight[unifiedBlobId.GetDsGroup()];
bool skipDontKeep = false;
if (gl.KeepList.erase(logoBlobId)) {
bool skipDontKeep = sharedBlobsInfo->BuildStoreCategories({ unifiedBlobId }).GetDirect().IsEmpty();
if (!skipDontKeep && gl.KeepList.erase(logoBlobId)) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_keep_gc_remove", logoBlobId);
// Skipped blobs still need to be deleted from BlobsToKeep table
if (CurrentGen == logoBlobId.Generation()) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ struct TEvColumnShard {
ui64 txId, TString txBody, const ui32 flags = 0)
: TEvProposeTransaction(txKind, source, txId, std::move(txBody), flags)
{
Y_ABORT_UNLESS(txKind == NKikimrTxColumnShard::TX_KIND_SCHEMA);
// Y_ABORT_UNLESS(txKind == NKikimrTxColumnShard::TX_KIND_SCHEMA);
Record.SetSchemeShardId(ssId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ NKikimr::TConclusionStatus TDestinationSession::DataReceived(THashMap<ui64, NEve
auto it = PathIds.find(i.first);
AFL_VERIFY(it != PathIds.end())("path_id_undefined", i.first);
for (auto&& portion : i.second.DetachPortions()) {
portion.ResetShardingVersion();
portion.SetPathId(it->second);
index.UpsertPortion(std::move(portion));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ TTxDataFromSource::TTxDataFromSource(NColumnShard::TColumnShard* self, const std
++p;
} else {
i.second.MutablePortions()[p] = std::move(i.second.MutablePortions().back());
i.second.MutablePortions()[p].ResetShardingVersion();
i.second.MutablePortions().pop_back();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,10 @@ class TBlobSharing {
if (i != selfTabletId) {
TStorageTabletTask task(StorageId, i);
task.AddRemapOwner(BlobId, selfTabletId, toTabletId);
AFL_VERIFY(result.emplace(i, std::move(task)).second);
auto info = result.emplace(i, task);
if (!info.second) {
info.first->second.Merge(task);
}
}

{
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/engines/column_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ class IColumnEngine {
virtual const std::shared_ptr<arrow::Schema>& GetReplaceKey() const;

virtual bool HasDataInPathId(const ui64 pathId) const = 0;
virtual bool ErasePathId(const ui64 pathId) = 0;
virtual bool Load(IDbWrapper& db) = 0;
void RegisterTable(const ui64 pathId) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "RegisterTable")("path_id", pathId);
Expand Down
8 changes: 0 additions & 8 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,6 @@ std::shared_ptr<TCleanupTablesColumnEngineChanges> TColumnEngineForLogs::StartCl

ui64 txSize = 0;
const ui64 txSizeLimit = TGlobalLimits::TxWriteLimitBytes / 4;
THashSet<ui64> pathsToRemove;
for (ui64 pathId : pathsToDrop) {
if (!HasDataInPathId(pathId)) {
changes->TablesToDrop.emplace(pathId);
Expand All @@ -308,9 +307,6 @@ std::shared_ptr<TCleanupTablesColumnEngineChanges> TColumnEngineForLogs::StartCl
break;
}
}
for (auto&& i : pathsToRemove) {
pathsToDrop.erase(i);
}
if (changes->TablesToDrop.empty()) {
return nullptr;
}
Expand Down Expand Up @@ -452,10 +448,6 @@ bool TColumnEngineForLogs::ApplyChangesOnExecute(IDbWrapper& db, std::shared_ptr
return true;
}

void TColumnEngineForLogs::EraseTable(const ui64 pathId) {
GranulesStorage->EraseTable(pathId);
}

void TColumnEngineForLogs::UpsertPortion(const TPortionInfo& portionInfo, const TPortionInfo* exInfo) {
if (exInfo) {
UpdatePortionStats(portionInfo, EStatsUpdateType::DEFAULT, exInfo);
Expand Down
10 changes: 8 additions & 2 deletions ydb/core/tx/columnshard/engines/column_engine_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ class TColumnEngineForLogs : public IColumnEngine {
return !!GranulesStorage->GetPortionOptional(pathId, portionId);
}

virtual bool ErasePathId(const ui64 pathId) override {
if (HasDataInPathId(pathId)) {
return false;
}
return GranulesStorage->EraseTable(pathId);
}


virtual bool HasDataInPathId(const ui64 pathId) const override {
auto g = GetGranuleOptional(pathId);
return g && g->GetPortions().size();
Expand Down Expand Up @@ -187,8 +195,6 @@ class TColumnEngineForLogs : public IColumnEngine {
bool LoadShardingInfo(IDbWrapper& db);
bool LoadCounters(IDbWrapper& db);

void EraseTable(const ui64 pathId);

void UpsertPortion(const TPortionInfo& portionInfo, const TPortionInfo* exInfo = nullptr);
bool ErasePortion(const TPortionInfo& portionInfo, bool updateStats = true);
void UpdatePortionStats(const TPortionInfo& portionInfo, EStatsUpdateType updateType = EStatsUpdateType::DEFAULT, const TPortionInfo* exPortionInfo = nullptr);
Expand Down
Loading