Skip to content

Commit

Permalink
GH-36078: [C#] Flight SQL implementation for C# (#36079)
Browse files Browse the repository at this point in the history
Flight SQL implementation for C# that is compatible with the C++ and Java implementations.

Closes issue #36078 
* Closes: #36078

Lead-authored-by: Jeremy Osterhoudt <jeremy.osterhoudt@ge.com>
Co-authored-by: Weston Pace <weston.pace@gmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
  • Loading branch information
jeremyosterhoudt and westonpace authored Aug 28, 2023
1 parent 78b0c10 commit be5ab86
Show file tree
Hide file tree
Showing 27 changed files with 1,508 additions and 32 deletions.
12 changes: 12 additions & 0 deletions csharp/Apache.Arrow.sln
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow.Compression",
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Apache.Arrow.Compression.Tests", "test\Apache.Arrow.Compression.Tests\Apache.Arrow.Compression.Tests.csproj", "{5D7FF380-B7DF-4752-B415-7C08C70C9F06}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache.Arrow.Flight.Sql.Tests", "test\Apache.Arrow.Flight.Sql.Tests\Apache.Arrow.Flight.Sql.Tests.csproj", "{DCC99EB1-4E60-4F0D-AEA9-C44A4C0C8B1D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Apache.Arrow.Flight.Sql", "src\Apache.Arrow.Flight.Sql\Apache.Arrow.Flight.Sql.csproj", "{2ADE087A-B424-4895-8CC5-10170D10BA62}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -69,6 +73,14 @@ Global
{5D7FF380-B7DF-4752-B415-7C08C70C9F06}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5D7FF380-B7DF-4752-B415-7C08C70C9F06}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5D7FF380-B7DF-4752-B415-7C08C70C9F06}.Release|Any CPU.Build.0 = Release|Any CPU
{DCC99EB1-4E60-4F0D-AEA9-C44A4C0C8B1D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DCC99EB1-4E60-4F0D-AEA9-C44A4C0C8B1D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DCC99EB1-4E60-4F0D-AEA9-C44A4C0C8B1D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DCC99EB1-4E60-4F0D-AEA9-C44A4C0C8B1D}.Release|Any CPU.Build.0 = Release|Any CPU
{2ADE087A-B424-4895-8CC5-10170D10BA62}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2ADE087A-B424-4895-8CC5-10170D10BA62}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2ADE087A-B424-4895-8CC5-10170D10BA62}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2ADE087A-B424-4895-8CC5-10170D10BA62}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
18 changes: 18 additions & 0 deletions csharp/src/Apache.Arrow.Flight.Sql/Apache.Arrow.Flight.Sql.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Grpc.Tools" Version="2.42.0" PrivateAssets="All" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Apache.Arrow.Flight\Apache.Arrow.Flight.csproj" />
</ItemGroup>

<ItemGroup>
<Protobuf Include="..\..\..\format\FlightSql.proto" Access="public" />
</ItemGroup>
</Project>
389 changes: 389 additions & 0 deletions csharp/src/Apache.Arrow.Flight.Sql/FlightSqlServer.cs

Large diffs are not rendered by default.

158 changes: 158 additions & 0 deletions csharp/src/Apache.Arrow.Flight.Sql/FlightSqlUtils.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Buffers;
using System.Collections.Generic;
using Arrow.Flight.Protocol.Sql;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;

namespace Apache.Arrow.Flight.Sql
{
/// <summary>
/// Helper methods for doing common Flight Sql tasks and conversions
/// </summary>
public class FlightSqlUtils
{
public static readonly FlightActionType FlightSqlCreatePreparedStatement = new("CreatePreparedStatement",
"Creates a reusable prepared statement resource on the server. \n" +
"Request Message: ActionCreatePreparedStatementRequest\n" +
"Response Message: ActionCreatePreparedStatementResult");

public static readonly FlightActionType FlightSqlClosePreparedStatement = new("ClosePreparedStatement",
"Closes a reusable prepared statement resource on the server. \n" +
"Request Message: ActionClosePreparedStatementRequest\n" +
"Response Message: N/A");

/// <summary>
/// List of possible actions
/// </summary>
public static readonly List<FlightActionType> FlightSqlActions = new()
{
FlightSqlCreatePreparedStatement,
FlightSqlClosePreparedStatement
};

/// <summary>
/// Helper to parse {@link com.google.protobuf.Any} objects to the specific protobuf object.
/// </summary>
/// <param name="source">the raw bytes source value.</param>
/// <returns>the materialized protobuf object.</returns>
public static Any Parse(ByteString source)
{
return Any.Parser.ParseFrom(source);
}

/// <summary>
/// Helper to unpack {@link com.google.protobuf.Any} objects to the specific protobuf object.
/// </summary>
/// <param name="source">the parsed Source value.</param>
/// <typeparam name="T">IMessage</typeparam>
/// <returns>the materialized protobuf object.</returns>
public static T Unpack<T>(Any source) where T : IMessage, new()
{
return source.Unpack<T>();
}

/// <summary>
/// Helper to parse and unpack {@link com.google.protobuf.Any} objects to the specific protobuf object.
/// </summary>
/// <param name="source">the raw bytes source value.</param>
/// <typeparam name="T">IMessage</typeparam>
/// <returns>the materialized protobuf object.</returns>
public static T ParseAndUnpack<T>(ByteString source) where T : IMessage, new()
{
return Unpack<T>(Parse(source));
}
}

/// <summary>
/// A set of helper functions for converting encoded commands to IMessage types
/// </summary>
public static class FlightSqlExtensions
{
private static Any ParsedCommand(this FlightDescriptor descriptor)
{
return FlightSqlUtils.Parse(descriptor.Command);
}

private static IMessage UnpackMessage(this Any command)
{
if (command.Is(CommandStatementQuery.Descriptor))
return FlightSqlUtils.Unpack<CommandStatementQuery>(command);
if (command.Is(CommandPreparedStatementQuery.Descriptor))
return FlightSqlUtils.Unpack<CommandPreparedStatementQuery>(command);
if (command.Is(CommandGetCatalogs.Descriptor))
return FlightSqlUtils.Unpack<CommandGetCatalogs>(command);
if (command.Is(CommandGetDbSchemas.Descriptor))
return FlightSqlUtils.Unpack<CommandGetDbSchemas>(command);
if (command.Is(CommandGetTables.Descriptor))
return FlightSqlUtils.Unpack<CommandGetTables>(command);
if (command.Is(CommandGetTableTypes.Descriptor))
return FlightSqlUtils.Unpack<CommandGetTableTypes>(command);
if (command.Is(CommandGetSqlInfo.Descriptor))
return FlightSqlUtils.Unpack<CommandGetSqlInfo>(command);
if (command.Is(CommandGetPrimaryKeys.Descriptor))
return FlightSqlUtils.Unpack<CommandGetPrimaryKeys>(command);
if (command.Is(CommandGetExportedKeys.Descriptor))
return FlightSqlUtils.Unpack<CommandGetExportedKeys>(command);
if (command.Is(CommandGetImportedKeys.Descriptor))
return FlightSqlUtils.Unpack<CommandGetImportedKeys>(command);
if (command.Is(CommandGetCrossReference.Descriptor))
return FlightSqlUtils.Unpack<CommandGetCrossReference>(command);
if (command.Is(CommandGetXdbcTypeInfo.Descriptor))
return FlightSqlUtils.Unpack<CommandGetXdbcTypeInfo>(command);
if (command.Is(TicketStatementQuery.Descriptor))
return FlightSqlUtils.Unpack<TicketStatementQuery>(command);
if (command.Is(TicketStatementQuery.Descriptor))
return FlightSqlUtils.Unpack<TicketStatementQuery>(command);
if (command.Is(CommandStatementUpdate.Descriptor))
return FlightSqlUtils.Unpack<CommandStatementUpdate>(command);
if (command.Is(CommandPreparedStatementUpdate.Descriptor))
return FlightSqlUtils.Unpack<CommandPreparedStatementUpdate>(command);
if (command.Is(CommandPreparedStatementQuery.Descriptor))
return FlightSqlUtils.Unpack<CommandPreparedStatementQuery>(command);

throw new ArgumentException("The defined request is invalid.");
}

/// <summary>
/// Extracts a command from a FlightDescriptor
/// </summary>
/// <param name="descriptor"></param>
/// <returns>An IMessage that has been parsed and unpacked</returns>
public static IMessage? ParsedAndUnpackedMessage(this FlightDescriptor descriptor)
{
try
{
return descriptor.ParsedCommand().UnpackMessage();
}
catch (ArgumentException)
{
return null;
}
}

public static ByteString Serialize(this IBufferMessage message)
{
int size = message.CalculateSize();
var writer = new ArrayBufferWriter<byte>(size);
message.WriteTo(writer);
var schemaBytes = writer.WrittenSpan;
return ByteString.CopyFrom(schemaBytes);
}
}
}
22 changes: 22 additions & 0 deletions csharp/src/Apache.Arrow.Flight.Sql/SqlActions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

namespace Apache.Arrow.Flight.Sql;

public static class SqlAction
{
public const string CreateRequest = "CreatePreparedStatement";
public const string CloseRequest = "ClosePreparedStatement";
}
20 changes: 18 additions & 2 deletions csharp/src/Apache.Arrow.Flight/Client/FlightClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Apache.Arrow.Flight.Internal;
using Apache.Arrow.Flight.Protocol;
using Apache.Arrow.Flight.Server;
using Apache.Arrow.Flight.Server.Internal;
using Grpc.Core;
using Grpc.Net.Client;

Expand Down Expand Up @@ -93,6 +93,22 @@ public FlightRecordBatchDuplexStreamingCall StartPut(FlightDescriptor flightDesc
channels.Dispose);
}

public AsyncDuplexStreamingCall<FlightHandshakeRequest, FlightHandshakeResponse> Handshake(Metadata headers = null)
{
var channel = _client.Handshake(headers);
var readStream = new StreamReader<HandshakeResponse, FlightHandshakeResponse>(channel.ResponseStream, response => new FlightHandshakeResponse(response));
var writeStream = new FlightHandshakeStreamWriterAdapter(channel.RequestStream);
var call = new AsyncDuplexStreamingCall<FlightHandshakeRequest, FlightHandshakeResponse>(
writeStream,
readStream,
channel.ResponseHeadersAsync,
channel.GetStatus,
channel.GetTrailers,
channel.Dispose);

return call;
}

public AsyncServerStreamingCall<FlightResult> DoAction(FlightAction action, Metadata headers = null)
{
var stream = _client.DoAction(action.ToProtocol(), headers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ namespace Apache.Arrow.Flight.Client
{
public class FlightClientRecordBatchStreamReader : FlightRecordBatchStreamReader
{
internal FlightClientRecordBatchStreamReader(IAsyncStreamReader<FlightData> flightDataStream) : base(flightDataStream)
internal FlightClientRecordBatchStreamReader(IAsyncStreamReader<Protocol.FlightData> flightDataStream) : base(flightDataStream)
{
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ namespace Apache.Arrow.Flight.Client
{
public class FlightClientRecordBatchStreamWriter : FlightRecordBatchStreamWriter, IClientStreamWriter<RecordBatch>
{
private readonly IClientStreamWriter<FlightData> _clientStreamWriter;
private readonly IClientStreamWriter<Protocol.FlightData> _clientStreamWriter;
private bool _completed = false;
internal FlightClientRecordBatchStreamWriter(IClientStreamWriter<FlightData> clientStreamWriter, FlightDescriptor flightDescriptor) : base(clientStreamWriter, flightDescriptor)
internal FlightClientRecordBatchStreamWriter(IClientStreamWriter<Protocol.FlightData> clientStreamWriter, FlightDescriptor flightDescriptor) : base(clientStreamWriter, flightDescriptor)
{
_clientStreamWriter = clientStreamWriter;
}
Expand Down
53 changes: 53 additions & 0 deletions csharp/src/Apache.Arrow.Flight/FlightData.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Google.Protobuf;

namespace Apache.Arrow.Flight;

public class FlightData
{
public FlightDescriptor Descriptor { get; }
public ByteString AppMetadata { get; }
public ByteString DataBody { get; }
public ByteString DataHeader { get; }

public FlightData(FlightDescriptor descriptor, ByteString dataBody = null, ByteString dataHeader = null, ByteString appMetadata = null)
{
Descriptor = descriptor;
DataBody = dataBody;
DataHeader = dataHeader;
AppMetadata = appMetadata;
}

internal FlightData(Protocol.FlightData protocolFlightData)
{
Descriptor = protocolFlightData.FlightDescriptor == null ? null : new FlightDescriptor(protocolFlightData.FlightDescriptor);
DataBody = protocolFlightData.DataBody;
DataHeader = protocolFlightData.DataHeader;
AppMetadata = protocolFlightData.AppMetadata;
}

internal Protocol.FlightData ToProtocol()
{
return new Protocol.FlightData
{
FlightDescriptor = Descriptor?.ToProtocol(),
AppMetadata = AppMetadata,
DataBody = DataBody,
DataHeader = DataHeader
};
}
}
58 changes: 58 additions & 0 deletions csharp/src/Apache.Arrow.Flight/FlightHandshakeRequest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using Google.Protobuf;

namespace Apache.Arrow.Flight;

public class FlightHandshakeRequest
{
private readonly Protocol.HandshakeRequest _result;
public ByteString Payload => _result.Payload;
public ulong ProtocolVersion => _result.ProtocolVersion;

internal FlightHandshakeRequest(Protocol.HandshakeRequest result)
{
_result = result;
}

public FlightHandshakeRequest(ByteString payload, ulong protocolVersion = 1)
{
_result = new Protocol.HandshakeRequest
{
Payload = payload,
ProtocolVersion = protocolVersion
};
}

internal Protocol.HandshakeRequest ToProtocol()
{
return _result;
}

public override bool Equals(object obj)
{
if(obj is FlightHandshakeRequest other)
{
return Equals(_result, other._result);
}
return false;
}

public override int GetHashCode()
{
return _result.GetHashCode();
}
}
Loading

0 comments on commit be5ab86

Please sign in to comment.