Skip to content

Commit 6c2742c

Browse files
authored
Merge d4adc79 into 1b9be9f
2 parents 1b9be9f + d4adc79 commit 6c2742c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1684
-151
lines changed

.github/config/muted_ya.txt

+7-3
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ ydb/core/kqp/ut/olap KqpOlapWrite.WriteDeleteCleanGC
1515
ydb/core/kqp/ut/pg KqpPg.CreateIndex
1616
ydb/core/kqp/ut/query KqpLimits.QueryReplySize
1717
ydb/core/kqp/ut/query KqpQuery.QueryTimeout
18+
ydb/core/kqp/ut/query KqpQuery.OlapCreateAsSelect_Complex
19+
ydb/core/kqp/ut/query KqpQuery.OlapCreateAsSelect_Simple
20+
ydb/core/kqp/ut/federated_query/s3 KqpFederatedQuery.CreateTableAsSelectFromExternalDataSource
21+
ydb/core/kqp/ut/federated_query/s3 KqpFederatedQuery.CreateTableAsSelectFromExternalTable
1822
ydb/core/kqp/ut/scan KqpRequestContext.TraceIdInErrorMessage
1923
ydb/core/kqp/ut/scheme KqpOlapScheme.TenThousandColumns
2024
ydb/core/kqp/ut/scheme KqpOlap.OlapRead_GenericQuerys
@@ -29,9 +33,9 @@ ydb/core/kqp/ut/scheme [44/50]*
2933
ydb/core/kqp/ut/service KqpQueryService.ExecuteQueryPgTableSelect
3034
ydb/core/kqp/ut/service KqpQueryService.QueryOnClosedSession
3135
ydb/core/kqp/ut/service KqpService.CloseSessionsWithLoad
32-
ydb/core/kqp/ut/service [38/50]*
33-
ydb/core/persqueue/ut [37/40] chunk chunk
34-
ydb/core/persqueue/ut [38/40] chunk chunk
36+
ydb/core/kqp/ut/service KqpQueryService.TableSink_OlapRWQueries
37+
ydb/core/kqp/ut/tx KqpSnapshotRead.ReadOnlyTxWithIndexCommitsOnConcurrentWrite+withSink
38+
ydb/core/persqueue/ut [*/*]*
3539
ydb/core/persqueue/ut TPQTest.*DirectRead*
3640
ydb/core/persqueue/ut/ut_with_sdk TopicAutoscaling.PartitionSplit_ManySession_AutoscaleAwareSDK
3741
ydb/core/persqueue/ut/ut_with_sdk TopicAutoscaling.PartitionSplit_ManySession_existed_AutoscaleAwareSDK

ydb/core/kqp/common/kqp_tx.cpp

+21-12
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,33 @@ namespace NKqp {
77

88
using namespace NYql;
99

10-
TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TMaybe<TKqpTxLock>& invalidatedLock) {
10+
NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TKikimrPathId& pathId) {
1111
TStringBuilder message;
1212
message << "Transaction locks invalidated.";
1313

14-
TMaybe<TString> tableName;
15-
if (invalidatedLock) {
16-
TKikimrPathId id(invalidatedLock->GetSchemeShard(), invalidatedLock->GetPathId());
17-
auto table = txCtx.TableByIdMap.FindPtr(id);
18-
if (table) {
19-
tableName = *table;
14+
if (pathId.OwnerId() != 0) {
15+
auto table = txCtx.TableByIdMap.FindPtr(pathId);
16+
if (!table) {
17+
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
2018
}
19+
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << *table);
20+
} else {
21+
// Olap tables don't return SchemeShard in locks, thus we use tableId here.
22+
for (const auto& [pathId, table] : txCtx.TableByIdMap) {
23+
if (pathId.TableId() == pathId.TableId()) {
24+
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Table: " << table);
25+
}
26+
}
27+
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message << " Unknown table.");
2128
}
29+
}
2230

23-
if (tableName) {
24-
message << " Table: " << *tableName;
25-
}
26-
27-
return YqlIssue(TPosition(), TIssuesIds::KIKIMR_LOCKS_INVALIDATED, message);
31+
TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const TKqpTxLock& invalidatedLock) {
32+
return GetLocksInvalidatedIssue(
33+
txCtx,
34+
TKikimrPathId(
35+
invalidatedLock.GetSchemeShard(),
36+
invalidatedLock.GetPathId()));
2837
}
2938

3039
std::pair<bool, std::vector<TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type, const NKikimrMiniKQL::TValue& value,

ydb/core/kqp/common/kqp_tx.h

+1
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,7 @@ class TTransactionsCache {
434434
}
435435
};
436436

437+
NYql::TIssue GetLocksInvalidatedIssue(const TKqpTransactionContext& txCtx, const NYql::TKikimrPathId& pathId);
437438
std::pair<bool, std::vector<NYql::TIssue>> MergeLocks(const NKikimrMiniKQL::TType& type,
438439
const NKikimrMiniKQL::TValue& value, TKqpTransactionContext& txCtx);
439440

ydb/core/kqp/compile_service/kqp_compile_actor.cpp

-3
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,6 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
172172
}
173173

174174
void StartSplitting(const TActorContext &ctx) {
175-
YQL_ENSURE(PerStatementResult);
176-
177175
const auto prepareSettings = PrepareCompilationSettings(ctx);
178176
auto result = KqpHost->SplitQuery(QueryRef, prepareSettings);
179177

@@ -280,7 +278,6 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
280278
IKqpHost::TPrepareSettings prepareSettings;
281279
prepareSettings.DocumentApiRestricted = QueryId.Settings.DocumentApiRestricted;
282280
prepareSettings.IsInternalCall = QueryId.Settings.IsInternalCall;
283-
prepareSettings.PerStatementResult = PerStatementResult;
284281

285282
switch (QueryId.Settings.Syntax) {
286283
case Ydb::Query::Syntax::SYNTAX_YQL_V1:

ydb/core/kqp/compute_actor/kqp_compute_actor.cpp

+5-5
Original file line numberDiff line numberDiff line change
@@ -131,18 +131,18 @@ namespace NKikimr::NKqp {
131131
using namespace NYql::NDq;
132132
using namespace NYql::NDqProto;
133133

134-
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId,
134+
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId,
135135
TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory,
136136
const NYql::NDq::TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
137-
TIntrusivePtr<NActors::TProtoArenaHolder> arena) {
138-
return new NScanPrivate::TKqpScanComputeActor(executerId, txId, task, std::move(asyncIoFactory),
137+
TIntrusivePtr<NActors::TProtoArenaHolder> arena, TComputeActorSchedulingOptions schedulingOptions) {
138+
return new NScanPrivate::TKqpScanComputeActor(std::move(schedulingOptions), executerId, txId, lockTxId, lockNodeId, task, std::move(asyncIoFactory),
139139
settings, memoryLimits, std::move(traceId), std::move(arena));
140140
}
141141

142142
IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector<NActors::TActorId>&& computeActors,
143143
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings,
144-
const ui64 txId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId) {
145-
return new NScanPrivate::TKqpScanFetcherActor(snapshot, settings, std::move(computeActors), txId, meta, shardsScanningPolicy, counters, std::move(traceId));
144+
const ui64 txId, ui64 lockTxId, ui32 lockNodeId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId) {
145+
return new NScanPrivate::TKqpScanFetcherActor(snapshot, settings, std::move(computeActors), txId, lockTxId, lockNodeId, meta, shardsScanningPolicy, counters, std::move(traceId));
146146
}
147147

148148
}

ydb/core/kqp/compute_actor/kqp_compute_actor.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -50,14 +50,14 @@ IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqPr
5050
TIntrusivePtr<NActors::TProtoArenaHolder> arena,
5151
const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup, const TGUCSettings::TPtr& GUCSettings);
5252

53-
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId,
53+
IActor* CreateKqpScanComputeActor(const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId,
5454
NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
5555
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
5656
TIntrusivePtr<NActors::TProtoArenaHolder> arena);
5757

5858
IActor* CreateKqpScanFetcher(const NKikimrKqp::TKqpSnapshot& snapshot, std::vector<NActors::TActorId>&& computeActors,
5959
const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, const NYql::NDq::TComputeRuntimeSettings& settings,
60-
const ui64 txId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId);
60+
const ui64 txId, ui64 lockTxId, ui32 lockNodeId, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId);
6161

6262
NYql::NDq::IDqAsyncIoFactory::TPtr CreateKqpAsyncIoFactory(
6363
TIntrusivePtr<TKqpCounters> counters,

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ class TKqpCaFactory : public IKqpNodeComputeActorFactory {
211211
if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) {
212212
YQL_ENSURE(args.ComputesByStages);
213213
auto& info = args.ComputesByStages->UpsertTaskWithScan(*args.Task, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead());
214-
IActor* computeActor = CreateKqpScanComputeActor(args.ExecuterId, args.TxId, args.Task,
214+
IActor* computeActor = CreateKqpScanComputeActor(args.ExecuterId, args.TxId, args.LockTxId, args.LockNodeId, args.Task,
215215
AsyncIoFactory, runtimeSettings, memoryLimits,
216216
std::move(args.TraceId), std::move(args.Arena));
217217
TActorId result = TlsActivationContext->Register(computeActor);

ydb/core/kqp/compute_actor/kqp_compute_actor_factory.h

+2
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ struct IKqpNodeComputeActorFactory {
106106
struct TCreateArgs {
107107
const NActors::TActorId& ExecuterId;
108108
const ui64 TxId;
109+
const ui64 LockTxId;
110+
const ui32 LockNodeId;
109111
NYql::NDqProto::TDqTask* Task;
110112
TIntrusivePtr<NRm::TTxState> TxInfo;
111113
const NYql::NDq::TComputeRuntimeSettings& RuntimeSettings;

ydb/core/kqp/compute_actor/kqp_compute_events.h

+6
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@
1111

1212
namespace NKikimr::NKqp {
1313

14+
struct TLocksInfo {
15+
TVector<NKikimrDataEvents::TLock> Locks;
16+
TVector<NKikimrDataEvents::TLock> BrokenLocks;
17+
};
18+
1419
struct TEvKqpCompute {
1520
struct TEvRemoteScanData : public TEventPB<TEvRemoteScanData, NKikimrKqp::TEvRemoteScanData,
1621
TKqpComputeEvents::EvRemoteScanData> {};
@@ -54,6 +59,7 @@ struct TEvKqpCompute {
5459
bool PageFault = false; // page fault was the reason for sending this message
5560
mutable THolder<TEvRemoteScanData> Remote;
5661
std::shared_ptr<IShardScanStats> StatsOnFinished;
62+
TLocksInfo LocksInfo;
5763

5864
template <class T>
5965
const T& GetStatsAs() const {

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp

+25-2
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,15 @@ static constexpr TDuration RL_MAX_BATCH_DELAY = TDuration::Seconds(50);
2323

2424
} // anonymous namespace
2525

26-
TKqpScanComputeActor::TKqpScanComputeActor(const TActorId& executerId, ui64 txId, NDqProto::TDqTask* task,
27-
IDqAsyncIoFactory::TPtr asyncIoFactory,
26+
TKqpScanComputeActor::TKqpScanComputeActor(TComputeActorSchedulingOptions cpuOptions, const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId,
27+
NDqProto::TDqTask* task, IDqAsyncIoFactory::TPtr asyncIoFactory,
2828
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
2929
TIntrusivePtr<NActors::TProtoArenaHolder> arena)
3030
: TBase(executerId, txId, task, std::move(asyncIoFactory), AppData()->FunctionRegistry, settings,
3131
memoryLimits, /* ownMemoryQuota = */ true, /* passExceptions = */ true, /*taskCounters = */ nullptr, std::move(traceId), std::move(arena))
3232
, ComputeCtx(settings.StatsMode)
33+
, LockTxId(lockTxId)
34+
, LockNodeId(lockNodeId)
3335
{
3436
InitializeTask();
3537
YQL_ENSURE(GetTask().GetMeta().UnpackTo(&Meta), "Invalid task meta: " << GetTask().GetMeta().DebugString());
@@ -103,6 +105,19 @@ void TKqpScanComputeActor::FillExtraStats(NDqProto::TDqComputeActorStats* dst, b
103105
}
104106
}
105107

108+
TMaybe<google::protobuf::Any> TKqpScanComputeActor::ExtraData() {
109+
NKikimrTxDataShard::TEvKqpInputActorResultInfo resultInfo;
110+
for (const auto& lock : Locks) {
111+
resultInfo.AddLocks()->CopyFrom(lock);
112+
}
113+
for (const auto& lock : BrokenLocks) {
114+
resultInfo.AddLocks()->CopyFrom(lock);
115+
}
116+
google::protobuf::Any result;
117+
result.PackFrom(resultInfo);
118+
return result;
119+
}
120+
106121
void TKqpScanComputeActor::HandleEvWakeup(EEvWakeupTag tag) {
107122
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "HandleEvWakeup")("self_id", SelfId());
108123
switch (tag) {
@@ -130,6 +145,14 @@ void TKqpScanComputeActor::Handle(TEvScanExchange::TEvTerminateFromFetcher::TPtr
130145
void TKqpScanComputeActor::Handle(TEvScanExchange::TEvSendData::TPtr& ev) {
131146
ALS_DEBUG(NKikimrServices::KQP_COMPUTE) << "TEvSendData: " << ev->Sender << "/" << SelfId();
132147
auto& msg = *ev->Get();
148+
149+
for (const auto& lock : msg.GetLocksInfo().Locks) {
150+
Locks.insert(lock);
151+
}
152+
for (const auto& lock : msg.GetLocksInfo().Locks) {
153+
BrokenLocks.insert(lock);
154+
}
155+
133156
auto guard = TaskRunner->BindAllocator();
134157
if (!!msg.GetArrowBatch()) {
135158
ScanData->AddData(NMiniKQL::TBatchDataAccessor(msg.GetArrowBatch(), std::move(msg.MutableDataIndexes())), msg.GetTabletId(), TaskRunner->GetHolderFactory());

ydb/core/kqp/compute_actor/kqp_scan_compute_actor.h

+41-2
Original file line numberDiff line numberDiff line change
@@ -10,27 +10,64 @@ namespace NKikimr::NKqp::NScanPrivate {
1010

1111
class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBase<TKqpScanComputeActor> {
1212
private:
13-
using TBase = NYql::NDq::TDqSyncComputeActorBase<TKqpScanComputeActor>;
13+
using TBase = TSchedulableComputeActorBase<TKqpScanComputeActor>;
14+
1415
NMiniKQL::TKqpScanComputeContext ComputeCtx;
1516
NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta Meta;
17+
1618
using TBase::TaskRunner;
1719
using TBase::MemoryLimits;
1820
using TBase::GetStatsMode;
1921
using TBase::TxId;
2022
using TBase::GetTask;
2123
using TBase::RuntimeSettings;
2224
using TBase::ContinueExecute;
25+
2326
std::set<NActors::TActorId> Fetchers;
2427
NMiniKQL::TKqpScanComputeContext::TScanData* ScanData = nullptr;
28+
const ui64 LockTxId;
29+
const ui32 LockNodeId;
30+
31+
struct TLockHash {
32+
bool operator()(const NKikimrDataEvents::TLock& lock) {
33+
return MultiHash(
34+
lock.GetLockId(),
35+
lock.GetDataShard(),
36+
lock.GetSchemeShard(),
37+
lock.GetPathId(),
38+
lock.GetGeneration(),
39+
lock.GetCounter(),
40+
lock.GetHasWrites());
41+
}
42+
};
43+
44+
struct TLockEqual {
45+
bool operator()(const NKikimrDataEvents::TLock& lhs, const NKikimrDataEvents::TLock& rhs) {
46+
return lhs.GetLockId() == rhs.GetLockId()
47+
&& lhs.GetDataShard() == rhs.GetDataShard()
48+
&& lhs.GetSchemeShard() == rhs.GetSchemeShard()
49+
&& lhs.GetPathId() == rhs.GetPathId()
50+
&& lhs.GetGeneration() == rhs.GetGeneration()
51+
&& lhs.GetCounter() == rhs.GetCounter()
52+
&& lhs.GetHasWrites() == rhs.GetHasWrites();
53+
}
54+
};
55+
56+
using TLocksHashSet = THashSet<NKikimrDataEvents::TLock, TLockHash, TLockEqual>;
57+
58+
TLocksHashSet Locks;
59+
TLocksHashSet BrokenLocks;
60+
2561
ui64 CalcMkqlMemoryLimit() override {
2662
return TBase::CalcMkqlMemoryLimit() + ComputeCtx.GetTableScans().size() * MemoryLimits.ChannelBufferSize;
2763
}
64+
2865
public:
2966
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
3067
return NKikimrServices::TActivity::KQP_SCAN_COMPUTE_ACTOR;
3168
}
3269

33-
TKqpScanComputeActor(const TActorId& executerId, ui64 txId,
70+
TKqpScanComputeActor(TComputeActorSchedulingOptions, const TActorId& executerId, ui64 txId, ui64 lockTxId, ui32 lockNodeId,
3471
NYql::NDqProto::TDqTask* task, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
3572
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, NWilson::TTraceId traceId,
3673
TIntrusivePtr<NActors::TProtoArenaHolder> arena);
@@ -62,6 +99,8 @@ class TKqpScanComputeActor: public NYql::NDq::TDqSyncComputeActorBase<TKqpScanCo
6299

63100
void FillExtraStats(NYql::NDqProto::TDqComputeActorStats* dst, bool last);
64101

102+
TMaybe<google::protobuf::Any> ExtraData() override;
103+
65104
void HandleEvWakeup(EEvWakeupTag tag);
66105

67106
void Handle(TEvScanExchange::TEvTerminateFromFetcher::TPtr& ev);

ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@ std::vector<std::unique_ptr<TComputeTaskData>> TShardScannerInfo::OnReceiveData(
3131
ui32 idx = 0;
3232
AFL_ENSURE(data.ArrowBatch);
3333
for (auto&& i : data.SplittedBatches) {
34-
result.emplace_back(std::make_unique<TComputeTaskData>(selfPtr, std::make_unique<TEvScanExchange::TEvSendData>(data.ArrowBatch, TabletId, std::move(i)), idx++));
34+
result.emplace_back(std::make_unique<TComputeTaskData>(selfPtr, std::make_unique<TEvScanExchange::TEvSendData>(data.ArrowBatch, TabletId, std::move(i), data.LocksInfo), idx++));
3535
}
3636
} else if (data.ArrowBatch) {
37-
result.emplace_back(std::make_unique<TComputeTaskData>(selfPtr, std::make_unique<TEvScanExchange::TEvSendData>(data.ArrowBatch, TabletId)));
37+
result.emplace_back(std::make_unique<TComputeTaskData>(selfPtr, std::make_unique<TEvScanExchange::TEvSendData>(data.ArrowBatch, TabletId, data.LocksInfo)));
3838
} else {
39-
result.emplace_back(std::make_unique<TComputeTaskData>(selfPtr, std::make_unique<TEvScanExchange::TEvSendData>(std::move(data.Rows), TabletId)));
39+
result.emplace_back(std::make_unique<TComputeTaskData>(selfPtr, std::make_unique<TEvScanExchange::TEvSendData>(std::move(data.Rows), TabletId, data.LocksInfo)));
4040
}
4141
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "receive_data")("actor_id", ActorId)("count_chunks", result.size());
4242
DataChunksInFlightCount = result.size();

ydb/core/kqp/compute_actor/kqp_scan_events.h

+8-4
Original file line numberDiff line numberDiff line change
@@ -43,31 +43,35 @@ struct TEvScanExchange {
4343
YDB_ACCESSOR_DEF(TVector<TOwnedCellVec>, Rows);
4444
YDB_READONLY(ui64, TabletId, 0);
4545
YDB_ACCESSOR_DEF(std::vector<ui32>, DataIndexes);
46+
YDB_READONLY_DEF(TLocksInfo, LocksInfo);
4647
public:
4748
ui32 GetRowsCount() const {
4849
return ArrowBatch ? ArrowBatch->num_rows() : Rows.size();
4950
}
5051

51-
TEvSendData(const std::shared_ptr<arrow::Table>& arrowBatch, const ui64 tabletId)
52+
TEvSendData(const std::shared_ptr<arrow::Table>& arrowBatch, const ui64 tabletId, const TLocksInfo& locksInfo)
5253
: ArrowBatch(arrowBatch)
5354
, TabletId(tabletId)
55+
, LocksInfo(locksInfo)
5456
{
5557
Y_ABORT_UNLESS(ArrowBatch);
5658
Y_ABORT_UNLESS(ArrowBatch->num_rows());
5759
}
5860

59-
TEvSendData(const std::shared_ptr<arrow::Table>& arrowBatch, const ui64 tabletId, std::vector<ui32>&& dataIndexes)
61+
TEvSendData(const std::shared_ptr<arrow::Table>& arrowBatch, const ui64 tabletId, std::vector<ui32>&& dataIndexes, const TLocksInfo& locksInfo)
6062
: ArrowBatch(arrowBatch)
6163
, TabletId(tabletId)
6264
, DataIndexes(std::move(dataIndexes))
65+
, LocksInfo(locksInfo)
6366
{
6467
Y_ABORT_UNLESS(ArrowBatch);
6568
Y_ABORT_UNLESS(ArrowBatch->num_rows());
6669
}
6770

68-
TEvSendData(TVector<TOwnedCellVec>&& rows, const ui64 tabletId)
71+
TEvSendData(TVector<TOwnedCellVec>&& rows, const ui64 tabletId, const TLocksInfo& locksInfo)
6972
: Rows(std::move(rows))
70-
, TabletId(tabletId) {
73+
, TabletId(tabletId)
74+
, LocksInfo(locksInfo) {
7175
Y_ABORT_UNLESS(Rows.size());
7276
}
7377
};

ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp

+5-1
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,15 @@ static constexpr ui64 MAX_SHARD_RESOLVES = 3;
2323

2424

2525
TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snapshot,
26-
const TComputeRuntimeSettings& settings, std::vector<NActors::TActorId>&& computeActors, const ui64 txId,
26+
const TComputeRuntimeSettings& settings, std::vector<NActors::TActorId>&& computeActors, const ui64 txId, const ui64 lockTxId, const ui32 lockNodeId,
2727
const NKikimrTxDataShard::TKqpTransaction_TScanTaskMeta& meta, const TShardsScanningPolicy& shardsScanningPolicy,
2828
TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId)
2929
: Meta(meta)
3030
, ScanDataMeta(Meta)
3131
, RuntimeSettings(settings)
3232
, TxId(txId)
33+
, LockTxId(lockTxId)
34+
, LockNodeId(lockNodeId)
3335
, ComputeActorIds(std::move(computeActors))
3436
, Snapshot(snapshot)
3537
, ShardsScanningPolicy(shardsScanningPolicy)
@@ -411,6 +413,8 @@ std::unique_ptr<NKikimr::TEvDataShard::TEvKqpScan> TKqpScanFetcherActor::BuildEv
411413
ev->Record.SetStatsMode(RuntimeSettings.StatsMode);
412414
ev->Record.SetScanId(scanId);
413415
ev->Record.SetTxId(std::get<ui64>(TxId));
416+
ev->Record.SetLockTxId(LockTxId);
417+
ev->Record.SetLockNodeId(LockNodeId);
414418
ev->Record.SetTablePath(ScanDataMeta.TablePath);
415419
ev->Record.SetSchemaVersion(ScanDataMeta.TableId.SchemaVersion);
416420

0 commit comments

Comments
 (0)