Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
<tbody align="center">
<tr>
<td >2.3.*</td>
<td rowspan=6><a href="https://github.com/dotnet/spark/releases/tag/v0.11.0">v0.11.0</a></td>
<td rowspan=6><a href="https://github.com/dotnet/spark/releases/tag/v0.12.0">v0.12.0</a></td>
</tr>
<tr>
<td>2.4.0</td>
Expand Down
2 changes: 1 addition & 1 deletion benchmark/scala/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.spark</groupId>
<artifactId>microsoft-spark-benchmark</artifactId>
<version>0.11.0</version>
<version>0.12.0</version>
<inceptionYear>2019</inceptionYear>
<properties>
<encoding>UTF-8</encoding>
Expand Down
115 changes: 115 additions & 0 deletions docs/release-notes/0.12/release-0.12.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# .NET for Apache Spark 0.12 Release Notes

### New Features/Improvements and Bug Fixes

* Expose `DataStreamWriter.ForeachBatch` API ([#549](https://github.com/dotnet/spark/pull/549))
* Support for [dotnet-interactive](https://github.com/dotnet/interactive) ([#515](https://github.com/dotnet/spark/pull/515)) ([#517](https://github.com/dotnet/spark/pull/517)) ([#554](https://github.com/dotnet/spark/pull/554))
* Support for [Hyperspace v0.1.0](https://github.com/microsoft/hyperspace) APIs ([#555](https://github.com/dotnet/spark/pull/555))
* Support for Spark 2.4.6 ([#547](https://github.com/dotnet/spark/pull/547))
* Bug fixes:
* Udf bug caused by `BroadcastVariablesRegistry` ([#551](https://github.com/dotnet/spark/pull/551))
* Null checks for `TimestampType` and `DateType` ([#530](https://github.com/dotnet/spark/pull/530))
* Update `Microsoft.Data.Analysis` to v`0.4.0` ([#528](https://github.com/dotnet/spark/pull/528))

### Infrastructure / Documentation / Etc.

* Improve build pipeline ([#510](https://github.com/dotnet/spark/pull/510)) ([#511](https://github.com/dotnet/spark/pull/511)) ([#512](https://github.com/dotnet/spark/pull/512)) ([#513](https://github.com/dotnet/spark/pull/513)) ([#524](https://github.com/dotnet/spark/pull/524))
* Update AppName for the C# Spark Examples ([#548](https://github.com/dotnet/spark/pull/548))
* Update maven links in build documentation ([#558](https://github.com/dotnet/spark/pull/558)) ([#560](https://github.com/dotnet/spark/pull/560))

### Breaking Changes

* None

### Known Issues

* Broadcast variables do not work with [dotnet-interactive](https://github.com/dotnet/interactive) ([#561](https://github.com/dotnet/spark/pull/561))

### Compatibility

#### Backward compatibility

The following table describes the oldest version of the worker that the current version is compatible with, along with new features that are incompatible with the worker.

<table>
<thead>
<tr>
<th>Oldest compatible Microsoft.Spark.Worker version</th>
<th>Incompatible features</th>
</tr>
</thead>
<tbody align="center">
<tr>
<td rowspan=4>v0.9.0</td>
<td>DataFrame with Grouped Map UDF <a href="https://github.com/dotnet/spark/pull/277">(#277)</a></td>
</tr>
<tr>
<td>DataFrame with Vector UDF <a href="https://github.com/dotnet/spark/pull/277">(#277)</a></td>
</tr>
<tr>
<td>Support for Broadcast Variables <a href="https://github.com/dotnet/spark/pull/414">(#414)</a></td>
</tr>
<tr>
<td>Support for TimestampType <a href="https://github.com/dotnet/spark/pull/428">(#428)</a></td>
</tr>
</tbody>
</table>

#### Forward compatibility

The following table describes the oldest version of .NET for Apache Spark release that the current worker is compatible with.

<table>
<thead>
<tr>
<th>Oldest compatible .NET for Apache Spark release version</th>
</tr>
</thead>
<tbody align="center">
<tr>
<td>v0.9.0</td>
</tr>
</tbody>
</table>

### Supported Spark Versions

The following table outlines the supported Spark versions along with the microsoft-spark JAR to use with:

<table>
<thead>
<tr>
<th>Spark Version</th>
<th>microsoft-spark JAR</th>
</tr>
</thead>
<tbody align="center">
<tr>
<td>2.3.*</td>
<td>microsoft-spark-2.3.x-0.12.0.jar</td>
</tr>
<tr>
<td>2.4.0</td>
<td rowspan=6>microsoft-spark-2.4.x-0.12.0.jar</td>
</tr>
<tr>
<td>2.4.1</td>
</tr>
<tr>
<td>2.4.3</td>
</tr>
<tr>
<td>2.4.4</td>
</tr>
<tr>
<td>2.4.5</td>
</tr>
<tr>
<td>2.4.6</td>
</tr>
<tr>
<td>2.4.2</td>
<td><a href="https://github.com/dotnet/spark/issues/60">Not supported</a></td>
</tr>
</tbody>
</table>
2 changes: 1 addition & 1 deletion eng/Versions.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<PropertyGroup>
<VersionPrefix>0.11.0</VersionPrefix>
<VersionPrefix>0.12.0</VersionPrefix>
<PreReleaseVersionLabel>prerelease</PreReleaseVersionLabel>
<RestoreSources>
$(RestoreSources);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public Task OnLoadAsync(IKernel kernel)

kernelBase.AddMiddleware(async (command, context, next) =>
{
await next(command, context);

if ((context.HandlingKernel is CSharpKernel kernel) &&
(command is SubmitCode) &&
TryGetSparkSession(out SparkSession sparkSession) &&
Expand All @@ -57,8 +59,6 @@ public Task OnLoadAsync(IKernel kernel)
sparkSession.SparkContext.AddFile(filePath);
}
}

await next(command, context);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void TestCreateDataFrame()

// Calling CreateDataFrame(IEnumerable<string> _) without schema
{
var data = new List<string>(new string[] { "Alice", "Bob" });
var data = new string[] { "Alice", "Bob", null };
StructType schema = SchemaWithSingleColumn(new StringType());

DataFrame df = _spark.CreateDataFrame(data);
Expand All @@ -103,7 +103,16 @@ public void TestCreateDataFrame()

// Calling CreateDataFrame(IEnumerable<int> _) without schema
{
var data = new List<int>(new int[] { 1, 2 });
var data = new int[] { 1, 2 };
StructType schema = SchemaWithSingleColumn(new IntegerType(), false);

DataFrame df = _spark.CreateDataFrame(data);
ValidateDataFrame(df, data.Select(a => new object[] { a }), schema);
}

// Calling CreateDataFrame(IEnumerable<int?> _) without schema
{
var data = new int?[] { 1, 2, null };
StructType schema = SchemaWithSingleColumn(new IntegerType());

DataFrame df = _spark.CreateDataFrame(data);
Expand All @@ -112,7 +121,16 @@ public void TestCreateDataFrame()

// Calling CreateDataFrame(IEnumerable<double> _) without schema
{
var data = new List<double>(new double[] { 1.2, 2.3 });
var data = new double[] { 1.2, 2.3 };
StructType schema = SchemaWithSingleColumn(new DoubleType(), false);

DataFrame df = _spark.CreateDataFrame(data);
ValidateDataFrame(df, data.Select(a => new object[] { a }), schema);
}

// Calling CreateDataFrame(IEnumerable<double?> _) without schema
{
var data = new double?[] { 1.2, 2.3, null };
StructType schema = SchemaWithSingleColumn(new DoubleType());

DataFrame df = _spark.CreateDataFrame(data);
Expand All @@ -121,19 +139,29 @@ public void TestCreateDataFrame()

// Calling CreateDataFrame(IEnumerable<bool> _) without schema
{
var data = new List<bool>(new bool[] { true, false });
var data = new bool[] { true, false };
StructType schema = SchemaWithSingleColumn(new BooleanType(), false);

DataFrame df = _spark.CreateDataFrame(data);
ValidateDataFrame(df, data.Select(a => new object[] { a }), schema);
}

// Calling CreateDataFrame(IEnumerable<bool?> _) without schema
{
var data = new bool?[] { true, false, null };
StructType schema = SchemaWithSingleColumn(new BooleanType());

DataFrame df = _spark.CreateDataFrame(data);
ValidateDataFrame(df, data.Select(a => new object[] { a }), schema);
}

// Calling CreateDataFrame(IEnumerable<Date> _) without schema
{
var data = new Date[]
{
new Date(2020, 1, 1),
new Date(2020, 1, 2)
new Date(2020, 1, 2),
null
};
StructType schema = SchemaWithSingleColumn(new DateType());

Expand All @@ -151,7 +179,8 @@ public void TestCreateDataFrameWithTimestamp()
var data = new Timestamp[]
{
new Timestamp(2020, 1, 1, 0, 0, 0, 0),
new Timestamp(2020, 1, 2, 15, 30, 30, 0)
new Timestamp(2020, 1, 2, 15, 30, 30, 0),
null
};
StructType schema = SchemaWithSingleColumn(new TimestampType());

Expand All @@ -172,8 +201,9 @@ private void ValidateDataFrame(
/// Returns a single column schema of the given datatype.
/// </summary>
/// <param name="dataType">Datatype of the column</param>
/// <param name="isNullable">Indicates if values of the column can be null</param>
/// <returns>Schema as StructType</returns>
private StructType SchemaWithSingleColumn(DataType dataType) =>
new StructType(new[] { new StructField("_1", dataType) });
private StructType SchemaWithSingleColumn(DataType dataType, bool isNullable = true) =>
new StructType(new[] { new StructField("_1", dataType, isNullable) });
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using Microsoft.Spark.E2ETest.Utils;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Streaming;
Expand Down Expand Up @@ -67,6 +68,69 @@ public void TestSignaturesV2_3_X()
Assert.IsType<DataStreamWriter>(dsw.Trigger(Trigger.Once()));
}

[SkipIfSparkVersionIsLessThan(Versions.V2_4_0)]
public void TestForeachBatch()
{
// Temporary folder to put our test stream input.
using var srcTempDirectory = new TemporaryDirectory();
// Temporary folder to write ForeachBatch output.
using var dstTempDirectory = new TemporaryDirectory();

Func<Column, Column> outerUdf = Udf<int, int>(i => i + 100);

// id column: [0, 1, ..., 9]
WriteCsv(0, 10, Path.Combine(srcTempDirectory.Path, "input1.csv"));

DataStreamWriter dsw = _spark
.ReadStream()
.Schema("id INT")
.Csv(srcTempDirectory.Path)
.WriteStream()
.ForeachBatch((df, id) =>
{
Func<Column, Column> innerUdf = Udf<int, int>(i => i + 200);
df.Select(outerUdf(innerUdf(Col("id"))))
.Write()
.Csv(Path.Combine(dstTempDirectory.Path, id.ToString()));
});

StreamingQuery sq = dsw.Start();

// Process until all available data in the source has been processed and committed
// to the ForeachBatch sink.
sq.ProcessAllAvailable();

// Add new file to the source path. The spark stream will read any new files
// added to the source path.
// id column: [10, 11, ..., 19]
WriteCsv(10, 10, Path.Combine(srcTempDirectory.Path, "input2.csv"));

// Process until all available data in the source has been processed and committed
// to the ForeachBatch sink.
sq.ProcessAllAvailable();
sq.Stop();

// Verify folders in the destination path.
string[] csvPaths =
Directory.GetDirectories(dstTempDirectory.Path).OrderBy(s => s).ToArray();
var expectedPaths = new string[]
{
Path.Combine(dstTempDirectory.Path, "0"),
Path.Combine(dstTempDirectory.Path, "1"),
};
Assert.True(expectedPaths.SequenceEqual(csvPaths));

// Read the generated csv paths and verify contents.
DataFrame df = _spark
.Read()
.Schema("id INT")
.Csv(csvPaths[0], csvPaths[1])
.Sort("id");

IEnumerable<int> actualIds = df.Collect().Select(r => r.GetAs<int>("id"));
Assert.True(Enumerable.Range(300, 20).SequenceEqual(actualIds));
}

[SkipIfSparkVersionIsLessThan(Versions.V2_4_0)]
public void TestForeach()
{
Expand Down Expand Up @@ -200,6 +264,15 @@ private void TestAndValidateForeach(
foreachWriterOutputDF.Collect().Select(r => r.Values));
}

private void WriteCsv(int start, int count, string path)
{
using var streamWriter = new StreamWriter(path);
foreach (int i in Enumerable.Range(start, count))
{
streamWriter.WriteLine(i);
}
}

[Serializable]
private class TestForeachWriter : IForeachWriter
{
Expand Down
25 changes: 25 additions & 0 deletions src/csharp/Microsoft.Spark.E2ETest/UdfTests/UdfSimpleTypesTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Microsoft.Spark.Sql;
using Microsoft.Spark.Sql.Types;
using Xunit;
Expand Down Expand Up @@ -166,5 +167,29 @@ public void TestUdfWithReturnAsTimestampType()
}
}
}

/// <summary>
/// Test to validate UDFs defined in separate threads work.
/// </summary>
[Fact]
public void TestUdfWithMultipleThreads()
{
try
{
void DefineUdf() => Udf<string, string>(str => str);

// Define a UDF in the main thread.
Udf<string, string>(str => str);

// Verify a UDF can be defined in a separate thread.
Thread t = new Thread(DefineUdf);
t.Start();
t.Join();
}
catch (Exception)
{
Assert.True(false);
}
}
}
}
Loading