Skip to content

skip long tx writing to in control locks #8924

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
91 changes: 85 additions & 6 deletions ydb/core/formats/arrow/process_columns.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,44 @@
namespace NKikimr::NArrow {

namespace {
template <class TDataContainer, class TStringImpl>

template <class T>
class TColumnNameAccessor {
public:
static const std::string& GetFieldName(const T& val) {
return val;
}
static TString DebugString(const std::vector<T>& items) {
return JoinSeq(",", items);
}
};

template <>
class TColumnNameAccessor<std::shared_ptr<arrow::Field>> {
public:
static const std::string& GetFieldName(const std::shared_ptr<arrow::Field>& val) {
return val->name();
}
static TString DebugString(const std::vector<std::shared_ptr<arrow::Field>>& items) {
TStringBuilder sb;
for (auto&& i : items) {
sb << i->name() << ",";
}
return sb;
}
};

template <class TDataContainer, class TStringContainer>
std::shared_ptr<TDataContainer> ExtractColumnsValidateImpl(
const std::shared_ptr<TDataContainer>& srcBatch, const std::vector<TStringImpl>& columnNames) {
const std::shared_ptr<TDataContainer>& srcBatch, const std::vector<TStringContainer>& columnNames) {
std::vector<std::shared_ptr<arrow::Field>> fields;
fields.reserve(columnNames.size());
std::vector<std::shared_ptr<typename NAdapter::TDataBuilderPolicy<TDataContainer>::TColumn>> columns;
columns.reserve(columnNames.size());

auto srcSchema = srcBatch->schema();
for (auto& name : columnNames) {
const int pos = srcSchema->GetFieldIndex(name);
const int pos = srcSchema->GetFieldIndex(TColumnNameAccessor<TStringContainer>::GetFieldName(name));
if (Y_LIKELY(pos > -1)) {
fields.push_back(srcSchema->field(pos));
columns.push_back(srcBatch->column(pos));
Expand Down Expand Up @@ -70,16 +97,16 @@ TConclusion<std::shared_ptr<TDataContainer>> AdaptColumnsImpl(
return NAdapter::TDataBuilderPolicy<TDataContainer>::Build(std::make_shared<arrow::Schema>(fields), std::move(columns), srcBatch->num_rows());
}

template <class TDataContainer, class TStringType>
template <class TDataContainer, class TStringContainer>
std::shared_ptr<TDataContainer> ExtractImpl(const TColumnOperator::EExtractProblemsPolicy& policy,
const std::shared_ptr<TDataContainer>& incoming, const std::vector<TStringType>& columnNames) {
const std::shared_ptr<TDataContainer>& incoming, const std::vector<TStringContainer>& columnNames) {
AFL_VERIFY(incoming);
AFL_VERIFY(columnNames.size());
auto result = ExtractColumnsValidateImpl(incoming, columnNames);
switch (policy) {
case TColumnOperator::EExtractProblemsPolicy::Verify:
AFL_VERIFY((ui32)result->num_columns() == columnNames.size())("schema", incoming->schema()->ToString())(
"required", JoinSeq(",", columnNames));
"required", TColumnNameAccessor<TStringContainer>::DebugString(columnNames));
break;
case TColumnOperator::EExtractProblemsPolicy::Null:
if ((ui32)result->num_columns() != columnNames.size()) {
Expand Down Expand Up @@ -123,6 +150,16 @@ std::shared_ptr<arrow::Table> TColumnOperator::Extract(
return ExtractImpl(AbsentColumnPolicy, incoming, columnNames);
}

std::shared_ptr<arrow::Table> TColumnOperator::Extract(
const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::shared_ptr<arrow::Field>>& columns) {
return ExtractImpl(AbsentColumnPolicy, incoming, columns);
}

std::shared_ptr<arrow::RecordBatch> TColumnOperator::Extract(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::shared_ptr<arrow::Field>>& columns) {
return ExtractImpl(AbsentColumnPolicy, incoming, columns);
}

std::shared_ptr<arrow::RecordBatch> TColumnOperator::Extract(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames) {
return ExtractImpl(AbsentColumnPolicy, incoming, columnNames);
Expand Down Expand Up @@ -171,5 +208,47 @@ NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Reorder(
const std::shared_ptr<arrow::Table>& incoming, const std::vector<TString>& columnNames) {
return ReorderImpl(incoming, columnNames);
}
namespace {
template <class TDataContainer, class TSchemaImpl>
TConclusion<TSchemaSubset> BuildSequentialSubsetImpl(
const std::shared_ptr<TDataContainer>& srcBatch, const std::shared_ptr<TSchemaImpl>& dstSchema) {
AFL_VERIFY(srcBatch);
AFL_VERIFY(dstSchema);
if (dstSchema->num_fields() < srcBatch->schema()->num_fields()) {
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "incorrect columns set: destination must been wider than source")(
"source", srcBatch->schema()->ToString())("destination", dstSchema->ToString());
return TConclusionStatus::Fail("incorrect columns set: destination must been wider than source");
}
std::set<ui32> fieldIdx;
auto itSrc = srcBatch->schema()->fields().begin();
auto itDst = dstSchema->fields().begin();
while (itSrc != srcBatch->schema()->fields().end() && itDst != dstSchema->fields().end()) {
if ((*itSrc)->name() != (*itDst)->name()) {
++itDst;
} else {
fieldIdx.emplace(itDst - dstSchema->fields().begin());
if (!(*itDst)->Equals(*itSrc)) {
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")(
"column_type", (*itDst)->ToString(true))("incoming_type", (*itSrc)->ToString(true));
return TConclusionStatus::Fail("incompatible column types");
}

++itDst;
++itSrc;
}
}
if (itDst == dstSchema->fields().end() && itSrc != srcBatch->schema()->fields().end()) {
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "incorrect columns order in source set")("source", srcBatch->schema()->ToString())(
"destination", dstSchema->ToString());
return TConclusionStatus::Fail("incorrect columns order in source set");
}
return TSchemaSubset(fieldIdx, dstSchema->num_fields());
}
} // namespace

TConclusion<TSchemaSubset> TColumnOperator::BuildSequentialSubset(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<NArrow::TSchemaLite>& dstSchema) {
return BuildSequentialSubsetImpl(incoming, dstSchema);
}

} // namespace NKikimr::NArrow
7 changes: 7 additions & 0 deletions ydb/core/formats/arrow/process_columns.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,16 @@ class TColumnOperator {
std::shared_ptr<arrow::RecordBatch> Extract(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames);
std::shared_ptr<arrow::Table> Extract(const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames);
std::shared_ptr<arrow::Table> Extract(
const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::shared_ptr<arrow::Field>>& columns);
std::shared_ptr<arrow::RecordBatch> Extract(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::shared_ptr<arrow::Field>>& columns);
std::shared_ptr<arrow::RecordBatch> Extract(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames);
std::shared_ptr<arrow::Table> Extract(const std::shared_ptr<arrow::Table>& incoming, const std::vector<TString>& columnNames);

TConclusion<TSchemaSubset> BuildSequentialSubset(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<NArrow::TSchemaLite>& dstSchema);

TConclusion<std::shared_ptr<arrow::RecordBatch>> Adapt(
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset = nullptr);
TConclusion<std::shared_ptr<arrow::Table>> Adapt(
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/formats/arrow/size_calcer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ NKikimr::NArrow::TSerializedBatch TSerializedBatch::Build(std::shared_ptr<arrow:
if (context.GetFieldsForSpecialKeys().size()) {
specialKeys = TFirstLastSpecialKeys(batch, context.GetFieldsForSpecialKeys()).SerializeToString();
}
return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(),
return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(),
NArrow::GetBatchDataSize(batch), specialKeys);
}

Expand Down
31 changes: 15 additions & 16 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,36 @@ namespace NKikimr::NColumnShard {

using namespace NTabletFlatExecutor;

void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteData& writeData, const ui64 cookie,
void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteMeta& writeMeta, const ui64 writeSize, const ui64 cookie,
std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx) {
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
switch (overloadReason) {
case EOverloadStatus::Disk:
Counters.OnWriteOverloadDisk();
break;
case EOverloadStatus::InsertTable:
Counters.OnWriteOverloadInsertTable(writeData.GetSize());
Counters.OnWriteOverloadInsertTable(writeSize);
break;
case EOverloadStatus::OverloadMetadata:
Counters.OnWriteOverloadMetadata(writeData.GetSize());
Counters.OnWriteOverloadMetadata(writeSize);
break;
case EOverloadStatus::ShardTxInFly:
Counters.OnWriteOverloadShardTx(writeData.GetSize());
Counters.OnWriteOverloadShardTx(writeSize);
break;
case EOverloadStatus::ShardWritesInFly:
Counters.OnWriteOverloadShardWrites(writeData.GetSize());
Counters.OnWriteOverloadShardWrites(writeSize);
break;
case EOverloadStatus::ShardWritesSizeInFly:
Counters.OnWriteOverloadShardWritesSize(writeData.GetSize());
Counters.OnWriteOverloadShardWritesSize(writeSize);
break;
case EOverloadStatus::None:
Y_ABORT("invalid function usage");
}

AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "write_overload")("size", writeData.GetSize())(
"path_id", writeData.GetWriteMeta().GetTableId())("reason", overloadReason);
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "write_overload")("size", writeSize)("path_id", writeMeta.GetTableId())(
"reason", overloadReason);

ctx.Send(writeData.GetWriteMeta().GetSource(), event.release(), 0, cookie);
ctx.Send(writeMeta.GetSource(), event.release(), 0, cookie);
}

TColumnShard::EOverloadStatus TColumnShard::CheckOverloaded(const ui64 tableId) const {
Expand Down Expand Up @@ -240,7 +240,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
if (overloadStatus != EOverloadStatus::None) {
std::unique_ptr<NActors::IEventBase> result = std::make_unique<TEvColumnShard::TEvWriteResult>(
TabletID(), writeData.GetWriteMeta(), NKikimrTxColumnShard::EResultStatus::OVERLOADED);
OverloadWriteFail(overloadStatus, writeData, cookie, std::move(result), ctx);
OverloadWriteFail(overloadStatus, writeData.GetWriteMeta(), writeData.GetSize(), cookie, std::move(result), ctx);
Counters.GetCSCounters().OnFailedWriteResponse(EWriteFailReason::Overload);
} else {
if (ui64 writeId = (ui64)HasLongTxWrite(writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId())) {
Expand Down Expand Up @@ -538,10 +538,9 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor

auto overloadStatus = CheckOverloaded(tableId);
if (overloadStatus != EOverloadStatus::None) {
NEvWrite::TWriteData writeData(NEvWrite::TWriteMeta(0, tableId, source, {}), arrowData, nullptr, nullptr);
std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError(
TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error");
OverloadWriteFail(overloadStatus, writeData, cookie, std::move(result), ctx);
OverloadWriteFail(overloadStatus, NEvWrite::TWriteMeta(0, tableId, source, {}), arrowData->GetSize(), cookie, std::move(result), ctx);
return;
}

Expand All @@ -554,11 +553,11 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor

ui64 lockId = 0;
if (behaviour == EOperationBehaviour::NoTxWrite) {
static TAtomicCounter Counter = 0;
const ui64 shift = (ui64)1 << 47;
lockId = shift + Counter.Inc();
lockId = BuildEphemeralTxId();
} else if (behaviour == EOperationBehaviour::InTxWrite) {
lockId = record.GetTxId();
} else {
lockId = (behaviour == EOperationBehaviour::InTxWrite) ? record.GetTxId() : record.GetLockTxId();
lockId = record.GetLockTxId();
}

OperationsManager->RegisterLock(lockId, Generation());
Expand Down
11 changes: 10 additions & 1 deletion ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ class TColumnShard
void OnTieringModified(const std::optional<ui64> pathId = {});

public:
ui64 BuildEphemeralTxId() {
static TAtomicCounter Counter = 0;
static constexpr ui64 shift = (ui64)1 << 47;
return shift | Counter.Inc();
}

enum class EOverloadStatus {
ShardTxInFly /* "shard_tx" */,
ShardWritesInFly /* "shard_writes" */,
Expand Down Expand Up @@ -320,7 +326,7 @@ class TColumnShard
}

private:
void OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteData& writeData, const ui64 cookie, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx);
void OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteMeta& writeMeta, const ui64 writeSize, const ui64 cookie, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx);
EOverloadStatus CheckOverloaded(const ui64 tableId) const;

protected:
Expand Down Expand Up @@ -534,6 +540,9 @@ class TColumnShard
public:
ui64 TabletTxCounter = 0;

bool HasLongTxWrites(const TInsertWriteId insertWriteId) const {
return LongTxWrites.contains(insertWriteId);
}
void EnqueueProgressTx(const TActorContext& ctx, const std::optional<ui64> continueTxId);
NOlap::TSnapshot GetLastTxSnapshot() const {
return NOlap::TSnapshot(LastPlannedStep, LastPlannedTxId);
Expand Down
16 changes: 0 additions & 16 deletions ydb/core/tx/columnshard/engines/portions/constructor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,6 @@ void TPortionInfoConstructor::LoadIndex(const TIndexChunkLoadContext& loadContex

const NKikimr::NOlap::TColumnRecord& TPortionInfoConstructor::AppendOneChunkColumn(TColumnRecord&& record) {
Y_ABORT_UNLESS(record.ColumnId);
std::optional<ui32> maxChunk;
for (auto&& i : Records) {
if (i.ColumnId == record.ColumnId) {
if (!maxChunk) {
maxChunk = i.Chunk;
} else {
Y_ABORT_UNLESS(*maxChunk + 1 == i.Chunk);
maxChunk = i.Chunk;
}
}
}
if (maxChunk) {
AFL_VERIFY(*maxChunk + 1 == record.Chunk)("max", *maxChunk)("record", record.Chunk);
} else {
AFL_VERIFY(0 == record.Chunk)("record", record.Chunk);
}
Records.emplace_back(std::move(record));
return Records.back();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ TConclusionStatus TReadMetadata::Init(
if (LockId) {
for (auto&& i : CommittedBlobs) {
if (auto writeId = i.GetWriteIdOptional()) {
auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(*writeId);
AddWriteIdToCheck(*writeId, op->GetLockId());
if (owner->HasLongTxWrites(*writeId)) {
} else {
auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(*writeId);
AddWriteIdToCheck(*writeId, op->GetLockId());
}
}
}
}
Expand Down
Loading
Loading