Skip to content

Commit bd566ed

Browse files
authored
Merge 23d4559 into d6fd00d
2 parents d6fd00d + 23d4559 commit bd566ed

File tree

4 files changed

+53
-12
lines changed

4 files changed

+53
-12
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1847,7 +1847,6 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
18471847
switch (stage.GetSources(0).GetTypeCase()) {
18481848
case NKqpProto::TKqpSource::kReadRangesSource:
18491849
if (auto partitionsCount = BuildScanTasksFromSource(stageInfo,
1850-
/* shardsResolved */ StreamResult,
18511850
/* limitTasksPerNode */ StreamResult)) {
18521851
sourceScanPartitionsCount += *partitionsCount;
18531852
} else {
@@ -1914,9 +1913,6 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
19141913
}
19151914
}
19161915

1917-
// For generic query all shards are already resolved
1918-
YQL_ENSURE(!StreamResult || remoteComputeTasks.empty());
1919-
19201916
for (const auto& channel : TasksGraph.GetChannels()) {
19211917
if (IsCrossShardChannel(TasksGraph, channel)) {
19221918
HasPersistentChannels = true;
@@ -1996,7 +1992,6 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
19961992
TasksGraph.GetMeta().UseFollowers = GetUseFollowers();
19971993

19981994
if (RemoteComputeTasks) {
1999-
YQL_ENSURE(!StreamResult);
20001995
TSet<ui64> shardIds;
20011996
for (const auto& [shardId, _] : RemoteComputeTasks) {
20021997
shardIds.insert(shardId);
@@ -2057,6 +2052,11 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
20572052
}
20582053
}
20592054

2055+
if (shardIds.size() <= 1 && HasDatashardSourceScan) {
2056+
// nothing to merge
2057+
HasDatashardSourceScan = false;
2058+
}
2059+
20602060
if ((HasOlapTable || HasDatashardSourceScan) && shardIds) {
20612061
LOG_D("Start resolving tablets nodes... (" << shardIds.size() << ")");
20622062
ExecuterStateSpan = NWilson::TSpan(

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
213213
auto& reply = *ev->Get();
214214

215215
KqpShardsResolverId = {};
216+
ShardsResolved = true;
216217

217218
// TODO: count resolve time in CpuTime
218219

@@ -419,7 +420,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
419420
auto& extraData = ExtraData[computeActor];
420421
extraData.TaskId = taskId;
421422
extraData.Data.Swap(state.MutableExtraData());
422-
423+
423424

424425
Stats->AddComputeActorStats(
425426
computeActor.NodeId(),
@@ -1015,7 +1016,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
10151016
return result;
10161017
}
10171018

1018-
TMaybe<size_t> BuildScanTasksFromSource(TStageInfo& stageInfo, const bool shardsResolved, const bool limitTasksPerNode) {
1019+
TMaybe<size_t> BuildScanTasksFromSource(TStageInfo& stageInfo, const bool limitTasksPerNode) {
10191020
THashMap<ui64, std::vector<ui64>> nodeTasks;
10201021
THashMap<ui64, ui64> assignedShardsCount;
10211022

@@ -1050,7 +1051,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
10501051
if (nodeId) {
10511052
task.Meta.NodeId = *nodeId;
10521053
} else {
1053-
YQL_ENSURE(!shardsResolved);
1054+
YQL_ENSURE(!ShardsResolved);
10541055
task.Meta.ShardId = taskLocation;
10551056
}
10561057
FillSecureParamsFromStage(task.Meta.SecureParams, stage);
@@ -1144,15 +1145,14 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
11441145
? TMaybe<ui64>{*nodeIdPtr}
11451146
: Nothing();
11461147

1147-
YQL_ENSURE(!shardsResolved || nodeId);
1148+
YQL_ENSURE(!ShardsResolved || nodeId);
11481149
YQL_ENSURE(Stats);
11491150

11501151
if (shardId) {
11511152
Stats->AffectedShards.insert(*shardId);
11521153
}
11531154

1154-
if (limitTasksPerNode) {
1155-
YQL_ENSURE(shardsResolved);
1155+
if (limitTasksPerNode && ShardsResolved) {
11561156
const auto maxScanTasksPerNode = GetScanTasksPerNode(stageInfo, /* isOlapScan */ false, *nodeId);
11571157
auto& nodeTasks = nodeIdToTasks[*nodeId];
11581158
if (nodeTasks.size() < maxScanTasksPerNode) {
@@ -1925,6 +1925,7 @@ class TKqpExecuterBase : public TActorBootstrapped<TDerived> {
19251925
TActorId Target;
19261926
ui64 TxId = 0;
19271927

1928+
bool ShardsResolved = false;
19281929
TKqpTasksGraph TasksGraph;
19291930

19301931
TActorId KqpTableResolverId;

ydb/core/kqp/executer_actor/kqp_scan_executer.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,6 @@ class TKqpScanExecuter : public TKqpExecuterBase<TKqpScanExecuter, EExecType::Sc
184184
case NKqpProto::TKqpSource::kReadRangesSource:
185185
BuildScanTasksFromSource(
186186
stageInfo,
187-
/* shardsResolved */ true,
188187
/* limitTasksPerNode */ false);
189188
break;
190189
default:

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
33
#include <ydb/core/kqp/ut/common/columnshard.h>
44
#include <ydb/core/testlib/common_helper.h>
5+
#include <ydb/core/base/tablet_pipecache.h>
56
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
67
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
78
#include <ydb/public/lib/ut_helpers/ut_helpers_query.h>
@@ -237,6 +238,46 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
237238
}
238239
}
239240

241+
Y_UNIT_TEST(Followers) {
242+
NKikimrConfig::TAppConfig config;
243+
244+
auto kikimr = TKikimrRunner(TKikimrSettings().SetAppConfig(config).SetUseRealThreads(false));
245+
auto db = kikimr.GetQueryClient();
246+
247+
TExecuteQuerySettings settings;
248+
249+
THashMap<TActorId, ui64> countResolveTablet;
250+
countResolveTablet[NKikimr::MakePipePerNodeCacheID(false)] = 0;
251+
countResolveTablet[NKikimr::MakePipePerNodeCacheID(true)] = 0;
252+
auto counterObserver = [&countResolveTablet](TAutoPtr<IEventHandle>& ev) -> auto {
253+
if (ev->GetTypeRewrite() == NKikimr::TEvPipeCache::TEvGetTabletNode::EventType) {
254+
countResolveTablet[ev->Recipient]++;
255+
return TTestActorRuntime::EEventAction::PROCESS;
256+
}
257+
258+
return TTestActorRuntime::EEventAction::PROCESS;
259+
};
260+
261+
kikimr.GetTestServer().GetRuntime()->SetObserverFunc(counterObserver);
262+
263+
Y_DEFER {
264+
kikimr.GetTestServer().GetRuntime()->SetObserverFunc(TTestActorRuntime::DefaultObserverFunc);
265+
};
266+
267+
{
268+
const TString query = "SELECT Key, Value2 FROM TwoShard WHERE Key = 1u";
269+
auto result = kikimr.RunCall([&] { return db.ExecuteQuery(query, NQuery::TTxControl::BeginTx(TTxSettings::StaleRO()).CommitTx()).ExtractValueSync(); });
270+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
271+
UNIT_ASSERT_VALUES_EQUAL(result.GetResultSets().size(), 1);
272+
CompareYson(R"([
273+
[[1u];[-1]]
274+
])", FormatResultSetYson(result.GetResultSet(0)));
275+
UNIT_ASSERT(countResolveTablet[NKikimr::MakePipePerNodeCacheID(false)] == 0);
276+
// using followers resolver.
277+
UNIT_ASSERT(countResolveTablet[NKikimr::MakePipePerNodeCacheID(true)] > 0);
278+
}
279+
}
280+
240281
Y_UNIT_TEST(ExecuteQueryWithWorkloadManager) {
241282
NKikimrConfig::TAppConfig config;
242283
config.MutableFeatureFlags()->SetEnableResourcePools(true);

0 commit comments

Comments
 (0)