Skip to content

Spark 3.5.3 support #1178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions azure-pipelines-e2e-tests-template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,14 @@ stages:
script: |
echo "Download Hadoop utils for Windows."
$hadoopBinaryUrl = "https://github.com/steveloughran/winutils/releases/download/tag_2017-08-29-hadoop-2.8.1-native/hadoop-2.8.1.zip"
# Spark 3.3.3 version binary use Hadoop3 dependency
if ("3.3.3" -contains "${{ test.version }}") {
# Spark 3.3.0+ version binary uses Hadoop3 dependency
if ([version]"3.3.0" -le [version]"${{ test.version }}") {
$hadoopBinaryUrl = "https://github.com/SparkSnail/winutils/releases/download/hadoop-3.3.5/hadoop-3.3.5.zip"
}
curl -k -L -o hadoop.zip $hadoopBinaryUrl
Expand-Archive -Path hadoop.zip -Destination .
New-Item -ItemType Directory -Force -Path hadoop\bin
if ("3.3.3" -contains "${{ test.version }}") {
if ([version]"3.3.0" -le [version]"${{ test.version }}") {
cp hadoop-3.3.5\winutils.exe hadoop\bin
# Hadoop 3.3 need to add hadoop.dll to environment varibles to avoid UnsatisfiedLinkError
cp hadoop-3.3.5\hadoop.dll hadoop\bin
Expand All @@ -142,12 +142,8 @@ stages:
- pwsh: |
echo "Downloading Spark ${{ test.version }}"
$sparkBinaryName = "spark-${{ test.version }}-bin-hadoop2.7"
# In spark 3.3.0, 3.3.1, 3.3.2, 3.3.4, the binary name with hadoop2 dependency has changed to spark-${{ test.version }}-bin-hadoop2.tgz
if ("3.3.0", "3.3.1", "3.3.2", "3.3.4" -contains "${{ test.version }}") {
$sparkBinaryName = "spark-${{ test.version }}-bin-hadoop2"
}
# In spark 3.3.3, the binary don't provide hadoop2 version, so we use hadoop3 version
if ("3.3.3" -contains "${{ test.version }}") {
# Spark 3.3.0+ uses Hadoop3
if ([version]"3.3.0" -le [version]"${{ test.version }}") {
$sparkBinaryName = "spark-${{ test.version }}-bin-hadoop3"
}
curl -k -L -o spark-${{ test.version }}.tgz https://archive.apache.org/dist/spark/spark-${{ test.version }}/${sparkBinaryName}.tgz
Expand Down
12 changes: 11 additions & 1 deletion azure-pipelines-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ variables:
backwardCompatibleTestOptions_Linux_3_1: ""
forwardCompatibleTestOptions_Linux_3_1: ""

# Skip all forward/backward compatibility tests since Spark 3.2 is not supported before this release.
# Skip all forward/backward compatibility tests since Spark 3.2 and 3.5 are not supported before this release.
backwardCompatibleTestOptions_Windows_3_2: "--filter FullyQualifiedName=NONE"
forwardCompatibleTestOptions_Windows_3_2: $(backwardCompatibleTestOptions_Windows_3_2)
backwardCompatibleTestOptions_Linux_3_2: $(backwardCompatibleTestOptions_Windows_3_2)
Expand All @@ -41,6 +41,11 @@ variables:
forwardCompatibleTestOptions_Windows_3_3: $(backwardCompatibleTestOptions_Windows_3_3)
backwardCompatibleTestOptions_Linux_3_3: $(backwardCompatibleTestOptions_Windows_3_3)
forwardCompatibleTestOptions_Linux_3_3: $(backwardCompatibleTestOptions_Windows_3_3)

backwardCompatibleTestOptions_Windows_3_5: "--filter FullyQualifiedName=NONE"
forwardCompatibleTestOptions_Windows_3_5: $(backwardCompatibleTestOptions_Windows_3_5)
backwardCompatibleTestOptions_Linux_3_5: $(backwardCompatibleTestOptions_Windows_3_5)
forwardCompatibleTestOptions_Linux_3_5: $(backwardCompatibleTestOptions_Windows_3_5)

# Azure DevOps variables are transformed into environment variables, with these variables we
# avoid the first time experience and telemetry to speed up the build.
Expand Down Expand Up @@ -73,6 +78,11 @@ parameters:
- '3.3.2'
- '3.3.3'
- '3.3.4'
- '3.5.0'
- '3.5.1'
- '3.5.2'
- '3.5.3'

# List of OS types to run E2E tests, run each test in both 'Windows' and 'Linux' environments
- name: listOfE2ETestsPoolTypes
type: object
Expand Down
55 changes: 54 additions & 1 deletion azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,17 @@ variables:
backwardCompatibleTestOptions_Linux_3_1: ""
forwardCompatibleTestOptions_Linux_3_1: ""

# Skip all forward/backward compatibility tests since Spark 3.2 is not supported before this release.
# Skip all forward/backward compatibility tests since Spark 3.2 and 3.5 are not supported before this release.
backwardCompatibleTestOptions_Windows_3_2: "--filter FullyQualifiedName=NONE"
forwardCompatibleTestOptions_Windows_3_2: $(backwardCompatibleTestOptions_Windows_3_2)
backwardCompatibleTestOptions_Linux_3_2: $(backwardCompatibleTestOptions_Windows_3_2)
forwardCompatibleTestOptions_Linux_3_2: $(backwardCompatibleTestOptions_Windows_3_2)

backwardCompatibleTestOptions_Windows_3_5: "--filter FullyQualifiedName=NONE"
forwardCompatibleTestOptions_Windows_3_5: $(backwardCompatibleTestOptions_Windows_3_5)
backwardCompatibleTestOptions_Linux_3_5: $(backwardCompatibleTestOptions_Windows_3_5)
forwardCompatibleTestOptions_Linux_3_5: $(backwardCompatibleTestOptions_Windows_3_5)

# Azure DevOps variables are transformed into environment variables, with these variables we
# avoid the first time experience and telemetry to speed up the build.
DOTNET_CLI_TELEMETRY_OPTOUT: 1
Expand Down Expand Up @@ -413,3 +418,51 @@ stages:
testOptions: ""
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Linux_3_2)
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Linux_3_2)
- version: '3.5.0'
enableForwardCompatibleTests: false
enableBackwardCompatibleTests: false
jobOptions:
- pool: 'Windows'
testOptions: ""
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Windows_3_5)
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Windows_3_5)
- pool: 'Linux'
testOptions: ""
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Linux_3_5)
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Linux_3_5)
- version: '3.5.1'
enableForwardCompatibleTests: false
enableBackwardCompatibleTests: false
jobOptions:
- pool: 'Windows'
testOptions: ""
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Windows_3_5)
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Windows_3_5)
- pool: 'Linux'
testOptions: ""
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Linux_3_5)
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Linux_3_5)
- version: '3.5.2'
enableForwardCompatibleTests: false
enableBackwardCompatibleTests: false
jobOptions:
- pool: 'Windows'
testOptions: ""
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Windows_3_5)
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Windows_3_5)
- pool: 'Linux'
testOptions: ""
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Linux_3_5)
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Linux_3_5)
- version: '3.5.3'
enableForwardCompatibleTests: false
enableBackwardCompatibleTests: false
jobOptions:
- pool: 'Windows'
testOptions: ""
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Windows_3_5)
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Windows_3_5)
- pool: 'Linux'
testOptions: ""
backwardCompatibleTestOptions: $(backwardCompatibleTestOptions_Linux_3_5)
forwardCompatibleTestOptions: $(forwardCompatibleTestOptions_Linux_3_5)
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public DeltaFixture()
(3, 3, 2) => "delta-core_2.12:2.3.0",
(3, 3, 3) => "delta-core_2.12:2.3.0",
(3, 3, 4) => "delta-core_2.12:2.3.0",
(3, 5, _) => "delta-spark_2.12:3.2.0",
_ => throw new NotSupportedException($"Spark {sparkVersion} not supported.")
};

Expand Down
10 changes: 8 additions & 2 deletions src/csharp/Microsoft.Spark.E2ETest/IpcTests/SparkContextTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,22 @@ public void TestSignaturesV2_4_X()

/// <summary>
/// Test signatures for APIs introduced in Spark 3.1.*.
/// In Spark 3.5 Spark throws an exception when trying to delete
/// archive.zip from temp folder, and causes failures of other tests
/// </summary>
[SkipIfSparkVersionIsLessThan(Versions.V3_1_0)]
[SkipIfSparkVersionIsNotInRange(Versions.V3_1_0, Versions.V3_3_0)]
public void TestSignaturesV3_1_X()
{
SparkContext sc = SparkContext.GetOrCreate(new SparkConf());

string archivePath = $"{TestEnvironment.ResourceDirectory}archive.zip";

sc.AddArchive(archivePath);

Assert.IsType<string[]>(sc.ListArchives().ToArray());
var archives = sc.ListArchives().ToArray();

Assert.IsType<string[]>(archives);
Assert.NotEmpty(archives.Where(a => a.EndsWith("archive.zip")));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public void TestSignaturesV2_4_X()
Assert.IsType<bool>(catalog.FunctionExists("functionname"));
Assert.IsType<Database>(catalog.GetDatabase("default"));
Assert.IsType<Function>(catalog.GetFunction("abs"));
Assert.IsType<Function>(catalog.GetFunction(null, "abs"));
Assert.IsType<Table>(catalog.GetTable("users"));
Assert.IsType<Table>(catalog.GetTable("default", "users"));
Assert.IsType<bool>(catalog.IsCached("users"));
Expand Down
1 change: 1 addition & 0 deletions src/csharp/Microsoft.Spark.UnitTest/TypeConverterTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public void TestBaseCase()
Assert.Equal((short)1, TypeConverter.ConvertTo<short>((short)1));
Assert.Equal((ushort)1, TypeConverter.ConvertTo<ushort>((ushort)1));
Assert.Equal(1, TypeConverter.ConvertTo<int>(1));
Assert.Equal(1L, TypeConverter.ConvertTo<long>(1));
Assert.Equal(1u, TypeConverter.ConvertTo<uint>(1u));
Assert.Equal(1L, TypeConverter.ConvertTo<long>(1L));
Assert.Equal(1ul, TypeConverter.ConvertTo<ulong>(1ul));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ internal PayloadWriter Create(Version version = null)
new BroadcastVariableWriterV2_4_X(),
new CommandWriterV2_4_X());
case Versions.V3_3_0:
case Versions.V3_5_1:
return new PayloadWriter(
version,
new TaskContextWriterV3_3_X(),
Expand Down
1 change: 1 addition & 0 deletions src/csharp/Microsoft.Spark.Worker.UnitTest/TestData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public static IEnumerable<object[]> VersionData() =>
new object[] { Versions.V3_0_0 },
new object[] { Versions.V3_2_0 },
new object[] { Versions.V3_3_0 },
new object[] { Versions.V3_5_1 },
};

internal static Payload GetDefaultPayload()
Expand Down
22 changes: 10 additions & 12 deletions src/csharp/Microsoft.Spark.Worker/Processor/TaskContextProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,31 +31,32 @@ internal TaskContext Process(Stream stream)
private static TaskContext ReadTaskContext_2_x(Stream stream)
=> new()
{
IsBarrier = SerDe.ReadBool(stream),
Port = SerDe.ReadInt32(stream),
Secret = SerDe.ReadString(stream),

StageId = SerDe.ReadInt32(stream),
PartitionId = SerDe.ReadInt32(stream),
AttemptNumber = SerDe.ReadInt32(stream),
AttemptId = SerDe.ReadInt64(stream),
};

// Needed for 3.3.0+
// https://issues.apache.org/jira/browse/SPARK-36173
private static TaskContext ReadTaskContext_3_3(Stream stream)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if ReadTaskContext_3_3 can rely on ReadTaskContext_2_x.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it would be nice to reuse common logic, let's have a look at it after removing support for the obsolete spark versions

=> new()
{
IsBarrier = SerDe.ReadBool(stream),
Port = SerDe.ReadInt32(stream),
Secret = SerDe.ReadString(stream),

StageId = SerDe.ReadInt32(stream),
PartitionId = SerDe.ReadInt32(stream),
AttemptNumber = SerDe.ReadInt32(stream),
AttemptId = SerDe.ReadInt64(stream),
// CPUs field is added into TaskContext from 3.3.0 https://issues.apache.org/jira/browse/SPARK-36173
CPUs = SerDe.ReadInt32(stream)
};

private static void ReadBarrierInfo(Stream stream)
{
// Read barrier-related payload. Note that barrier is currently not supported.
SerDe.ReadBool(stream); // IsBarrier
SerDe.ReadInt32(stream); // BoundPort
SerDe.ReadString(stream); // Secret
}

private static void ReadTaskContextProperties(Stream stream, TaskContext taskContext)
{
int numProperties = SerDe.ReadInt32(stream);
Expand Down Expand Up @@ -87,7 +88,6 @@ private static class TaskContextProcessorV2_4_X
{
internal static TaskContext Process(Stream stream)
{
ReadBarrierInfo(stream);
TaskContext taskContext = ReadTaskContext_2_x(stream);
ReadTaskContextProperties(stream, taskContext);

Expand All @@ -99,7 +99,6 @@ private static class TaskContextProcessorV3_0_X
{
internal static TaskContext Process(Stream stream)
{
ReadBarrierInfo(stream);
TaskContext taskContext = ReadTaskContext_2_x(stream);
ReadTaskContextResources(stream);
ReadTaskContextProperties(stream, taskContext);
Expand All @@ -112,7 +111,6 @@ private static class TaskContextProcessorV3_3_X
{
internal static TaskContext Process(Stream stream)
{
ReadBarrierInfo(stream);
TaskContext taskContext = ReadTaskContext_3_3(stream);
ReadTaskContextResources(stream);
ReadTaskContextProperties(stream, taskContext);
Expand Down
10 changes: 9 additions & 1 deletion src/csharp/Microsoft.Spark/Interop/Ipc/JvmBridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using Microsoft.Spark.Network;
Expand Down Expand Up @@ -184,7 +185,7 @@ private object CallJavaMethod(
ISocketWrapper socket = null;

try
{
{
// Limit the number of connections to the JVM backend. Netty is configured
// to use a set number of threads to process incoming connections. Each
// new connection is delegated to these threads in a round robin fashion.
Expand Down Expand Up @@ -299,6 +300,13 @@ private object CallJavaMethod(
}
else
{
if (e.InnerException is SocketException)
{
_logger.LogError(
"Scala worker abandoned the connection, likely fatal crash on Java side. \n" +
"Ensure Spark runs with sufficient memory.");
}

// In rare cases we may hit the Netty connection thread deadlock.
// If max backend threads is 10 and we are currently using 10 active
// connections (0 in the _sockets queue). When we hit this exception,
Expand Down
14 changes: 8 additions & 6 deletions src/csharp/Microsoft.Spark/Sql/Catalog/Catalog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -248,20 +248,22 @@ public Database GetDatabase(string dbName) =>
new Database((JvmObjectReference)Reference.Invoke("getDatabase", dbName));

/// <summary>
/// Get the function with the specified name. If you are trying to get an in-built
/// function then use the unqualified name.
/// Get the function with the specified name. This function can be a temporary function
/// or a function.
/// </summary>
/// <param name="functionName">Is either a qualified or unqualified name that designates a
/// function. If no database identifier is provided, it refers to a temporary function or
/// a function in the current database.</param>
/// function. It follows the same resolution rule with SQL: search for built-in/temp
/// functions first then functions in the current database(namespace).</param>
/// <returns>`Function` object which includes the class name, database, description,
/// whether it is temporary and the name of the function.</returns>
public Function GetFunction(string functionName) =>
new Function((JvmObjectReference)Reference.Invoke("getFunction", functionName));

/// <summary>
/// Get the function with the specified name. If you are trying to get an in-built function
/// then pass null as the dbName.
/// Get the function with the specified name in the specified database under the Hive
/// Metastore.
/// To get built-in functions, or functions in other catalogs, please use `getFunction(functionName)` with
/// qualified function name instead.
/// </summary>
/// <param name="dbName">Is a name that designates a database. Built-in functions will be
/// in database null rather than default.</param>
Expand Down
5 changes: 5 additions & 0 deletions src/csharp/Microsoft.Spark/Utils/TypeConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ private static object Convert(object obj, Type toType)
{
return ConvertToDictionary(hashtable, toType);
}
// Fails to convert int to long otherwise
else if (toType.IsPrimitive)
{
return System.Convert.ChangeType(obj, toType);
}

return obj;
}
Expand Down
1 change: 1 addition & 0 deletions src/csharp/Microsoft.Spark/Versions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ internal static class Versions
internal const string V3_1_1 = "3.1.1";
internal const string V3_2_0 = "3.2.0";
internal const string V3_3_0 = "3.3.0";
internal const string V3_5_1 = "3.5.1";
}
}
Loading