Skip to content

Commit d2f5e8e

Browse files
committed
Fixed token passing for SLJ
1 parent 5dbd26c commit d2f5e8e

File tree

8 files changed

+180
-90
lines changed

8 files changed

+180
-90
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1967,7 +1967,12 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
19671967
}
19681968
}
19691969

1970-
size_t sourceScanPartitionsCount = TasksGraph.BuildAllTasks({}, ResourcesSnapshot, Stats.get(), &ShardsWithEffects);
1970+
size_t sourceScanPartitionsCount = 0;
1971+
1972+
if (!graphRestored) {
1973+
sourceScanPartitionsCount = TasksGraph.BuildAllTasks({}, ResourcesSnapshot, Stats.get(), &ShardsWithEffects);
1974+
}
1975+
19711976
OnEmptyResult();
19721977

19731978
TIssue validateIssue;

ydb/core/kqp/executer_actor/kqp_executer_impl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1269,7 +1269,7 @@ class TKqpExecuterBase : public TActor<TDerived> {
12691269

12701270
bool RestoreTasksGraph() {
12711271
if (Request.QueryPhysicalGraph) {
1272-
TasksGraph.RestoreTasksGraphInfo(*Request.QueryPhysicalGraph);
1272+
TasksGraph.RestoreTasksGraphInfo(ResourcesSnapshot, *Request.QueryPhysicalGraph);
12731273
}
12741274

12751275
return TasksGraph.GetMeta().IsRestored;

ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp

Lines changed: 94 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,49 @@ using namespace NYql;
2525
using namespace NYql::NDq;
2626
using namespace NYql::NNodes;
2727

28+
namespace {
29+
30+
struct TStageScheduleInfo {
31+
double StageCost = 0.0;
32+
ui32 TaskCount = 0;
33+
};
34+
35+
std::map<ui32, TStageScheduleInfo> ScheduleByCost(const IKqpGateway::TPhysicalTxData& tx, const TVector<NKikimrKqp::TKqpNodeResources>& resourceSnapshot) {
36+
std::map<ui32, TStageScheduleInfo> result;
37+
if (!resourceSnapshot.empty()) // can't schedule w/o node count
38+
{
39+
// collect costs and schedule stages with external sources only
40+
double totalCost = 0.0;
41+
for (ui32 stageIdx = 0; stageIdx < tx.Body->StagesSize(); ++stageIdx) {
42+
auto& stage = tx.Body->GetStages(stageIdx);
43+
if (stage.SourcesSize() > 0 && stage.GetSources(0).GetTypeCase() == NKqpProto::TKqpSource::kExternalSource) {
44+
if (stage.GetStageCost() > 0.0 && stage.GetTaskCount() == 0) {
45+
totalCost += stage.GetStageCost();
46+
result.emplace(stageIdx, TStageScheduleInfo{.StageCost = stage.GetStageCost()});
47+
}
48+
}
49+
}
50+
// assign task counts
51+
if (!result.empty()) {
52+
// allow use 2/3 of threads in single stage
53+
ui32 maxStageTaskCount = (TStagePredictor::GetUsableThreads() * 2 + 2) / 3;
54+
// total limit per mode is x2
55+
ui32 maxTotalTaskCount = maxStageTaskCount * 2;
56+
for (auto& [_, stageInfo] : result) {
57+
// schedule tasks evenly between nodes
58+
stageInfo.TaskCount =
59+
std::max<ui32>(
60+
std::min(static_cast<ui32>(maxTotalTaskCount * stageInfo.StageCost / totalCost), maxStageTaskCount)
61+
, 1
62+
) * resourceSnapshot.size();
63+
}
64+
}
65+
}
66+
return result;
67+
}
68+
69+
} // anonymous namespace
70+
2871
struct TShardRangesWithShardId {
2972
TMaybe<ui64> ShardId;
3073
const TShardKeyRanges* Ranges;
@@ -546,6 +589,12 @@ void TKqpTasksGraph::BuildDqSourceStreamLookupChannels(const TStageInfo& stageIn
546589
streamLookupSource.SetProviderName(compiledSource.GetType());
547590
*streamLookupSource.MutableLookupSource() = compiledSource.GetSettings();
548591

592+
TString structuredToken;
593+
const auto& sourceName = compiledSource.GetSourceName();
594+
if (sourceName) {
595+
structuredToken = NYql::CreateStructuredTokenParser(compiledSource.GetAuthInfo()).ToBuilder().ReplaceReferences(GetMeta().SecureParams).ToJson();
596+
}
597+
549598
TTransform dqSourceStreamLookupTransform = {
550599
.Type = "StreamLookupInputTransform",
551600
.InputType = dqSourceStreamLookup.GetInputStageRowType(),
@@ -554,7 +603,12 @@ void TKqpTasksGraph::BuildDqSourceStreamLookupChannels(const TStageInfo& stageIn
554603
YQL_ENSURE(dqSourceStreamLookupTransform.Settings.PackFrom(*settings));
555604

556605
for (const auto taskId : stageInfo.Tasks) {
557-
GetTask(taskId).Inputs[inputIndex].Transform = dqSourceStreamLookupTransform;
606+
auto& task = GetTask(taskId);
607+
task.Inputs[inputIndex].Transform = dqSourceStreamLookupTransform;
608+
609+
if (structuredToken) {
610+
task.Meta.SecureParams.emplace(sourceName, structuredToken);
611+
}
558612
}
559613

560614
BuildUnionAllChannels(*this, stageInfo, inputIndex, inputStageInfo, outputIndex, /* enableSpilling */ false, logFunc);
@@ -1505,9 +1559,10 @@ void TKqpTasksGraph::PersistTasksGraphInfo(NKikimrKqp::TQueryPhysicalGraph& resu
15051559
}
15061560
}
15071561

1508-
// Restored graph only requires to update authentication secrets
1509-
// and to reassign existing tasks between actual nodes.
1510-
void TKqpTasksGraph::RestoreTasksGraphInfo(const NKikimrKqp::TQueryPhysicalGraph& graphInfo) {
1562+
void TKqpTasksGraph::RestoreTasksGraphInfo(const TVector<NKikimrKqp::TKqpNodeResources>& resourcesSnapshot, const NKikimrKqp::TQueryPhysicalGraph& graphInfo) {
1563+
GetMeta().IsRestored = true;
1564+
GetMeta().AllowWithSpilling = false;
1565+
15111566
const auto restoreDqTransform = [](const auto& protoInfo) -> TMaybe<TTransform> {
15121567
if (!protoInfo.HasTransform()) {
15131568
return Nothing();
@@ -1543,12 +1598,14 @@ void TKqpTasksGraph::RestoreTasksGraphInfo(const NKikimrKqp::TQueryPhysicalGraph
15431598
const auto& task = graphInfo.GetTasks(taskIdx);
15441599
const auto txId = task.GetTxId();
15451600
const auto& taskInfo = task.GetDqTask();
1601+
const NYql::NDq::TStageId stageId(txId, taskInfo.GetStageId());
15461602

1547-
auto& stageInfo = GetStageInfo({txId, taskInfo.GetStageId()});
1603+
auto& stageInfo = GetStageInfo(stageId);
15481604
auto& newTask = AddTask(stageInfo, TTaskType::RESTORED);
15491605
YQL_ENSURE(taskInfo.GetId() == newTask.Id);
15501606
newTask.Meta.TaskParams.insert(taskInfo.GetTaskParams().begin(), taskInfo.GetTaskParams().end());
15511607
newTask.Meta.ReadRanges.assign(taskInfo.GetReadRanges().begin(), taskInfo.GetReadRanges().end());
1608+
newTask.Meta.Type = TTaskMeta::TTaskType::Compute;
15521609

15531610
for (size_t inputIdx = 0; inputIdx < taskInfo.InputsSize(); ++inputIdx) {
15541611
const auto& inputInfo = taskInfo.GetInputs(inputIdx);
@@ -1663,6 +1720,23 @@ void TKqpTasksGraph::RestoreTasksGraphInfo(const NKikimrKqp::TQueryPhysicalGraph
16631720
restoreDqChannel(txId, channelInfo).SrcOutputIndex = outputIdx;
16641721
}
16651722
}
1723+
1724+
const auto& stage = stageInfo.Meta.GetStage(stageId);
1725+
FillSecureParamsFromStage(newTask.Meta.SecureParams, stage);
1726+
BuildSinks(stage, stageInfo, newTask);
1727+
1728+
for (const auto& input : stage.GetInputs()) {
1729+
if (input.GetTypeCase() != NKqpProto::TKqpPhyConnection::kDqSourceStreamLookup) {
1730+
continue;
1731+
}
1732+
1733+
if (const auto& compiledSource = input.GetDqSourceStreamLookup().GetLookupSource(); const auto& sourceName = compiledSource.GetSourceName()) {
1734+
newTask.Meta.SecureParams.emplace(
1735+
sourceName,
1736+
NYql::CreateStructuredTokenParser(compiledSource.GetAuthInfo()).ToBuilder().ReplaceReferences(GetMeta().SecureParams).ToJson()
1737+
);
1738+
}
1739+
}
16661740
}
16671741

16681742
for (const auto& [id, channel] : channels) {
@@ -1671,7 +1745,20 @@ void TKqpTasksGraph::RestoreTasksGraphInfo(const NKikimrKqp::TQueryPhysicalGraph
16711745
YQL_ENSURE(id == newChannel.Id);
16721746
}
16731747

1674-
GetMeta().IsRestored = true;
1748+
for (ui64 txIdx = 0; txIdx < Transactions.size(); ++txIdx) {
1749+
const auto& tx = Transactions.at(txIdx);
1750+
const auto scheduledTaskCount = ScheduleByCost(tx, resourcesSnapshot);
1751+
1752+
for (ui64 stageIdx = 0; stageIdx < tx.Body->StagesSize(); ++stageIdx) {
1753+
const auto& stage = tx.Body->GetStages(stageIdx);
1754+
auto& stageInfo = GetStageInfo({txIdx, stageIdx});
1755+
1756+
if (const auto& sources = stage.GetSources(); !sources.empty() && sources[0].GetTypeCase() == NKqpProto::TKqpSource::kExternalSource) {
1757+
const auto it = scheduledTaskCount.find(stageIdx);
1758+
BuildReadTasksFromSource(stageInfo, resourcesSnapshot, it != scheduledTaskCount.end() ? it->second.TaskCount : 0);
1759+
}
1760+
}
1761+
}
16751762
}
16761763

16771764
void TKqpTasksGraph::BuildSysViewScanTasks(TStageInfo& stageInfo) {
@@ -1745,15 +1832,6 @@ std::pair<ui32, TKqpTasksGraph::TTaskType::ECreateReason> TKqpTasksGraph::GetMax
17451832
bool TKqpTasksGraph::BuildComputeTasks(TStageInfo& stageInfo, const ui32 nodesCount) {
17461833
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
17471834

1748-
// TODO: move outside
1749-
if (GetMeta().IsRestored) {
1750-
for (const auto taskId : stageInfo.Tasks) {
1751-
auto& task = GetTask(taskId);
1752-
task.Meta.Type = TTaskMeta::TTaskType::Compute;
1753-
}
1754-
return false;
1755-
}
1756-
17571835
TTaskType::ECreateReason tasksReason = TTaskType::MINIMUM_COMPUTE;
17581836
bool unknownAffectedShardCount = false;
17591837
ui32 partitionsCount = 1;
@@ -2715,45 +2793,6 @@ TMaybe<size_t> TKqpTasksGraph::BuildScanTasksFromSource(TStageInfo& stageInfo, b
27152793
}
27162794
}
27172795

2718-
struct TStageScheduleInfo {
2719-
double StageCost = 0.0;
2720-
ui32 TaskCount = 0;
2721-
};
2722-
2723-
static std::map<ui32, TStageScheduleInfo> ScheduleByCost(const IKqpGateway::TPhysicalTxData& tx, const TVector<NKikimrKqp::TKqpNodeResources>& resourceSnapshot) {
2724-
std::map<ui32, TStageScheduleInfo> result;
2725-
if (!resourceSnapshot.empty()) // can't schedule w/o node count
2726-
{
2727-
// collect costs and schedule stages with external sources only
2728-
double totalCost = 0.0;
2729-
for (ui32 stageIdx = 0; stageIdx < tx.Body->StagesSize(); ++stageIdx) {
2730-
auto& stage = tx.Body->GetStages(stageIdx);
2731-
if (stage.SourcesSize() > 0 && stage.GetSources(0).GetTypeCase() == NKqpProto::TKqpSource::kExternalSource) {
2732-
if (stage.GetStageCost() > 0.0 && stage.GetTaskCount() == 0) {
2733-
totalCost += stage.GetStageCost();
2734-
result.emplace(stageIdx, TStageScheduleInfo{.StageCost = stage.GetStageCost()});
2735-
}
2736-
}
2737-
}
2738-
// assign task counts
2739-
if (!result.empty()) {
2740-
// allow use 2/3 of threads in single stage
2741-
ui32 maxStageTaskCount = (TStagePredictor::GetUsableThreads() * 2 + 2) / 3;
2742-
// total limit per mode is x2
2743-
ui32 maxTotalTaskCount = maxStageTaskCount * 2;
2744-
for (auto& [_, stageInfo] : result) {
2745-
// schedule tasks evenly between nodes
2746-
stageInfo.TaskCount =
2747-
std::max<ui32>(
2748-
std::min(static_cast<ui32>(maxTotalTaskCount * stageInfo.StageCost / totalCost), maxStageTaskCount)
2749-
, 1
2750-
) * resourceSnapshot.size();
2751-
}
2752-
}
2753-
}
2754-
return result;
2755-
}
2756-
27572796
void TKqpTasksGraph::FillSecureParamsFromStage(THashMap<TString, TString>& secureParams, const NKqpProto::TKqpPhyStage& stage) const {
27582797
for (const auto& [secretName, authInfo] : stage.GetSecureParams()) {
27592798
const auto& structuredToken = NYql::CreateStructuredTokenParser(authInfo).ToBuilder().ReplaceReferences(GetMeta().SecureParams).ToJson();
@@ -2943,9 +2982,7 @@ size_t TKqpTasksGraph::BuildAllTasks(std::optional<TLlvmSettings> llvmSettings,
29432982

29442983
// Not task-related
29452984
GetMeta().AllowWithSpilling |= stage.GetAllowWithSpilling();
2946-
if (!GetMeta().IsRestored) {
2947-
BuildKqpStageChannels(stageInfo, GetMeta().TxId, GetMeta().AllowWithSpilling, tx.Body->EnableShuffleElimination());
2948-
}
2985+
BuildKqpStageChannels(stageInfo, GetMeta().TxId, GetMeta().AllowWithSpilling, tx.Body->EnableShuffleElimination());
29492986
}
29502987

29512988
// Not task-related

ydb/core/kqp/executer_actor/kqp_tasks_graph.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ class TKqpTasksGraph : public NYql::NDq::TDqTasksGraph<TGraphMeta, TStageInfoMet
389389

390390
NYql::NDqProto::TDqTask* ArenaSerializeTaskToProto(const TTask& task, bool serializeAsyncIoSettings);
391391
void PersistTasksGraphInfo(NKikimrKqp::TQueryPhysicalGraph& result) const;
392-
void RestoreTasksGraphInfo(const NKikimrKqp::TQueryPhysicalGraph& graphInfo);
392+
void RestoreTasksGraphInfo(const TVector<NKikimrKqp::TKqpNodeResources>& resourcesSnapshot, const NKikimrKqp::TQueryPhysicalGraph& graphInfo);
393393

394394
// TODO: public used by TKqpPlanner
395395
void FillChannelDesc(NYql::NDqProto::TChannel& channelDesc, const NYql::NDq::TChannel& channel,

ydb/core/kqp/query_compiler/kqp_query_compiler.cpp

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -471,21 +471,21 @@ void FillOlapProgram(const T& node, const NKikimr::NMiniKQL::TType* miniKqlResul
471471
CompileOlapProgram(node.Process(), tableMeta, readProto, resultColNames, ctx);
472472
}
473473

474-
THashMap<TString, TString> FindSecureParams(const TExprNode::TPtr& node, const TTypeAnnotationContext& typesCtx, TSet<TString>& SecretNames) {
474+
THashMap<TString, TString> FindSecureParams(const TExprNode::TPtr& node, const TTypeAnnotationContext& typesCtx, TSet<TString>& secretNames) {
475475
THashMap<TString, TString> secureParams;
476476
NYql::NCommon::FillSecureParams(node, typesCtx, secureParams);
477477

478478
for (auto& [secretName, structuredToken] : secureParams) {
479479
const auto& tokenParser = CreateStructuredTokenParser(structuredToken);
480-
tokenParser.ListReferences(SecretNames);
480+
tokenParser.ListReferences(secretNames);
481481
structuredToken = tokenParser.ToBuilder().RemoveSecrets().ToJson();
482482
}
483483

484484
return secureParams;
485485
}
486486

487-
std::optional<std::pair<TString, TString>> FindOneSecureParam(const TExprNode::TPtr& node, const TTypeAnnotationContext& typesCtx, const TString& nodeName, TSet<TString>& SecretNames) {
488-
const auto& secureParams = FindSecureParams(node, typesCtx, SecretNames);
487+
std::optional<std::pair<TString, TString>> FindOneSecureParam(const TExprNode::TPtr& node, const TTypeAnnotationContext& typesCtx, const TString& nodeName, TSet<TString>& secretNames) {
488+
const auto& secureParams = FindSecureParams(node, typesCtx, secretNames);
489489
if (secureParams.empty()) {
490490
return std::nullopt;
491491
}
@@ -1849,6 +1849,11 @@ class TKqpQueryCompiler : public IKqpQueryCompiler {
18491849
YQL_ENSURE(!lookupSourceSettings.type_url().empty(), "Data source provider \"" << dataSourceCategory << "\" did't fill dq source settings for its dq source node");
18501850
YQL_ENSURE(lookupSourceType, "Data source provider \"" << dataSourceCategory << "\" did't fill dq source settings type for its dq source node");
18511851

1852+
if (const auto& secureParams = FindOneSecureParam(lookupSourceWrap.Ptr(), TypesCtx, "streamLookupSource", SecretNames)) {
1853+
lookupSource.SetSourceName(secureParams->first);
1854+
lookupSource.SetAuthInfo(secureParams->second);
1855+
}
1856+
18521857
const auto& streamLookupOutput = streamLookup.Output();
18531858
const auto connectionInputRowType = GetSeqItemType(streamLookupOutput.Ref().GetTypeAnn());
18541859
YQL_ENSURE(connectionInputRowType->GetKind() == ETypeAnnotationKind::Struct);

ydb/core/kqp/ut/federated_query/datastreams/datastreams_ut.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -658,7 +658,7 @@ class TStreamingTestFixture : public NUnitTest::TBaseFixture {
658658

659659
auto listSplitsBuilder = mockClient->ExpectListSplits();
660660
auto fillListSplitExpectation = listSplitsBuilder
661-
.ValidateArgs(settings.ValidateListSplitsArgs)
661+
.ValidateArgs(settings.ValidateListSplitsArgs ? TConnectorClientMock::EArgsValidation::Strict : TConnectorClientMock::EArgsValidation::DataSourceInstance)
662662
.Select()
663663
.DataSourceInstance(GetMockConnectorSourceInstance())
664664
.Table(settings.TableName)
@@ -689,7 +689,8 @@ class TStreamingTestFixture : public NUnitTest::TBaseFixture {
689689
{
690690
auto columnsBuilder = readSplitsBuilder
691691
.Filtering(TReadSplitsRequest::FILTERING_OPTIONAL)
692-
.ValidateArgs(settings.ValidateReadSplitsArgs)
692+
.DataSourceInstance(GetMockConnectorSourceInstance())
693+
.ValidateArgs(settings.ValidateReadSplitsArgs ? TConnectorClientMock::EArgsValidation::Strict : TConnectorClientMock::EArgsValidation::DataSourceInstance)
693694
.Split()
694695
.Description("some binary description")
695696
.Select()

ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ namespace NYql::NDq {
9898
// all the splits from all partitions will be packed into a single ReadSplits call.
9999
// There's a lot of space for the optimizations here.
100100
NConnector::NApi::TReadSplitsRequest request;
101+
*request.mutable_data_source_instance() = Source_.select().data_source_instance();
101102
request.set_format(NConnector::NApi::TReadSplitsRequest::ARROW_IPC_STREAMING);
102103
request.set_filtering(NConnector::NApi::TReadSplitsRequest::FILTERING_OPTIONAL);
103104

0 commit comments

Comments
 (0)