Skip to content

Commit afdada9

Browse files
authored
Merge a0c1c59 into b2d56de
2 parents b2d56de + a0c1c59 commit afdada9

File tree

13 files changed

+102
-49
lines changed

13 files changed

+102
-49
lines changed

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
8484
, PeriodicWakeupActivationPeriod(NYDBTest::TControllers::GetColumnShardController()->GetPeriodicWakeupActivationPeriod())
8585
, StatsReportInterval(NYDBTest::TControllers::GetColumnShardController()->GetStatsReportInterval())
8686
, InFlightReadsTracker(StoragesManager, Counters.GetRequestsTracingCounters())
87-
, TablesManager(StoragesManager, std::make_shared<NOlap::NDataAccessorControl::TLocalManager>(nullptr), info->TabletID)
87+
, TablesManager(StoragesManager, std::make_shared<NOlap::NDataAccessorControl::TLocalManager>(nullptr),
88+
std::make_shared<NOlap::TSchemaObjectsCache>(), info->TabletID)
8889
, Subscribers(std::make_shared<NSubscriber::TManager>(*this))
8990
, PipeClientCache(NTabletPipe::CreateBoundedClientCache(new NTabletPipe::TBoundedClientCacheConfig(), GetPipeClientConfig()))
9091
, InsertTable(std::make_unique<NOlap::TInsertTable>())
@@ -354,15 +355,13 @@ void TColumnShard::RunInit(const NKikimrTxColumnShard::TInitShard& proto, const
354355

355356
NIceDb::TNiceDb db(txc.DB);
356357

357-
if (proto.HasOwnerPathId()) {
358-
OwnerPathId = proto.GetOwnerPathId();
359-
Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPathId, OwnerPathId);
360-
}
358+
AFL_VERIFY(proto.HasOwnerPathId());
359+
OwnerPathId = proto.GetOwnerPathId();
360+
Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPathId, OwnerPathId);
361361

362-
if (proto.HasOwnerPath()) {
363-
OwnerPath = proto.GetOwnerPath();
364-
Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPath, OwnerPath);
365-
}
362+
AFL_VERIFY(proto.HasOwnerPathId());
363+
OwnerPath = proto.GetOwnerPath();
364+
Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPath, OwnerPath);
366365

367366
for (auto& createTable : proto.GetTables()) {
368367
RunEnsureTable(createTable, version, txc);

ydb/core/tx/columnshard/engines/column_engine_logs.cpp

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,25 +26,27 @@
2626

2727
namespace NKikimr::NOlap {
2828

29-
TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId,
29+
TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
3030
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
3131
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const TSchemaInitializationData& schema)
3232
: GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, dataAccessorsManager, storagesManager))
3333
, DataAccessorsManager(dataAccessorsManager)
3434
, StoragesManager(storagesManager)
35+
, SchemaObjectsCache(schemaCache)
3536
, TabletId(tabletId)
3637
, LastPortion(0)
3738
, LastGranule(0) {
3839
ActualizationController = std::make_shared<NActualizer::TController>();
3940
RegisterSchemaVersion(snapshot, schema);
4041
}
4142

42-
TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId,
43+
TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
4344
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
4445
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema)
4546
: GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, dataAccessorsManager, storagesManager))
4647
, DataAccessorsManager(dataAccessorsManager)
4748
, StoragesManager(storagesManager)
49+
, SchemaObjectsCache(schemaCache)
4850
, TabletId(tabletId)
4951
, LastPortion(0)
5052
, LastGranule(0) {
@@ -139,18 +141,19 @@ void TColumnEngineForLogs::UpdatePortionStats(
139141
}
140142

141143
void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& indexInfo) {
144+
std::shared_ptr<const TIndexInfo> cachedIndexInfo = SchemaObjectsCache->GetIndexInfoCache(std::move(indexInfo));
142145
AFL_VERIFY(DataAccessorsManager);
143146
bool switchOptimizer = false;
144147
bool switchAccessorsManager = false;
145148
if (!VersionedIndex.IsEmpty()) {
146149
const NOlap::TIndexInfo& lastIndexInfo = VersionedIndex.GetLastSchema()->GetIndexInfo();
147-
Y_ABORT_UNLESS(lastIndexInfo.CheckCompatible(indexInfo));
148-
switchOptimizer = !indexInfo.GetCompactionPlannerConstructor()->IsEqualTo(lastIndexInfo.GetCompactionPlannerConstructor());
149-
switchAccessorsManager = !indexInfo.GetMetadataManagerConstructor()->IsEqualTo(*lastIndexInfo.GetMetadataManagerConstructor());
150+
Y_ABORT_UNLESS(lastIndexInfo.CheckCompatible(*cachedIndexInfo));
151+
switchOptimizer = !cachedIndexInfo->GetCompactionPlannerConstructor()->IsEqualTo(lastIndexInfo.GetCompactionPlannerConstructor());
152+
switchAccessorsManager = !cachedIndexInfo->GetMetadataManagerConstructor()->IsEqualTo(*lastIndexInfo.GetMetadataManagerConstructor());
150153
}
151154

152-
const bool isCriticalScheme = indexInfo.GetSchemeNeedActualization();
153-
auto* indexInfoActual = VersionedIndex.AddIndex(snapshot, std::move(indexInfo));
155+
const bool isCriticalScheme = cachedIndexInfo->GetSchemeNeedActualization();
156+
auto* indexInfoActual = VersionedIndex.AddIndex(snapshot, std::move(cachedIndexInfo));
154157
if (isCriticalScheme) {
155158
StartActualization({});
156159
for (auto&& i : GranulesStorage->GetTables()) {

ydb/core/tx/columnshard/engines/column_engine_logs.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class TColumnEngineForLogs: public IColumnEngine {
6161
std::shared_ptr<IStoragesManager> StoragesManager;
6262

6363
std::shared_ptr<NActualizer::TController> ActualizationController;
64-
std::shared_ptr<TSchemaObjectsCache> SchemaObjectsCache = std::make_shared<TSchemaObjectsCache>();
64+
std::shared_ptr<TSchemaObjectsCache> SchemaObjectsCache;
6565
TVersionedIndex VersionedIndex;
6666
std::shared_ptr<TVersionedIndex> VersionedIndexCopy;
6767

@@ -98,9 +98,11 @@ class TColumnEngineForLogs: public IColumnEngine {
9898
ADD,
9999
};
100100

101-
TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
101+
TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
102+
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
102103
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const TSchemaInitializationData& schema);
103-
TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
104+
TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
105+
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
104106
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema);
105107

106108
void OnTieringModified(const std::optional<NOlap::TTiering>& ttl, const ui64 pathId) override;
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,17 @@
11
#include "objects_cache.h"
22

3+
#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
4+
35
namespace NKikimr::NOlap {
46

7+
std::shared_ptr<const TIndexInfo> TSchemaObjectsCache::GetIndexInfoCache(TIndexInfo&& indexInfo) {
8+
const ui64 schemaVersion = indexInfo.GetVersion();
9+
std::unique_lock lock(SchemasMutex);
10+
auto* findSchema = SchemasByVersion.FindPtr(schemaVersion);
11+
if (!findSchema || findSchema->expired()) {
12+
SchemasByVersion[schemaVersion] = std::make_shared<TIndexInfo>(std::move(indexInfo));
13+
}
14+
return findSchema->lock();
15+
}
16+
517
} // namespace NKikimr::NOlap

ydb/core/tx/columnshard/engines/scheme/objects_cache.h

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,18 @@ namespace NKikimr::NOlap {
1010
class TSchemaObjectsCache {
1111
private:
1212
THashMap<TString, std::shared_ptr<arrow::Field>> Fields;
13-
THashMap<TString, std::shared_ptr<TColumnFeatures>> ColumnFeatures;
14-
THashSet<TString> StringsCache;
1513
mutable ui64 AcceptionFieldsCount = 0;
14+
mutable TMutex FieldsMutex;
15+
16+
THashMap<TString, std::shared_ptr<TColumnFeatures>> ColumnFeatures;
1617
mutable ui64 AcceptionFeaturesCount = 0;
18+
mutable TMutex FeaturesMutex;
19+
20+
THashMap<ui64, std::weak_ptr<const TIndexInfo>> SchemasByVersion;
21+
mutable TMutex SchemasMutex;
22+
23+
THashSet<TString> StringsCache;
24+
mutable TMutex StringsMutex;
1725

1826
public:
1927
const TString& GetStringCache(const TString& original) {
@@ -26,13 +34,16 @@ class TSchemaObjectsCache {
2634

2735
void RegisterField(const TString& fingerprint, const std::shared_ptr<arrow::Field>& f) {
2836
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "register_field")("fp", fingerprint)("f", f->ToString());
37+
std::unique_lock lock(FieldsMutex);
2938
AFL_VERIFY(Fields.emplace(fingerprint, f).second);
3039
}
3140
void RegisterColumnFeatures(const TString& fingerprint, const std::shared_ptr<TColumnFeatures>& f) {
3241
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "register_column_features")("fp", fingerprint)("info", f->DebugString());
42+
std::unique_lock lock(FeaturesMutex);
3343
AFL_VERIFY(ColumnFeatures.emplace(fingerprint, f).second);
3444
}
3545
std::shared_ptr<arrow::Field> GetField(const TString& fingerprint) const {
46+
std::unique_lock lock(FieldsMutex);
3647
auto it = Fields.find(fingerprint);
3748
if (it == Fields.end()) {
3849
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "get_field_miss")("fp", fingerprint)("count", Fields.size())(
@@ -47,6 +58,7 @@ class TSchemaObjectsCache {
4758
}
4859
template <class TConstructor>
4960
TConclusion<std::shared_ptr<TColumnFeatures>> GetOrCreateColumnFeatures(const TString& fingerprint, const TConstructor& constructor) {
61+
std::unique_lock lock(FeaturesMutex);
5062
auto it = ColumnFeatures.find(fingerprint);
5163
if (it == ColumnFeatures.end()) {
5264
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "get_column_features_miss")("fp", UrlEscapeRet(fingerprint))(
@@ -65,6 +77,25 @@ class TSchemaObjectsCache {
6577
}
6678
return it->second;
6779
}
80+
81+
std::shared_ptr<const TIndexInfo> GetIndexInfoCache(TIndexInfo&& indexInfo);
82+
};
83+
84+
class TSchemaCachesManager {
85+
private:
86+
THashMap<ui64, std::shared_ptr<TSchemaObjectsCache>> CacheByTableOwner;
87+
TMutex Mutex;
88+
89+
public:
90+
std::shared_ptr<TSchemaObjectsCache> GetCache(const ui64 ownerPathId) {
91+
AFL_VERIFY(ownerPathId);
92+
std::unique_lock lock(Mutex);
93+
auto findCache = CacheByTableOwner.FindPtr(ownerPathId);
94+
if (findCache) {
95+
return *findCache;
96+
}
97+
return CacheByTableOwner.emplace(ownerPathId, std::make_shared<TSchemaObjectsCache>()).first->second;
98+
}
6899
};
69100

70101
} // namespace NKikimr::NOlap

ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,39 +2,39 @@
22

33
namespace NKikimr::NOlap {
44

5-
TSnapshotSchema::TSnapshotSchema(TIndexInfo&& indexInfo, const TSnapshot& snapshot)
5+
TSnapshotSchema::TSnapshotSchema(const std::shared_ptr<const TIndexInfo>& indexInfo, const TSnapshot& snapshot)
66
: IndexInfo(std::move(indexInfo))
7-
, Schema(IndexInfo.ArrowSchemaWithSpecials())
7+
, Schema(IndexInfo->ArrowSchemaWithSpecials())
88
, Snapshot(snapshot)
99
{
1010
}
1111

1212
TColumnSaver TSnapshotSchema::GetColumnSaver(const ui32 columnId) const {
13-
return IndexInfo.GetColumnSaver(columnId);
13+
return IndexInfo->GetColumnSaver(columnId);
1414
}
1515

1616
std::shared_ptr<TColumnLoader> TSnapshotSchema::GetColumnLoaderOptional(const ui32 columnId) const {
17-
return IndexInfo.GetColumnLoaderOptional(columnId);
17+
return IndexInfo->GetColumnLoaderOptional(columnId);
1818
}
1919

2020
std::optional<ui32> TSnapshotSchema::GetColumnIdOptional(const std::string& columnName) const {
21-
return IndexInfo.GetColumnIdOptional(columnName);
21+
return IndexInfo->GetColumnIdOptional(columnName);
2222
}
2323

2424
ui32 TSnapshotSchema::GetColumnIdVerified(const std::string& columnName) const {
25-
return IndexInfo.GetColumnIdVerified(columnName);
25+
return IndexInfo->GetColumnIdVerified(columnName);
2626
}
2727

2828
int TSnapshotSchema::GetFieldIndex(const ui32 columnId) const {
29-
return IndexInfo.GetColumnIndexOptional(columnId).value_or(-1);
29+
return IndexInfo->GetColumnIndexOptional(columnId).value_or(-1);
3030
}
3131

3232
const std::shared_ptr<NArrow::TSchemaLite>& TSnapshotSchema::GetSchema() const {
3333
return Schema;
3434
}
3535

3636
const TIndexInfo& TSnapshotSchema::GetIndexInfo() const {
37-
return IndexInfo;
37+
return *IndexInfo;
3838
}
3939

4040
const TSnapshot& TSnapshotSchema::GetSnapshot() const {
@@ -46,7 +46,7 @@ ui32 TSnapshotSchema::GetColumnsCount() const {
4646
}
4747

4848
ui64 TSnapshotSchema::GetVersion() const {
49-
return IndexInfo.GetVersion();
49+
return IndexInfo->GetVersion();
5050
}
5151

5252
}

ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,23 @@ namespace NKikimr::NOlap {
88

99
class TSnapshotSchema: public ISnapshotSchema {
1010
private:
11-
TIndexInfo IndexInfo;
11+
std::shared_ptr<const TIndexInfo> IndexInfo;
1212
std::shared_ptr<NArrow::TSchemaLite> Schema;
1313
TSnapshot Snapshot;
1414
protected:
1515
virtual TString DoDebugString() const override {
1616
return TStringBuilder() << "("
1717
"schema=" << Schema->ToString() << ";" <<
1818
"snapshot=" << Snapshot.DebugString() << ";" <<
19-
"index_info=" << IndexInfo.DebugString() << ";" <<
19+
"index_info=" << IndexInfo->DebugString() << ";" <<
2020
")"
2121
;
2222
}
2323
public:
24-
TSnapshotSchema(TIndexInfo&& indexInfo, const TSnapshot& snapshot);
24+
TSnapshotSchema(const std::shared_ptr<const TIndexInfo>& indexInfo, const TSnapshot& snapshot);
2525

2626
virtual TColumnIdsView GetColumnIds() const override {
27-
return IndexInfo.GetColumnIds();
27+
return IndexInfo->GetColumnIds();
2828
}
2929

3030
TColumnSaver GetColumnSaver(const ui32 columnId) const override;

ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,16 @@
66

77
namespace NKikimr::NOlap {
88

9-
const TIndexInfo* TVersionedIndex::AddIndex(const TSnapshot& snapshot, TIndexInfo&& indexInfo) {
9+
const TIndexInfo* TVersionedIndex::AddIndex(const TSnapshot& snapshot, const std::shared_ptr<const TIndexInfo>& indexInfo) {
1010
if (Snapshots.empty()) {
11-
PrimaryKey = indexInfo.GetPrimaryKey();
11+
PrimaryKey = indexInfo->GetPrimaryKey();
1212
} else {
13-
Y_ABORT_UNLESS(PrimaryKey->Equals(indexInfo.GetPrimaryKey()));
13+
Y_ABORT_UNLESS(PrimaryKey->Equals(indexInfo->GetPrimaryKey()));
1414
}
1515

16-
const bool needActualization = indexInfo.GetSchemeNeedActualization();
17-
auto newVersion = indexInfo.GetVersion();
18-
auto itVersion = SnapshotByVersion.emplace(newVersion, std::make_shared<TSnapshotSchema>(std::move(indexInfo), snapshot));
16+
const bool needActualization = indexInfo->GetSchemeNeedActualization();
17+
auto newVersion = indexInfo->GetVersion();
18+
auto itVersion = SnapshotByVersion.emplace(newVersion, std::make_shared<TSnapshotSchema>(indexInfo, snapshot));
1919
if (!itVersion.second) {
2020
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("message", "Skip registered version")("version", LastSchemaVersion);
2121
} else if (needActualization) {

ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ class TVersionedIndex {
123123
return PrimaryKey;
124124
}
125125

126-
const TIndexInfo* AddIndex(const TSnapshot& snapshot, TIndexInfo&& indexInfo);
126+
const TIndexInfo* AddIndex(const TSnapshot& snapshot, const std::shared_ptr<const TIndexInfo>& indexInfo);
127127

128128
bool LoadShardingInfo(IDbWrapper& db);
129129
};

ydb/core/tx/columnshard/loading/stages.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,8 @@ bool TSpecialValuesInitializer::DoPrecharge(NTabletFlatExecutor::TTransactionCon
194194

195195
bool TTablesManagerInitializer::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) {
196196
NIceDb::TNiceDb db(txc.DB);
197-
TTablesManager tablesManagerLocal(Self->StoragesManager, Self->DataAccessorsManager.GetObjectPtrVerified(), Self->TabletID());
197+
TTablesManager tablesManagerLocal(Self->StoragesManager, Self->DataAccessorsManager.GetObjectPtrVerified(),
198+
Singleton<NOlap::TSchemaCachesManager>()->GetCache(Self->OwnerPathId), Self->TabletID());
198199
{
199200
TMemoryProfileGuard g("TTxInit/TTablesManager");
200201
if (!tablesManagerLocal.InitFromDB(db)) {

0 commit comments

Comments
 (0)