Skip to content

separate schema caches by tenant (#17729) #18028

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
, PeriodicWakeupActivationPeriod(NYDBTest::TControllers::GetColumnShardController()->GetPeriodicWakeupActivationPeriod())
, StatsReportInterval(NYDBTest::TControllers::GetColumnShardController()->GetStatsReportInterval())
, InFlightReadsTracker(StoragesManager, Counters.GetRequestsTracingCounters())
, TablesManager(StoragesManager, std::make_shared<NOlap::NDataAccessorControl::TLocalManager>(nullptr),
std::make_shared<NOlap::TSchemaObjectsCache>(), Counters.GetPortionIndexCounters(), info->TabletID)
, TablesManager(StoragesManager, std::make_shared<NOlap::NDataAccessorControl::TLocalManager>(nullptr), nullptr,
Counters.GetPortionIndexCounters(), info->TabletID)
, Subscribers(std::make_shared<NSubscriber::TManager>(*this))
, PipeClientCache(NTabletPipe::CreateBoundedClientCache(new NTabletPipe::TBoundedClientCacheConfig(), GetPipeClientConfig()))
, InsertTable(std::make_unique<NOlap::TInsertTable>())
Expand Down Expand Up @@ -360,6 +360,7 @@ void TColumnShard::RunInit(const NKikimrTxColumnShard::TInitShard& proto, const
if (proto.HasOwnerPathId()) {
OwnerPathId = proto.GetOwnerPathId();
Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPathId, OwnerPathId);
TablesManager.SetSchemaObjectsCache(NOlap::TSchemaCachesManager::GetCache(OwnerPathId, Info()->TenantPathId));
}

if (proto.HasOwnerPath()) {
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::share
, Counters(counters)
, LastPortion(0)
, LastGranule(0) {
AFL_VERIFY(SchemaObjectsCache);
ActualizationController = std::make_shared<NActualizer::TController>();
RegisterSchemaVersion(snapshot, presetId, schema);
}
Expand All @@ -52,6 +53,7 @@ TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::share
, Counters(counters)
, LastPortion(0)
, LastGranule(0) {
AFL_VERIFY(SchemaObjectsCache);
ActualizationController = std::make_shared<NActualizer::TController>();
RegisterSchemaVersion(snapshot, presetId, std::move(schema));
}
Expand All @@ -62,7 +64,7 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, cons
bool switchAccessorsManager = false;
if (!VersionedIndex.IsEmpty()) {
const NOlap::TIndexInfo& lastIndexInfo = VersionedIndex.GetLastSchema()->GetIndexInfo();
Y_ABORT_UNLESS(lastIndexInfo.CheckCompatible(indexInfo));
lastIndexInfo.CheckCompatible(indexInfo).Validate();
switchOptimizer = !indexInfo.GetCompactionPlannerConstructor()->IsEqualTo(lastIndexInfo.GetCompactionPlannerConstructor());
switchAccessorsManager = !indexInfo.GetMetadataManagerConstructor()->IsEqualTo(*lastIndexInfo.GetMetadataManagerConstructor());
}
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/tx/columnshard/engines/scheme/index_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@

namespace NKikimr::NOlap {

bool TIndexInfo::CheckCompatible(const TIndexInfo& other) const {
TConclusionStatus TIndexInfo::CheckCompatible(const TIndexInfo& other) const {
if (!other.GetPrimaryKey()->Equals(PrimaryKey)) {
return false;
return TConclusionStatus::Fail(
TStringBuilder() << "PK mismatch: this=" << PrimaryKey->ToString() << " other=" << other.GetPrimaryKey()->ToString());
}
return true;
return TConclusionStatus::Success();
}

ui32 TIndexInfo::GetColumnIdVerified(const std::string& name) const {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/scheme/index_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ struct TIndexInfo: public IIndexInfo {
return Version;
}

bool CheckCompatible(const TIndexInfo& other) const;
TConclusionStatus CheckCompatible(const TIndexInfo& other) const;
NArrow::NSerialization::TSerializerContainer GetDefaultSerializer() const {
return DefaultSerializer;
}
Expand Down
35 changes: 26 additions & 9 deletions ydb/core/tx/columnshard/engines/scheme/objects_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,36 @@ class TSchemaObjectsCache {

class TSchemaCachesManager {
private:
THashMap<ui64, std::shared_ptr<TSchemaObjectsCache>> CacheByTableOwner;
TMutex Mutex;
class TColumnOwnerId {
private:
TPathId Tenant;
TLocalPathId Owner;

public:
TColumnOwnerId(const TPathId& tenant, const TLocalPathId owner)
: Tenant(tenant)
, Owner(owner) {
AFL_VERIFY(!!Owner);
}

std::shared_ptr<TSchemaObjectsCache> GetCacheImpl(const ui64 ownerPathId) {
if (!ownerPathId) {
return std::make_shared<TSchemaObjectsCache>();
operator size_t() const {
return CombineHashes(Owner, Tenant.Hash());
}
bool operator==(const TColumnOwnerId& other) const {
return Tenant == other.Tenant && Owner == other.Owner;
}
};

THashMap<TColumnOwnerId, std::shared_ptr<TSchemaObjectsCache>> CacheByTableOwner;
TMutex Mutex;

std::shared_ptr<TSchemaObjectsCache> GetCacheImpl(const TColumnOwnerId& owner) {
TGuard lock(Mutex);
auto findCache = CacheByTableOwner.FindPtr(ownerPathId);
auto findCache = CacheByTableOwner.FindPtr(owner);
if (findCache) {
return *findCache;
}
return CacheByTableOwner.emplace(ownerPathId, std::make_shared<TSchemaObjectsCache>()).first->second;
return CacheByTableOwner.emplace(owner, std::make_shared<TSchemaObjectsCache>()).first->second;
}

void DropCachesImpl() {
Expand All @@ -100,8 +117,8 @@ class TSchemaCachesManager {
}

public:
static std::shared_ptr<TSchemaObjectsCache> GetCache(const ui64 ownerPathId) {
return Singleton<TSchemaCachesManager>()->GetCacheImpl(ownerPathId);
static std::shared_ptr<TSchemaObjectsCache> GetCache(const ui64 ownerPathId, const TPathId& tenantPathId) {
return Singleton<TSchemaCachesManager>()->GetCacheImpl(TColumnOwnerId(tenantPathId, ownerPathId));
}

static void DropCaches() {
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/loading/stages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,9 @@ bool TSpecialValuesInitializer::DoPrecharge(NTabletFlatExecutor::TTransactionCon
bool TTablesManagerInitializer::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) {
NIceDb::TNiceDb db(txc.DB);
TTablesManager tablesManagerLocal(Self->StoragesManager, Self->DataAccessorsManager.GetObjectPtrVerified(),
NOlap::TSchemaCachesManager::GetCache(Self->OwnerPathId), Self->Counters.GetPortionIndexCounters(), Self->TabletID());
Self->OwnerPathId ? NOlap::TSchemaCachesManager::GetCache(Self->OwnerPathId, Self->Info()->TenantPathId)
: std::shared_ptr<NOlap::TSchemaObjectsCache>(),
Self->Counters.GetPortionIndexCounters(), Self->TabletID());
{
TMemoryProfileGuard g("TTxInit/TTablesManager");
if (!tablesManagerLocal.InitFromDB(db)) {
Expand Down
9 changes: 4 additions & 5 deletions ydb/core/tx/columnshard/tables_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
"version", info.GetSchema().GetVersion());
NOlap::IColumnEngine::TSchemaInitializationData schemaInitializationData(info);
if (!PrimaryIndex) {
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, SchemaObjectsCache, DataAccessorsManager, StoragesManager,
version, preset->Id, schemaInitializationData, PortionsStats);
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, SchemaObjectsCache.GetObjectPtrVerified(),
DataAccessorsManager, StoragesManager, version, preset->Id, schemaInitializationData, PortionsStats);
} else if (PrimaryIndex->GetVersionedIndex().IsEmpty() ||
info.GetSchema().GetVersion() > PrimaryIndex->GetVersionedIndex().GetLastSchema()->GetVersion()) {
PrimaryIndex->RegisterSchemaVersion(version, preset->Id, schemaInitializationData);
Expand Down Expand Up @@ -279,8 +279,8 @@ void TTablesManager::AddSchemaVersion(
versionInfo.MutableSchema()->SetEngine(NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES);
Schema::SaveSchemaPresetVersionInfo(db, presetId, version, versionInfo);
if (!PrimaryIndex) {
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, SchemaObjectsCache, DataAccessorsManager, StoragesManager,
version, presetId, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo), PortionsStats);
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, SchemaObjectsCache.GetObjectPtrVerified(), DataAccessorsManager,
StoragesManager, version, presetId, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo), PortionsStats);
for (auto&& i : Tables) {
PrimaryIndex->RegisterTable(i.first);
}
Expand Down Expand Up @@ -339,7 +339,6 @@ TTablesManager::TTablesManager(const std::shared_ptr<NOlap::IStoragesManager>& s
, SchemaObjectsCache(schemaCache)
, PortionsStats(portionsStats)
, TabletId(tabletId) {
AFL_VERIFY(SchemaObjectsCache);
}

bool TTablesManager::TryFinalizeDropPathOnExecute(NTable::TDatabase& dbTable, const ui64 pathId) const {
Expand Down
8 changes: 7 additions & 1 deletion ydb/core/tx/columnshard/tables_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class TTablesManager {
std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager> DataAccessorsManager;
std::unique_ptr<TTableLoadTimeCounters> LoadTimeCounters;
std::shared_ptr<NOlap::TSchemaObjectsCache> SchemaObjectsCache;
NBackgroundTasks::TControlInterfaceContainer<NOlap::TSchemaObjectsCache> SchemaObjectsCache;
std::shared_ptr<TPortionIndexStats> PortionsStats;
ui64 TabletId = 0;

Expand Down Expand Up @@ -330,6 +330,12 @@ class TTablesManager {

[[nodiscard]] std::unique_ptr<NTabletFlatExecutor::ITransaction> CreateAddShardingInfoTx(TColumnShard& owner, const ui64 pathId,
const ui64 versionId, const NSharding::TGranuleShardingLogicContainer& tabletShardingLogic) const;

void SetSchemaObjectsCache(const std::shared_ptr<NOlap::TSchemaObjectsCache>& cache) {
AFL_VERIFY(cache);
AFL_VERIFY(!SchemaObjectsCache);
SchemaObjectsCache = cache;
}
};

} // namespace NKikimr::NColumnShard
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ void TTester::Setup(TTestActorRuntime& runtime) {
runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD_SCAN, NActors::NLog::PRI_DEBUG);
runtime.SetLogPriority(NKikimrServices::S3_WRAPPER, NLog::PRI_DEBUG);

NOlap::TSchemaCachesManager::DropCaches();

ui32 domainId = 0;
ui32 planResolution = 500;

Expand Down Expand Up @@ -449,7 +451,6 @@ namespace NKikimr::NColumnShard {
}

void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, const TString& txBody, const NOlap::TSnapshot& snapshot, bool succeed) {

auto controller = NYDBTest::TControllers::GetControllerAs<NYDBTest::NColumnShard::TController>();
while (controller && !controller->IsActiveTablet(TTestTxConfig::TxTablet0)) {
runtime.SimulateSleep(TDuration::Seconds(1));
Expand All @@ -463,21 +464,19 @@ namespace NKikimr::NColumnShard {
}
}

void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId,
const TestTableDescription& table, TString codec) {
void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId, const TestTableDescription& table, TString codec) {
using namespace NTxUT;
NOlap::TSnapshot snapshot(10, 10);
TString txBody;
auto specials = TTestSchema::TTableSpecials().WithCodec(codec);
if (table.InStore) {
txBody = TTestSchema::CreateTableTxBody(pathId, table.Schema, table.Pk, specials);
txBody = TTestSchema::CreateInitShardTxBody(pathId, table.Schema, table.Pk, specials);
} else {
txBody = TTestSchema::CreateStandaloneTableTxBody(pathId, table.Schema, table.Pk, specials);
}
SetupSchema(runtime, sender, txBody, snapshot, true);
}


void PrepareTablet(TTestBasicRuntime& runtime, const ui64 tableId, const std::vector<NArrow::NTest::TTestColumn>& schema, const ui32 keySize) {
using namespace NTxUT;
CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard);
Expand Down
25 changes: 21 additions & 4 deletions ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,18 @@ struct TTestSchema {
NKikimrTxColumnShard::TSchemaTxBody tx;
auto* table = tx.MutableInitShard()->AddTables();
tx.MutableInitShard()->SetOwnerPath(ownerPath);
tx.MutableInitShard()->SetOwnerPathId(pathId);
table->SetPathId(pathId);

InitSchema(columns, pk, specials, table->MutableSchema());
{ // preset
auto* preset = table->MutableSchemaPreset();
preset->SetId(1);
preset->SetName("default");

// schema
InitSchema(columns, pk, specials, preset->MutableSchema());
}

InitTiersAndTtl(specials, table->MutableTtlSettings());

Cerr << "CreateInitShard: " << tx << "\n";
Expand All @@ -308,9 +317,11 @@ struct TTestSchema {
}

static TString CreateStandaloneTableTxBody(ui64 pathId, const std::vector<NArrow::NTest::TTestColumn>& columns,
const std::vector<NArrow::NTest::TTestColumn>& pk, const TTableSpecials& specials = {}) {
const std::vector<NArrow::NTest::TTestColumn>& pk, const TTableSpecials& specials = {}, const TString& path = "/Root/olap") {
NKikimrTxColumnShard::TSchemaTxBody tx;
auto* table = tx.MutableEnsureTables()->AddTables();
auto* table = tx.MutableInitShard()->AddTables();
tx.MutableInitShard()->SetOwnerPath(path);
tx.MutableInitShard()->SetOwnerPathId(pathId);
table->SetPathId(pathId);

InitSchema(columns, pk, specials, table->MutableSchema());
Expand All @@ -323,12 +334,18 @@ struct TTestSchema {
return out;
}

static TString AlterTableTxBody(ui64 pathId, ui32 version, const TTableSpecials& specials) {
static TString AlterTableTxBody(ui64 pathId, ui32 version, const std::vector<NArrow::NTest::TTestColumn>& columns,
const std::vector<NArrow::NTest::TTestColumn>& pk, const TTableSpecials& specials) {
NKikimrTxColumnShard::TSchemaTxBody tx;
auto* table = tx.MutableAlterTable();
table->SetPathId(pathId);
tx.MutableSeqNo()->SetRound(version);

auto* preset = table->MutableSchemaPreset();
preset->SetId(1);
preset->SetName("default");
InitSchema(columns, pk, specials, preset->MutableSchema());

auto* ttlSettings = table->MutableTtlSettings();
if (!InitTiersAndTtl(specials, ttlSettings)) {
ttlSettings->MutableDisabled();
Expand Down
21 changes: 10 additions & 11 deletions ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
spec.EvictAfter = TDuration::Seconds(ttlSec);
}
SetupSchema(runtime, sender,
TTestSchema::AlterTableTxBody(tableId, 2, spec),
TTestSchema::AlterTableTxBody(tableId, 2, ydbSchema, testYdbPk, spec),
NOlap::TSnapshot(++planStep, ++txId));
if (spec.HasTiers()) {
csControllerGuard->OverrideTierConfigs(runtime, sender, TTestSchema::BuildSnapshot(spec));
Expand All @@ -314,9 +314,8 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
}

// Disable TTL
auto ok = ProposeSchemaTx(runtime, sender,
TTestSchema::AlterTableTxBody(tableId, 3, TTestSchema::TTableSpecials()),
NOlap::TSnapshot(++planStep, ++txId));
auto ok = ProposeSchemaTx(runtime, sender, TTestSchema::AlterTableTxBody(tableId, 3, ydbSchema, testYdbPk, TTestSchema::TTableSpecials()),
NOlap::TSnapshot(++planStep, ++txId));
UNIT_ASSERT(ok);
if (spec.HasTiers()) {
csControllerGuard->OverrideTierConfigs(runtime, sender, TTestSchema::BuildSnapshot(TTestSchema::TTableSpecials()));
Expand Down Expand Up @@ -630,8 +629,7 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt
}
if (i) {
const ui32 version = 2 * i + 1;
SetupSchema(runtime, sender,
TTestSchema::AlterTableTxBody(tableId, version, specs[i]),
SetupSchema(runtime, sender, TTestSchema::AlterTableTxBody(tableId, version, testYdbSchema, testYdbPk, specs[i]),
NOlap::TSnapshot(++planStep, ++txId));
}
if (specs[i].HasTiers() || reboots) {
Expand Down Expand Up @@ -970,7 +968,7 @@ void TestDrop(bool reboots) {
ui64 planStep = 1000000000; // greater then delays
ui64 txId = 100;

SetupSchema(runtime, sender, TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk),
SetupSchema(runtime, sender, TTestSchema::CreateInitShardTxBody(tableId, testYdbSchema, testYdbPk),
NOlap::TSnapshot(++planStep, ++txId));
//

Expand Down Expand Up @@ -1039,7 +1037,7 @@ void TestDropWriteRace() {
NLongTxService::TLongTxId longTxId;
UNIT_ASSERT(longTxId.ParseString("ydb://long-tx/01ezvvxjdk2hd4vdgjs68knvp8?node_id=1"));

SetupSchema(runtime, sender, TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk),
SetupSchema(runtime, sender, TTestSchema::CreateInitShardTxBody(tableId, testYdbSchema, testYdbPk),
NOlap::TSnapshot(++planStep, ++txId));
TString data = MakeTestBlob({0, 100}, testYdbSchema);
UNIT_ASSERT(data.size() < NColumnShard::TLimits::MIN_BYTES_TO_INSERT);
Expand Down Expand Up @@ -1080,7 +1078,7 @@ void TestCompaction(std::optional<ui32> numWrites = {}) {
ui64 planStep = 100;
ui64 txId = 100;

SetupSchema(runtime, sender, TTestSchema::CreateTableTxBody(tableId, testYdbSchema, testYdbPk),
SetupSchema(runtime, sender, TTestSchema::CreateInitShardTxBody(tableId, testYdbSchema, testYdbPk),
NOlap::TSnapshot(++planStep, ++txId));
// Set tiering

Expand All @@ -1097,8 +1095,8 @@ void TestCompaction(std::optional<ui32> numWrites = {}) {
spec.Tiers.back().EvictAfter = allow;
spec.Tiers.back().S3 = TTestSchema::TStorageTier::FakeS3();

SetupSchema(runtime, sender, TTestSchema::AlterTableTxBody(tableId, 1, spec),
NOlap::TSnapshot(++planStep, ++txId));
SetupSchema(
runtime, sender, TTestSchema::AlterTableTxBody(tableId, 1, testYdbSchema, testYdbPk, spec), NOlap::TSnapshot(++planStep, ++txId));
csControllerGuard->OverrideTierConfigs(runtime, sender, TTestSchema::BuildSnapshot(spec));

// Writes
Expand Down Expand Up @@ -1176,6 +1174,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestSchema) {
ui64 txId = 100;
ui64 generation = 0;

SetupSchema(runtime, sender, TTestSchema::CreateInitShardTxBody(tableId++, schema, pk), NOlap::TSnapshot(planStep++, txId++));
for (auto& ydbType : intTypes) {
schema[0].SetType(TTypeInfo(ydbType));
pk[0].SetType(TTypeInfo(ydbType));
Expand Down
Loading