Skip to content

Commit bf1b6ff

Browse files
Merge 6e3432b into 4ab249f
2 parents 4ab249f + 6e3432b commit bf1b6ff

File tree

14 files changed

+231
-130
lines changed

14 files changed

+231
-130
lines changed

ydb/core/formats/arrow/process_columns.cpp

Lines changed: 85 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,44 @@
99
namespace NKikimr::NArrow {
1010

1111
namespace {
12-
template <class TDataContainer, class TStringImpl>
12+
13+
template <class T>
14+
class TColumnNameAccessor {
15+
public:
16+
static const std::string& GetFieldName(const T& val) {
17+
return val;
18+
}
19+
static TString DebugString(const std::vector<T>& items) {
20+
return JoinSeq(",", items);
21+
}
22+
};
23+
24+
template <>
25+
class TColumnNameAccessor<std::shared_ptr<arrow::Field>> {
26+
public:
27+
static const std::string& GetFieldName(const std::shared_ptr<arrow::Field>& val) {
28+
return val->name();
29+
}
30+
static TString DebugString(const std::vector<std::shared_ptr<arrow::Field>>& items) {
31+
TStringBuilder sb;
32+
for (auto&& i : items) {
33+
sb << i->name() << ",";
34+
}
35+
return sb;
36+
}
37+
};
38+
39+
template <class TDataContainer, class TStringContainer>
1340
std::shared_ptr<TDataContainer> ExtractColumnsValidateImpl(
14-
const std::shared_ptr<TDataContainer>& srcBatch, const std::vector<TStringImpl>& columnNames) {
41+
const std::shared_ptr<TDataContainer>& srcBatch, const std::vector<TStringContainer>& columnNames) {
1542
std::vector<std::shared_ptr<arrow::Field>> fields;
1643
fields.reserve(columnNames.size());
1744
std::vector<std::shared_ptr<typename NAdapter::TDataBuilderPolicy<TDataContainer>::TColumn>> columns;
1845
columns.reserve(columnNames.size());
1946

2047
auto srcSchema = srcBatch->schema();
2148
for (auto& name : columnNames) {
22-
const int pos = srcSchema->GetFieldIndex(name);
49+
const int pos = srcSchema->GetFieldIndex(TColumnNameAccessor<TStringContainer>::GetFieldName(name));
2350
if (Y_LIKELY(pos > -1)) {
2451
fields.push_back(srcSchema->field(pos));
2552
columns.push_back(srcBatch->column(pos));
@@ -70,16 +97,16 @@ TConclusion<std::shared_ptr<TDataContainer>> AdaptColumnsImpl(
7097
return NAdapter::TDataBuilderPolicy<TDataContainer>::Build(std::make_shared<arrow::Schema>(fields), std::move(columns), srcBatch->num_rows());
7198
}
7299

73-
template <class TDataContainer, class TStringType>
100+
template <class TDataContainer, class TStringContainer>
74101
std::shared_ptr<TDataContainer> ExtractImpl(const TColumnOperator::EExtractProblemsPolicy& policy,
75-
const std::shared_ptr<TDataContainer>& incoming, const std::vector<TStringType>& columnNames) {
102+
const std::shared_ptr<TDataContainer>& incoming, const std::vector<TStringContainer>& columnNames) {
76103
AFL_VERIFY(incoming);
77104
AFL_VERIFY(columnNames.size());
78105
auto result = ExtractColumnsValidateImpl(incoming, columnNames);
79106
switch (policy) {
80107
case TColumnOperator::EExtractProblemsPolicy::Verify:
81108
AFL_VERIFY((ui32)result->num_columns() == columnNames.size())("schema", incoming->schema()->ToString())(
82-
"required", JoinSeq(",", columnNames));
109+
"required", TColumnNameAccessor<TStringContainer>::DebugString(columnNames));
83110
break;
84111
case TColumnOperator::EExtractProblemsPolicy::Null:
85112
if ((ui32)result->num_columns() != columnNames.size()) {
@@ -123,6 +150,16 @@ std::shared_ptr<arrow::Table> TColumnOperator::Extract(
123150
return ExtractImpl(AbsentColumnPolicy, incoming, columnNames);
124151
}
125152

153+
std::shared_ptr<arrow::Table> TColumnOperator::Extract(
154+
const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::shared_ptr<arrow::Field>>& columns) {
155+
return ExtractImpl(AbsentColumnPolicy, incoming, columns);
156+
}
157+
158+
std::shared_ptr<arrow::RecordBatch> TColumnOperator::Extract(
159+
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::shared_ptr<arrow::Field>>& columns) {
160+
return ExtractImpl(AbsentColumnPolicy, incoming, columns);
161+
}
162+
126163
std::shared_ptr<arrow::RecordBatch> TColumnOperator::Extract(
127164
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames) {
128165
return ExtractImpl(AbsentColumnPolicy, incoming, columnNames);
@@ -171,5 +208,47 @@ NKikimr::TConclusion<std::shared_ptr<arrow::Table>> TColumnOperator::Reorder(
171208
const std::shared_ptr<arrow::Table>& incoming, const std::vector<TString>& columnNames) {
172209
return ReorderImpl(incoming, columnNames);
173210
}
211+
namespace {
212+
template <class TDataContainer, class TSchemaImpl>
213+
TConclusion<TSchemaSubset> BuildSequentialSubsetImpl(
214+
const std::shared_ptr<TDataContainer>& srcBatch, const std::shared_ptr<TSchemaImpl>& dstSchema) {
215+
AFL_VERIFY(srcBatch);
216+
AFL_VERIFY(dstSchema);
217+
if (dstSchema->num_fields() < srcBatch->schema()->num_fields()) {
218+
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "incorrect columns set: destination must been wider than source")(
219+
"source", srcBatch->schema()->ToString())("destination", dstSchema->ToString());
220+
return TConclusionStatus::Fail("incorrect columns set: destination must been wider than source");
221+
}
222+
std::set<ui32> fieldIdx;
223+
auto itSrc = srcBatch->schema()->fields().begin();
224+
auto itDst = dstSchema->fields().begin();
225+
while (itSrc != srcBatch->schema()->fields().end() && itDst != dstSchema->fields().end()) {
226+
if ((*itSrc)->name() != (*itDst)->name()) {
227+
++itDst;
228+
} else {
229+
fieldIdx.emplace(itDst - dstSchema->fields().begin());
230+
if (!(*itDst)->Equals(*itSrc)) {
231+
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "cannot_use_incoming_batch")("reason", "invalid_column_type")(
232+
"column_type", (*itDst)->ToString(true))("incoming_type", (*itSrc)->ToString(true));
233+
return TConclusionStatus::Fail("incompatible column types");
234+
}
235+
236+
++itDst;
237+
++itSrc;
238+
}
239+
}
240+
if (itDst == dstSchema->fields().end() && itSrc != srcBatch->schema()->fields().end()) {
241+
AFL_ERROR(NKikimrServices::ARROW_HELPER)("event", "incorrect columns order in source set")("source", srcBatch->schema()->ToString())(
242+
"destination", dstSchema->ToString());
243+
return TConclusionStatus::Fail("incorrect columns order in source set");
244+
}
245+
return TSchemaSubset(fieldIdx, dstSchema->num_fields());
246+
}
247+
} // namespace
248+
249+
TConclusion<TSchemaSubset> TColumnOperator::BuildSequentialSubset(
250+
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<NArrow::TSchemaLite>& dstSchema) {
251+
return BuildSequentialSubsetImpl(incoming, dstSchema);
252+
}
174253

175254
} // namespace NKikimr::NArrow

ydb/core/formats/arrow/process_columns.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,16 @@ class TColumnOperator {
3838
std::shared_ptr<arrow::RecordBatch> Extract(
3939
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::string>& columnNames);
4040
std::shared_ptr<arrow::Table> Extract(const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::string>& columnNames);
41+
std::shared_ptr<arrow::Table> Extract(
42+
const std::shared_ptr<arrow::Table>& incoming, const std::vector<std::shared_ptr<arrow::Field>>& columns);
43+
std::shared_ptr<arrow::RecordBatch> Extract(
44+
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<std::shared_ptr<arrow::Field>>& columns);
4145
std::shared_ptr<arrow::RecordBatch> Extract(const std::shared_ptr<arrow::RecordBatch>& incoming, const std::vector<TString>& columnNames);
4246
std::shared_ptr<arrow::Table> Extract(const std::shared_ptr<arrow::Table>& incoming, const std::vector<TString>& columnNames);
4347

48+
TConclusion<TSchemaSubset> BuildSequentialSubset(
49+
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<NArrow::TSchemaLite>& dstSchema);
50+
4451
TConclusion<std::shared_ptr<arrow::RecordBatch>> Adapt(
4552
const std::shared_ptr<arrow::RecordBatch>& incoming, const std::shared_ptr<arrow::Schema>& dstSchema, TSchemaSubset* subset = nullptr);
4653
TConclusion<std::shared_ptr<arrow::Table>> Adapt(

ydb/core/formats/arrow/size_calcer.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ NKikimr::NArrow::TSerializedBatch TSerializedBatch::Build(std::shared_ptr<arrow:
246246
if (context.GetFieldsForSpecialKeys().size()) {
247247
specialKeys = TFirstLastSpecialKeys(batch, context.GetFieldsForSpecialKeys()).SerializeToString();
248248
}
249-
return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(),
249+
return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(),
250250
NArrow::GetBatchDataSize(batch), specialKeys);
251251
}
252252

ydb/core/tx/columnshard/columnshard__write.cpp

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,36 +19,36 @@ namespace NKikimr::NColumnShard {
1919

2020
using namespace NTabletFlatExecutor;
2121

22-
void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteData& writeData, const ui64 cookie,
22+
void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteMeta& writeMeta, const ui64 writeSize, const ui64 cookie,
2323
std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx) {
2424
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
2525
switch (overloadReason) {
2626
case EOverloadStatus::Disk:
2727
Counters.OnWriteOverloadDisk();
2828
break;
2929
case EOverloadStatus::InsertTable:
30-
Counters.OnWriteOverloadInsertTable(writeData.GetSize());
30+
Counters.OnWriteOverloadInsertTable(writeSize);
3131
break;
3232
case EOverloadStatus::OverloadMetadata:
33-
Counters.OnWriteOverloadMetadata(writeData.GetSize());
33+
Counters.OnWriteOverloadMetadata(writeSize);
3434
break;
3535
case EOverloadStatus::ShardTxInFly:
36-
Counters.OnWriteOverloadShardTx(writeData.GetSize());
36+
Counters.OnWriteOverloadShardTx(writeSize);
3737
break;
3838
case EOverloadStatus::ShardWritesInFly:
39-
Counters.OnWriteOverloadShardWrites(writeData.GetSize());
39+
Counters.OnWriteOverloadShardWrites(writeSize);
4040
break;
4141
case EOverloadStatus::ShardWritesSizeInFly:
42-
Counters.OnWriteOverloadShardWritesSize(writeData.GetSize());
42+
Counters.OnWriteOverloadShardWritesSize(writeSize);
4343
break;
4444
case EOverloadStatus::None:
4545
Y_ABORT("invalid function usage");
4646
}
4747

48-
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "write_overload")("size", writeData.GetSize())(
49-
"path_id", writeData.GetWriteMeta().GetTableId())("reason", overloadReason);
48+
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "write_overload")("size", writeSize)("path_id", writeMeta.GetTableId())(
49+
"reason", overloadReason);
5050

51-
ctx.Send(writeData.GetWriteMeta().GetSource(), event.release(), 0, cookie);
51+
ctx.Send(writeMeta.GetSource(), event.release(), 0, cookie);
5252
}
5353

5454
TColumnShard::EOverloadStatus TColumnShard::CheckOverloaded(const ui64 tableId) const {
@@ -240,7 +240,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
240240
if (overloadStatus != EOverloadStatus::None) {
241241
std::unique_ptr<NActors::IEventBase> result = std::make_unique<TEvColumnShard::TEvWriteResult>(
242242
TabletID(), writeData.GetWriteMeta(), NKikimrTxColumnShard::EResultStatus::OVERLOADED);
243-
OverloadWriteFail(overloadStatus, writeData, cookie, std::move(result), ctx);
243+
OverloadWriteFail(overloadStatus, writeData.GetWriteMeta(), writeData.GetSize(), cookie, std::move(result), ctx);
244244
Counters.GetCSCounters().OnFailedWriteResponse(EWriteFailReason::Overload);
245245
} else {
246246
if (ui64 writeId = (ui64)HasLongTxWrite(writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId())) {
@@ -538,10 +538,9 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
538538

539539
auto overloadStatus = CheckOverloaded(tableId);
540540
if (overloadStatus != EOverloadStatus::None) {
541-
NEvWrite::TWriteData writeData(NEvWrite::TWriteMeta(0, tableId, source, {}), arrowData, nullptr, nullptr);
542541
std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError(
543542
TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error");
544-
OverloadWriteFail(overloadStatus, writeData, cookie, std::move(result), ctx);
543+
OverloadWriteFail(overloadStatus, NEvWrite::TWriteMeta(0, tableId, source, {}), arrowData->GetSize(), cookie, std::move(result), ctx);
545544
return;
546545
}
547546

@@ -554,11 +553,11 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
554553

555554
ui64 lockId = 0;
556555
if (behaviour == EOperationBehaviour::NoTxWrite) {
557-
static TAtomicCounter Counter = 0;
558-
const ui64 shift = (ui64)1 << 47;
559-
lockId = shift + Counter.Inc();
556+
lockId = BuildEphemeralTxId();
557+
} else if (behaviour == EOperationBehaviour::InTxWrite) {
558+
lockId = record.GetTxId();
560559
} else {
561-
lockId = (behaviour == EOperationBehaviour::InTxWrite) ? record.GetTxId() : record.GetLockTxId();
560+
lockId = record.GetLockTxId();
562561
}
563562

564563
OperationsManager->RegisterLock(lockId, Generation());

ydb/core/tx/columnshard/columnshard_impl.h

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,12 @@ class TColumnShard
286286
void OnTieringModified(const std::optional<ui64> pathId = {});
287287

288288
public:
289+
ui64 BuildEphemeralTxId() {
290+
static TAtomicCounter Counter = 0;
291+
static constexpr ui64 shift = (ui64)1 << 47;
292+
return shift | Counter.Inc();
293+
}
294+
289295
enum class EOverloadStatus {
290296
ShardTxInFly /* "shard_tx" */,
291297
ShardWritesInFly /* "shard_writes" */,
@@ -320,7 +326,7 @@ class TColumnShard
320326
}
321327

322328
private:
323-
void OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteData& writeData, const ui64 cookie, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx);
329+
void OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteMeta& writeMeta, const ui64 writeSize, const ui64 cookie, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx);
324330
EOverloadStatus CheckOverloaded(const ui64 tableId) const;
325331

326332
protected:
@@ -534,6 +540,9 @@ class TColumnShard
534540
public:
535541
ui64 TabletTxCounter = 0;
536542

543+
bool HasLongTxWrites(const TInsertWriteId insertWriteId) const {
544+
return LongTxWrites.contains(insertWriteId);
545+
}
537546
void EnqueueProgressTx(const TActorContext& ctx, const std::optional<ui64> continueTxId);
538547
NOlap::TSnapshot GetLastTxSnapshot() const {
539548
return NOlap::TSnapshot(LastPlannedStep, LastPlannedTxId);

ydb/core/tx/columnshard/engines/portions/constructor.cpp

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -67,22 +67,6 @@ void TPortionInfoConstructor::LoadIndex(const TIndexChunkLoadContext& loadContex
6767

6868
const NKikimr::NOlap::TColumnRecord& TPortionInfoConstructor::AppendOneChunkColumn(TColumnRecord&& record) {
6969
Y_ABORT_UNLESS(record.ColumnId);
70-
std::optional<ui32> maxChunk;
71-
for (auto&& i : Records) {
72-
if (i.ColumnId == record.ColumnId) {
73-
if (!maxChunk) {
74-
maxChunk = i.Chunk;
75-
} else {
76-
Y_ABORT_UNLESS(*maxChunk + 1 == i.Chunk);
77-
maxChunk = i.Chunk;
78-
}
79-
}
80-
}
81-
if (maxChunk) {
82-
AFL_VERIFY(*maxChunk + 1 == record.Chunk)("max", *maxChunk)("record", record.Chunk);
83-
} else {
84-
AFL_VERIFY(0 == record.Chunk)("record", record.Chunk);
85-
}
8670
Records.emplace_back(std::move(record));
8771
return Records.back();
8872
}

ydb/core/tx/columnshard/engines/portions/constructor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class TPortionInfoConstructor {
2626
std::optional<ui64> ShardingVersion;
2727

2828
std::vector<TIndexChunk> Indexes;
29-
YDB_ACCESSOR_DEF(std::vector<TColumnRecord>, Records);
29+
YDB_READONLY_DEF(std::vector<TColumnRecord>, Records);
3030
std::vector<TUnifiedBlobId> BlobIds;
3131

3232
public:

ydb/core/tx/columnshard/engines/reader/plain_reader/constructor/read_metadata.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,11 @@ TConclusionStatus TReadMetadata::Init(
3535
if (LockId) {
3636
for (auto&& i : CommittedBlobs) {
3737
if (auto writeId = i.GetWriteIdOptional()) {
38-
auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(*writeId);
39-
AddWriteIdToCheck(*writeId, op->GetLockId());
38+
if (owner->HasLongTxWrites(*writeId)) {
39+
} else {
40+
auto op = owner->GetOperationsManager().GetOperationByInsertWriteIdVerified(*writeId);
41+
AddWriteIdToCheck(*writeId, op->GetLockId());
42+
}
4043
}
4144
}
4245
}

0 commit comments

Comments
 (0)