Skip to content

Commit a831473

Browse files
buffered writing for columnshards
1 parent 0e45751 commit a831473

34 files changed

+638
-234
lines changed

ydb/core/protos/config.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1422,6 +1422,7 @@ message TColumnShardConfig {
14221422
optional bool IndexationEnabled = 4 [default = true];
14231423
optional bool CompactionEnabled = 5 [default = true];
14241424
optional bool TTLEnabled = 6 [default = true];
1425+
optional uint32 WritingBufferDurationMs = 7 [default = 0];
14251426
}
14261427

14271428
message TSchemeShardConfig {

ydb/core/tx/columnshard/blobs_action/abstract/read.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,9 @@ void IBlobsReadingAction::OnReadError(const TBlobRange& range, const TErrorStatu
7373
void IBlobsReadingAction::AddRange(const TBlobRange& range, const TString& result /*= Default<TString>()*/) {
7474
Y_ABORT_UNLESS(!Started);
7575
if (!result) {
76-
Y_ABORT_UNLESS(RangesForRead[range.BlobId].emplace(range).second);
76+
AFL_VERIFY(RangesForRead[range.BlobId].emplace(range).second)("range", range.ToString());
7777
} else {
78-
Y_ABORT_UNLESS(RangesForResult.emplace(range, result).second);
78+
AFL_VERIFY(RangesForResult.emplace(range, result).second)("range", range.ToString());
7979
}
8080
}
8181

ydb/core/tx/columnshard/blobs_action/counters/write.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ TWriteCounters::TWriteCounters(const TConsumerCounters& owner)
1313
ReplyBytes = TBase::GetDeriviative("Replies/Bytes");
1414
ReplyDurationBySize = TBase::GetHistogram("Replies/Duration/Bytes", NMonitoring::ExponentialHistogram(15, 2, 1));
1515
ReplyDurationByCount = TBase::GetHistogram("Replies/Duration/Count", NMonitoring::ExponentialHistogram(15, 2, 1));
16+
WritesBySize = TBase::GetHistogram("Writes/Bytes", NMonitoring::ExponentialHistogram(25, 2, 1));
17+
VolumeByChunkSize = TBase::GetHistogram("Volume/Bytes", NMonitoring::ExponentialHistogram(25, 2, 1));
1618

1719
FailsCount = TBase::GetDeriviative("Fails/Count");
1820
FailBytes = TBase::GetDeriviative("Fails/Bytes");

ydb/core/tx/columnshard/blobs_action/counters/write.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ class TWriteCounters: public NColumnShard::TCommonCountersOwner {
1616
NMonitoring::TDynamicCounters::TCounterPtr ReplyBytes;
1717
NMonitoring::THistogramPtr ReplyDurationByCount;
1818
NMonitoring::THistogramPtr ReplyDurationBySize;
19+
NMonitoring::THistogramPtr WritesBySize;
20+
NMonitoring::THistogramPtr VolumeByChunkSize;
1921

2022
NMonitoring::TDynamicCounters::TCounterPtr FailsCount;
2123
NMonitoring::TDynamicCounters::TCounterPtr FailBytes;
@@ -34,6 +36,8 @@ class TWriteCounters: public NColumnShard::TCommonCountersOwner {
3436
ReplyBytes->Add(bytes);
3537
ReplyDurationByCount->Collect((i64)d.MilliSeconds());
3638
ReplyDurationBySize->Collect((i64)d.MilliSeconds(), (i64)bytes);
39+
WritesBySize->Collect((i64)bytes);
40+
VolumeByChunkSize->Collect((i64)bytes, (i64)bytes);
3741
}
3842

3943
void OnFail(const ui64 bytes, const TDuration d) const {

ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,7 @@ bool TTxInsertTableCleanup::Execute(TTransactionContext& txc, const TActorContex
1414
auto storage = Self->StoragesManager->GetInsertOperator();
1515
BlobsAction = storage->StartDeclareRemovingAction("TX_CLEANUP");
1616
for (auto& [abortedWriteId, abortedData] : allAborted) {
17-
Self->InsertTable->EraseAborted(dbTable, abortedData);
18-
Y_ABORT_UNLESS(abortedData.GetBlobRange().IsFullBlob());
19-
BlobsAction->DeclareRemove(abortedData.GetBlobRange().GetBlobId());
17+
Self->InsertTable->EraseAborted(dbTable, abortedData, BlobsAction);
2018
}
2119
BlobsAction->OnExecuteTxAfterRemoving(*Self, blobManagerDb, true);
2220
return true;
Lines changed: 77 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,27 @@
11
#include "tx_write.h"
22

33
namespace NKikimr::NColumnShard {
4-
bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWriteBlobsResult::TPutBlobData& blobData, const TWriteId writeId, const TString& blob) {
5-
const NKikimrTxColumnShard::TLogicalMetadata& meta = blobData.GetLogicalMeta();
6-
7-
const auto& blobRange = blobData.GetBlobRange();
4+
bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId) {
5+
NKikimrTxColumnShard::TLogicalMetadata meta;
6+
meta.SetNumRows(batch->GetRowsCount());
7+
meta.SetRawBytes(batch->GetRawBytes());
8+
meta.SetDirtyWriteTimeSeconds(batch.GetStartInstant().Seconds());
9+
meta.SetSpecialKeysRawData(batch->GetSpecialKeysSafe().SerializeToString());
10+
11+
const auto& blobRange = batch.GetRange();
812
Y_ABORT_UNLESS(blobRange.GetBlobId().IsValid());
913

1014
// First write wins
1115
TBlobGroupSelector dsGroupSelector(Self->Info());
1216
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
1317

14-
const auto& writeMeta(PutBlobResult->Get()->GetWriteMeta());
15-
16-
auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(PutBlobResult->Get()->GetSchemaVersion());
18+
const auto& writeMeta = batch.GetAggregation().GetWriteData()->GetWriteMeta();
19+
auto schemeVersion = batch.GetAggregation().GetWriteData()->GetData()->GetSchemaVersion();
20+
auto tableSchema = Self->TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(schemeVersion);
1721

18-
NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange, meta, tableSchema->GetVersion(), blob);
22+
NOlap::TInsertedData insertData((ui64)writeId, writeMeta.GetTableId(), writeMeta.GetDedupId(), blobRange, meta, tableSchema->GetVersion(), batch->GetData());
1923
bool ok = Self->InsertTable->Insert(dbTable, std::move(insertData));
2024
if (ok) {
21-
// Put new data into blob cache
22-
Y_ABORT_UNLESS(blobRange.IsFullBlob());
23-
2425
Self->UpdateInsertTableCounters();
2526
return true;
2627
}
@@ -31,62 +32,89 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWrit
3132
bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
3233
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "execute");
3334
ACFL_DEBUG("event", "start_execute");
34-
const auto& writeMeta(PutBlobResult->Get()->GetWriteMeta());
35-
Y_ABORT_UNLESS(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId()));
36-
37-
txc.DB.NoMoreReadsForTx();
38-
TWriteOperation::TPtr operation;
39-
if (writeMeta.HasLongTxId()) {
40-
AFL_VERIFY(PutBlobResult->Get()->GetBlobData().size() == 1)("count", PutBlobResult->Get()->GetBlobData().size());
41-
} else {
42-
operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId());
43-
Y_ABORT_UNLESS(operation);
44-
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
45-
}
35+
const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer();
36+
for (auto&& aggr : buffer.GetAggregations()) {
37+
const auto& writeMeta = aggr->GetWriteData()->GetWriteMeta();
38+
Y_ABORT_UNLESS(Self->TablesManager.IsReadyForWrite(writeMeta.GetTableId()));
39+
txc.DB.NoMoreReadsForTx();
40+
TWriteOperation::TPtr operation;
41+
if (writeMeta.HasLongTxId()) {
42+
AFL_VERIFY(aggr->GetSplittedBlobs().size() == 1)("count", aggr->GetSplittedBlobs().size());
43+
} else {
44+
operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId());
45+
Y_ABORT_UNLESS(operation);
46+
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
47+
}
4648

47-
TVector<TWriteId> writeIds;
48-
for (auto blobData : PutBlobResult->Get()->GetBlobData()) {
4949
auto writeId = TWriteId(writeMeta.GetWriteId());
50-
if (operation) {
51-
writeId = Self->BuildNextWriteId(txc);
52-
} else {
50+
if (!operation) {
5351
NIceDb::TNiceDb db(txc.DB);
5452
writeId = Self->GetLongTxWrite(db, writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId());
53+
aggr->AddWriteId(writeId);
5554
}
5655

57-
if (!InsertOneBlob(txc, blobData, writeId, PutBlobResult->Get()->GetBlobVerified(blobData.GetBlobRange()))) {
58-
LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << (ui64)writeId << TxSuffix());
59-
Self->IncCounter(COUNTER_WRITE_DUPLICATE);
56+
for (auto&& i : aggr->GetSplittedBlobs()) {
57+
if (operation) {
58+
writeId = Self->BuildNextWriteId(txc);
59+
aggr->AddWriteId(writeId);
60+
}
61+
62+
if (!InsertOneBlob(txc, i, writeId)) {
63+
LOG_S_DEBUG(TxPrefix() << "duplicate writeId " << (ui64)writeId << TxSuffix());
64+
Self->IncCounter(COUNTER_WRITE_DUPLICATE);
65+
}
6066
}
61-
writeIds.push_back(writeId);
6267
}
6368

6469
TBlobManagerDb blobManagerDb(txc.DB);
65-
AFL_VERIFY(PutBlobResult->Get()->GetActions().size() == 1);
66-
AFL_VERIFY(PutBlobResult->Get()->GetActions().front()->GetBlobsCount() == PutBlobResult->Get()->GetBlobData().size());
67-
for (auto&& i : PutBlobResult->Get()->GetActions()) {
70+
AFL_VERIFY(buffer.GetAddActions().size() == 1);
71+
for (auto&& i : buffer.GetAddActions()) {
6872
i->OnExecuteTxAfterWrite(*Self, blobManagerDb, true);
6973
}
70-
71-
if (operation) {
72-
operation->OnWriteFinish(txc, writeIds);
73-
auto txInfo = Self->ProgressTxController->RegisterTxWithDeadline(operation->GetTxId(), NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, "", writeMeta.GetSource(), 0, txc);
74-
Y_UNUSED(txInfo);
75-
NEvents::TDataEvents::TCoordinatorInfo tInfo = Self->ProgressTxController->GetCoordinatorInfo(operation->GetTxId());
76-
Result = NEvents::TDataEvents::TEvWriteResult::BuildPrepared(Self->TabletID(), operation->GetTxId(), tInfo);
77-
} else {
78-
Y_ABORT_UNLESS(writeIds.size() == 1);
79-
Result = std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, (ui64)writeIds.front(), NKikimrTxColumnShard::EResultStatus::SUCCESS);
74+
for (auto&& i : buffer.GetRemoveActions()) {
75+
i->OnExecuteTxAfterRemoving(*Self, blobManagerDb, true);
76+
}
77+
for (auto&& aggr : buffer.GetAggregations()) {
78+
const auto& writeMeta = aggr->GetWriteData()->GetWriteMeta();
79+
std::unique_ptr<TEvColumnShard::TEvWriteResult> result;
80+
TWriteOperation::TPtr operation;
81+
if (!writeMeta.HasLongTxId()) {
82+
operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId());
83+
Y_ABORT_UNLESS(operation);
84+
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
85+
}
86+
if (operation) {
87+
operation->OnWriteFinish(txc, aggr->GetWriteIds());
88+
auto txInfo = Self->ProgressTxController->RegisterTxWithDeadline(operation->GetTxId(), NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, "", writeMeta.GetSource(), 0, txc);
89+
Y_UNUSED(txInfo);
90+
NEvents::TDataEvents::TCoordinatorInfo tInfo = Self->ProgressTxController->GetCoordinatorInfo(operation->GetTxId());
91+
Results.emplace_back(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(Self->TabletID(), operation->GetTxId(), tInfo));
92+
} else {
93+
Y_ABORT_UNLESS(aggr->GetWriteIds().size() == 1);
94+
Results.emplace_back(std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, (ui64)aggr->GetWriteIds().front(), NKikimrTxColumnShard::EResultStatus::SUCCESS));
95+
}
8096
}
8197
return true;
8298
}
8399

84100
void TTxWrite::Complete(const TActorContext& ctx) {
85101
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "complete");
86-
AFL_VERIFY(Result);
87-
Self->CSCounters.OnWriteTxComplete((TMonotonic::Now() - PutBlobResult->Get()->GetWriteMeta().GetWriteStartInstant()).MilliSeconds());
88-
Self->CSCounters.OnSuccessWriteResponse();
89-
ctx.Send(PutBlobResult->Get()->GetWriteMeta().GetSource(), Result.release());
102+
const auto now = TMonotonic::Now();
103+
const NOlap::TWritingBuffer& buffer = PutBlobResult->Get()->MutableWritesBuffer();
104+
for (auto&& i : buffer.GetAddActions()) {
105+
i->OnCompleteTxAfterWrite(*Self);
106+
}
107+
for (auto&& i : buffer.GetRemoveActions()) {
108+
i->OnCompleteTxAfterRemoving(*Self);
109+
}
110+
AFL_VERIFY(buffer.GetAggregations().size() == Results.size());
111+
for (ui32 i = 0; i < buffer.GetAggregations().size(); ++i) {
112+
const auto& writeMeta = buffer.GetAggregations()[i]->GetWriteData()->GetWriteMeta();
113+
ctx.Send(writeMeta.GetSource(), Results[i].release());
114+
Self->CSCounters.OnWriteTxComplete((now - writeMeta.GetWriteStartInstant()).MilliSeconds());
115+
Self->CSCounters.OnSuccessWriteResponse();
116+
}
117+
90118
}
91119

92120
}

ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22
#include <ydb/core/tx/columnshard/columnshard_impl.h>
3+
#include <ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h>
34

45
namespace NKikimr::NColumnShard {
56

@@ -15,12 +16,12 @@ class TTxWrite : public TTransactionBase<TColumnShard> {
1516
void Complete(const TActorContext& ctx) override;
1617
TTxType GetTxType() const override { return TXTYPE_WRITE; }
1718

18-
bool InsertOneBlob(TTransactionContext& txc, const TEvPrivate::TEvWriteBlobsResult::TPutBlobData& blobData, const TWriteId writeId, const TString& blob);
19+
bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId);
1920

2021
private:
2122
TEvPrivate::TEvWriteBlobsResult::TPtr PutBlobResult;
2223
const ui32 TabletTxNo;
23-
std::unique_ptr<NActors::IEventBase> Result;
24+
std::vector<std::unique_ptr<NActors::IEventBase>> Results;
2425

2526
TStringBuilder TxPrefix() const {
2627
return TStringBuilder() << "TxWrite[" << ToString(TabletTxNo) << "] ";

ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ void TTxWriteIndex::Complete(const TActorContext& ctx) {
6464
Self->EnqueueBackgroundActivities(false, TriggerActivity);
6565
}
6666

67-
Self->UpdateResourceMetrics(ctx, Ev->Get()->PutResult->GetResourceUsage());
6867
changes->MutableBlobsAction().OnCompleteTxAfterAction(*Self);
6968
NYDBTest::TControllers::GetColumnShardController()->OnWriteIndexComplete(Self->TabletID(), changes->TypeString());
7069
}

0 commit comments

Comments
 (0)