Skip to content

Commit

Permalink
Handle clients sending schema after the first flight data stream message
Browse files Browse the repository at this point in the history
  • Loading branch information
adamreeve committed Oct 15, 2024
1 parent aa6ec7d commit 734f81e
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,42 +69,43 @@ public override void ReadSchema()

public override async ValueTask ReadSchemaAsync(CancellationToken cancellationToken)
{
if (HasReadSchema)
while (!HasReadSchema)
{
return;
}

var moveNextResult = await _flightDataStream.MoveNext(cancellationToken).ConfigureAwait(false);

if (!moveNextResult)
{
throw new Exception("No records or schema in this flight");
}
var moveNextResult = await _flightDataStream.MoveNext(cancellationToken).ConfigureAwait(false);
if (!moveNextResult)
{
throw new Exception("No records or schema in this flight");
}

//AppMetadata will never be null, but length 0 if empty
//Those are skipped
if(_flightDataStream.Current.AppMetadata.Length > 0)
{
_applicationMetadatas.Add(_flightDataStream.Current.AppMetadata);
}
if (_flightDescriptor == null && _flightDataStream.Current.FlightDescriptor != null)
{
_flightDescriptor = new FlightDescriptor(_flightDataStream.Current.FlightDescriptor);
}

var header = _flightDataStream.Current.DataHeader.Memory;
Message message = Message.GetRootAsMessage(
ArrowReaderImplementation.CreateByteBuffer(header));
// AppMetadata will never be null, but length 0 if empty
// Those are skipped
if(_flightDataStream.Current.AppMetadata.Length > 0)
{
_applicationMetadatas.Add(_flightDataStream.Current.AppMetadata);
}

var header = _flightDataStream.Current.DataHeader.Memory;
if (header.IsEmpty)
{
// Clients may send a first message with a descriptor only and no schema
continue;
}

if(_flightDataStream.Current.FlightDescriptor != null)
{
_flightDescriptor = new FlightDescriptor(_flightDataStream.Current.FlightDescriptor);
}
Message message = Message.GetRootAsMessage(ArrowReaderImplementation.CreateByteBuffer(header));

switch (message.HeaderType)
{
case MessageHeader.Schema:
_schema = FlightMessageSerializer.DecodeSchema(message.ByteBuffer);
break;
default:
throw new Exception($"Expected schema as the first message, but got: {message.HeaderType.ToString()}");
switch (message.HeaderType)
{
case MessageHeader.Schema:
_schema = FlightMessageSerializer.DecodeSchema(message.ByteBuffer);
break;
default:
throw new Exception($"Expected schema as the first message, but got: {message.HeaderType.ToString()}");
}
}
}

Expand Down
1 change: 0 additions & 1 deletion csharp/test/Apache.Arrow.Flight.Tests/FlightTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
using Google.Protobuf;
using Grpc.Core;
using Grpc.Core.Utils;
using Python.Runtime;
using Xunit;

namespace Apache.Arrow.Flight.Tests
Expand Down

0 comments on commit 734f81e

Please sign in to comment.