Skip to content

separate schema caches by tenant (#17729) #18113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
, PeriodicWakeupActivationPeriod(NYDBTest::TControllers::GetColumnShardController()->GetPeriodicWakeupActivationPeriod())
, StatsReportInterval(NYDBTest::TControllers::GetColumnShardController()->GetStatsReportInterval())
, InFlightReadsTracker(StoragesManager, Counters.GetRequestsTracingCounters())
, TablesManager(StoragesManager, std::make_shared<NOlap::NDataAccessorControl::TLocalManager>(nullptr),
std::make_shared<NOlap::TSchemaObjectsCache>(), Counters.GetPortionIndexCounters(), info->TabletID)
, TablesManager(StoragesManager, std::make_shared<NOlap::NDataAccessorControl::TLocalManager>(nullptr), nullptr,
Counters.GetPortionIndexCounters(), info->TabletID)
, Subscribers(std::make_shared<NSubscriber::TManager>(*this))
, PipeClientCache(NTabletPipe::CreateBoundedClientCache(new NTabletPipe::TBoundedClientCacheConfig(), GetPipeClientConfig()))
, InsertTable(std::make_unique<NOlap::TInsertTable>())
Expand Down Expand Up @@ -362,6 +362,7 @@ void TColumnShard::RunInit(const NKikimrTxColumnShard::TInitShard& proto, const
if (proto.HasOwnerPathId()) {
OwnerPathId = proto.GetOwnerPathId();
Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPathId, OwnerPathId);
TablesManager.SetSchemaObjectsCache(NOlap::TSchemaCachesManager::GetCache(OwnerPathId, Info()->TenantPathId));
}

if (proto.HasOwnerPath()) {
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::share
, Counters(counters)
, LastPortion(0)
, LastGranule(0) {
AFL_VERIFY(SchemaObjectsCache);
ActualizationController = std::make_shared<NActualizer::TController>();
RegisterSchemaVersion(snapshot, presetId, schema);
}
Expand All @@ -53,6 +54,7 @@ TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::share
, Counters(counters)
, LastPortion(0)
, LastGranule(0) {
AFL_VERIFY(SchemaObjectsCache);
ActualizationController = std::make_shared<NActualizer::TController>();
RegisterSchemaVersion(snapshot, presetId, std::move(schema));
}
Expand All @@ -63,7 +65,7 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, cons
bool switchAccessorsManager = false;
if (!VersionedIndex.IsEmpty()) {
const NOlap::TIndexInfo& lastIndexInfo = VersionedIndex.GetLastSchema()->GetIndexInfo();
Y_ABORT_UNLESS(lastIndexInfo.CheckCompatible(indexInfo));
lastIndexInfo.CheckCompatible(indexInfo).Validate();
switchOptimizer = !indexInfo.GetCompactionPlannerConstructor()->IsEqualTo(lastIndexInfo.GetCompactionPlannerConstructor());
switchAccessorsManager = !indexInfo.GetMetadataManagerConstructor()->IsEqualTo(*lastIndexInfo.GetMetadataManagerConstructor());
}
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/tx/columnshard/engines/scheme/index_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@

namespace NKikimr::NOlap {

bool TIndexInfo::CheckCompatible(const TIndexInfo& other) const {
TConclusionStatus TIndexInfo::CheckCompatible(const TIndexInfo& other) const {
if (!other.GetPrimaryKey()->Equals(PrimaryKey)) {
return false;
return TConclusionStatus::Fail(
TStringBuilder() << "PK mismatch: this=" << PrimaryKey->ToString() << " other=" << other.GetPrimaryKey()->ToString());
}
return true;
return TConclusionStatus::Success();
}

ui32 TIndexInfo::GetColumnIdVerified(const std::string& name) const {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/scheme/index_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ struct TIndexInfo: public IIndexInfo {
return Version;
}

bool CheckCompatible(const TIndexInfo& other) const;
TConclusionStatus CheckCompatible(const TIndexInfo& other) const;
NArrow::NSerialization::TSerializerContainer GetDefaultSerializer() const {
return DefaultSerializer;
}
Expand Down
35 changes: 26 additions & 9 deletions ydb/core/tx/columnshard/engines/scheme/objects_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,36 @@ class TSchemaObjectsCache {

class TSchemaCachesManager {
private:
THashMap<ui64, std::shared_ptr<TSchemaObjectsCache>> CacheByTableOwner;
TMutex Mutex;
class TColumnOwnerId {
private:
TPathId Tenant;
TLocalPathId Owner;

public:
TColumnOwnerId(const TPathId& tenant, const TLocalPathId owner)
: Tenant(tenant)
, Owner(owner) {
AFL_VERIFY(!!Owner);
}

std::shared_ptr<TSchemaObjectsCache> GetCacheImpl(const ui64 ownerPathId) {
if (!ownerPathId) {
return std::make_shared<TSchemaObjectsCache>();
operator size_t() const {
return CombineHashes(Owner, Tenant.Hash());
}
bool operator==(const TColumnOwnerId& other) const {
return Tenant == other.Tenant && Owner == other.Owner;
}
};

THashMap<TColumnOwnerId, std::shared_ptr<TSchemaObjectsCache>> CacheByTableOwner;
TMutex Mutex;

std::shared_ptr<TSchemaObjectsCache> GetCacheImpl(const TColumnOwnerId& owner) {
TGuard lock(Mutex);
auto findCache = CacheByTableOwner.FindPtr(ownerPathId);
auto findCache = CacheByTableOwner.FindPtr(owner);
if (findCache) {
return *findCache;
}
return CacheByTableOwner.emplace(ownerPathId, std::make_shared<TSchemaObjectsCache>()).first->second;
return CacheByTableOwner.emplace(owner, std::make_shared<TSchemaObjectsCache>()).first->second;
}

void DropCachesImpl() {
Expand All @@ -100,8 +117,8 @@ class TSchemaCachesManager {
}

public:
static std::shared_ptr<TSchemaObjectsCache> GetCache(const ui64 ownerPathId) {
return Singleton<TSchemaCachesManager>()->GetCacheImpl(ownerPathId);
static std::shared_ptr<TSchemaObjectsCache> GetCache(const ui64 ownerPathId, const TPathId& tenantPathId) {
return Singleton<TSchemaCachesManager>()->GetCacheImpl(TColumnOwnerId(tenantPathId, ownerPathId));
}

static void DropCaches() {
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/loading/stages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,9 @@ bool TSpecialValuesInitializer::DoPrecharge(NTabletFlatExecutor::TTransactionCon
bool TTablesManagerInitializer::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) {
NIceDb::TNiceDb db(txc.DB);
TTablesManager tablesManagerLocal(Self->StoragesManager, Self->DataAccessorsManager.GetObjectPtrVerified(),
NOlap::TSchemaCachesManager::GetCache(Self->OwnerPathId), Self->Counters.GetPortionIndexCounters(), Self->TabletID());
Self->OwnerPathId ? NOlap::TSchemaCachesManager::GetCache(Self->OwnerPathId, Self->Info()->TenantPathId)
: std::shared_ptr<NOlap::TSchemaObjectsCache>(),
Self->Counters.GetPortionIndexCounters(), Self->TabletID());
{
TMemoryProfileGuard g("TTxInit/TTablesManager");
if (!tablesManagerLocal.InitFromDB(db)) {
Expand Down
9 changes: 4 additions & 5 deletions ydb/core/tx/columnshard/tables_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
"version", info.GetSchema().GetVersion());
NOlap::IColumnEngine::TSchemaInitializationData schemaInitializationData(info);
if (!PrimaryIndex) {
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, SchemaObjectsCache, DataAccessorsManager, StoragesManager,
version, preset->Id, schemaInitializationData, PortionsStats);
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, SchemaObjectsCache.GetObjectPtrVerified(),
DataAccessorsManager, StoragesManager, version, preset->Id, schemaInitializationData, PortionsStats);
} else if (PrimaryIndex->GetVersionedIndex().IsEmpty() ||
info.GetSchema().GetVersion() > PrimaryIndex->GetVersionedIndex().GetLastSchema()->GetVersion()) {
PrimaryIndex->RegisterSchemaVersion(version, preset->Id, schemaInitializationData);
Expand Down Expand Up @@ -279,8 +279,8 @@ void TTablesManager::AddSchemaVersion(
versionInfo.MutableSchema()->SetEngine(NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES);
Schema::SaveSchemaPresetVersionInfo(db, presetId, version, versionInfo);
if (!PrimaryIndex) {
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, SchemaObjectsCache, DataAccessorsManager, StoragesManager,
version, presetId, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo), PortionsStats);
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, SchemaObjectsCache.GetObjectPtrVerified(), DataAccessorsManager,
StoragesManager, version, presetId, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo), PortionsStats);
for (auto&& i : Tables) {
PrimaryIndex->RegisterTable(i.first);
}
Expand Down Expand Up @@ -339,7 +339,6 @@ TTablesManager::TTablesManager(const std::shared_ptr<NOlap::IStoragesManager>& s
, SchemaObjectsCache(schemaCache)
, PortionsStats(portionsStats)
, TabletId(tabletId) {
AFL_VERIFY(SchemaObjectsCache);
}

bool TTablesManager::TryFinalizeDropPathOnExecute(NTable::TDatabase& dbTable, const TInternalPathId pathId) const {
Expand Down
8 changes: 7 additions & 1 deletion ydb/core/tx/columnshard/tables_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ class TTablesManager {
std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager> DataAccessorsManager;
std::unique_ptr<TTableLoadTimeCounters> LoadTimeCounters;
std::shared_ptr<NOlap::TSchemaObjectsCache> SchemaObjectsCache;
NBackgroundTasks::TControlInterfaceContainer<NOlap::TSchemaObjectsCache> SchemaObjectsCache;
std::shared_ptr<TPortionIndexStats> PortionsStats;
ui64 TabletId = 0;

Expand Down Expand Up @@ -331,6 +331,12 @@ class TTablesManager {

[[nodiscard]] std::unique_ptr<NTabletFlatExecutor::ITransaction> CreateAddShardingInfoTx(TColumnShard& owner, const TInternalPathId pathId,
const ui64 versionId, const NSharding::TGranuleShardingLogicContainer& tabletShardingLogic) const;

void SetSchemaObjectsCache(const std::shared_ptr<NOlap::TSchemaObjectsCache>& cache) {
AFL_VERIFY(cache);
AFL_VERIFY(!SchemaObjectsCache);
SchemaObjectsCache = cache;
}
};

} // namespace NKikimr::NColumnShard
10 changes: 5 additions & 5 deletions ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ void TTester::Setup(TTestActorRuntime& runtime) {
runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD_SCAN, NActors::NLog::PRI_DEBUG);
runtime.SetLogPriority(NKikimrServices::S3_WRAPPER, NLog::PRI_DEBUG);

NOlap::TSchemaCachesManager::DropCaches();

ui32 domainId = 0;
ui32 planResolution = 500;

Expand Down Expand Up @@ -503,21 +505,19 @@ namespace NKikimr::NColumnShard {
return planStep;
}

NTxUT::TPlanStep SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId,
const TestTableDescription& table, TString codec) {
NTxUT::TPlanStep SetupSchema(
TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId, const TestTableDescription& table, TString codec, const ui64 txId) {
using namespace NTxUT;
const ui64 txId = 10;
TString txBody;
auto specials = TTestSchema::TTableSpecials().WithCodec(codec);
if (table.InStore) {
txBody = TTestSchema::CreateTableTxBody(pathId, table.Schema, table.Pk, specials);
txBody = TTestSchema::CreateInitShardTxBody(pathId, table.Schema, table.Pk, specials);
} else {
txBody = TTestSchema::CreateStandaloneTableTxBody(pathId, table.Schema, table.Pk, specials);
}
return SetupSchema(runtime, sender, txBody, txId);
}


NTxUT::TPlanStep PrepareTablet(TTestBasicRuntime& runtime, const ui64 tableId, const std::vector<NArrow::NTest::TTestColumn>& schema, const ui32 keySize) {
using namespace NTxUT;
CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard);
Expand Down
28 changes: 23 additions & 5 deletions ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,18 @@ struct TTestSchema {
NKikimrTxColumnShard::TSchemaTxBody tx;
auto* table = tx.MutableInitShard()->AddTables();
tx.MutableInitShard()->SetOwnerPath(ownerPath);
tx.MutableInitShard()->SetOwnerPathId(pathId);
table->SetPathId(pathId);

InitSchema(columns, pk, specials, table->MutableSchema());
{ // preset
auto* preset = table->MutableSchemaPreset();
preset->SetId(1);
preset->SetName("default");

// schema
InitSchema(columns, pk, specials, preset->MutableSchema());
}

InitTiersAndTtl(specials, table->MutableTtlSettings());

Cerr << "CreateInitShard: " << tx << "\n";
Expand All @@ -310,9 +319,11 @@ struct TTestSchema {
}

static TString CreateStandaloneTableTxBody(ui64 pathId, const std::vector<NArrow::NTest::TTestColumn>& columns,
const std::vector<NArrow::NTest::TTestColumn>& pk, const TTableSpecials& specials = {}) {
const std::vector<NArrow::NTest::TTestColumn>& pk, const TTableSpecials& specials = {}, const TString& path = "/Root/olap") {
NKikimrTxColumnShard::TSchemaTxBody tx;
auto* table = tx.MutableEnsureTables()->AddTables();
auto* table = tx.MutableInitShard()->AddTables();
tx.MutableInitShard()->SetOwnerPath(path);
tx.MutableInitShard()->SetOwnerPathId(pathId);
table->SetPathId(pathId);

InitSchema(columns, pk, specials, table->MutableSchema());
Expand All @@ -325,12 +336,18 @@ struct TTestSchema {
return out;
}

static TString AlterTableTxBody(ui64 pathId, ui32 version, const TTableSpecials& specials) {
static TString AlterTableTxBody(ui64 pathId, ui32 version, const std::vector<NArrow::NTest::TTestColumn>& columns,
const std::vector<NArrow::NTest::TTestColumn>& pk, const TTableSpecials& specials) {
NKikimrTxColumnShard::TSchemaTxBody tx;
auto* table = tx.MutableAlterTable();
table->SetPathId(pathId);
tx.MutableSeqNo()->SetRound(version);

auto* preset = table->MutableSchemaPreset();
preset->SetId(1);
preset->SetName("default");
InitSchema(columns, pk, specials, preset->MutableSchema());

auto* ttlSettings = table->MutableTtlSettings();
if (!InitTiersAndTtl(specials, ttlSettings)) {
ttlSettings->MutableDisabled();
Expand Down Expand Up @@ -560,7 +577,8 @@ struct TestTableDescription {
}
};

[[nodiscard]] NTxUT::TPlanStep SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId, const TestTableDescription& table = {}, TString codec = "none");
[[nodiscard]] NTxUT::TPlanStep SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId, const TestTableDescription& table = {},
TString codec = "none", const ui64 txId = 10);
[[nodiscard]] NTxUT::TPlanStep SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, const TString& txBody, const ui64 txId);

[[nodiscard]] NTxUT::TPlanStep PrepareTablet(
Expand Down
Loading
Loading