Skip to content

Commit bedaf6d

Browse files
Added the mySparkBatchApp.
1 parent 05aa697 commit bedaf6d

File tree

5 files changed

+1113
-0
lines changed

5 files changed

+1113
-0
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
2+
Microsoft Visual Studio Solution File, Format Version 12.00
3+
# Visual Studio Version 16
4+
VisualStudioVersion = 16.0.30717.126
5+
MinimumVisualStudioVersion = 10.0.40219.1
6+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "mySparkBatchApp", "mySparkBatchApp\mySparkBatchApp.csproj", "{1C6EC355-954D-4090-812C-8A77856BE22E}"
7+
EndProject
8+
Global
9+
GlobalSection(SolutionConfigurationPlatforms) = preSolution
10+
Debug|Any CPU = Debug|Any CPU
11+
Release|Any CPU = Release|Any CPU
12+
EndGlobalSection
13+
GlobalSection(ProjectConfigurationPlatforms) = postSolution
14+
{1C6EC355-954D-4090-812C-8A77856BE22E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
15+
{1C6EC355-954D-4090-812C-8A77856BE22E}.Debug|Any CPU.Build.0 = Debug|Any CPU
16+
{1C6EC355-954D-4090-812C-8A77856BE22E}.Release|Any CPU.ActiveCfg = Release|Any CPU
17+
{1C6EC355-954D-4090-812C-8A77856BE22E}.Release|Any CPU.Build.0 = Release|Any CPU
18+
EndGlobalSection
19+
GlobalSection(SolutionProperties) = preSolution
20+
HideSolutionNode = FALSE
21+
EndGlobalSection
22+
GlobalSection(ExtensibilityGlobals) = postSolution
23+
SolutionGuid = {2105A144-A409-4E75-9511-D63CA8D55072}
24+
EndGlobalSection
25+
EndGlobal
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
using System;
2+
using Microsoft.Spark.Sql;
3+
using static Microsoft.Spark.Sql.Functions;
4+
5+
namespace mySparkBatchApp
6+
{
7+
class Program
8+
{
9+
static void Main(string[] args)
10+
{
11+
//Console.WriteLine("Hello World!");
12+
13+
DateTime referenceDate = new DateTime(2015, 10, 20);
14+
15+
if (args.Length != 1)
16+
{
17+
Console.Error.WriteLine(
18+
"Usage: GitHubProjects <path to projects.csv>");
19+
Environment.Exit(1);
20+
}
21+
22+
string filePath = args[0];
23+
24+
SparkSession spark = SparkSession
25+
.Builder()
26+
.AppName("GitHub and Spark Batch")
27+
.GetOrCreate();
28+
29+
DataFrame projectsDf = spark
30+
.Read()
31+
.Schema("id INT, url STRING, owner_id INT, " +
32+
"name STRING, descriptor STRING, language STRING, " +
33+
"created_at STRING, forked_from INT, deleted STRING, " +
34+
"updated_at STRING")
35+
.Csv(filePath);
36+
37+
projectsDf.Show();
38+
39+
// Drop any rows with NA values
40+
DataFrameNaFunctions dropEmptyProjects = projectsDf.Na();
41+
DataFrame cleanedProjects = dropEmptyProjects.Drop("any");
42+
43+
// Remove unnecessary columns
44+
cleanedProjects = cleanedProjects.Drop("id", "url", "owner_id");
45+
cleanedProjects.Show();
46+
47+
// Average number of times each language has been forked
48+
DataFrame groupedDF = cleanedProjects
49+
.GroupBy("language")
50+
.Agg(Avg(cleanedProjects["forked_from"]));
51+
52+
// Sort by most forked languages first
53+
groupedDF.OrderBy(Desc("avg(forked_from)")).Show();
54+
55+
spark.Udf().Register<string, bool>(
56+
"MyUDF",
57+
(date) => DateTime.TryParse(date, out DateTime convertedDate) &&
58+
(convertedDate > referenceDate));
59+
60+
cleanedProjects.CreateOrReplaceTempView("dateView");
61+
62+
DataFrame dateDf = spark.Sql(
63+
"SELECT *, MyUDF(dateView.updated_at) AS datebefore FROM dateView");
64+
dateDf.Show();
65+
66+
spark.Stop();
67+
}
68+
}
69+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<OutputType>Exe</OutputType>
5+
<TargetFramework>netcoreapp3.1</TargetFramework>
6+
</PropertyGroup>
7+
8+
<ItemGroup>
9+
<PackageReference Include="Microsoft.Spark" Version="1.0.0" />
10+
</ItemGroup>
11+
12+
<ItemGroup>
13+
<None Update="projects_smaller.csv">
14+
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
15+
</None>
16+
</ItemGroup>
17+
18+
</Project>

0 commit comments

Comments
 (0)