-
Notifications
You must be signed in to change notification settings - Fork 96
Squall query plans to Storm topologies
###Assigning Storm components to Nodes
In the Storm terminology, a logical component which generates tuples (or reads
from a file) and propagates them further down the system, is called Spout, and
all other components are denoted as Bolts. A Spout contains the nextTuple
method, which sends tuples down the topology, and Bolt contains execute(Tuple)
method which processes a tuple and/or sends it further down.
From the Storm documentation:
Each spout or bolt executes as many tasks across the cluster. Each task corresponds to one thread of execution, and stream groupings define how to send tuples from one set of tasks to another set of tasks.
However, each task executes sequentially - that is, nextTuple
or execute(Tuple)
are not called, until the processing of the previous tuple is done. You can set
the parallelism for each component using the config files that we will explain
later, as well as the total number of workers allocated.
From the Storm documentation:
Topologies execute across one or more worker processes. Each worker process is a physical JVM and executes a subset of all the tasks for the topology. For example, if the combined parallelism of the topology is 300 and 50 workers are allocated, then each worker will execute 6 tasks (as threads within the worker). Storm tries to spread the tasks evenly across all the workers.
Two logical components can be interconnected by an arbitrary function (e.g. by a hash function of some field in a tuple). In Storm terminology, these interconnections are called stream groupings. Stream groupings offer more flexibility than the corresponding ones in Online MapReduce [1]. Specifically, Storm offers the user the possibility to reason about his system in terms of a DAG of components that contain arbitrary code and can be arbitrarily interconnected. A DAG of these logical components submitted to a parallel execution environment is denoted as a topology. A topology is executed indefinitely, until a user explicitly kills it.
The parallelism of AggregationOperator component cannot exceed the number of GroupBy different values. In a query containing aggregation with a small number of different GroupBy values, the level with the aggregation is very sensitive to skew. For example, if we have 5 different GroupBy values, and 5 nodes on that level, Storm will assign 2 of them to a single node, so that the execution time for that level is twice more than necessary and one node is completely idle. Storm cannot do better since it hashes different values to different nodes without the knowledge of full list of different GroupBy values. However, if we know the GroupBy values ahead of time, we could statically hash them uniformly over nodes.
This can be specified by invoking components.AbstractComponent.setFullHashList
. In Hyracks query on 5G TPC-H database, 23% of speedup was obtained.
Currently, this only works for the Imperative interface.
Squall can work in two modes - AckEveryTuple
and AckAtTheEnd
. For the former, some nodes are dedicated to take care of propagation of a tuple through the topology. If a tuple is fully processed, the Spout ack()
method is invoked, otherwise the Spout fail()
method is invoked. For the latter, Squall makes sure that all the tuples are fully processed only at the end, by flushing communication channels. We need to know when the topology processing is done, so that the final result and the full execution time can be acquired.
The latter approach incurs in average 2 times speedup, because of the significantly reduced network traffic. On the other hand, we loose the information about Storm component latencies, and more importantly, it also might lead to buffer overflow in the underlying communication libraries. However, if the query plan (including parallelism of each component) is properly designed, it will likely execute without running out of memory.
If we only set the parallelism of the last component to one, the desired behavior is acquired, but that component becomes a bottleneck and it limits the scalability of the system.
Alternatively, we can sacrifice a little bit of latency for achieving this. We can scatter the final aggregation on two last components. The components from next-to-last level can periodically send aggregated results to the last level component, which has parallelism set to one.
This can be specified by invoking components.Component.setBatchOutputMode(long millis)
on the next-to-last level component. It exhibits negligible overheads in terms of full execution time for the Hyracks query on a 5G database size with parallelism 16 for the component on the next-to-last level.
However, this only works if Squall is run with manually specified query plans and if we are in AckAtTheEnd mode. Currently, we are working on the brand new user interface.
[1] T. Condie, N. Conway, P. Alvaro, J. M. Hellerstein, J. Gerth, J. Talbot, K. Elmeleegy, and R. Sears, “Online aggregation and continuous query sup- port in mapreduce,” in Proceedings of the 2010 international conference on Management of data, SIGMOD ’10, (New York, NY, USA), pp. 1115–1118, ACM, 2010.