Skip to content

[Doc] Tpch Benchmark Instructions #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions benchmark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
Benchmarking
===

# Generate Data
> *TODO* instructions to be provided
# Instructions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a description on what the field of result means? Along with warm description? Also, do we plan to get rid of cold mechanism all together (looks like that is happening in the code)?

Copy link
Member Author

@suhsteve suhsteve Apr 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can get the cold mechanism by surrounding the spark-submit with another for loop and sending 1 as the number of iterations to the application.

## CSharp
1. Ensure that the Microsoft.Spark.Worker is properly [installed](https://github.com/dotnet/spark/tree/master/deployment#cloud-deployment) in your cluster.
2. Build `microsoft-spark-<spark_majorversion.spark_minorversion.x>-<spark_dotnet_version>.jar` and the [csharp tpch benchmark](csharp/Tpch) application by following the [build instructions](../docs/building).
3. Upload [run_csharp_benchmark.sh](run_csharp_benchmark.sh), the Tpch benchmark application, and `microsoft-spark-<spark_majorversion.spark_minorversion.x>-<spark_dotnet_version>.jar` to the cluster.
4. Run the benchmark by invoking `run_csharp_benchmark.sh`
```shell
run_csharp_benchmark.sh \
<num_executors> \
<driver_memory> \
<executor_memory> \
<executor_cores> \
</path/to/Tpch.dll> \
</path/to/microsoft-spark-<spark_majorversion.spark_minorversion.x>-<spark_dotnet_version>.jar> \
</path/to/Tpch executable> \
</path/to/dataset> \
<number of iterations> \
<true for sql tests, false for functional tests>
```

## Python
1. Upload [run_python_benchmark.sh](run_python_benchmark.sh) and all [python tpch benchmark](python/) files to the cluster.
2. Run the benchmark by invoking `run_python_benchmark.sh`
```shell
run_python_benchmark.sh \
<num_executors> \
<driver_memory> \
<executor_memory> \
<executor_cores> \
</path/to/tpch.py> \
</path/to/dataset> \
<number of iterations> \
<true for sql tests, false for functional tests>
```

## Scala
1. `mvn package` to build the [scala tpch benchmark](scala/) application.
2. Upload [run_scala_benchmark.sh](run_scala_benchmark.sh) and the `microsoft-spark-benchmark-<version>.jar` to the cluster.
3. Run the benchmark by invoking `run_scala_benchmark.sh`
```shell
run_scala_benchmark.sh \
<num_executors> \
<driver_memory> \
<executor_memory> \
<executor_cores> \
</path/to/microsoft-spark-benchmark-<version>.jar> \
</path/to/dataset> \
<number of iterations> \
<true for sql tests, false for functional tests>
```
19 changes: 11 additions & 8 deletions benchmark/csharp/Tpch/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,21 @@ private static void Main(string[] args)
var numIteration = int.Parse(args[2]);
var isSQL = bool.Parse(args[3]);

SparkSession spark = SparkSession
.Builder()
.AppName("TPC-H Benchmark for DotNet")
.GetOrCreate();

for (var i = 0; i < numIteration; ++i)
{
SparkSession spark = SparkSession
.Builder()
.AppName("TPC-H Benchmark for DotNet")
.GetOrCreate();

Stopwatch sw = Stopwatch.StartNew();
Stopwatch swFunc = new Stopwatch();
if (!isSQL)
{
var tpchFunctional = new TpchFunctionalQueries(tpchRoot, spark);
swFunc.Start();
tpchFunctional.Run(queryNumber.ToString());
swFunc.Stop();
}
else
{
Expand All @@ -46,10 +49,10 @@ private static void Main(string[] args)
sw.Stop();

var typeStr = isSQL ? "SQL" : "Functional";
Console.WriteLine($"TPCH_Result,DotNet,{typeStr},{queryNumber},{i},{sw.ElapsedMilliseconds}");

spark.Stop();
Console.WriteLine($"TPCH_Result,DotNet,{typeStr},{queryNumber},{i},{sw.ElapsedMilliseconds},{swFunc.ElapsedMilliseconds}");
}

spark.Stop();
}
}
}
12 changes: 7 additions & 5 deletions benchmark/python/tpch.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,27 @@ def main():
input_dir = sys.argv[1]
query_number = sys.argv[2]
num_iterations = int(sys.argv[3])
is_sql = sys.argv[4].lower() in ("true")
is_sql = sys.argv[4].lower() == "true"

spark = SparkSession.builder.appName('TPCH Benchmark for Python').getOrCreate()
for iter in range(0, num_iterations):
print("TPCH Starting iteration {0} with query #{1}".format(iter, query_number))
spark = SparkSession.builder.appName('TPCH Benchmark for Python').getOrCreate()

start = time.time()
start = startFunc = endFunc = time.time()
if (is_sql == False):
queries = TpchFunctionalQueries(spark, input_dir)
startFunc = time.time()
getattr(queries, "q" + query_number)()
endFunc = time.time()
else:
queries = TpchSqlQueries(spark, input_dir)
getattr(queries, "q" + query_number)()
end = time.time()

typeStr = "SQL" if is_sql else "Functional"
print("TPCH_Result,Python,%s,%s,%d,%d" % (typeStr, query_number, iter, (end-start) * 1000))
print("TPCH_Result,Python,%s,%s,%d,%d,%d" % (typeStr, query_number, iter, (end-start) * 1000, (endFunc-startFunc) * 1000))

spark.stop()
spark.stop()

if __name__ == '__main__':
main()
17 changes: 17 additions & 0 deletions benchmark/run_csharp_benchmark.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/bash

NUM_EXECUTORS=$1
DRIVER_MEMORY=$2
EXECUTOR_MEMORY=$3
EXECUTOR_CORES=$4
CSHARP_DLL=$5
JAR_PATH=$6
CSHARP_EXECUTABLE=$7
DATA_PATH=$8
NUM_ITERATION=$9
IS_SQL=$10

for i in {1..22}
do
$SPARK_HOME/bin/spark-submit --master yarn --num-executors $NUM_EXECUTORS --driver-memory $DRIVER_MEMORY --executor-memory $EXECUTOR_MEMORY --executor-cores $EXECUTOR_CORES --files $CSHARP_DLL --class org.apache.spark.deploy.DotnetRunner $JAR_PATH $CSHARP_EXECUTABLE $DATA_PATH $i $NUM_ITERATION $IS_SQL
done
15 changes: 15 additions & 0 deletions benchmark/run_python_benchmark.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/bin/bash

NUM_EXECUTORS=$1
DRIVER_MEMORY=$2
EXECUTOR_MEMORY=$3
EXECUTOR_CORES=$4
PYTHON_SCRIPT=$5
DATA_PATH=$6
NUM_ITERATION=$7
IS_SQL=$8

for i in {1..22}
do
$SPARK_HOME/bin/spark-submit --master yarn --num-executors $NUM_EXECUTORS --driver-memory $DRIVER_MEMORY --executor-memory $EXECUTOR_MEMORY --executor-cores $EXECUTOR_CORES $PYTHON_SCRIPT $DATA_PATH $i $NUM_ITERATION $IS_SQL
done
15 changes: 15 additions & 0 deletions benchmark/run_scala_benchmark.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/bin/bash

NUM_EXECUTORS=$1
DRIVER_MEMORY=$2
EXECUTOR_MEMORY=$3
EXECUTOR_CORES=$4
JAR_PATH=$5
DATA_PATH=$6
NUM_ITERATION=$7
IS_SQL=$8

for i in {1..22}
do
$SPARK_HOME/bin/spark-submit --master yarn --num-executors $NUM_EXECUTORS --driver-memory $DRIVER_MEMORY --executor-memory $EXECUTOR_MEMORY --executor-cores $EXECUTOR_CORES --class com.microsoft.tpch.App $JAR_PATH $DATA_PATH $i $NUM_ITERATION $IS_SQL
done
21 changes: 13 additions & 8 deletions benchmark/scala/src/main/scala/com/microsoft/tpch/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,35 @@ object App {
val numIteration = args(2).toInt
val isSql = Try(args(3).toBoolean).getOrElse(false)

for (i <- 0 until numIteration) {
val spark = SparkSession
.builder()
.appName("TPC-H Benchmark for Scala")
.getOrCreate()
val spark = SparkSession
.builder()
.appName("TPC-H Benchmark for Scala")
.getOrCreate()

for (i <- 0 until numIteration) {
val startTs = System.currentTimeMillis
var startFunc = System.currentTimeMillis
var endFunc = System.currentTimeMillis

if (!isSql) {
val tpchFunctional = new TpchFunctionalQueries(spark, tpchRoot)
startFunc = System.currentTimeMillis
tpchFunctional.run(queryNumber.toString)
endFunc = System.currentTimeMillis
}
else {
}

val endTs = System.currentTimeMillis
val totalTime = endTs - startTs
val totalTimeFunc = endFunc - startFunc

val typeStr = if (isSql) "SQL"
else "Functional"

println(s"TPCH_Result,Scala,$typeStr,$queryNumber,$i,$totalTime")

spark.stop()
println(s"TPCH_Result,Scala,$typeStr,$queryNumber,$i,$totalTime,$totalTimeFunc")
}

spark.stop()
}
}