Skip to content

Commit 2e0cd14

Browse files
Merge c308968 into 8561c6b
2 parents 8561c6b + c308968 commit 2e0cd14

23 files changed

+195
-149
lines changed

ydb/core/tx/columnshard/columnshard__write.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
263263
<< Counters.GetWritesMonitor()->DebugString() << " at tablet " << TabletID());
264264
writeData.MutableWriteMeta().SetWriteMiddle1StartInstant(TMonotonic::Now());
265265
std::shared_ptr<NConveyor::ITask> task = std::make_shared<NOlap::TBuildBatchesTask>(
266-
TabletID(), SelfId(), BufferizationWriteActorId, std::move(writeData), snapshotSchema, GetLastTxSnapshot());
266+
TabletID(), SelfId(), BufferizationWriteActorId, std::move(writeData), snapshotSchema, GetLastTxSnapshot(), Counters.GetCSCounters().WritingCounters);
267267
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task);
268268
}
269269
}

ydb/core/tx/columnshard/counters/columnshard.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ namespace NKikimr::NColumnShard {
88

99
TCSCounters::TCSCounters()
1010
: TBase("CS")
11+
, WritingCounters(std::make_shared<TWriteCounters>(*this))
1112
, Initialization(*this)
1213
, TxProgress(*this) {
1314
StartBackgroundCount = TBase::GetDeriviative("StartBackground/Count");

ydb/core/tx/columnshard/counters/columnshard.h

+25-6
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,29 @@ enum class EWriteFailReason {
2121
OverlimitReadBlobMemory /* "overlimit_read_blob_memory" */
2222
};
2323

24+
class TWriteCounters: public TCommonCountersOwner {
25+
private:
26+
using TBase = TCommonCountersOwner;
27+
NMonitoring::TDynamicCounters::TCounterPtr VolumeWriteData;
28+
NMonitoring::THistogramPtr HistogramBytesWriteDataCount;
29+
NMonitoring::THistogramPtr HistogramBytesWriteDataBytes;
30+
31+
public:
32+
TWriteCounters(TCommonCountersOwner& owner)
33+
: TBase(owner, "activity", "writing")
34+
{
35+
VolumeWriteData = TBase::GetDeriviative("Write/Incoming/Bytes");
36+
HistogramBytesWriteDataCount = TBase::GetHistogram("Write/Incoming/ByBytes/Count", NMonitoring::ExponentialHistogram(18, 2, 100));
37+
HistogramBytesWriteDataBytes = TBase::GetHistogram("Write/Incoming/ByBytes/Bytes", NMonitoring::ExponentialHistogram(18, 2, 100));
38+
}
39+
40+
void OnIncomingData(const ui64 dataSize) const {
41+
VolumeWriteData->Add(dataSize);
42+
HistogramBytesWriteDataCount->Collect((i64)dataSize, 1);
43+
HistogramBytesWriteDataBytes->Collect((i64)dataSize, dataSize);
44+
}
45+
};
46+
2447
class TCSCounters: public TCommonCountersOwner {
2548
private:
2649
using TBase = TCommonCountersOwner;
@@ -72,7 +95,9 @@ class TCSCounters: public TCommonCountersOwner {
7295
NMonitoring::TDynamicCounters::TCounterPtr WriteRequests;
7396
THashMap<EWriteFailReason, NMonitoring::TDynamicCounters::TCounterPtr> FailedWriteRequests;
7497
NMonitoring::TDynamicCounters::TCounterPtr SuccessWriteRequests;
98+
7599
public:
100+
const std::shared_ptr<TWriteCounters> WritingCounters;
76101
const TCSInitialization Initialization;
77102
TTxProgressCounters TxProgress;
78103

@@ -89,7 +114,6 @@ class TCSCounters: public TCommonCountersOwner {
89114

90115
void OnWritePutBlobsSuccess(const TDuration d) const {
91116
HistogramSuccessWritePutBlobsDurationMs->Collect(d.MilliSeconds());
92-
WritePutBlobsCount->Sub(1);
93117
}
94118

95119
void OnWriteMiddle1PutBlobsSuccess(const TDuration d) const {
@@ -118,11 +142,6 @@ class TCSCounters: public TCommonCountersOwner {
118142

119143
void OnWritePutBlobsFail(const TDuration d) const {
120144
HistogramFailedWritePutBlobsDurationMs->Collect(d.MilliSeconds());
121-
WritePutBlobsCount->Sub(1);
122-
}
123-
124-
void OnWritePutBlobsStart() const {
125-
WritePutBlobsCount->Add(1);
126145
}
127146

128147
void OnWriteTxComplete(const TDuration d) const {

ydb/core/tx/columnshard/engines/column_engine_logs.cpp

+2-4
Original file line numberDiff line numberDiff line change
@@ -361,8 +361,7 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
361361
limitExceeded = true;
362362
break;
363363
}
364-
const auto inserted = uniquePortions.emplace(info->GetAddress()).second;
365-
Y_ABORT_UNLESS(inserted);
364+
AFL_VERIFY(uniquePortions.emplace(info->GetAddress()).second);
366365
changes->PortionsToDrop.push_back(*info);
367366
++portionsFromDrop;
368367
}
@@ -381,8 +380,7 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
381380
++i;
382381
continue;
383382
}
384-
const auto inserted = uniquePortions.emplace(it->second[i].GetAddress()).second;
385-
if (inserted) {
383+
if (uniquePortions.emplace(it->second[i].GetAddress()).second) {
386384
AFL_VERIFY(it->second[i].CheckForCleanup(snapshot))("p_snapshot", it->second[i].GetRemoveSnapshotOptional())("snapshot", snapshot);
387385
if (txSize + it->second[i].GetTxVolume() < txSizeLimit || changes->PortionsToDrop.empty()) {
388386
txSize += it->second[i].GetTxVolume();

ydb/core/tx/columnshard/engines/insert_table/insert_table.cpp

+5-2
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const std::optional<
123123
}
124124

125125
std::vector<TCommittedBlob> result;
126-
result.reserve(pInfo->GetCommitted().size() + pInfo->GetInserted().size());
126+
result.reserve(pInfo->GetCommitted().size() + Summary.GetInserted().size());
127127

128128
for (const auto& data : pInfo->GetCommitted()) {
129129
if (lockId || data.GetSnapshot() <= reqSnapshot) {
@@ -137,7 +137,10 @@ std::vector<TCommittedBlob> TInsertTable::Read(ui64 pathId, const std::optional<
137137
}
138138
}
139139
if (lockId) {
140-
for (const auto& [writeId, data] : pInfo->GetInserted()) {
140+
for (const auto& [writeId, data] : Summary.GetInserted()) {
141+
if (data.GetPathId() != pathId) {
142+
continue;
143+
}
141144
auto start = data.GetMeta().GetFirstPK(pkSchema);
142145
auto finish = data.GetMeta().GetLastPK(pkSchema);
143146
if (pkRangesFilter && pkRangesFilter->IsPortionInPartialUsage(start, finish) == TPKRangeFilter::EUsageClass::DontUsage) {

ydb/core/tx/columnshard/engines/insert_table/insert_table.h

+1-5
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,6 @@ namespace NKikimr::NOlap {
1212
class TPKRangesFilter;
1313
class IDbWrapper;
1414

15-
/// Use one table for inserted and committed blobs:
16-
/// !Commited => {PlanStep, WriteTxId} are {0, WriteId}
17-
/// Commited => {PlanStep, WriteTxId} are {PlanStep, TxId}
18-
1915
class TInsertTableAccessor {
2016
protected:
2117
TInsertionSummary Summary;
@@ -76,7 +72,7 @@ class TInsertTableAccessor {
7672
const THashMap<TInsertWriteId, TInsertedData>& GetAborted() const {
7773
return Summary.GetAborted();
7874
}
79-
const THashMap<TInsertWriteId, TInsertedData>& GetInserted() const {
75+
const TInsertedContainer& GetInserted() const {
8076
return Summary.GetInserted();
8177
}
8278
const TInsertionSummary::TCounters& GetCountersPrepared() const {

ydb/core/tx/columnshard/engines/insert_table/path_info.cpp

-4
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,4 @@ NKikimr::NOlap::TPathInfoIndexPriority TPathInfo::GetIndexationPriority() const
7272
}
7373
}
7474

75-
const THashMap<TInsertWriteId, TInsertedData>& TPathInfo::GetInserted() const {
76-
return Summary->GetInserted();
77-
}
78-
7975
}

ydb/core/tx/columnshard/engines/insert_table/path_info.h

-2
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,6 @@ class TPathInfo: public TMoveOnly {
5757
return Committed.empty() && !InsertedSize;
5858
}
5959

60-
const THashMap<TInsertWriteId, TInsertedData>& GetInserted() const;
61-
6260
void AddInsertedSize(const i64 size, const ui64 overloadLimit);
6361

6462
explicit TPathInfo(TInsertionSummary& summary, const ui64 pathId);

ydb/core/tx/columnshard/engines/insert_table/rt_insertion.cpp

+10-51
Original file line numberDiff line numberDiff line change
@@ -89,37 +89,8 @@ void TInsertionSummary::OnEraseInserted(TPathInfo& pathInfo, const ui64 dataSize
8989
AFL_VERIFY(Counters.Inserted.GetDataSize() == (i64)StatsPrepared.Bytes);
9090
}
9191

92-
THashSet<TInsertWriteId> TInsertionSummary::GetInsertedByPathId(const ui64 pathId) const {
93-
THashSet<TInsertWriteId> result;
94-
for (auto& [writeId, data] : Inserted) {
95-
if (data.GetPathId() == pathId) {
96-
result.insert(writeId);
97-
}
98-
}
99-
100-
return result;
101-
}
102-
10392
THashSet<TInsertWriteId> TInsertionSummary::GetExpiredInsertions(const TInstant timeBorder, const ui64 limit) const {
104-
if (timeBorder < MinInsertedTs) {
105-
return {};
106-
}
107-
108-
THashSet<TInsertWriteId> toAbort;
109-
TInstant newMin = TInstant::Max();
110-
for (auto& [writeId, data] : Inserted) {
111-
const TInstant dataInsertTs = data.GetMeta().GetDirtyWriteTime();
112-
if (data.IsNotAbortable()) {
113-
continue;
114-
}
115-
if (dataInsertTs < timeBorder && toAbort.size() < limit) {
116-
toAbort.insert(writeId);
117-
} else {
118-
newMin = Min(newMin, dataInsertTs);
119-
}
120-
}
121-
MinInsertedTs = (toAbort.size() == Inserted.size()) ? TInstant::Zero() : newMin;
122-
return toAbort;
93+
return Inserted.GetExpired(timeBorder, limit);
12394
}
12495

12596
bool TInsertionSummary::EraseAborted(const TInsertWriteId writeId) {
@@ -173,33 +144,21 @@ const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddAborted(TInsertedData
173144
}
174145

175146
std::optional<NKikimr::NOlap::TInsertedData> TInsertionSummary::ExtractInserted(const TInsertWriteId id) {
176-
auto it = Inserted.find(id);
177-
if (it == Inserted.end()) {
178-
return {};
179-
} else {
180-
auto pathInfo = GetPathInfoOptional(it->second.GetPathId());
147+
auto result = Inserted.ExtractOptional(id);
148+
if (result) {
149+
auto pathInfo = GetPathInfoOptional(result->GetPathId());
181150
if (pathInfo) {
182-
OnEraseInserted(*pathInfo, it->second.BlobSize());
151+
OnEraseInserted(*pathInfo, result->BlobSize());
183152
}
184-
std::optional<TInsertedData> result = std::move(it->second);
185-
Inserted.erase(it);
186-
return result;
187153
}
154+
return result;
188155
}
189156

190157
const NKikimr::NOlap::TInsertedData* TInsertionSummary::AddInserted(TInsertedData&& data, const bool load /*= false*/) {
191-
const TInsertWriteId writeId = data.GetInsertWriteId();
192-
const ui32 dataSize = data.BlobSize();
193-
const ui64 pathId = data.GetPathId();
194-
auto insertInfo = Inserted.emplace(writeId, std::move(data));
195-
AFL_VERIFY_DEBUG(!Aborted.contains(writeId));
196-
if (insertInfo.second) {
197-
OnNewInserted(GetPathInfo(pathId), dataSize, load);
198-
return &insertInfo.first->second;
199-
} else {
200-
Counters.Inserted.SkipAdd(dataSize);
201-
return nullptr;
202-
}
158+
auto* insertInfo = Inserted.AddVerified(std::move(data));
159+
AFL_VERIFY_DEBUG(!Aborted.contains(insertInfo->GetInsertWriteId()));
160+
OnNewInserted(GetPathInfo(insertInfo->GetPathId()), insertInfo->BlobSize(), load);
161+
return insertInfo;
203162
}
204163

205164
}

ydb/core/tx/columnshard/engines/insert_table/rt_insertion.h

+113-8
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,114 @@
88

99
namespace NKikimr::NOlap {
1010
class IBlobsDeclareRemovingAction;
11+
12+
class TInsertedDataInstant {
13+
private:
14+
const TInsertedData* Data;
15+
const TInstant WriteTime;
16+
17+
public:
18+
TInsertedDataInstant(const TInsertedData& data)
19+
: Data(&data)
20+
, WriteTime(Data->GetMeta().GetDirtyWriteTime())
21+
{
22+
23+
}
24+
25+
const TInsertedData& GetData() const {
26+
return *Data;
27+
}
28+
TInstant GetWriteTime() const {
29+
return WriteTime;
30+
}
31+
32+
bool operator<(const TInsertedDataInstant& item) const {
33+
if (WriteTime == item.WriteTime) {
34+
return Data->GetInsertWriteId() < item.Data->GetInsertWriteId();
35+
} else {
36+
return WriteTime < item.WriteTime;
37+
}
38+
}
39+
};
40+
41+
class TInsertedContainer {
42+
private:
43+
THashMap<TInsertWriteId, TInsertedData> Inserted;
44+
std::set<TInsertedDataInstant> InsertedByWriteTime;
45+
46+
public:
47+
size_t size() const {
48+
return Inserted.size();
49+
}
50+
51+
bool contains(const TInsertWriteId id) const {
52+
return Inserted.contains(id);
53+
}
54+
55+
THashMap<TInsertWriteId, TInsertedData>::const_iterator begin() const {
56+
return Inserted.begin();
57+
}
58+
59+
THashMap<TInsertWriteId, TInsertedData>::const_iterator end() const {
60+
return Inserted.end();
61+
}
62+
63+
THashSet<TInsertWriteId> GetExpired(const TInstant timeBorder, const ui64 limit) const {
64+
THashSet<TInsertWriteId> result;
65+
for (auto& data : InsertedByWriteTime) {
66+
if (timeBorder < data.GetWriteTime()) {
67+
break;
68+
}
69+
if (data.GetData().IsNotAbortable()) {
70+
continue;
71+
}
72+
result.emplace(data.GetData().GetInsertWriteId());
73+
if (limit <= result.size()) {
74+
break;
75+
}
76+
}
77+
return result;
78+
}
79+
80+
TInsertedData* AddVerified(TInsertedData&& data) {
81+
const TInsertWriteId writeId = data.GetInsertWriteId();
82+
auto itInsertion = Inserted.emplace(writeId, std::move(data));
83+
AFL_VERIFY(itInsertion.second);
84+
auto* dataPtr = &itInsertion.first->second;
85+
InsertedByWriteTime.emplace(TInsertedDataInstant(*dataPtr));
86+
return dataPtr;
87+
}
88+
89+
const TInsertedData* GetOptional(const TInsertWriteId id) const {
90+
auto it = Inserted.find(id);
91+
if (it == Inserted.end()) {
92+
return nullptr;
93+
} else {
94+
return &it->second;
95+
}
96+
}
97+
98+
TInsertedData* MutableOptional(const TInsertWriteId id) {
99+
auto it = Inserted.find(id);
100+
if (it == Inserted.end()) {
101+
return nullptr;
102+
} else {
103+
return &it->second;
104+
}
105+
}
106+
107+
std::optional<TInsertedData> ExtractOptional(const TInsertWriteId id) {
108+
auto it = Inserted.find(id);
109+
if (it == Inserted.end()) {
110+
return std::nullopt;
111+
}
112+
AFL_VERIFY(InsertedByWriteTime.erase(TInsertedDataInstant(it->second)));
113+
TInsertedData result = std::move(it->second);
114+
Inserted.erase(it);
115+
return result;
116+
}
117+
};
118+
11119
class TInsertionSummary {
12120
public:
13121
struct TCounters {
@@ -22,9 +130,8 @@ class TInsertionSummary {
22130
TCounters StatsCommitted;
23131
const NColumnShard::TInsertTableCounters Counters;
24132

25-
THashMap<TInsertWriteId, TInsertedData> Inserted;
133+
TInsertedContainer Inserted;
26134
THashMap<TInsertWriteId, TInsertedData> Aborted;
27-
mutable TInstant MinInsertedTs = TInstant::Zero();
28135

29136
std::map<TPathInfoIndexPriority, std::set<const TPathInfo*>> Priorities;
30137
THashMap<ui64, TPathInfo> PathInfo;
@@ -57,18 +164,16 @@ class TInsertionSummary {
57164
}
58165

59166
void MarkAsNotAbortable(const TInsertWriteId writeId) {
60-
auto it = Inserted.find(writeId);
61-
if (it == Inserted.end()) {
167+
auto* data = Inserted.MutableOptional(writeId);
168+
if (!data) {
62169
return;
63170
}
64-
it->second.MarkAsNotAbortable();
171+
data->MarkAsNotAbortable();
65172
}
66173

67-
THashSet<TInsertWriteId> GetInsertedByPathId(const ui64 pathId) const;
68-
69174
THashSet<TInsertWriteId> GetExpiredInsertions(const TInstant timeBorder, const ui64 limit) const;
70175

71-
const THashMap<TInsertWriteId, TInsertedData>& GetInserted() const {
176+
const TInsertedContainer& GetInserted() const {
72177
return Inserted;
73178
}
74179
const THashMap<TInsertWriteId, TInsertedData>& GetAborted() const {

ydb/core/tx/columnshard/engines/portions/constructor_meta.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ TPortionMeta TPortionMetaConstructor::Build() {
5151

5252
bool TPortionMetaConstructor::LoadMetadata(const NKikimrTxColumnShard::TIndexPortionMeta& portionMeta, const TIndexInfo& indexInfo) {
5353
if (!!Produced) {
54-
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "DeserializeFromProto")("error", "parsing duplication");
54+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "DeserializeFromProto")("error", "parsing duplication");
5555
return true;
5656
}
5757
if (portionMeta.GetTierName()) {

0 commit comments

Comments
 (0)