Skip to content

Commit c9d60a6

Browse files
committed
Migrated binary serialization to use Messagepack, extracted serialization logic into a separate class
1 parent 91212b9 commit c9d60a6

File tree

19 files changed

+329
-98
lines changed

19 files changed

+329
-98
lines changed

src/csharp/Extensions/Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest/Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest.csproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
<PropertyGroup>
44
<TargetFramework>net8.0</TargetFramework>
55
<RootNamespace>Microsoft.Spark.Extensions.DotNet.Interactive.UnitTest</RootNamespace>
6-
<EnableUnsafeBinaryFormatterSerialization>true</EnableUnsafeBinaryFormatterSerialization>
76
</PropertyGroup>
87

98
<ItemGroup>

src/csharp/Microsoft.Spark.E2ETest.ExternalLibrary/ExternalClass.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@ namespace Microsoft.Spark.E2ETest.ExternalLibrary
99
[Serializable]
1010
public class ExternalClass
1111
{
12-
private string _s;
12+
private string s;
1313

1414
public ExternalClass(string s)
1515
{
16-
_s = s;
16+
this.s = s;
1717
}
1818

1919
public static string HelloWorld()
@@ -23,7 +23,7 @@ public static string HelloWorld()
2323

2424
public string Concat(string s)
2525
{
26-
return _s + s;
26+
return this.s + s;
2727
}
2828
}
2929
}

src/csharp/Microsoft.Spark.E2ETest/IpcTests/BroadcastTests.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Linq;
3+
using MessagePack;
34
using Microsoft.Spark.Sql;
45
using Xunit;
56
using static Microsoft.Spark.Sql.Functions;
@@ -12,10 +13,11 @@ public class TestBroadcastVariable
1213
public int IntValue { get; private set; }
1314
public string StringValue { get; private set; }
1415

15-
public TestBroadcastVariable(int intVal, string stringVal)
16+
[SerializationConstructor]
17+
public TestBroadcastVariable(int intValue, string stringValue)
1618
{
17-
IntValue = intVal;
18-
StringValue = stringVal;
19+
IntValue = intValue;
20+
StringValue = stringValue;
1921
}
2022
}
2123

src/csharp/Microsoft.Spark.E2ETest/Microsoft.Spark.E2ETest.csproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
<Project Sdk="Microsoft.NET.Sdk">
22
<PropertyGroup>
33
<TargetFramework>net8.0</TargetFramework>
4-
<EnableUnsafeBinaryFormatterSerialization>true</EnableUnsafeBinaryFormatterSerialization>
54
</PropertyGroup>
65
<ItemGroup>
76
<Content Include="Resources\*">
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
// Licensed to the .NET Foundation under one or more agreements.
2+
// The .NET Foundation licenses this file to you under the MIT license.
3+
// See the LICENSE file in the project root for more information.
4+
5+
using System;
6+
using System.Collections.Generic;
7+
using System.IO;
8+
using MessagePack;
9+
using Microsoft.Spark.Utils;
10+
using Xunit;
11+
12+
namespace Microsoft.Spark.UnitTest;
13+
14+
[Collection("Spark Unit Tests")]
15+
public class BinarySerDeTests
16+
{
17+
[Theory]
18+
[InlineData(42)]
19+
[InlineData("Test")]
20+
[InlineData(99.99)]
21+
public void Serialize_ShouldWriteObjectToStream(object input)
22+
{
23+
using var memoryStream = new MemoryStream();
24+
BinarySerDe.Serialize(memoryStream, input);
25+
memoryStream.Position = 0;
26+
27+
var deserializedObject = MessagePackSerializer.Typeless.Deserialize(memoryStream);
28+
29+
Assert.Equal(input, deserializedObject);
30+
}
31+
32+
[Fact]
33+
public void Deserialize_ShouldReturnExpectedObject_WhenTypeMatches()
34+
{
35+
var employee = new Employee { Id = 101, Name = "John Doe" };
36+
using var memoryStream = new MemoryStream();
37+
MessagePackSerializer.Typeless.Serialize(memoryStream, employee);
38+
memoryStream.Position = 0;
39+
40+
var result = BinarySerDe.Deserialize<Employee>(memoryStream);
41+
42+
Assert.Equal(employee.Id, result.Id);
43+
Assert.Equal(employee.Name, result.Name);
44+
}
45+
46+
[Fact]
47+
public void Deserialize_ShouldThrowInvalidCastEx_WhenTypeDoesNotMatch()
48+
{
49+
var employee = new Employee { Id = 101, Name = "John Doe" };
50+
using var memoryStream = new MemoryStream();
51+
MessagePackSerializer.Typeless.Serialize(memoryStream, employee);
52+
memoryStream.Position = 0;
53+
54+
var action = () => BinarySerDe.Deserialize<Department>(memoryStream);
55+
56+
Assert.Throws<InvalidCastException>(action);
57+
}
58+
59+
[Fact]
60+
public void Serialize_CustomFunctionAndObject_ShouldBeSerializable()
61+
{
62+
var department = new Department { Name = "HR", EmployeeCount = 27 };
63+
var employeeStub = new Employee
64+
{
65+
EmbeddedObject = department,
66+
Id = 11,
67+
Name = "Derek",
68+
};
69+
using var memoryStream = new MemoryStream();
70+
MessagePackSerializer.Typeless.Serialize(memoryStream, employeeStub);
71+
memoryStream.Position = 0;
72+
73+
var deserializedCalculation = BinarySerDe.Deserialize<Employee>(memoryStream);
74+
75+
Assert.IsType<Department>(deserializedCalculation.EmbeddedObject);
76+
Assert.Equal(27, ((Department)deserializedCalculation.EmbeddedObject).EmployeeCount);
77+
Assert.Equal("HR", ((Department)deserializedCalculation.EmbeddedObject).Name);
78+
}
79+
80+
[Fact]
81+
public void Serialize_ClassWithoutSerializableAttribute_ShouldThrowException()
82+
{
83+
var nonSerializableClass = new NonSerializableClass { Value = 123 };
84+
using var memoryStream = new MemoryStream();
85+
BinarySerDe.Serialize(memoryStream, nonSerializableClass);
86+
memoryStream.Position = 0;
87+
88+
Assert.Throws<MessagePackSerializationException>(() => BinarySerDe.Deserialize<NonSerializableClass>(memoryStream));
89+
}
90+
91+
[Fact]
92+
public void Serialize_CollectionAndDictionary_ShouldBeSerializable()
93+
{
94+
var list = new List<int> { 1, 2, 3 };
95+
var dictionary = new Dictionary<string, int> { { "one", 1 }, { "two", 2 } };
96+
97+
using var memoryStream = new MemoryStream();
98+
BinarySerDe.Serialize(memoryStream, list);
99+
memoryStream.Position = 0;
100+
var deserializedList = MessagePackSerializer.Typeless.Deserialize(memoryStream) as List<int>;
101+
102+
Assert.Equal(list, deserializedList);
103+
104+
memoryStream.SetLength(0);
105+
BinarySerDe.Serialize(memoryStream, dictionary);
106+
memoryStream.Position = 0;
107+
var deserializedDictionary = MessagePackSerializer.Typeless.Deserialize(memoryStream) as Dictionary<string, int>;
108+
109+
Assert.Equal(dictionary, deserializedDictionary);
110+
}
111+
112+
[Fact]
113+
public void Serialize_PolymorphicObject_ShouldBeSerializable()
114+
{
115+
Employee manager = new Manager { Id = 1, Name = "Alice", Role = "Account manager" };
116+
using var memoryStream = new MemoryStream();
117+
BinarySerDe.Serialize(memoryStream, manager);
118+
memoryStream.Position = 0;
119+
120+
var deserializedEmployee = BinarySerDe.Deserialize<Employee>(memoryStream);
121+
122+
Assert.IsType<Manager>(deserializedEmployee);
123+
Assert.Equal("Alice", deserializedEmployee.Name);
124+
Assert.Equal("Account manager", ((Manager)deserializedEmployee).Role);
125+
}
126+
127+
[Serializable]
128+
private class Employee
129+
{
130+
public int Id { get; set; }
131+
132+
public string Name { get; set; }
133+
134+
public object EmbeddedObject { get; set; }
135+
}
136+
137+
[Serializable]
138+
private class Department
139+
{
140+
public string Name { get; set; }
141+
public int EmployeeCount { get; set; }
142+
}
143+
144+
[Serializable]
145+
private class Manager : Employee
146+
{
147+
public string Role { get; set; }
148+
}
149+
150+
private class NonSerializableClass
151+
{
152+
public int Value { get; init; }
153+
}
154+
}

src/csharp/Microsoft.Spark.UnitTest/Microsoft.Spark.UnitTest.csproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
<PropertyGroup>
44
<TargetFramework>net8.0</TargetFramework>
55
<RootNamespace>Microsoft.Spark.UnitTest</RootNamespace>
6-
<EnableUnsafeBinaryFormatterSerialization>true</EnableUnsafeBinaryFormatterSerialization>
76
</PropertyGroup>
87

98
<ItemGroup>

src/csharp/Microsoft.Spark.UnitTest/UdfSerDeTests.cs

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
using System;
66
using System.IO;
77
using System.Reflection;
8-
using System.Runtime.Serialization.Formatters.Binary;
98
using Microsoft.Spark.Utils;
109
using Xunit;
1110

@@ -17,21 +16,21 @@ public class UdfSerDeTests
1716
[Serializable]
1817
private class TestClass
1918
{
20-
private readonly string _str;
19+
private readonly string str;
2120

22-
public TestClass(string s)
21+
public TestClass(string str)
2322
{
24-
_str = s;
23+
this.str = str;
2524
}
2625

2726
public string Concat(string s)
2827
{
29-
if (_str == null)
28+
if (str == null)
3029
{
3130
return s + s;
3231
}
3332

34-
return _str + s;
33+
return str + s;
3534
}
3635

3736
public override bool Equals(object obj)
@@ -43,7 +42,7 @@ public override bool Equals(object obj)
4342
return false;
4443
}
4544

46-
return _str == that._str;
45+
return str == that.str;
4746
}
4847

4948
public override int GetHashCode()
@@ -149,16 +148,13 @@ private Delegate SerDe(Delegate udf)
149148
return Deserialize(Serialize(udf));
150149
}
151150

152-
#pragma warning disable SYSLIB0011 // Type or member is obsolete
153-
// TODO: Replace BinaryFormatter with a new, secure serializer.
154151
private byte[] Serialize(Delegate udf)
155152
{
156153
UdfSerDe.UdfData udfData = UdfSerDe.Serialize(udf);
157154

158155
using (var ms = new MemoryStream())
159156
{
160-
var bf = new BinaryFormatter();
161-
bf.Serialize(ms, udfData);
157+
BinarySerDe.Serialize(ms, udfData);
162158
return ms.ToArray();
163159
}
164160
}
@@ -167,11 +163,9 @@ private Delegate Deserialize(byte[] serializedUdf)
167163
{
168164
using (var ms = new MemoryStream(serializedUdf, false))
169165
{
170-
var bf = new BinaryFormatter();
171-
UdfSerDe.UdfData udfData = (UdfSerDe.UdfData)bf.Deserialize(ms);
166+
var udfData = BinarySerDe.Deserialize<UdfSerDe.UdfData>(ms);
172167
return UdfSerDe.Deserialize(udfData);
173168
}
174169
}
175-
#pragma warning restore
176170
}
177171
}

src/csharp/Microsoft.Spark.Worker.UnitTest/CommandExecutorTests.cs

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
using System.IO;
99
using System.Linq;
1010
using System.Reflection;
11-
using System.Runtime.Serialization.Formatters.Binary;
1211
using System.Threading;
1312
using System.Threading.Tasks;
1413
using Apache.Arrow;
@@ -1049,10 +1048,8 @@ public void TestRDDCommandExecutor(Version sparkVersion, IpcOptions ipcOptions)
10491048

10501049
using var inputStream = new MemoryStream();
10511050
using var outputStream = new MemoryStream();
1052-
#pragma warning disable SYSLIB0011 // Type or member is obsolete
1051+
10531052
// Write test data to the input stream.
1054-
var formatter = new BinaryFormatter();
1055-
#pragma warning restore SYSLIB0011 // Type or member is obsolete
10561053
var memoryStream = new MemoryStream();
10571054

10581055
var inputs = new int[] { 0, 1, 2, 3, 4 };
@@ -1061,10 +1058,7 @@ public void TestRDDCommandExecutor(Version sparkVersion, IpcOptions ipcOptions)
10611058
foreach (int input in inputs)
10621059
{
10631060
memoryStream.Position = 0;
1064-
#pragma warning disable SYSLIB0011 // Type or member is obsolete
1065-
// TODO: Replace BinaryFormatter with a new, secure serializer.
1066-
formatter.Serialize(memoryStream, input);
1067-
#pragma warning restore SYSLIB0011 // Type or member is obsolete
1061+
BinarySerDe.Serialize(memoryStream, input);
10681062
values.Add(memoryStream.ToArray());
10691063
}
10701064

@@ -1094,12 +1088,9 @@ public void TestRDDCommandExecutor(Version sparkVersion, IpcOptions ipcOptions)
10941088
for (int i = 0; i < inputs.Length; ++i)
10951089
{
10961090
Assert.True(SerDe.ReadInt32(outputStream) > 0);
1097-
#pragma warning disable SYSLIB0011 // Type or member is obsolete
1098-
// TODO: Replace BinaryFormatter with a new, secure serializer.
10991091
Assert.Equal(
11001092
mapUdf(i),
1101-
formatter.Deserialize(outputStream));
1102-
#pragma warning restore SYSLIB0011 // Type or member is obsolete
1093+
BinarySerDe.Deserialize<object>(outputStream));
11031094
}
11041095

11051096
// Validate all the data on the stream is read.

src/csharp/Microsoft.Spark.Worker.UnitTest/Microsoft.Spark.Worker.UnitTest.csproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
<Project Sdk="Microsoft.NET.Sdk">
22
<PropertyGroup>
33
<TargetFramework>net8.0</TargetFramework>
4-
<EnableUnsafeBinaryFormatterSerialization>true</EnableUnsafeBinaryFormatterSerialization>
54
</PropertyGroup>
65
<ItemGroup>
76
<PackageReference Include="Moq" Version="4.20.70" />

src/csharp/Microsoft.Spark.Worker/Command/RDDCommandExecutor.cs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
using System;
66
using System.Collections.Generic;
77
using System.IO;
8-
using System.Runtime.Serialization.Formatters.Binary;
98
using Microsoft.Spark.Interop.Ipc;
109
using Microsoft.Spark.Utils;
1110

@@ -19,10 +18,6 @@ internal class RDDCommandExecutor
1918
{
2019
[ThreadStatic]
2120
private static MemoryStream s_writeOutputStream;
22-
[ThreadStatic]
23-
#pragma warning disable SYSLIB0011 // Type or member is obsolete
24-
private static BinaryFormatter s_binaryFormatter;
25-
#pragma warning restore SYSLIB0011 // Type or member is obsolete
2621

2722
/// <summary>
2823
/// Executes the commands on the input data read from input stream
@@ -113,11 +108,7 @@ private void Serialize(
113108
switch (serializerMode)
114109
{
115110
case CommandSerDe.SerializedMode.Byte:
116-
#pragma warning disable SYSLIB0011 // Type or member is obsolete
117-
BinaryFormatter formatter = s_binaryFormatter ??= new BinaryFormatter();
118-
// TODO: Replace BinaryFormatter with a new, secure serializer.
119-
formatter.Serialize(stream, message);
120-
#pragma warning restore SYSLIB0011 // Type or member is obsolete
111+
BinarySerDe.Serialize(stream, message);
121112
break;
122113
case CommandSerDe.SerializedMode.None:
123114
case CommandSerDe.SerializedMode.String:

0 commit comments

Comments
 (0)