You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
There are several approaches on the table for choosing an appropriate amount of tasks for the different stages of a query:
Cardinality effect: this is the current approach, if a stage contains a node that reduces cardinality, a scale factor is applied to the task count of the stage above.
Cost based estimation: approach attempted at Cost based planning #311. Attributes a compute cost to the different nodes, and based on statistics propagation, it knows how much compute power will be need for the different stages.
The big limitation with those approaches is that they make two big assumptions:
Size of the data pulled by leaf nodes can be inferred at planning time.
Size of the data flowing through stages can be calculated based on stats propagation.
Unfortunately, 1) is true only for specific cases, like reading from parquet files, and 2) is mostly not currently implemented upstream, and if it was, it will only be estimations.
Some engines overcome this by implementing adaptative query execution, which means that, based on the results of intermediate stages, they are capable of sizing appropriately the next stage, or even adding/removing workers involved in the query on the fly. Some systems like Spark can even change the shape of the plan on the fly.
This is difficult to do in this project, as no materialization occurs between stages, and therefore, there's really nothing that can be looked at for sizing them. Even if it was, once a number of tasks have been stablished for a stage, that's set in stone, it cannot be resized. However, there do is something we can do: choose the task count as late as possible, lazily as data comes in.
The current state of the project will assign an input task count to the different network boundaries at planning time, but let's imagine we don't, and the task count is left as an incognita that will be decided later.
The first step would be to choose an appropriate amount of tasks for the leaf node, which needs to be done before any data flows in. #374 explains how this can be done at runtime. Let's imagine that 3 tasks are chosen:
We have not drawn the RepartitionExec because we still don't know it's output partitions, as those will need to be set based on the amount of tasks of the stage above, and it's something we still don't know.
Now, we need a way to inform the DistributedExec node about how many tasks are appropriate to be spawned for the stage above, and for that, we can look at a sample of the data that flowed in the first 100 ms or something similar (pretty arbitrary, just for explanation purposes):
The sampler communicates with the DistributedExec an estimation of the amount of data that will flow through it, so that DistributedExec can lazily size the next stage. This will have a small runtime cost.
Let's imagine this situation:
the Filter node was very selective
the Aggregate(partial) was aggregating over a column with very low NDV
This will translate in very little amount of data reaching the Sampler, which might even only receive a single RecordBatch after batching up to datafusion.execution.batch_size. At this point, the Sampler can communicate to the DistributedExec this information, and the DistributedExec might decide to just not distribute further, and run the whole rest of the plan in single node, avoiding unnecessary network hops:
There are several approaches on the table for choosing an appropriate amount of tasks for the different stages of a query:
The big limitation with those approaches is that they make two big assumptions:
Unfortunately, 1) is true only for specific cases, like reading from parquet files, and 2) is mostly not currently implemented upstream, and if it was, it will only be estimations.
Some engines overcome this by implementing adaptative query execution, which means that, based on the results of intermediate stages, they are capable of sizing appropriately the next stage, or even adding/removing workers involved in the query on the fly. Some systems like Spark can even change the shape of the plan on the fly.
This is difficult to do in this project, as no materialization occurs between stages, and therefore, there's really nothing that can be looked at for sizing them. Even if it was, once a number of tasks have been stablished for a stage, that's set in stone, it cannot be resized. However, there do is something we can do: choose the task count as late as possible, lazily as data comes in.
Here's an example plan:
The same plan with network boundaries would look like this:
The current state of the project will assign an input task count to the different network boundaries at planning time, but let's imagine we don't, and the task count is left as an incognita that will be decided later.
The first step would be to choose an appropriate amount of tasks for the leaf node, which needs to be done before any data flows in. #374 explains how this can be done at runtime. Let's imagine that 3 tasks are chosen:
We have not drawn the
RepartitionExecbecause we still don't know it's output partitions, as those will need to be set based on the amount of tasks of the stage above, and it's something we still don't know.Now, we need a way to inform the
DistributedExecnode about how many tasks are appropriate to be spawned for the stage above, and for that, we can look at a sample of the data that flowed in the first 100 ms or something similar (pretty arbitrary, just for explanation purposes):The sampler communicates with the
DistributedExecan estimation of the amount of data that will flow through it, so thatDistributedExeccan lazily size the next stage. This will have a small runtime cost.Let's imagine this situation:
Filternode was very selectiveAggregate(partial)was aggregating over a column with very low NDVThis will translate in very little amount of data reaching the
Sampler, which might even only receive a singleRecordBatchafter batching up todatafusion.execution.batch_size. At this point, theSamplercan communicate to theDistributedExecthis information, and theDistributedExecmight decide to just not distribute further, and run the whole rest of the plan in single node, avoiding unnecessary network hops:Of course, the logic in the
Samplercan get arbitrarily complex:RecordBatches, and see how much time passed between their arrival