Skip to content

Commit 5163b36

Browse files
fix htap test for deletion (#9313)
1 parent a37a4ae commit 5163b36

File tree

6 files changed

+32
-17
lines changed

6 files changed

+32
-17
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
133133
userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult)
134134
, AsyncIoFactory(std::move(asyncIoFactory))
135135
, UseEvWriteForOltp(tableServiceConfig.GetEnableOltpSink())
136-
, HtapTx(tableServiceConfig.GetEnableHtapTx())
137136
, FederatedQuerySetup(federatedQuerySetup)
138137
, GUCSettings(GUCSettings)
139138
, ShardIdToTableInfo(shardIdToTableInfo)
@@ -2353,6 +2352,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
23532352

23542353
absl::flat_hash_set<ui64> sendingShardsSet;
23552354
absl::flat_hash_set<ui64> receivingShardsSet;
2355+
absl::flat_hash_set<ui64> sendingColumnShardsSet;
23562356
absl::flat_hash_set<ui64> receivingColumnShardsSet;
23572357
ui64 arbiter = 0;
23582358
std::optional<ui64> columnShardArbiter;
@@ -2378,6 +2378,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
23782378
if (tx->HasLocks()) {
23792379
// Locks may be broken so shards with locks need to send readsets
23802380
sendingShardsSet.insert(shardId);
2381+
2382+
if (ShardIdToTableInfo->Get(shardId).IsOlap) {
2383+
sendingColumnShardsSet.insert(shardId);
2384+
}
23812385
}
23822386
if (ShardsWithEffects.contains(shardId)) {
23832387
// Volatile transactions may abort effects, so they send readsets
@@ -2387,7 +2391,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
23872391
// Effects are only applied when all locks are valid
23882392
receivingShardsSet.insert(shardId);
23892393

2390-
if (HtapTx && ShardIdToTableInfo->Get(shardId).IsOlap) {
2394+
if (ShardIdToTableInfo->Get(shardId).IsOlap) {
23912395
receivingColumnShardsSet.insert(shardId);
23922396
}
23932397
}
@@ -2445,10 +2449,13 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
24452449
}
24462450
}
24472451

2448-
if (!receivingColumnShardsSet.empty()) {
2449-
AFL_ENSURE(HtapTx);
2450-
const ui32 index = RandomNumber<ui32>(receivingColumnShardsSet.size());
2451-
auto arbiterIterator = std::begin(receivingColumnShardsSet);
2452+
if (!receivingColumnShardsSet.empty() || !sendingColumnShardsSet.empty()) {
2453+
const auto& shards = receivingColumnShardsSet.empty()
2454+
? sendingColumnShardsSet
2455+
: receivingColumnShardsSet;
2456+
2457+
const ui32 index = RandomNumber<ui32>(shards.size());
2458+
auto arbiterIterator = std::begin(shards);
24522459
std::advance(arbiterIterator, index);
24532460
columnShardArbiter = *arbiterIterator;
24542461
}
@@ -2857,7 +2864,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
28572864
private:
28582865
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
28592866
const bool UseEvWriteForOltp = false;
2860-
const bool HtapTx = false;
28612867
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
28622868
const TGUCSettings::TPtr GUCSettings;
28632869
TShardIdToTableInfoPtr ShardIdToTableInfo;

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3190,12 +3190,13 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
31903190
31913191
{
31923192
auto result = client.ExecuteQuery(R"(
3193-
SELECT * FROM `/Root/DataShard`;
3194-
SELECT * FROM `/Root/ColumnShard`;
3193+
SELECT * FROM `/Root/DataShard` ORDER BY Col1;
3194+
SELECT * FROM `/Root/ColumnShard` ORDER BY Col1;
31953195
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
31963196
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
3197-
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));
3198-
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(1)));
3197+
CompareYson(R"([[10u;"test1";10];[20u;"test2";11];[30u;"test3";12];[40u;"test";13];[101u;"test";101];[102u;"test";101];[103u;"test";101];[104u;"test";101];[1001u;"test";1001];[1002u;"test";1001];[1003u;"test";1001];[1004u;"test";1001]])",
3198+
FormatResultSetYson(result.GetResultSet(0)));
3199+
CompareYson(R"([[1u;"test";1];[2u;"test";1];[3u;"test";1];[4u;"test";1]])", FormatResultSetYson(result.GetResultSet(1)));
31993200
}
32003201
32013202
{

ydb/core/tx/columnshard/columnshard_impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ class TGeneralCompactColumnEngineChanges;
8989

9090
namespace NKikimr::NColumnShard {
9191

92+
class TEvWriteCommitPrimaryTransactionOperator;
93+
class TEvWriteCommitSecondaryTransactionOperator;
9294
class TTxFinishAsyncTransaction;
9395
class TTxInsertTableCleanup;
9496
class TTxRemoveSharedBlobs;
@@ -138,6 +140,8 @@ class TColumnShard
138140
: public TActor<TColumnShard>
139141
, public NTabletFlatExecutor::TTabletExecutedFlat
140142
{
143+
friend class TEvWriteCommitSecondaryTransactionOperator;
144+
friend class TEvWriteCommitPrimaryTransactionOperator;
141145
friend class TTxInsertTableCleanup;
142146
friend class TTxInit;
143147
friend class TTxInitSchema;

ydb/core/tx/columnshard/operations/slice_builder/builder.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ TConclusionStatus TBuildSlicesTask::DoExecute(const std::shared_ptr<ITask>& /*ta
5757
if (OriginalBatch->num_columns() != indexSchema->num_fields()) {
5858
AFL_VERIFY(OriginalBatch->num_columns() < indexSchema->num_fields())("original", OriginalBatch->num_columns())(
5959
"index", indexSchema->num_fields());
60-
if (HasAppData() && !AppDataVerified().FeatureFlags.GetEnableOptionalColumnsInColumnShard()) {
60+
if (HasAppData() && !AppDataVerified().FeatureFlags.GetEnableOptionalColumnsInColumnShard() &&
61+
WriteData.GetWriteMeta().GetModificationType() != NEvWrite::EModificationType::Delete) {
6162
subset = NArrow::TSchemaSubset::AllFieldsAccepted();
6263
const std::vector<ui32>& columnIdsVector = ActualSchema->GetIndexInfo().GetColumnIds(false);
6364
const std::set<ui32> columnIdsSet(columnIdsVector.begin(), columnIdsVector.end());

ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,10 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
141141
auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs<TEvWriteCommitPrimaryTransactionOperator>(TxId);
142142
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "ack_tablet")("wait", JoinSeq(",", op->WaitShardsResultAck))(
143143
"receive", TabletId);
144-
AFL_VERIFY(op->WaitShardsResultAck.erase(TabletId));
144+
if (!op->WaitShardsResultAck.erase(TabletId)) {
145+
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "ack_tablet_duplication")("wait", JoinSeq(",", op->WaitShardsResultAck))(
146+
"receive", TabletId);
147+
}
145148
op->CheckFinished(*Self);
146149
}
147150

@@ -188,7 +191,7 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
188191
if (tabletId && *tabletId != i) {
189192
continue;
190193
}
191-
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
194+
owner.Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
192195
new TEvPipeCache::TEvForward(
193196
new TEvTxProcessing::TEvReadSetAck(0, GetTxId(), owner.TabletID(), i, owner.TabletID(), 0), i, true),
194197
IEventHandle::FlagTrackDelivery, GetTxId());
@@ -202,7 +205,7 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
202205
readSetData.SetDecision(*TxBroken ? NKikimrTx::TReadSetData::DECISION_ABORT : NKikimrTx::TReadSetData::DECISION_COMMIT);
203206
for (auto&& i : ReceivingShards) {
204207
if (WaitShardsResultAck.contains(i)) {
205-
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
208+
owner.Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
206209
new TEvPipeCache::TEvForward(new TEvTxProcessing::TEvReadSet(TxInfo.PlanStep, GetTxId(), owner.TabletID(), i,
207210
owner.TabletID(), readSetData.SerializeAsString()),
208211
i, true),

ydb/core/tx/columnshard/transactions/operators/ev_write/secondary.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ class TEvWriteCommitSecondaryTransactionOperator: public TEvWriteCommitSyncTrans
136136
}
137137

138138
void SendBrokenFlagAck(TColumnShard& owner) {
139-
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
139+
owner.Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
140140
new TEvPipeCache::TEvForward(
141141
new TEvTxProcessing::TEvReadSetAck(0, GetTxId(), owner.TabletID(), ArbiterTabletId, owner.TabletID(), 0), ArbiterTabletId, true),
142142
IEventHandle::FlagTrackDelivery, GetTxId());
@@ -145,7 +145,7 @@ class TEvWriteCommitSecondaryTransactionOperator: public TEvWriteCommitSyncTrans
145145
void SendResult(TColumnShard& owner) {
146146
NKikimrTx::TReadSetData readSetData;
147147
readSetData.SetDecision(SelfBroken ? NKikimrTx::TReadSetData::DECISION_ABORT : NKikimrTx::TReadSetData::DECISION_COMMIT);
148-
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
148+
owner.Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
149149
new TEvPipeCache::TEvForward(new TEvTxProcessing::TEvReadSet(
150150
0, GetTxId(), owner.TabletID(), ArbiterTabletId, owner.TabletID(), readSetData.SerializeAsString()),
151151
ArbiterTabletId, true),

0 commit comments

Comments
 (0)