Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
userRequestContext, statementResultIndex, TWilsonKqp::DataExecuter, "DataExecuter", streamResult)
, AsyncIoFactory(std::move(asyncIoFactory))
, UseEvWriteForOltp(tableServiceConfig.GetEnableOltpSink())
, HtapTx(tableServiceConfig.GetEnableHtapTx())
, FederatedQuerySetup(federatedQuerySetup)
, GUCSettings(GUCSettings)
, ShardIdToTableInfo(shardIdToTableInfo)
Expand Down Expand Up @@ -2353,6 +2352,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da

absl::flat_hash_set<ui64> sendingShardsSet;
absl::flat_hash_set<ui64> receivingShardsSet;
absl::flat_hash_set<ui64> sendingColumnShardsSet;
absl::flat_hash_set<ui64> receivingColumnShardsSet;
ui64 arbiter = 0;
std::optional<ui64> columnShardArbiter;
Expand All @@ -2378,6 +2378,10 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
if (tx->HasLocks()) {
// Locks may be broken so shards with locks need to send readsets
sendingShardsSet.insert(shardId);

if (ShardIdToTableInfo->Get(shardId).IsOlap) {
sendingColumnShardsSet.insert(shardId);
}
}
if (ShardsWithEffects.contains(shardId)) {
// Volatile transactions may abort effects, so they send readsets
Expand All @@ -2387,7 +2391,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
// Effects are only applied when all locks are valid
receivingShardsSet.insert(shardId);

if (HtapTx && ShardIdToTableInfo->Get(shardId).IsOlap) {
if (ShardIdToTableInfo->Get(shardId).IsOlap) {
receivingColumnShardsSet.insert(shardId);
}
}
Expand Down Expand Up @@ -2445,10 +2449,13 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}
}

if (!receivingColumnShardsSet.empty()) {
AFL_ENSURE(HtapTx);
const ui32 index = RandomNumber<ui32>(receivingColumnShardsSet.size());
auto arbiterIterator = std::begin(receivingColumnShardsSet);
if (!receivingColumnShardsSet.empty() || !sendingColumnShardsSet.empty()) {
const auto& shards = receivingColumnShardsSet.empty()
? sendingColumnShardsSet
: receivingColumnShardsSet;

const ui32 index = RandomNumber<ui32>(shards.size());
auto arbiterIterator = std::begin(shards);
std::advance(arbiterIterator, index);
columnShardArbiter = *arbiterIterator;
}
Expand Down Expand Up @@ -2857,7 +2864,6 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
private:
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
const bool UseEvWriteForOltp = false;
const bool HtapTx = false;
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
const TGUCSettings::TPtr GUCSettings;
TShardIdToTableInfoPtr ShardIdToTableInfo;
Expand Down
9 changes: 5 additions & 4 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3190,12 +3190,13 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {

{
auto result = client.ExecuteQuery(R"(
SELECT * FROM `/Root/DataShard`;
SELECT * FROM `/Root/ColumnShard`;
SELECT * FROM `/Root/DataShard` ORDER BY Col1;
SELECT * FROM `/Root/ColumnShard` ORDER BY Col1;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(0)));
CompareYson(R"([])", FormatResultSetYson(result.GetResultSet(1)));
CompareYson(R"([[10u;"test1";10];[20u;"test2";11];[30u;"test3";12];[40u;"test";13];[101u;"test";101];[102u;"test";101];[103u;"test";101];[104u;"test";101];[1001u;"test";1001];[1002u;"test";1001];[1003u;"test";1001];[1004u;"test";1001]])",
FormatResultSetYson(result.GetResultSet(0)));
CompareYson(R"([[1u;"test";1];[2u;"test";1];[3u;"test";1];[4u;"test";1]])", FormatResultSetYson(result.GetResultSet(1)));
}

{
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ class TGeneralCompactColumnEngineChanges;

namespace NKikimr::NColumnShard {

class TEvWriteCommitPrimaryTransactionOperator;
class TEvWriteCommitSecondaryTransactionOperator;
class TTxFinishAsyncTransaction;
class TTxInsertTableCleanup;
class TTxRemoveSharedBlobs;
Expand Down Expand Up @@ -138,6 +140,8 @@ class TColumnShard
: public TActor<TColumnShard>
, public NTabletFlatExecutor::TTabletExecutedFlat
{
friend class TEvWriteCommitSecondaryTransactionOperator;
friend class TEvWriteCommitPrimaryTransactionOperator;
friend class TTxInsertTableCleanup;
friend class TTxInit;
friend class TTxInitSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ TConclusionStatus TBuildSlicesTask::DoExecute(const std::shared_ptr<ITask>& /*ta
if (OriginalBatch->num_columns() != indexSchema->num_fields()) {
AFL_VERIFY(OriginalBatch->num_columns() < indexSchema->num_fields())("original", OriginalBatch->num_columns())(
"index", indexSchema->num_fields());
if (HasAppData() && !AppDataVerified().FeatureFlags.GetEnableOptionalColumnsInColumnShard()) {
if (HasAppData() && !AppDataVerified().FeatureFlags.GetEnableOptionalColumnsInColumnShard() &&
WriteData.GetWriteMeta().GetModificationType() != NEvWrite::EModificationType::Delete) {
subset = NArrow::TSchemaSubset::AllFieldsAccepted();
const std::vector<ui32>& columnIdsVector = ActualSchema->GetIndexInfo().GetColumnIds(false);
const std::set<ui32> columnIdsSet(columnIdsVector.begin(), columnIdsVector.end());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs<TEvWriteCommitPrimaryTransactionOperator>(TxId);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "ack_tablet")("wait", JoinSeq(",", op->WaitShardsResultAck))(
"receive", TabletId);
AFL_VERIFY(op->WaitShardsResultAck.erase(TabletId));
if (!op->WaitShardsResultAck.erase(TabletId)) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "ack_tablet_duplication")("wait", JoinSeq(",", op->WaitShardsResultAck))(
"receive", TabletId);
}
op->CheckFinished(*Self);
}

Expand Down Expand Up @@ -188,7 +191,7 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
if (tabletId && *tabletId != i) {
continue;
}
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
owner.Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
new TEvPipeCache::TEvForward(
new TEvTxProcessing::TEvReadSetAck(0, GetTxId(), owner.TabletID(), i, owner.TabletID(), 0), i, true),
IEventHandle::FlagTrackDelivery, GetTxId());
Expand All @@ -202,7 +205,7 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
readSetData.SetDecision(*TxBroken ? NKikimrTx::TReadSetData::DECISION_ABORT : NKikimrTx::TReadSetData::DECISION_COMMIT);
for (auto&& i : ReceivingShards) {
if (WaitShardsResultAck.contains(i)) {
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
owner.Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
new TEvPipeCache::TEvForward(new TEvTxProcessing::TEvReadSet(TxInfo.PlanStep, GetTxId(), owner.TabletID(), i,
owner.TabletID(), readSetData.SerializeAsString()),
i, true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class TEvWriteCommitSecondaryTransactionOperator: public TEvWriteCommitSyncTrans
}

void SendBrokenFlagAck(TColumnShard& owner) {
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
owner.Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
new TEvPipeCache::TEvForward(
new TEvTxProcessing::TEvReadSetAck(0, GetTxId(), owner.TabletID(), ArbiterTabletId, owner.TabletID(), 0), ArbiterTabletId, true),
IEventHandle::FlagTrackDelivery, GetTxId());
Expand All @@ -145,7 +145,7 @@ class TEvWriteCommitSecondaryTransactionOperator: public TEvWriteCommitSyncTrans
void SendResult(TColumnShard& owner) {
NKikimrTx::TReadSetData readSetData;
readSetData.SetDecision(SelfBroken ? NKikimrTx::TReadSetData::DECISION_ABORT : NKikimrTx::TReadSetData::DECISION_COMMIT);
NActors::TActivationContext::AsActorContext().Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
owner.Send(MakePipePerNodeCacheID(EPipePerNodeCache::Persistent),
new TEvPipeCache::TEvForward(new TEvTxProcessing::TEvReadSet(
0, GetTxId(), owner.TabletID(), ArbiterTabletId, owner.TabletID(), readSetData.SerializeAsString()),
ArbiterTabletId, true),
Expand Down