Skip to content

Commit 633aa59

Browse files
authored
Merge 7644cea into b2d56de
2 parents b2d56de + 7644cea commit 633aa59

File tree

19 files changed

+117
-62
lines changed

19 files changed

+117
-62
lines changed

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,8 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
8484
, PeriodicWakeupActivationPeriod(NYDBTest::TControllers::GetColumnShardController()->GetPeriodicWakeupActivationPeriod())
8585
, StatsReportInterval(NYDBTest::TControllers::GetColumnShardController()->GetStatsReportInterval())
8686
, InFlightReadsTracker(StoragesManager, Counters.GetRequestsTracingCounters())
87-
, TablesManager(StoragesManager, std::make_shared<NOlap::NDataAccessorControl::TLocalManager>(nullptr), info->TabletID)
87+
, TablesManager(StoragesManager, std::make_shared<NOlap::NDataAccessorControl::TLocalManager>(nullptr),
88+
std::make_shared<NOlap::TSchemaObjectsCache>(), info->TabletID)
8889
, Subscribers(std::make_shared<NSubscriber::TManager>(*this))
8990
, PipeClientCache(NTabletPipe::CreateBoundedClientCache(new NTabletPipe::TBoundedClientCacheConfig(), GetPipeClientConfig()))
9091
, InsertTable(std::make_unique<NOlap::TInsertTable>())
@@ -354,15 +355,13 @@ void TColumnShard::RunInit(const NKikimrTxColumnShard::TInitShard& proto, const
354355

355356
NIceDb::TNiceDb db(txc.DB);
356357

357-
if (proto.HasOwnerPathId()) {
358-
OwnerPathId = proto.GetOwnerPathId();
359-
Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPathId, OwnerPathId);
360-
}
358+
AFL_VERIFY(proto.HasOwnerPathId());
359+
OwnerPathId = proto.GetOwnerPathId();
360+
Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPathId, OwnerPathId);
361361

362-
if (proto.HasOwnerPath()) {
363-
OwnerPath = proto.GetOwnerPath();
364-
Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPath, OwnerPath);
365-
}
362+
AFL_VERIFY(proto.HasOwnerPathId());
363+
OwnerPath = proto.GetOwnerPath();
364+
Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPath, OwnerPath);
366365

367366
for (auto& createTable : proto.GetTables()) {
368367
RunEnsureTable(createTable, version, txc);

ydb/core/tx/columnshard/engines/changes/indexation.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ class TPathFieldsInfo {
108108
if (!Schemas.contains(data.GetSchemaVersion())) {
109109
Schemas.emplace(data.GetSchemaVersion(), blobSchema);
110110
}
111-
auto columnIds = blobSchema->GetIndexInfo().GetColumnIds(false);
111+
TColumnIdsView columnIds = blobSchema->GetIndexInfo().GetColumnIds(false);
112112
std::vector<ui32> filteredIds = data.GetMeta().GetSchemaSubset().Apply(columnIds.begin(), columnIds.end());
113113
if (data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete) {
114114
filteredIds.emplace_back((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG);
@@ -247,7 +247,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
247247
{
248248
const auto blobData = Blobs.Extract(IStoragesManager::DefaultStorageId, blobRange);
249249

250-
auto blobSchemaView = blobSchema->GetIndexInfo().ArrowSchema();
250+
NArrow::TSchemaLiteView blobSchemaView = blobSchema->GetIndexInfo().ArrowSchema();
251251
auto batchSchema =
252252
std::make_shared<arrow::Schema>(inserted.GetMeta().GetSchemaSubset().Apply(blobSchemaView.begin(), blobSchemaView.end()));
253253
batch = std::make_shared<NArrow::TGeneralContainer>(NArrow::DeserializeBatch(blobData, batchSchema));

ydb/core/tx/columnshard/engines/column_engine_logs.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,25 +26,27 @@
2626

2727
namespace NKikimr::NOlap {
2828

29-
TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId,
29+
TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
3030
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
3131
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const TSchemaInitializationData& schema)
3232
: GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, dataAccessorsManager, storagesManager))
3333
, DataAccessorsManager(dataAccessorsManager)
3434
, StoragesManager(storagesManager)
35+
, SchemaObjectsCache(schemaCache)
3536
, TabletId(tabletId)
3637
, LastPortion(0)
3738
, LastGranule(0) {
3839
ActualizationController = std::make_shared<NActualizer::TController>();
3940
RegisterSchemaVersion(snapshot, schema);
4041
}
4142

42-
TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId,
43+
TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
4344
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
4445
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema)
4546
: GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, dataAccessorsManager, storagesManager))
4647
, DataAccessorsManager(dataAccessorsManager)
4748
, StoragesManager(storagesManager)
49+
, SchemaObjectsCache(schemaCache)
4850
, TabletId(tabletId)
4951
, LastPortion(0)
5052
, LastGranule(0) {
@@ -150,7 +152,7 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TInd
150152
}
151153

152154
const bool isCriticalScheme = indexInfo.GetSchemeNeedActualization();
153-
auto* indexInfoActual = VersionedIndex.AddIndex(snapshot, std::move(indexInfo));
155+
auto* indexInfoActual = VersionedIndex.AddIndex(snapshot, SchemaObjectsCache->GetIndexInfoCache(std::move(indexInfo)));
154156
if (isCriticalScheme) {
155157
StartActualization({});
156158
for (auto&& i : GranulesStorage->GetTables()) {
@@ -215,7 +217,7 @@ void TColumnEngineForLogs::RegisterOldSchemaVersion(const TSnapshot& snapshot, c
215217
}
216218

217219
AFL_VERIFY(indexInfoOptional);
218-
VersionedIndex.AddIndex(snapshot, std::move(*indexInfoOptional));
220+
VersionedIndex.AddIndex(snapshot, SchemaObjectsCache->GetIndexInfoCache(std::move(*indexInfoOptional)));
219221
}
220222

221223
std::shared_ptr<ITxReader> TColumnEngineForLogs::BuildLoader(const std::shared_ptr<IBlobGroupSelector>& dsGroupSelector) {

ydb/core/tx/columnshard/engines/column_engine_logs.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class TColumnEngineForLogs: public IColumnEngine {
6161
std::shared_ptr<IStoragesManager> StoragesManager;
6262

6363
std::shared_ptr<NActualizer::TController> ActualizationController;
64-
std::shared_ptr<TSchemaObjectsCache> SchemaObjectsCache = std::make_shared<TSchemaObjectsCache>();
64+
std::shared_ptr<TSchemaObjectsCache> SchemaObjectsCache;
6565
TVersionedIndex VersionedIndex;
6666
std::shared_ptr<TVersionedIndex> VersionedIndexCopy;
6767

@@ -98,9 +98,11 @@ class TColumnEngineForLogs: public IColumnEngine {
9898
ADD,
9999
};
100100

101-
TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
101+
TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
102+
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
102103
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const TSchemaInitializationData& schema);
103-
TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
104+
TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
105+
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
104106
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema);
105107

106108
void OnTieringModified(const std::optional<NOlap::TTiering>& ttl, const ui64 pathId) override;

ydb/core/tx/columnshard/engines/scheme/index_info.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ std::vector<TString> TIndexInfo::GetColumnNames(const std::vector<ui32>& ids) co
7676
}
7777

7878
std::vector<std::string> TIndexInfo::GetColumnSTLNames(const bool withSpecial) const {
79-
const auto ids = GetColumnIds(withSpecial);
79+
const TColumnIdsView ids = GetColumnIds(withSpecial);
8080
std::vector<std::string> out;
8181
out.reserve(ids.size());
8282
for (ui32 id : ids) {
@@ -457,7 +457,7 @@ std::shared_ptr<NIndexes::NCountMinSketch::TIndexMeta> TIndexInfo::GetIndexMetaC
457457
}
458458

459459
std::vector<ui32> TIndexInfo::GetEntityIds() const {
460-
const auto columnIds = GetColumnIds(true);
460+
const TColumnIdsView columnIds = GetColumnIds(true);
461461
std::vector<ui32> result(columnIds.begin(), columnIds.end());
462462
for (auto&& i : Indexes) {
463463
result.emplace_back(i.first);
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,17 @@
11
#include "objects_cache.h"
22

3+
#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
4+
35
namespace NKikimr::NOlap {
46

7+
std::shared_ptr<const TIndexInfo> TSchemaObjectsCache::GetIndexInfoCache(TIndexInfo&& indexInfo) {
8+
const ui64 schemaVersion = indexInfo.GetVersion();
9+
std::unique_lock lock(SchemasMutex);
10+
auto* findSchema = SchemasByVersion.FindPtr(schemaVersion);
11+
if (!findSchema || findSchema->expired()) {
12+
SchemasByVersion[schemaVersion] = std::make_shared<TIndexInfo>(std::move(indexInfo));
13+
}
14+
return findSchema->lock();
15+
}
16+
517
} // namespace NKikimr::NOlap

ydb/core/tx/columnshard/engines/scheme/objects_cache.h

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,18 @@ namespace NKikimr::NOlap {
1010
class TSchemaObjectsCache {
1111
private:
1212
THashMap<TString, std::shared_ptr<arrow::Field>> Fields;
13-
THashMap<TString, std::shared_ptr<TColumnFeatures>> ColumnFeatures;
14-
THashSet<TString> StringsCache;
1513
mutable ui64 AcceptionFieldsCount = 0;
14+
mutable TMutex FieldsMutex;
15+
16+
THashMap<TString, std::shared_ptr<TColumnFeatures>> ColumnFeatures;
1617
mutable ui64 AcceptionFeaturesCount = 0;
18+
mutable TMutex FeaturesMutex;
19+
20+
THashMap<ui64, std::weak_ptr<const TIndexInfo>> SchemasByVersion;
21+
mutable TMutex SchemasMutex;
22+
23+
THashSet<TString> StringsCache;
24+
mutable TMutex StringsMutex;
1725

1826
public:
1927
const TString& GetStringCache(const TString& original) {
@@ -26,13 +34,16 @@ class TSchemaObjectsCache {
2634

2735
void RegisterField(const TString& fingerprint, const std::shared_ptr<arrow::Field>& f) {
2836
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "register_field")("fp", fingerprint)("f", f->ToString());
37+
std::unique_lock lock(FieldsMutex);
2938
AFL_VERIFY(Fields.emplace(fingerprint, f).second);
3039
}
3140
void RegisterColumnFeatures(const TString& fingerprint, const std::shared_ptr<TColumnFeatures>& f) {
3241
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "register_column_features")("fp", fingerprint)("info", f->DebugString());
42+
std::unique_lock lock(FeaturesMutex);
3343
AFL_VERIFY(ColumnFeatures.emplace(fingerprint, f).second);
3444
}
3545
std::shared_ptr<arrow::Field> GetField(const TString& fingerprint) const {
46+
std::unique_lock lock(FieldsMutex);
3647
auto it = Fields.find(fingerprint);
3748
if (it == Fields.end()) {
3849
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "get_field_miss")("fp", fingerprint)("count", Fields.size())(
@@ -47,6 +58,7 @@ class TSchemaObjectsCache {
4758
}
4859
template <class TConstructor>
4960
TConclusion<std::shared_ptr<TColumnFeatures>> GetOrCreateColumnFeatures(const TString& fingerprint, const TConstructor& constructor) {
61+
std::unique_lock lock(FeaturesMutex);
5062
auto it = ColumnFeatures.find(fingerprint);
5163
if (it == ColumnFeatures.end()) {
5264
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "get_column_features_miss")("fp", UrlEscapeRet(fingerprint))(
@@ -65,6 +77,25 @@ class TSchemaObjectsCache {
6577
}
6678
return it->second;
6779
}
80+
81+
std::shared_ptr<const TIndexInfo> GetIndexInfoCache(TIndexInfo&& indexInfo);
82+
};
83+
84+
class TSchemaCachesManager {
85+
private:
86+
THashMap<ui64, std::shared_ptr<TSchemaObjectsCache>> CacheByTableOwner;
87+
TMutex Mutex;
88+
89+
public:
90+
std::shared_ptr<TSchemaObjectsCache> GetCache(const ui64 ownerPathId) {
91+
AFL_VERIFY(ownerPathId);
92+
std::unique_lock lock(Mutex);
93+
auto findCache = CacheByTableOwner.FindPtr(ownerPathId);
94+
if (findCache) {
95+
return *findCache;
96+
}
97+
return CacheByTableOwner.emplace(ownerPathId, std::make_shared<TSchemaObjectsCache>()).first->second;
98+
}
6899
};
69100

70101
} // namespace NKikimr::NOlap

ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,39 +2,39 @@
22

33
namespace NKikimr::NOlap {
44

5-
TSnapshotSchema::TSnapshotSchema(TIndexInfo&& indexInfo, const TSnapshot& snapshot)
5+
TSnapshotSchema::TSnapshotSchema(const std::shared_ptr<const TIndexInfo>& indexInfo, const TSnapshot& snapshot)
66
: IndexInfo(std::move(indexInfo))
7-
, Schema(IndexInfo.ArrowSchemaWithSpecials())
7+
, Schema(IndexInfo->ArrowSchemaWithSpecials())
88
, Snapshot(snapshot)
99
{
1010
}
1111

1212
TColumnSaver TSnapshotSchema::GetColumnSaver(const ui32 columnId) const {
13-
return IndexInfo.GetColumnSaver(columnId);
13+
return IndexInfo->GetColumnSaver(columnId);
1414
}
1515

1616
std::shared_ptr<TColumnLoader> TSnapshotSchema::GetColumnLoaderOptional(const ui32 columnId) const {
17-
return IndexInfo.GetColumnLoaderOptional(columnId);
17+
return IndexInfo->GetColumnLoaderOptional(columnId);
1818
}
1919

2020
std::optional<ui32> TSnapshotSchema::GetColumnIdOptional(const std::string& columnName) const {
21-
return IndexInfo.GetColumnIdOptional(columnName);
21+
return IndexInfo->GetColumnIdOptional(columnName);
2222
}
2323

2424
ui32 TSnapshotSchema::GetColumnIdVerified(const std::string& columnName) const {
25-
return IndexInfo.GetColumnIdVerified(columnName);
25+
return IndexInfo->GetColumnIdVerified(columnName);
2626
}
2727

2828
int TSnapshotSchema::GetFieldIndex(const ui32 columnId) const {
29-
return IndexInfo.GetColumnIndexOptional(columnId).value_or(-1);
29+
return IndexInfo->GetColumnIndexOptional(columnId).value_or(-1);
3030
}
3131

3232
const std::shared_ptr<NArrow::TSchemaLite>& TSnapshotSchema::GetSchema() const {
3333
return Schema;
3434
}
3535

3636
const TIndexInfo& TSnapshotSchema::GetIndexInfo() const {
37-
return IndexInfo;
37+
return *IndexInfo;
3838
}
3939

4040
const TSnapshot& TSnapshotSchema::GetSnapshot() const {
@@ -46,7 +46,7 @@ ui32 TSnapshotSchema::GetColumnsCount() const {
4646
}
4747

4848
ui64 TSnapshotSchema::GetVersion() const {
49-
return IndexInfo.GetVersion();
49+
return IndexInfo->GetVersion();
5050
}
5151

5252
}

ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,23 +8,23 @@ namespace NKikimr::NOlap {
88

99
class TSnapshotSchema: public ISnapshotSchema {
1010
private:
11-
TIndexInfo IndexInfo;
11+
std::shared_ptr<const TIndexInfo> IndexInfo;
1212
std::shared_ptr<NArrow::TSchemaLite> Schema;
1313
TSnapshot Snapshot;
1414
protected:
1515
virtual TString DoDebugString() const override {
1616
return TStringBuilder() << "("
1717
"schema=" << Schema->ToString() << ";" <<
1818
"snapshot=" << Snapshot.DebugString() << ";" <<
19-
"index_info=" << IndexInfo.DebugString() << ";" <<
19+
"index_info=" << IndexInfo->DebugString() << ";" <<
2020
")"
2121
;
2222
}
2323
public:
24-
TSnapshotSchema(TIndexInfo&& indexInfo, const TSnapshot& snapshot);
24+
TSnapshotSchema(const std::shared_ptr<const TIndexInfo>& indexInfo, const TSnapshot& snapshot);
2525

2626
virtual TColumnIdsView GetColumnIds() const override {
27-
return IndexInfo.GetColumnIds();
27+
return IndexInfo->GetColumnIds();
2828
}
2929

3030
TColumnSaver GetColumnSaver(const ui32 columnId) const override;

ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,16 @@
66

77
namespace NKikimr::NOlap {
88

9-
const TIndexInfo* TVersionedIndex::AddIndex(const TSnapshot& snapshot, TIndexInfo&& indexInfo) {
9+
const TIndexInfo* TVersionedIndex::AddIndex(const TSnapshot& snapshot, const std::shared_ptr<const TIndexInfo>& indexInfo) {
1010
if (Snapshots.empty()) {
11-
PrimaryKey = indexInfo.GetPrimaryKey();
11+
PrimaryKey = indexInfo->GetPrimaryKey();
1212
} else {
13-
Y_ABORT_UNLESS(PrimaryKey->Equals(indexInfo.GetPrimaryKey()));
13+
Y_ABORT_UNLESS(PrimaryKey->Equals(indexInfo->GetPrimaryKey()));
1414
}
1515

16-
const bool needActualization = indexInfo.GetSchemeNeedActualization();
17-
auto newVersion = indexInfo.GetVersion();
18-
auto itVersion = SnapshotByVersion.emplace(newVersion, std::make_shared<TSnapshotSchema>(std::move(indexInfo), snapshot));
16+
const bool needActualization = indexInfo->GetSchemeNeedActualization();
17+
auto newVersion = indexInfo->GetVersion();
18+
auto itVersion = SnapshotByVersion.emplace(newVersion, std::make_shared<TSnapshotSchema>(indexInfo, snapshot));
1919
if (!itVersion.second) {
2020
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("message", "Skip registered version")("version", LastSchemaVersion);
2121
} else if (needActualization) {

0 commit comments

Comments
 (0)