Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-44361: [C#][Integration] Include .NET in Flight integration tests #44377

Merged
merged 8 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions csharp/Apache.Arrow.sln
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache.Arrow.Flight.Sql.Tes
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache.Arrow.Flight.Sql", "src\Apache.Arrow.Flight.Sql\Apache.Arrow.Flight.Sql.csproj", "{2ADE087A-B424-4895-8CC5-10170D10BA62}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache.Arrow.Flight.IntegrationTest", "test\Apache.Arrow.Flight.IntegrationTest\Apache.Arrow.Flight.IntegrationTest.csproj", "{7E66CBB4-D921-41E7-A98A-7C6DEA521696}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -81,6 +83,10 @@ Global
{2ADE087A-B424-4895-8CC5-10170D10BA62}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2ADE087A-B424-4895-8CC5-10170D10BA62}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2ADE087A-B424-4895-8CC5-10170D10BA62}.Release|Any CPU.Build.0 = Release|Any CPU
{7E66CBB4-D921-41E7-A98A-7C6DEA521696}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{7E66CBB4-D921-41E7-A98A-7C6DEA521696}.Debug|Any CPU.Build.0 = Debug|Any CPU
{7E66CBB4-D921-41E7-A98A-7C6DEA521696}.Release|Any CPU.ActiveCfg = Release|Any CPU
{7E66CBB4-D921-41E7-A98A-7C6DEA521696}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
_flightDataStream.Dispose();
_flightDataStream?.Dispose();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't required for any of the included tests, but before disabling the primitive_no_batches test, it would crash here with a NullReferenceException due to the writer being disposed before the stream was created.

With this fix, writing doesn't crash, but the data isn't found when trying to retrieve it.

_disposed = true;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="utf-8"?>
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<RootNamespace>Apache.Arrow.Flight.IntegrationTest</RootNamespace>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1" />
<PackageReference Include="System.Text.Json" Version="8.0.5" />
<ProjectReference Include="..\..\src\Apache.Arrow.Flight\Apache.Arrow.Flight.csproj" />
<ProjectReference Include="..\Apache.Arrow.Flight.TestWeb\Apache.Arrow.Flight.TestWeb.csproj" />
<ProjectReference Include="..\Apache.Arrow.IntegrationTest\Apache.Arrow.IntegrationTest.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.IO;
using System.Threading.Tasks;

namespace Apache.Arrow.Flight.IntegrationTest;

public class FlightClientCommand
{
private readonly int _port;
private readonly string _scenario;
private readonly FileInfo _jsonFileInfo;

public FlightClientCommand(int port, string scenario, FileInfo jsonFileInfo)
{
_port = port;
_scenario = scenario;
_jsonFileInfo = jsonFileInfo;
}

public async Task Execute()
{
if (!string.IsNullOrEmpty(_scenario))
{
// No named scenarios are currently implemented
throw new Exception($"Scenario '{_scenario}' is not supported.");
}

if (!(_jsonFileInfo?.Exists ?? false))
{
throw new Exception($"Invalid JSON file path '{_jsonFileInfo?.FullName}'");
}

var scenario = new JsonTestScenario(_port, _jsonFileInfo);
await scenario.RunClient().ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Net;
using System.Threading.Tasks;
using Apache.Arrow.Flight.TestWeb;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.AspNetCore.Hosting.Server.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

namespace Apache.Arrow.Flight.IntegrationTest;

public class FlightServerCommand
{
private readonly string _scenario;

public FlightServerCommand(string scenario)
{
_scenario = scenario;
}

public async Task Execute()
{
if (!string.IsNullOrEmpty(_scenario))
{
// No named scenarios are currently implemented
throw new Exception($"Scenario '{_scenario}' is not supported.");
}

var host = Host.CreateDefaultBuilder()
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder
.ConfigureKestrel(options =>
{
options.Listen(IPEndPoint.Parse("127.0.0.1:0"), l => l.Protocols = HttpProtocols.Http2);
})
.UseStartup<Startup>();
})
.Build();

await host.StartAsync().ConfigureAwait(false);

var addresses = host.Services.GetService<IServer>().Features.Get<IServerAddressesFeature>().Addresses;
foreach (var address in addresses)
{
Console.WriteLine($"Server listening on {address}");
}

await host.WaitForShutdownAsync().ConfigureAwait(false);
}
}
34 changes: 34 additions & 0 deletions csharp/test/Apache.Arrow.Flight.IntegrationTest/GrpcResolver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Grpc.Net.Client.Balancer;

namespace Apache.Arrow.Flight.IntegrationTest;

/// <summary>
/// The Grpc.Net.Client library doesn't know how to handle the "grpc+tcp" scheme used by Arrow Flight.
/// This ResolverFactory passes these through to the standard Static Resolver used for the http scheme.
/// </summary>
public class GrpcTcpResolverFactory : ResolverFactory
{
public override string Name => "grpc+tcp";

public override Resolver Create(ResolverOptions options)
{
return new StaticResolverFactory(
uri => new[] { new BalancerAddress(options.Address.Host, options.Address.Port) })
.Create(options);
}
}
160 changes: 160 additions & 0 deletions csharp/test/Apache.Arrow.Flight.IntegrationTest/JsonTestScenario.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Apache.Arrow.Flight.Client;
using Apache.Arrow.IntegrationTest;
using Apache.Arrow.Tests;
using Apache.Arrow.Types;
using Google.Protobuf;
using Grpc.Net.Client;
using Grpc.Core;
using Grpc.Net.Client.Balancer;
using Microsoft.Extensions.DependencyInjection;

namespace Apache.Arrow.Flight.IntegrationTest;

/// <summary>
/// A test scenario defined using a JSON data file
/// </summary>
internal class JsonTestScenario
{
private readonly int _serverPort;
private readonly FileInfo _jsonFile;
private readonly ServiceProvider _serviceProvider;

public JsonTestScenario(int serverPort, FileInfo jsonFile)
{
_serverPort = serverPort;
_jsonFile = jsonFile;

var services = new ServiceCollection();
services.AddSingleton<ResolverFactory>(new GrpcTcpResolverFactory());
_serviceProvider = services.BuildServiceProvider();
}

public async Task RunClient()
{
var address = $"grpc+tcp://localhost:{_serverPort}";
using var channel = GrpcChannel.ForAddress(
address,
new GrpcChannelOptions
{
ServiceProvider = _serviceProvider,
Credentials = ChannelCredentials.Insecure
});
var client = new FlightClient(channel);

var descriptor = FlightDescriptor.CreatePathDescriptor(_jsonFile.FullName);

var jsonFile = await JsonFile.ParseAsync(_jsonFile).ConfigureAwait(false);
var schema = jsonFile.GetSchemaAndDictionaries(out Func<DictionaryType, IArrowArray> dictionaries);
var batches = jsonFile.Batches.Select(batch => batch.ToArrow(schema, dictionaries)).ToArray();

// 1. Put the data to the server.
await UploadBatches(client, descriptor, batches).ConfigureAwait(false);

// 2. Get the ticket for the data.
var info = await client.GetInfo(descriptor).ConfigureAwait(false);
if (info.Endpoints.Count == 0)
{
throw new Exception("No endpoints received");
}

// 3. Stream data from the server, comparing individual batches.
foreach (var endpoint in info.Endpoints)
{
var locations = endpoint.Locations.ToArray();
if (locations.Length == 0)
{
// Can read with existing client
await ConsumeFlightLocation(client, endpoint.Ticket, batches).ConfigureAwait(false);
}
else
{
foreach (var location in locations)
{
using var readChannel = GrpcChannel.ForAddress(
location.Uri,
new GrpcChannelOptions
{
ServiceProvider = _serviceProvider,
Credentials = ChannelCredentials.Insecure
});
var readClient = new FlightClient(readChannel);
await ConsumeFlightLocation(readClient, endpoint.Ticket, batches).ConfigureAwait(false);
}
}
}
}

private static async Task UploadBatches(FlightClient client, FlightDescriptor descriptor, RecordBatch[] batches)
{
using var putCall = client.StartPut(descriptor);
using var writer = putCall.RequestStream;

try
{
var counter = 0;
foreach (var batch in batches)
{
var metadata = $"{counter}";

await writer.WriteAsync(batch, ByteString.CopyFromUtf8(metadata)).ConfigureAwait(false);

// Verify server has acknowledged the write request
await putCall.ResponseStream.MoveNext().ConfigureAwait(false);
var responseString = putCall.ResponseStream.Current.ApplicationMetadata.ToStringUtf8();

if (responseString != metadata)
{
throw new Exception($"Response metadata '{responseString}' does not match expected metadata '{metadata}'");
}

counter++;
}
}
finally
{
await writer.CompleteAsync().ConfigureAwait(false);
}
}

private static async Task ConsumeFlightLocation(FlightClient client, FlightTicket ticket, RecordBatch[] batches)
{
using var readStream = client.GetStream(ticket);
var counter = 0;
foreach (var originalBatch in batches)
{
if (!await readStream.ResponseStream.MoveNext().ConfigureAwait(false))
{
throw new Exception($"Expected {batches.Length} batches but received {counter}");
}

var batch = readStream.ResponseStream.Current;
ArrowReaderVerifier.CompareBatches(originalBatch, batch, strictCompare: false);

counter++;
}

if (await readStream.ResponseStream.MoveNext().ConfigureAwait(false))
{
throw new Exception($"Expected to reach the end of the response stream after {batches.Length} batches");
}
}
}
Loading
Loading