Skip to content

Commit 0b573b9

Browse files
Use portion data accessor (#11074)
1 parent e3bfd65 commit 0b573b9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+324
-302
lines changed

ydb/core/tx/columnshard/blobs_action/transaction/tx_blobs_written.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ bool TTxBlobsWritingFinished::DoExecute(TTransactionContext& txc, const TActorCo
3434
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
3535
granule.CommitImmediateOnExecute(txc, *CommitSnapshot, portion.GetPortionInfo());
3636
} else {
37-
granule.InsertPortionOnExecute(txc, NOlap::TPortionDataAccessor(portion.GetPortionInfo()));
37+
granule.InsertPortionOnExecute(txc, portion.GetPortionInfo());
3838
}
3939
}
4040
}
@@ -99,7 +99,7 @@ void TTxBlobsWritingFinished::DoComplete(const TActorContext& ctx) {
9999
Self->GetOperationsManager().AddEventForLock(*Self, op->GetLockId(), evWrite);
100100
}
101101
}
102-
granule.InsertPortionOnComplete(portion.GetPortionInfo());
102+
granule.InsertPortionOnComplete(portion.GetPortionInfo().MutablePortionInfoPtr());
103103
}
104104
if (op->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
105105
AFL_VERIFY(CommitSnapshot);

ydb/core/tx/columnshard/data_locks/locks/list.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,17 @@ class TListPortionsLock: public ILock {
2727
return Portions.empty();
2828
}
2929
public:
30-
TListPortionsLock(const TString& lockName, const std::vector<std::shared_ptr<TPortionInfo>>& portions, const bool readOnly = false)
30+
TListPortionsLock(const TString& lockName, const std::vector<TPortionDataAccessor>& portions, const bool readOnly = false)
3131
: TBase(lockName, readOnly)
3232
{
33+
for (auto&& p : portions) {
34+
Portions.emplace(p.GetPortionInfo().GetAddress());
35+
Granules.emplace(p.GetPortionInfo().GetPathId());
36+
}
37+
}
38+
39+
TListPortionsLock(const TString& lockName, const std::vector<std::shared_ptr<TPortionInfo>>& portions, const bool readOnly = false)
40+
: TBase(lockName, readOnly) {
3341
for (auto&& p : portions) {
3442
Portions.emplace(p->GetAddress());
3543
Granules.emplace(p->GetPathId());

ydb/core/tx/columnshard/data_sharing/common/session/common.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ bool TCommonSession::TryStart(const NColumnShard::TColumnShard& shard) {
1919

2020
AFL_VERIFY(!!LockGuard);
2121
const auto& index = shard.GetIndexAs<TColumnEngineForLogs>();
22-
THashMap<ui64, std::vector<std::shared_ptr<TPortionInfo>>> portionsByPath;
22+
THashMap<ui64, std::vector<TPortionDataAccessor>> portionsByPath;
2323
THashSet<TString> StoragesIds;
2424
for (auto&& i : GetPathIdsForStart()) {
2525
const auto& g = index.GetGranuleVerified(i);

ydb/core/tx/columnshard/data_sharing/common/session/common.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ class TColumnShard;
1313

1414
namespace NKikimr::NOlap {
1515
class TPortionInfo;
16+
class TPortionDataAccessor;
1617
namespace NDataLocks {
1718
class TManager;
1819
}
@@ -42,7 +43,7 @@ class TCommonSession {
4243
EState State = EState::Created;
4344
protected:
4445
TTransferContext TransferContext;
45-
virtual bool DoStart(const NColumnShard::TColumnShard& shard, const THashMap<ui64, std::vector<std::shared_ptr<TPortionInfo>>>& portions) = 0;
46+
virtual bool DoStart(const NColumnShard::TColumnShard& shard, const THashMap<ui64, std::vector<TPortionDataAccessor>>& portions) = 0;
4647
virtual THashSet<ui64> GetPathIdsForStart() const = 0;
4748
public:
4849
virtual ~TCommonSession() = default;

ydb/core/tx/columnshard/data_sharing/destination/events/transfer.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,13 @@
77

88
namespace NKikimr::NOlap::NDataSharing::NEvents {
99

10-
THashMap<NKikimr::NOlap::TTabletId, NKikimr::NOlap::NDataSharing::TTaskForTablet> TPathIdData::BuildLinkTabletTasks(
10+
THashMap<TTabletId, TTaskForTablet> TPathIdData::BuildLinkTabletTasks(
1111
const std::shared_ptr<IStoragesManager>& storages, const TTabletId selfTabletId, const TTransferContext& context,
1212
const TVersionedIndex& index) {
1313
THashMap<TString, THashSet<TUnifiedBlobId>> blobIds;
1414
for (auto&& i : Portions) {
15-
auto schema = i->GetSchema(index);
16-
TPortionDataAccessor(i).FillBlobIdsByStorage(blobIds, schema->GetIndexInfo());
15+
auto schema = i.GetPortionInfo().GetSchema(index);
16+
i.FillBlobIdsByStorage(blobIds, schema->GetIndexInfo());
1717
}
1818

1919
const std::shared_ptr<TSharedBlobsManager> sharedBlobs = storages->GetSharedBlobsManager();

ydb/core/tx/columnshard/data_sharing/destination/events/transfer.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ namespace NKikimr::NOlap::NDataSharing::NEvents {
2121
class TPathIdData {
2222
private:
2323
YDB_READONLY(ui64, PathId, 0);
24-
YDB_ACCESSOR_DEF(std::vector<TPortionInfo::TPtr>, Portions);
24+
YDB_ACCESSOR_DEF(std::vector<TPortionDataAccessor>, Portions);
2525

2626
TPathIdData() = default;
2727

@@ -31,7 +31,7 @@ class TPathIdData {
3131
}
3232
PathId = proto.GetPathId();
3333
for (auto&& portionProto : proto.GetPortions()) {
34-
TConclusion<TPortionInfo::TPtr> portion = TPortionInfo::BuildFromProto(portionProto, indexInfo);
34+
TConclusion<TPortionDataAccessor> portion = TPortionDataAccessor::BuildFromProto(portionProto, indexInfo);
3535
if (!portion) {
3636
return portion.GetError();
3737
}
@@ -41,12 +41,12 @@ class TPathIdData {
4141
}
4242

4343
public:
44-
TPathIdData(const ui64 pathId, const std::vector<TPortionInfo::TPtr>& portions)
44+
TPathIdData(const ui64 pathId, const std::vector<TPortionDataAccessor>& portions)
4545
: PathId(pathId)
4646
, Portions(portions) {
4747
}
4848

49-
std::vector<TPortionInfo::TPtr> DetachPortions() {
49+
std::vector<TPortionDataAccessor> DetachPortions() {
5050
return std::move(Portions);
5151
}
5252
THashMap<TTabletId, TTaskForTablet> BuildLinkTabletTasks(const std::shared_ptr<IStoragesManager>& storages, const TTabletId selfTabletId,
@@ -55,17 +55,17 @@ class TPathIdData {
5555
void InitPortionIds(ui64* lastPortionId, const std::optional<ui64> pathId = {}) {
5656
AFL_VERIFY(lastPortionId);
5757
for (auto&& i : Portions) {
58-
i->SetPortionId(++*lastPortionId);
58+
i.MutablePortionInfo().SetPortionId(++*lastPortionId);
5959
if (pathId) {
60-
i->SetPathId(*pathId);
60+
i.MutablePortionInfo().SetPathId(*pathId);
6161
}
6262
}
6363
}
6464

6565
void SerializeToProto(NKikimrColumnShardDataSharingProto::TPathIdData& proto) const {
6666
proto.SetPathId(PathId);
6767
for (auto&& i : Portions) {
68-
TPortionDataAccessor(i).SerializeToProto(*proto.AddPortions());
68+
i.SerializeToProto(*proto.AddPortions());
6969
}
7070
};
7171

ydb/core/tx/columnshard/data_sharing/destination/session/destination.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ NKikimr::TConclusionStatus TDestinationSession::DataReceived(
1919
auto it = PathIds.find(i.first);
2020
AFL_VERIFY(it != PathIds.end())("path_id_undefined", i.first);
2121
for (auto&& portion : i.second.DetachPortions()) {
22-
portion->SetPathId(it->second);
23-
index.AppendPortion(*portion);
22+
portion.MutablePortionInfo().SetPathId(it->second);
23+
index.AppendPortion(portion.GetPortionInfo());
2424
}
2525
}
2626
return TConclusionStatus::Success();
@@ -161,23 +161,23 @@ NKikimr::TConclusionStatus TDestinationSession::DeserializeCursorFromProto(
161161
}
162162

163163
bool TDestinationSession::DoStart(
164-
const NColumnShard::TColumnShard& shard, const THashMap<ui64, std::vector<std::shared_ptr<TPortionInfo>>>& portions) {
164+
const NColumnShard::TColumnShard& shard, const THashMap<ui64, std::vector<TPortionDataAccessor>>& portions) {
165165
AFL_VERIFY(IsConfirmed());
166166
NYDBTest::TControllers::GetColumnShardController()->OnDataSharingStarted(shard.TabletID(), GetSessionId());
167167
THashMap<TString, THashSet<TUnifiedBlobId>> local;
168168
for (auto&& i : portions) {
169169
for (auto&& p : i.second) {
170-
TPortionDataAccessor(p).FillBlobIdsByStorage(local, shard.GetIndexAs<TColumnEngineForLogs>().GetVersionedIndex());
170+
p.FillBlobIdsByStorage(local, shard.GetIndexAs<TColumnEngineForLogs>().GetVersionedIndex());
171171
}
172172
}
173173
std::swap(CurrentBlobIds, local);
174174
SendCurrentCursorAck(shard, {});
175175
return true;
176176
}
177177

178-
bool TDestinationSession::TryTakePortionBlobs(const TVersionedIndex& vIndex, const TPortionInfo::TConstPtr& portion) {
178+
bool TDestinationSession::TryTakePortionBlobs(const TVersionedIndex& vIndex, const TPortionDataAccessor& portion) {
179179
THashMap<TString, THashSet<TUnifiedBlobId>> blobIds;
180-
TPortionDataAccessor(portion).FillBlobIdsByStorage(blobIds, vIndex);
180+
portion.FillBlobIdsByStorage(blobIds, vIndex);
181181
ui32 containsCounter = 0;
182182
ui32 newCounter = 0;
183183
for (auto&& i : blobIds) {

ydb/core/tx/columnshard/data_sharing/destination/session/destination.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class TDestinationSession: public TCommonSession {
7878
THashMap<TString, THashSet<TUnifiedBlobId>> CurrentBlobIds;
7979

8080
protected:
81-
virtual bool DoStart(const NColumnShard::TColumnShard& shard, const THashMap<ui64, std::vector<std::shared_ptr<TPortionInfo>>>& portions) override;
81+
virtual bool DoStart(const NColumnShard::TColumnShard& shard, const THashMap<ui64, std::vector<TPortionDataAccessor>>& portions) override;
8282
virtual THashSet<ui64> GetPathIdsForStart() const override {
8383
THashSet<ui64> result;
8484
for (auto&& i : PathIds) {
@@ -88,7 +88,7 @@ class TDestinationSession: public TCommonSession {
8888
}
8989

9090
public:
91-
bool TryTakePortionBlobs(const TVersionedIndex& vIndex, const std::shared_ptr<const TPortionInfo>& portion);
91+
bool TryTakePortionBlobs(const TVersionedIndex& vIndex, const TPortionDataAccessor& portion);
9292

9393
TSourceCursorForDestination& GetCursorVerified(const TTabletId& tabletId) {
9494
auto it = Cursors.find(tabletId);

ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ bool TTxDataFromSource::DoExecute(NTabletFlatExecutor::TTransactionContext& txc,
2121
THashMap<TString, THashSet<NBlobCache::TUnifiedBlobId>> sharedBlobIds;
2222
for (auto&& i : PortionsByPathId) {
2323
for (auto&& p : i.second.GetPortions()) {
24-
TPortionDataAccessor(p).SaveToDatabase(dbWrapper, schemaPtr->GetIndexInfo().GetPKFirstColumnId(), false);
24+
p.SaveToDatabase(dbWrapper, schemaPtr->GetIndexInfo().GetPKFirstColumnId(), false);
2525
}
2626
}
2727
NIceDb::TNiceDb db(txc.DB);
@@ -47,7 +47,7 @@ TTxDataFromSource::TTxDataFromSource(NColumnShard::TColumnShard* self, const std
4747
++p;
4848
} else {
4949
i.second.MutablePortions()[p] = std::move(i.second.MutablePortions().back());
50-
i.second.MutablePortions()[p]->ResetShardingVersion();
50+
i.second.MutablePortions()[p].MutablePortionInfo().ResetShardingVersion();
5151
i.second.MutablePortions().pop_back();
5252
}
5353
}

ydb/core/tx/columnshard/data_sharing/source/session/cursor.cpp

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,16 @@ void TSourceCursor::BuildSelection(const std::shared_ptr<IStoragesManager>& stor
1818
ui32 chunksCount = 0;
1919
bool selectMore = true;
2020
for (; itCurrentPath != PortionsForSend.end() && selectMore; ++itCurrentPath) {
21-
std::vector<TPortionInfo::TPtr> portions;
21+
std::vector<TPortionDataAccessor> portions;
2222
for (; itPortion != itCurrentPath->second.end(); ++itPortion) {
2323
selectMore = (count < 10000 && chunksCount < 1000000);
2424
if (!selectMore) {
2525
NextPathId = itCurrentPath->first;
2626
NextPortionId = itPortion->first;
2727
} else {
2828
portions.emplace_back(itPortion->second);
29-
chunksCount += TPortionDataAccessor(portions.back()).GetRecords().size();
30-
chunksCount += TPortionDataAccessor(portions.back()).GetIndexes().size();
29+
chunksCount += portions.back().GetRecords().size();
30+
chunksCount += portions.back().GetIndexes().size();
3131
++count;
3232
}
3333
}
@@ -158,16 +158,15 @@ void TSourceCursor::SaveToDatabase(NIceDb::TNiceDb& db, const TString& sessionId
158158
}
159159

160160
bool TSourceCursor::Start(const std::shared_ptr<IStoragesManager>& storagesManager,
161-
const THashMap<ui64, std::vector<std::shared_ptr<TPortionInfo>>>& portions, const TVersionedIndex& index) {
161+
const THashMap<ui64, std::vector<TPortionDataAccessor>>& portions, const TVersionedIndex& index) {
162162
AFL_VERIFY(!IsStartedFlag);
163-
std::map<ui64, std::map<ui32, std::shared_ptr<TPortionInfo>>> local;
164-
std::vector<std::shared_ptr<TPortionInfo>> portionsLock;
163+
std::map<ui64, std::map<ui32, TPortionDataAccessor>> local;
165164
NArrow::NHash::NXX64::TStreamStringHashCalcer hashCalcer(0);
166165
for (auto&& i : portions) {
167166
hashCalcer.Start();
168-
std::map<ui32, std::shared_ptr<TPortionInfo>> portionsMap;
167+
std::map<ui32, TPortionDataAccessor> portionsMap;
169168
for (auto&& p : i.second) {
170-
const ui64 portionId = p->GetPortionId();
169+
const ui64 portionId = p.GetPortionInfo().GetPortionId();
171170
hashCalcer.Update((ui8*)&portionId, sizeof(portionId));
172171
AFL_VERIFY(portionsMap.emplace(portionId, p).second);
173172
}

0 commit comments

Comments
 (0)