Skip to content

Split scheduling is broken for colocated join #11253

@haozhun

Description

@haozhun

There are two bugs that affect scheduling for fragment/stage with more than one scan nodes. Only colocated join can create such fragment/stage at this time.

The combined outcome is that there is no back pressure for split scheduling for all but the first scan node (in source scheduling order) in a stage.

bug 1

When a fragment/stage contains multiple scan nodes. None of the scan nodes will receive a TaskSource where TaskSource.noMoreSplits = true until scheduling for splits for all the scan nodes in the stage finishes.

In SqlStageExecution, there are 2 places where completeSources variable is updated. One in schedulingComplete, which happens after the entire stage finishes scheduling. The other in addExchangeLocations, which isn't relevant for scan nodes.

In SqlStageExecution, there are 3 places where task.noMoreSplits is invoked. It's invoked in scheduleTask, where task.noMoreSplits is invoked for each element in completeSources. The other two are schedulingComplete and addExchangeLocation.

This bug seems unlikely because the bug should have caused frequent query deadlock. That leads me to bug 2.

bug 2

In PipelineContext.getPipelineStatus, queuedDrivers is computed by looking at DriverContexts. However, note that in SqlTaskExecution.schedulePartitionedSource, there is this concept of pendingSplitsByPlanNode. pendingSplitsByPlanNode buffers splits for scan nodes that aren't yet eligible to schedule because another scan node who is ahead in term of source scheduling order hasn't finished scheduling.

Specifically, that "another scan node" is the first scan node (in source scheduling order). Due to bug 1, it will not finish scheduling until all splits for the stage are delivered to workers.

queuedDrivers should include splits in pendingSplitsByPlanNode although a DriverSplitRunner is yet to be created for those splits. The fact that the worker chose to defer the creation of those drivers is an implementation detail. Conceptually, those splits have been delivered to the worker, and the worker has created the drivers for them although those drivers are "blocked" (not runnable).

Now look at NodeScheduler.selectDistributionNodes. It depends on NodeAssignmentStats.getTotalSplitCount. It is effectively queuedDrivers + recent assignment. While recent assignment would increment, it is reset to zero whenever the split is delivered to workers. queuedDrivers would be a small number (and eventually hit zero) due to this bug.

This bug also leads to misleading/unituitive client stats.

mutual effect

If bug 2 gets fixed alone, it would lead to scheduling deadlocks.

If bug 1 gets fixed alone, it will restore back pressure and somewhat mitigate bug 2.

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions