Skip to content

Commit f1d57e4

Browse files
Merge 4917725 into d73903d
2 parents d73903d + 4917725 commit f1d57e4

File tree

14 files changed

+166
-66
lines changed

14 files changed

+166
-66
lines changed

ydb/core/kqp/ut/common/kqp_ut_common.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ TKikimrRunner::TKikimrRunner(const TKikimrSettings& settings) {
123123
appConfig.MutableTableServiceConfig()->SetEnableRowsDuplicationCheck(true);
124124
ServerSettings->SetAppConfig(appConfig);
125125
ServerSettings->SetFeatureFlags(settings.FeatureFlags);
126+
ServerSettings->FeatureFlags.SetEnableImmediateWritingOnBulkUpsert(true);
126127
ServerSettings->SetNodeCount(settings.NodeCount);
127128
ServerSettings->SetEnableKqpSpilling(enableSpilling);
128129
ServerSettings->SetEnableDataColumnForIndexTable(true);

ydb/core/kqp/ut/olap/indexes_ut.cpp

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
8080

8181
{
8282
auto alterQuery = TStringBuilder() <<
83-
"ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, SCHEME_NEED_ACTUALIZATION=`true`, EXTERNAL_GUARANTEE_EXCLUSIVE_PK=`true`);";
83+
"ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, SCHEME_NEED_ACTUALIZATION=`true`);";
8484
auto session = tableClient.CreateSession().GetValueSync().GetSession();
8585
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
8686
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
@@ -336,13 +336,6 @@ Y_UNIT_TEST_SUITE(KqpOlapIndexes) {
336336
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
337337
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
338338
}
339-
{
340-
auto alterQuery = TStringBuilder() <<
341-
"ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, EXTERNAL_GUARANTEE_EXCLUSIVE_PK=`true`);";
342-
auto session = tableClient.CreateSession().GetValueSync().GetSession();
343-
auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync();
344-
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), NYdb::EStatus::SUCCESS, alterResult.GetIssues().ToString());
345-
}
346339

347340
std::vector<TString> uids;
348341
std::vector<TString> resourceIds;

ydb/core/protos/data_events.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ message TEvWrite {
9595
repeated uint32 ColumnIds = 3 [packed = true];
9696
optional uint64 PayloadIndex = 4;
9797
optional EDataFormat PayloadFormat = 5;
98+
optional string PayloadSchema = 6;
9899
}
99100

100101
// Transaction operations

ydb/core/protos/feature_flags.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,4 +162,5 @@ message TFeatureFlags {
162162
optional bool EnableExternalDataSourcesOnServerless = 143 [default = true];
163163
optional bool EnableSparsedColumns = 144 [default = false];
164164
optional bool EnableParameterizedDecimal = 145 [default = false];
165+
optional bool EnableImmediateWritingOnBulkUpsert = 146 [default = false];
165166
}

ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ void TTxWrite::Complete(const TActorContext& ctx) {
163163
Self->Counters.GetCSCounters().OnSuccessWriteResponse();
164164
}
165165
Self->Counters.GetTabletCounters()->IncCounter(COUNTER_IMMEDIATE_TX_COMPLETED);
166+
Self->SetupIndexation();
166167
}
167168

168169
} // namespace NKikimr::NColumnShard

ydb/core/tx/columnshard/operations/write.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ void TWriteOperation::Start(TColumnShard& owner, const ui64 tableId, const NEvWr
3535
NEvWrite::TWriteData(writeMeta, data, owner.TablesManager.GetPrimaryIndex()->GetReplaceKey(),
3636
owner.StoragesManager->GetInsertOperator()->StartWritingAction(NOlap::NBlobOperations::EConsumer::WRITING_OPERATOR)),
3737
schema, owner.GetLastTxSnapshot(), owner.Counters.GetCSCounters().WritingCounters);
38-
NConveyor::TCompServiceOperator::SendTaskToExecute(task);
38+
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task);
3939

4040
Status = EOperationStatus::Started;
4141
}

ydb/core/tx/columnshard/operations/write_data.cpp

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,26 +12,43 @@ bool TArrowData::Parse(const NKikimrDataEvents::TEvWrite_TOperation& proto, cons
1212
}
1313
IncomingData = payload.GetDataFromPayload(proto.GetPayloadIndex());
1414
if (proto.HasType()) {
15-
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "invalid_modification_type")("proto", proto.DebugString());
1615
auto type = TEnumOperator<NEvWrite::EModificationType>::DeserializeFromProto(proto.GetType());
1716
if (!type) {
17+
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "invalid_modification_type")("proto", proto.DebugString());
1818
return false;
1919
}
2020
ModificationType = *type;
2121
}
2222

23-
std::vector<ui32> columns;
24-
for (auto&& columnId : proto.GetColumnIds()) {
25-
columns.emplace_back(columnId);
23+
if (proto.HasPayloadSchema()) {
24+
PayloadSchema = NArrow::DeserializeSchema(proto.GetPayloadSchema());
25+
} else {
26+
std::vector<ui32> columns;
27+
for (auto&& columnId : proto.GetColumnIds()) {
28+
columns.emplace_back(columnId);
29+
}
30+
if (columns.empty()) {
31+
BatchSchema = IndexSchema;
32+
} else {
33+
BatchSchema = std::make_shared<NOlap::TFilteredSnapshotSchema>(IndexSchema, columns);
34+
}
35+
if (BatchSchema->GetColumnsCount() != columns.size()) {
36+
return false;
37+
}
2638
}
27-
BatchSchema = std::make_shared<NOlap::TFilteredSnapshotSchema>(IndexSchema, columns);
2839
OriginalDataSize = IncomingData.size();
29-
return BatchSchema->GetColumnsCount() == columns.size() && !IncomingData.empty();
40+
return !!IncomingData;
3041
}
3142

3243
TConclusion<std::shared_ptr<arrow::RecordBatch>> TArrowData::ExtractBatch() {
3344
Y_ABORT_UNLESS(!!IncomingData);
34-
auto result = NArrow::DeserializeBatch(IncomingData, std::make_shared<arrow::Schema>(BatchSchema->GetSchema()->fields()));
45+
std::shared_ptr<arrow::RecordBatch> result;
46+
if (PayloadSchema) {
47+
result = NArrow::DeserializeBatch(IncomingData, PayloadSchema);
48+
} else {
49+
result = NArrow::DeserializeBatch(IncomingData, std::make_shared<arrow::Schema>(BatchSchema->GetSchema()->fields()));
50+
}
51+
3552
IncomingData = "";
3653
return result;
3754
}

ydb/core/tx/columnshard/operations/write_data.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class TArrowData : public NEvWrite::IDataContainer {
3030
private:
3131
NOlap::ISnapshotSchema::TPtr IndexSchema;
3232
NOlap::ISnapshotSchema::TPtr BatchSchema;
33+
std::shared_ptr<arrow::Schema> PayloadSchema;
3334
TString IncomingData;
3435
NEvWrite::EModificationType ModificationType = NEvWrite::EModificationType::Upsert;
3536
};

ydb/core/tx/data_events/columnshard_splitter.h

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
#pragma once
22

3+
#include "events.h"
34
#include "shards_splitter.h"
5+
#include "payload_helper.h"
46

57
#include <ydb/core/tx/sharding/sharding.h>
68
#include <ydb/core/tx/columnshard/columnshard.h>
79
#include <ydb/core/formats/arrow/size_calcer.h>
810
#include <ydb/core/formats/arrow/arrow_helpers.h>
911
#include <ydb/core/scheme/scheme_types_proto.h>
1012

11-
1213
namespace NKikimr::NEvWrite {
1314

14-
class TColumnShardShardsSplitter : public IShardsSplitter {
15-
class TShardInfo : public IShardInfo {
15+
class TColumnShardShardsSplitter: public IShardsSplitter {
16+
class TShardInfo: public IShardInfo {
1617
private:
1718
const TString SchemaData;
1819
const TString Data;
@@ -23,25 +24,38 @@ class TColumnShardShardsSplitter : public IShardsSplitter {
2324
: SchemaData(schemaData)
2425
, Data(data)
2526
, RowsCount(rowsCount)
26-
, GranuleShardingVersion(granuleShardingVersion)
27-
{}
27+
, GranuleShardingVersion(granuleShardingVersion) {
28+
}
2829

29-
ui64 GetBytes() const override {
30+
virtual ui64 GetBytes() const override {
3031
return Data.size();
3132
}
3233

33-
ui32 GetRowsCount() const override {
34+
virtual ui32 GetRowsCount() const override {
3435
return RowsCount;
3536
}
3637

37-
const TString& GetData() const override {
38+
virtual const TString& GetData() const override {
3839
return Data;
3940
}
4041

41-
void Serialize(TEvWrite& evWrite) const override {
42+
virtual void Serialize(TEvColumnShard::TEvWrite& evWrite) const override {
4243
evWrite.SetArrowData(SchemaData, Data);
4344
evWrite.Record.SetGranuleShardingVersion(GranuleShardingVersion);
4445
}
46+
virtual void Serialize(NEvents::TDataEvents::TEvWrite& evWrite, const ui64 tableId, const ui64 schemaVersion) const override {
47+
TPayloadWriter<NEvents::TDataEvents::TEvWrite> writer(evWrite);
48+
TString data = Data;
49+
writer.AddDataToPayload(std::move(data));
50+
51+
auto* operation = evWrite.Record.AddOperations();
52+
operation->SetPayloadSchema(SchemaData);
53+
operation->SetType(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE);
54+
operation->SetPayloadFormat(NKikimrDataEvents::FORMAT_ARROW);
55+
operation->SetPayloadIndex(0);
56+
operation->MutableTableId()->SetTableId(tableId);
57+
operation->MutableTableId()->SetSchemaVersion(schemaVersion);
58+
}
4559
};
4660

4761
private:

ydb/core/tx/data_events/shard_writer.cpp

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,10 @@
77

88
namespace NKikimr::NEvWrite {
99

10-
TWritersController::TWritersController(const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId)
10+
TWritersController::TWritersController(const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId, const bool immediateWrite)
1111
: WritesCount(writesCount)
1212
, LongTxActorId(longTxActorId)
13+
, ImmediateWrite(immediateWrite)
1314
, LongTxId(longTxId)
1415
{
1516
Y_ABORT_UNLESS(writesCount);
@@ -39,28 +40,62 @@ namespace NKikimr::NEvWrite {
3940
}
4041
}
4142

42-
TShardWriter::TShardWriter(const ui64 shardId, const ui64 tableId, const TString& dedupId, const IShardInfo::TPtr& data,
43-
const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, const EModificationType mType)
43+
TShardWriter::TShardWriter(const ui64 shardId, const ui64 tableId, const ui64 schemaVersion, const TString& dedupId, const IShardInfo::TPtr& data,
44+
const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, const EModificationType mType, const bool immediateWrite)
4445
: ShardId(shardId)
4546
, WritePartIdx(writePartIdx)
4647
, TableId(tableId)
48+
, SchemaVersion(schemaVersion)
4749
, DedupId(dedupId)
4850
, DataForShard(data)
4951
, ExternalController(externalController)
5052
, LeaderPipeCache(MakePipePerNodeCacheID(false))
5153
, ActorSpan(parentSpan.BuildChildrenSpan("ShardWriter"))
5254
, ModificationType(mType)
55+
, ImmediateWrite(immediateWrite)
5356
{
5457
}
5558

59+
void TShardWriter::SendWriteRequest() {
60+
if (ImmediateWrite) {
61+
auto ev = MakeHolder<NEvents::TDataEvents::TEvWrite>(NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
62+
DataForShard->Serialize(*ev, TableId, SchemaVersion);
63+
SendToTablet(std::move(ev));
64+
} else {
65+
auto ev = MakeHolder<TEvColumnShard::TEvWrite>(SelfId(), ExternalController->GetLongTxId(), TableId, DedupId, "", WritePartIdx, ModificationType);
66+
DataForShard->Serialize(*ev);
67+
SendToTablet(std::move(ev));
68+
}
69+
}
70+
5671
void TShardWriter::Bootstrap() {
57-
auto ev = MakeHolder<TEvWrite>(SelfId(), ExternalController->GetLongTxId(), TableId, DedupId, "", WritePartIdx, ModificationType);
58-
DataForShard->Serialize(*ev);
59-
SendToTablet(std::move(ev));
72+
SendWriteRequest();
6073
Become(&TShardWriter::StateMain);
6174
}
6275

63-
void TShardWriter::Handle(TEvWriteResult::TPtr& ev) {
76+
void TShardWriter::Handle(NEvents::TDataEvents::TEvWriteResult::TPtr& ev) {
77+
const auto* msg = ev->Get();
78+
Y_ABORT_UNLESS(msg->Record.GetOrigin() == ShardId);
79+
80+
const auto ydbStatus = msg->GetStatus();
81+
if (ydbStatus == NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED) {
82+
if (RetryWriteRequest(true)) {
83+
return;
84+
}
85+
}
86+
87+
auto gPassAway = PassAwayGuard();
88+
if (ydbStatus != NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED) {
89+
ExternalController->OnFail(Ydb::StatusIds::INTERNAL_ERROR,
90+
TStringBuilder() << "Cannot write data into shard " << ShardId << " in longTx " <<
91+
ExternalController->GetLongTxId().ToString());
92+
return;
93+
}
94+
95+
ExternalController->OnSuccess(ShardId, 0, WritePartIdx);
96+
}
97+
98+
void TShardWriter::Handle(TEvColumnShard::TEvWriteResult::TPtr& ev) {
6499
const auto* msg = ev->Get();
65100
Y_ABORT_UNLESS(msg->Record.GetOrigin() == ShardId);
66101

@@ -113,9 +148,7 @@ namespace NKikimr::NEvWrite {
113148
Schedule(OverloadTimeout(), new TEvents::TEvWakeup());
114149
} else {
115150
++NumRetries;
116-
auto ev = MakeHolder<TEvWrite>(SelfId(), ExternalController->GetLongTxId(), TableId, DedupId, "", WritePartIdx, ModificationType);
117-
DataForShard->Serialize(*ev);
118-
SendToTablet(std::move(ev));
151+
SendWriteRequest();
119152
}
120153
return true;
121154
}

0 commit comments

Comments
 (0)