Skip to content

Implement VectorUdf and use it in Queries 1 and 8 of TPCH benchmarks. #127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jun 7, 2019
7 changes: 6 additions & 1 deletion benchmark/csharp/Microsoft.Spark.Benchmark.sln
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.28307.168
Expand All @@ -9,6 +8,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Spark", "..\..\sr
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Spark.Worker", "..\..\src\csharp\Microsoft.Spark.Worker\Microsoft.Spark.Worker.csproj", "{A267D1A0-8EF6-475F-B118-67DDACD4373A}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Spark.Experimental", "..\..\src\csharp\Microsoft.Spark.Experimental\Microsoft.Spark.Experimental.csproj", "{3F56D109-C4B8-4AC6-B87E-D2E8B73E3FB4}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -27,6 +28,10 @@ Global
{A267D1A0-8EF6-475F-B118-67DDACD4373A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A267D1A0-8EF6-475F-B118-67DDACD4373A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A267D1A0-8EF6-475F-B118-67DDACD4373A}.Release|Any CPU.Build.0 = Release|Any CPU
{3F56D109-C4B8-4AC6-B87E-D2E8B73E3FB4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3F56D109-C4B8-4AC6-B87E-D2E8B73E3FB4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3F56D109-C4B8-4AC6-B87E-D2E8B73E3FB4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3F56D109-C4B8-4AC6-B87E-D2E8B73E3FB4}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
2 changes: 1 addition & 1 deletion benchmark/csharp/Tpch/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ private static void Main(string[] args)
}

var tpchRoot = args[0];
var queryNumber = int.Parse(args[1]);
var queryNumber = args[1];
var numIteration = int.Parse(args[2]);
var isSQL = bool.Parse(args[3]);

Expand Down
3 changes: 2 additions & 1 deletion benchmark/csharp/Tpch/Tpch.csproj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
Expand All @@ -9,6 +9,7 @@
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\src\csharp\Microsoft.Spark.Experimental\Microsoft.Spark.Experimental.csproj" />
<ProjectReference Include="..\..\..\src\csharp\Microsoft.Spark.Worker\Microsoft.Spark.Worker.csproj" />
<ProjectReference Include="..\..\..\src\csharp\Microsoft.Spark\Microsoft.Spark.csproj" />
</ItemGroup>
Expand Down
60 changes: 58 additions & 2 deletions benchmark/csharp/Tpch/TpchFunctionalQueries.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
using System.Diagnostics;
using System.Reflection;
using System.Text.RegularExpressions;
using Apache.Arrow;
using Microsoft.Spark.Sql;
using static Microsoft.Spark.Sql.ExperimentalFunctions;
using static Microsoft.Spark.Sql.Functions;

namespace Tpch
Expand Down Expand Up @@ -57,6 +59,28 @@ internal void Q1()
.Show();
}

internal void Q1v()
{
Func<Column, Column, Column> discPrice = VectorUdf<DoubleArray, DoubleArray, DoubleArray>(
(price, discount) => VectorFunctions.ComputeDiscountPrice(price, discount));

Func<Column, Column, Column, Column> total = VectorUdf<DoubleArray, DoubleArray, DoubleArray, DoubleArray>(
(price, discount, tax) => VectorFunctions.ComputeTotal(price, discount, tax));

_lineitem.Filter(Col("l_shipdate") <= "1998-09-02")
.GroupBy(Col("l_returnflag"), Col("l_linestatus"))
.Agg(Sum(Col("l_quantity")).As("sum_qty"), Sum(Col("l_extendedprice")).As("sum_base_price"),
Sum(discPrice(Col("l_extendedprice"), Col("l_discount"))).As("sum_disc_price"),
Sum(total(Col("l_extendedprice"), Col("l_discount"), Col("l_tax"))).As("sum_charge"),
Avg(Col("l_quantity")).As("avg_qty"),
Avg(Col("l_extendedprice")).As("avg_price"),
Avg(Col("l_discount")).As("avg_disc"),
Count(Col("l_quantity")).As("count_order")
)
.Sort(Col("l_returnflag"), Col("l_linestatus"))
.Show();
}

internal void Q2()
{
DataFrame europe = _region.Filter(Col("r_name") == "EUROPE")
Expand Down Expand Up @@ -202,6 +226,40 @@ internal void Q8()
.Show();
}

internal void Q8v()
{
Func<Column, Column> getYear = Udf<string, string>(x => x.Substring(0, 4));
Func<Column, Column, Column> discPrice = VectorUdf<DoubleArray, DoubleArray, DoubleArray>(
(price, discount) => VectorFunctions.ComputeDiscountPrice(price, discount));

Func<Column, Column, Column> isBrazil = Udf<string, double, double>((x, y) => x == "BRAZIL" ? y : 0);

DataFrame fregion = _region.Filter(Col("r_name") == "AMERICA");
DataFrame forder = _orders.Filter(Col("o_orderdate") <= "1996-12-31" & Col("o_orderdate") >= "1995-01-01");
DataFrame fpart = _part.Filter(Col("p_type") == "ECONOMY ANODIZED STEEL");

DataFrame nat = _nation.Join(_supplier, Col("n_nationkey") == _supplier["s_nationkey"]);

DataFrame line = _lineitem.Select(Col("l_partkey"), Col("l_suppkey"), Col("l_orderkey"),
discPrice(Col("l_extendedprice"), Col("l_discount")).As("volume"))
.Join(fpart, Col("l_partkey") == fpart["p_partkey"])
.Join(nat, Col("l_suppkey") == nat["s_suppkey"]);

_nation.Join(fregion, Col("n_regionkey") == fregion["r_regionkey"])
.Select(Col("n_nationkey"))
.Join(_customer, Col("n_nationkey") == _customer["c_nationkey"])
.Select(Col("c_custkey"))
.Join(forder, Col("c_custkey") == forder["o_custkey"])
.Select(Col("o_orderkey"), Col("o_orderdate"))
.Join(line, Col("o_orderkey") == line["l_orderkey"])
.Select(getYear(Col("o_orderdate")).As("o_year"), Col("volume"),
isBrazil(Col("n_name"), Col("volume")).As("case_volume"))
.GroupBy(Col("o_year"))
.Agg((Sum(Col("case_volume")) / Sum("volume")).As("mkt_share"))
.Sort(Col("o_year"))
.Show();
}

internal void Q9()
{
Func<Column, Column> getYear = Udf<string, string>(x => x.Substring(0, 4));
Expand Down Expand Up @@ -333,8 +391,6 @@ internal void Q15()
private static readonly Regex s_q16NumbersRegex = new Regex("^(49|14|23|45|19|3|36|9)$", RegexOptions.Compiled);
internal void Q16()
{
Func<Column, Column, Column> decrease = Udf<double, double, double>((x, y) => x * (1 - y));

Func<Column, Column> complains = Udf<string, bool>((x) => s_q16CompainsRegex.Match(x).Success);

Func<Column, Column> polished = Udf<string, bool>((x) => x.StartsWith("MEDIUM POLISHED"));
Expand Down
61 changes: 61 additions & 0 deletions benchmark/csharp/Tpch/VectorFunctions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using Apache.Arrow;

namespace Tpch
{
internal static class VectorFunctions
{
internal static DoubleArray ComputeTotal(DoubleArray price, DoubleArray discount, DoubleArray tax)
{
if ((price.Length != discount.Length) || (price.Length != tax.Length))
{
throw new ArgumentException("Arrays need to be the same length");
}

int length = price.Length;
var builder = new ArrowBuffer.Builder<double>(length);
ReadOnlySpan<double> prices = price.Values;
ReadOnlySpan<double> discounts = discount.Values;
ReadOnlySpan<double> taxes = tax.Values;
for (int i = 0; i < length; ++i)
{
builder.Append(prices[i] * (1 - discounts[i]) * (1 + taxes[i]));
}

return new DoubleArray(
builder.Build(),
nullBitmapBuffer: ArrowBuffer.Empty,
length: length,
nullCount: 0,
offset: 0);
}

internal static DoubleArray ComputeDiscountPrice(DoubleArray price, DoubleArray discount)
{
if (price.Length != discount.Length)
{
throw new ArgumentException("Arrays need to be the same length");
}

int length = price.Length;
var builder = new ArrowBuffer.Builder<double>(length);
ReadOnlySpan<double> prices = price.Values;
ReadOnlySpan<double> discounts = discount.Values;
for (int i = 0; i < length; ++i)
{
builder.Append(prices[i] * (1 - discounts[i]));
}

return new DoubleArray(
builder.Build(),
nullBitmapBuffer: ArrowBuffer.Empty,
length: length,
nullCount: 0,
offset: 0);
}
}
}
Loading