Skip to content

Commit 5cda1aa

Browse files
Merge 62f23b4 into c92ef24
2 parents c92ef24 + 62f23b4 commit 5cda1aa

File tree

8 files changed

+65
-19
lines changed

8 files changed

+65
-19
lines changed

ydb/core/tx/columnshard/columnshard__write.cpp

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
288288
writeData.MutableWriteMeta().SetWriteMiddle1StartInstant(TMonotonic::Now());
289289

290290
NOlap::TWritingContext context(TabletID(), SelfId(), snapshotSchema, StoragesManager, Counters.GetIndexationCounters().SplitterCounters,
291-
Counters.GetCSCounters().WritingCounters, GetLastTxSnapshot());
291+
Counters.GetCSCounters().WritingCounters, GetLastTxSnapshot(), std::make_shared<TAtomicCounter>(1));
292292
std::shared_ptr<NConveyor::ITask> task =
293293
std::make_shared<NOlap::TBuildBatchesTask>(BufferizationWriteActorId, std::move(writeData), context);
294294
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task);
@@ -460,6 +460,15 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
460460
const auto& record = ev->Get()->Record;
461461
const auto source = ev->Sender;
462462
const auto cookie = ev->Cookie;
463+
464+
if (!TablesManager.GetPrimaryIndex()) {
465+
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
466+
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(
467+
TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, "schema not ready for writing");
468+
ctx.Send(source, result.release(), 0, cookie);
469+
return;
470+
}
471+
463472
const auto behaviourConclusion = TOperationsManager::GetBehaviour(*ev->Get());
464473
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_WRITE)("ev_write", record.DebugString());
465474
if (behaviourConclusion.IsFail()) {
@@ -482,6 +491,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
482491
ctx.Send(source, result.release(), 0, cookie);
483492
};
484493
if (behaviour == EOperationBehaviour::CommitWriteLock) {
494+
TMemoryProfileGuard mpg1("NEvents::TDataEvents::TEvWrite::1");
485495
auto commitOperation = std::make_shared<TCommitOperation>(TabletID());
486496
auto conclusionParse = commitOperation->Parse(*ev->Get());
487497
if (conclusionParse.IsFail()) {
@@ -513,6 +523,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
513523
return;
514524
}
515525

526+
TMemoryProfileGuard mpg2("NEvents::TDataEvents::TEvWrite::2");
516527
if (record.GetOperations().size() != 1) {
517528
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
518529
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(
@@ -521,6 +532,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
521532
return;
522533
}
523534

535+
TMemoryProfileGuard mpg3("NEvents::TDataEvents::TEvWrite::3");
524536
const auto& operation = record.GetOperations()[0];
525537
const std::optional<NEvWrite::EModificationType> mType =
526538
TEnumOperator<NEvWrite::EModificationType>::DeserializeFromProto(operation.GetType());
@@ -535,6 +547,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
535547
return;
536548
}
537549

550+
TMemoryProfileGuard mpg4("NEvents::TDataEvents::TEvWrite::4");
538551
auto schema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaOptional(operation.GetTableId().GetSchemaVersion());
539552
if (!schema) {
540553
sendError("unknown schema version", NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
@@ -550,12 +563,14 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
550563

551564
Counters.GetColumnTablesCounters()->GetPathIdCounter(pathId)->OnWriteEvent();
552565

566+
TMemoryProfileGuard mpg5("NEvents::TDataEvents::TEvWrite::5");
553567
auto arrowData = std::make_shared<TArrowData>(schema);
554568
if (!arrowData->Parse(operation, NEvWrite::TPayloadReader<NEvents::TDataEvents::TEvWrite>(*ev->Get()))) {
555569
sendError("parsing data error", NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
556570
return;
557571
}
558572

573+
TMemoryProfileGuard mpg5_1("NEvents::TDataEvents::TEvWrite::5_1");
559574
auto overloadStatus = CheckOverloaded(pathId);
560575
if (overloadStatus != EOverloadStatus::None) {
561576
std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError(
@@ -564,6 +579,9 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
564579
return;
565580
}
566581

582+
TMemoryProfileGuard mpg6("NEvents::TDataEvents::TEvWrite::6");
583+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("writing_size", arrowData->GetSize())(
584+
"in_flight", Counters.GetWritesMonitor()->GetWritesInFlight())("sizE_in_flight", Counters.GetWritesMonitor()->GetWritesSizeInFlight());
567585
Counters.GetWritesMonitor()->OnStartWrite(arrowData->GetSize());
568586

569587
std::optional<ui32> granuleShardingVersionId;
@@ -578,18 +596,20 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
578596
lockId = record.GetLockTxId();
579597
}
580598

599+
TMemoryProfileGuard mpg7("NEvents::TDataEvents::TEvWrite::7");
581600
if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) {
582601
sendError("writing disabled", NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED);
583602
return;
584603
}
585604

605+
TMemoryProfileGuard mpg8("NEvents::TDataEvents::TEvWrite::8");
586606
OperationsManager->RegisterLock(lockId, Generation());
587607
auto writeOperation = OperationsManager->RegisterOperation(
588608
pathId, lockId, cookie, granuleShardingVersionId, *mType, AppDataVerified().FeatureFlags.GetEnableWritePortionsOnInsert());
589609
Y_ABORT_UNLESS(writeOperation);
590610
writeOperation->SetBehaviour(behaviour);
591611
NOlap::TWritingContext wContext(pathId, SelfId(), schema, StoragesManager, Counters.GetIndexationCounters().SplitterCounters,
592-
Counters.GetCSCounters().WritingCounters, NOlap::TSnapshot::Max());
612+
Counters.GetCSCounters().WritingCounters, NOlap::TSnapshot::Max(), writeOperation->GetActivityChecker());
593613
arrowData->SetSeparationPoints(GetIndexAs<NOlap::TColumnEngineForLogs>().GetGranulePtrVerified(pathId)->GetBucketPositions());
594614
writeOperation->Start(*this, arrowData, source, wContext);
595615
}

ydb/core/tx/columnshard/engines/changes/cleanup_portions.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class TCleanupPortionsColumnEngineChanges: public TColumnEngineChanges,
6868

6969
void AddPortionToRemove(const TPortionInfo::TConstPtr& portion) {
7070
PortionsToRemove.emplace_back(portion);
71+
PortionsToAccess->AddPortion(portion);
7172
}
7273

7374
virtual ui32 GetWritePortionsCount() const override {

ydb/core/tx/columnshard/operations/batch_builder/builder.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,15 @@ void TBuildBatchesTask::ReplyError(const TString& message, const NColumnShard::T
2020
}
2121

2222
TConclusionStatus TBuildBatchesTask::DoExecute(const std::shared_ptr<ITask>& /*taskPtr*/) {
23+
const NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("scope", "TBuildBatchesTask::DoExecute");
24+
if (!Context.IsActive()) {
25+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "abort_external");
26+
ReplyError("writing aborted", NColumnShard::TEvPrivate::TEvWriteBlobsResult::EErrorClass::Internal);
27+
return TConclusionStatus::Fail("writing aborted");
28+
}
2329
TConclusion<std::shared_ptr<arrow::RecordBatch>> batchConclusion = WriteData.GetData()->ExtractBatch();
2430
if (batchConclusion.IsFail()) {
31+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "abort_on_extract")("reason", batchConclusion.GetErrorMessage());
2532
ReplyError(
2633
"cannot extract incoming batch: " + batchConclusion.GetErrorMessage(), NColumnShard::TEvPrivate::TEvWriteBlobsResult::EErrorClass::Internal);
2734
return TConclusionStatus::Fail("cannot extract incoming batch: " + batchConclusion.GetErrorMessage());
@@ -31,6 +38,7 @@ TConclusionStatus TBuildBatchesTask::DoExecute(const std::shared_ptr<ITask>& /*t
3138
auto preparedConclusion =
3239
Context.GetActualSchema()->PrepareForModification(batchConclusion.DetachResult(), WriteData.GetWriteMeta().GetModificationType());
3340
if (preparedConclusion.IsFail()) {
41+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "abort_on_prepare")("reason", preparedConclusion.GetErrorMessage());
3442
ReplyError("cannot prepare incoming batch: " + preparedConclusion.GetErrorMessage(),
3543
NColumnShard::TEvPrivate::TEvWriteBlobsResult::EErrorClass::Request);
3644
return TConclusionStatus::Fail("cannot prepare incoming batch: " + preparedConclusion.GetErrorMessage());

ydb/core/tx/columnshard/operations/common/context.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,26 @@ class TWritingContext {
1515
YDB_READONLY_DEF(std::shared_ptr<NColumnShard::TSplitterCounters>, SplitterCounters);
1616
YDB_READONLY_DEF(std::shared_ptr<NColumnShard::TWriteCounters>, WritingCounters);
1717
YDB_READONLY(TSnapshot, ApplyToSnapshot, TSnapshot::Zero());
18+
const std::shared_ptr<const TAtomicCounter> ActivityChecker;
1819

1920
public:
21+
bool IsActive() const {
22+
return ActivityChecker->Val();
23+
}
24+
2025
TWritingContext(const ui64 tabletId, const NActors::TActorId& tabletActorId, const std::shared_ptr<ISnapshotSchema>& actualSchema,
2126
const std::shared_ptr<IStoragesManager>& operators, const std::shared_ptr<NColumnShard::TSplitterCounters>& splitterCounters,
22-
const std::shared_ptr<NColumnShard::TWriteCounters>& writingCounters, const TSnapshot& applyToSnapshot)
27+
const std::shared_ptr<NColumnShard::TWriteCounters>& writingCounters, const TSnapshot& applyToSnapshot,
28+
const std::shared_ptr<const TAtomicCounter>& activityChecker)
2329
: TabletId(tabletId)
2430
, TabletActorId(tabletActorId)
2531
, ActualSchema(actualSchema)
2632
, StoragesManager(operators)
2733
, SplitterCounters(splitterCounters)
2834
, WritingCounters(writingCounters)
2935
, ApplyToSnapshot(applyToSnapshot)
30-
{
36+
, ActivityChecker(activityChecker) {
37+
AFL_VERIFY(ActivityChecker);
3138
}
3239
};
3340
} // namespace NKikimr::NOlap

ydb/core/tx/columnshard/operations/slice_builder/builder.cpp

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@ std::optional<std::vector<NKikimr::NArrow::TSerializedBatch>> TBuildSlicesTask::
1818
context.SetFieldsForSpecialKeys(WriteData.GetPrimaryKeySchema());
1919
auto splitResult = NArrow::SplitByBlobSize(OriginalBatch, context);
2020
if (splitResult.IsFail()) {
21-
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)(
21+
AFL_INFO(NKikimrServices::TX_COLUMNSHARD_WRITE)(
2222
"event", TStringBuilder() << "cannot split batch in according to limits: " + splitResult.GetErrorMessage());
2323
return {};
2424
}
2525
auto result = splitResult.DetachResult();
2626
if (result.size() > 1) {
2727
for (auto&& i : result) {
28-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "strange_blobs_splitting")("blob", i.DebugString())(
28+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "strange_blobs_splitting")("blob", i.DebugString())(
2929
"original_size", WriteData.GetSize());
3030
}
3131
}
@@ -108,11 +108,15 @@ class TPortionWriteController: public NColumnShard::IWriteController,
108108
};
109109

110110
TConclusionStatus TBuildSlicesTask::DoExecute(const std::shared_ptr<ITask>& /*taskPtr*/) {
111-
NActors::TLogContextGuard g(
112-
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletId)("parent_id", Context.GetTabletActorId()));
111+
const NActors::TLogContextGuard g = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_WRITE)("tablet_id", TabletId)("parent_id",
112+
Context.GetTabletActorId())("write_id", WriteData.GetWriteMeta().GetWriteId())("table_id", WriteData.GetWriteMeta().GetTableId());
113+
if (!Context.IsActive()) {
114+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "abort_execution");
115+
ReplyError("execution aborted", NColumnShard::TEvPrivate::TEvWriteBlobsResult::EErrorClass::Internal);
116+
return TConclusionStatus::Fail("execution aborted");
117+
}
113118
if (!OriginalBatch) {
114-
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "ev_write_bad_data")("write_id", WriteData.GetWriteMeta().GetWriteId())(
115-
"table_id", WriteData.GetWriteMeta().GetTableId());
119+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "ev_write_bad_data");
116120
ReplyError("no data in batch", NColumnShard::TEvPrivate::TEvWriteBlobsResult::EErrorClass::Internal);
117121
return TConclusionStatus::Fail("no data in batch");
118122
}
@@ -155,7 +159,7 @@ TConclusionStatus TBuildSlicesTask::DoExecute(const std::shared_ptr<ITask>& /*ta
155159
const auto& indexSchema = Context.GetActualSchema()->GetIndexInfo().ArrowSchema();
156160
auto subsetConclusion = NArrow::TColumnOperator().IgnoreOnDifferentFieldTypes().BuildSequentialSubset(OriginalBatch, indexSchema);
157161
if (subsetConclusion.IsFail()) {
158-
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "unadaptable schemas")("index", indexSchema.ToString())(
162+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "unadaptable schemas")("index", indexSchema.ToString())(
159163
"problem", subsetConclusion.GetErrorMessage());
160164
ReplyError("unadaptable schema: " + subsetConclusion.GetErrorMessage(),
161165
NColumnShard::TEvPrivate::TEvWriteBlobsResult::EErrorClass::Internal);

ydb/core/tx/columnshard/operations/write.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ void TWriteOperation::FromProto(const NKikimrTxColumnShard::TInternalOperationDa
128128

129129
void TWriteOperation::AbortOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const {
130130
Y_ABORT_UNLESS(Status != EOperationStatus::Draft);
131-
131+
*Activity = 0;
132132
TBlobGroupSelector dsGroupSelector(owner.Info());
133133
NOlap::TDbWrapper dbTable(txc.DB, &dsGroupSelector);
134134

ydb/core/tx/columnshard/operations/write.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ class TWriteOperation: public TMonitoringObjectsCounter<TWriteOperation> {
5959
YDB_READONLY_DEF(std::optional<ui32>, GranuleShardingVersionId);
6060
YDB_READONLY(NEvWrite::EModificationType, ModificationType, NEvWrite::EModificationType::Upsert);
6161
bool WritePortions = false;
62+
const std::shared_ptr<TAtomicCounter> Activity = std::make_shared<TAtomicCounter>();
6263

6364
public:
6465
using TPtr = std::shared_ptr<TWriteOperation>;
@@ -76,6 +77,10 @@ class TWriteOperation: public TMonitoringObjectsCounter<TWriteOperation> {
7677
void AbortOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) const;
7778
void AbortOnComplete(TColumnShard& owner) const;
7879

80+
std::shared_ptr<const TAtomicCounter> GetActivityChecker() const {
81+
return Activity;
82+
}
83+
7984
void Out(IOutputStream& out) const {
8085
out << "write_id=" << (ui64)WriteId << ";lock_id=" << LockId;
8186
}

ydb/core/tx/columnshard/operations/write_data.cpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@ namespace NKikimr::NColumnShard {
77

88
bool TArrowData::Parse(const NKikimrDataEvents::TEvWrite_TOperation& proto, const NEvWrite::IPayloadReader& payload) {
99
if (proto.GetPayloadFormat() != NKikimrDataEvents::FORMAT_ARROW) {
10-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "invalid_payload_format")("payload_format", (ui64)proto.GetPayloadFormat());
10+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "invalid_payload_format")("payload_format", (ui64)proto.GetPayloadFormat());
1111
return false;
1212
}
1313
IncomingData = payload.GetDataFromPayload(proto.GetPayloadIndex());
1414
if (proto.HasType()) {
1515
auto type = TEnumOperator<NEvWrite::EModificationType>::DeserializeFromProto(proto.GetType());
1616
if (!type) {
17-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "invalid_modification_type")("proto", proto.DebugString());
17+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "invalid_modification_type")("proto", proto.DebugString());
1818
return false;
1919
}
2020
ModificationType = *type;
@@ -49,7 +49,8 @@ TConclusion<std::shared_ptr<arrow::RecordBatch>> TArrowData::ExtractBatch() {
4949
result = NArrow::DeserializeBatch(IncomingData, std::make_shared<arrow::Schema>(BatchSchema->GetSchema()->fields()));
5050
}
5151

52-
IncomingData = "";
52+
TString emptyString;
53+
std::swap(IncomingData, emptyString);
5354
return result;
5455
}
5556

@@ -65,22 +66,22 @@ bool TProtoArrowData::ParseFromProto(const NKikimrTxColumnShard::TEvWrite& proto
6566
if (proto.HasMeta()) {
6667
const auto& incomingDataScheme = proto.GetMeta().GetSchema();
6768
if (incomingDataScheme.empty() || proto.GetMeta().GetFormat() != NKikimrTxColumnShard::FORMAT_ARROW) {
68-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "invalid_data_format");
69+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "invalid_data_format");
6970
return false;
7071
}
7172
ArrowSchema = NArrow::DeserializeSchema(incomingDataScheme);
7273
if (!ArrowSchema) {
73-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_deserialize_data");
74+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "cannot_deserialize_data");
7475
return false;
7576
}
7677
}
7778
OriginalDataSize = IncomingData.size();
7879
if (IncomingData.empty()) {
79-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "empty_data");
80+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "empty_data");
8081
return false;
8182
}
8283
if (NColumnShard::TLimits::GetMaxBlobSize() < IncomingData.size() && !AppDataVerified().FeatureFlags.GetEnableWritePortionsOnInsert()) {
83-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "too_big_blob");
84+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "too_big_blob");
8485
return false;
8586
}
8687
return true;

0 commit comments

Comments
 (0)