Skip to content

Commit 6bfaabd

Browse files
Allow empty streams to return an empty schema asynchronously.
1 parent 710f6e4 commit 6bfaabd

File tree

7 files changed

+42
-13
lines changed

7 files changed

+42
-13
lines changed

src/Apache.Arrow.Flight/Internal/RecordBatchReaderImplementation.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public override void ReadSchema()
6767
ReadSchemaAsync(CancellationToken.None).AsTask().Wait();
6868
}
6969

70-
public override async ValueTask ReadSchemaAsync(CancellationToken cancellationToken)
70+
public override async ValueTask<Schema> ReadSchemaAsync(CancellationToken cancellationToken)
7171
{
7272
while (!HasReadSchema)
7373
{
@@ -107,6 +107,7 @@ public override async ValueTask ReadSchemaAsync(CancellationToken cancellationTo
107107
throw new Exception($"Expected schema as the first message, but got: {message.HeaderType.ToString()}");
108108
}
109109
}
110+
return _schema;
110111
}
111112

112113
public override async ValueTask<RecordBatch> ReadNextRecordBatchAsync(CancellationToken cancellationToken)

src/Apache.Arrow/Ipc/ArrowFileReaderImplementation.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,11 @@ public async ValueTask<int> RecordBatchCountAsync(CancellationToken cancellation
5252
return _footer.RecordBatchCount;
5353
}
5454

55-
public override async ValueTask ReadSchemaAsync(CancellationToken cancellationToken = default)
55+
public override async ValueTask<Schema> ReadSchemaAsync(CancellationToken cancellationToken = default)
5656
{
5757
if (HasReadSchema)
5858
{
59-
return;
59+
return _schema;
6060
}
6161

6262
await ValidateFileAsync(cancellationToken).ConfigureAwait(false);
@@ -82,6 +82,8 @@ public override async ValueTask ReadSchemaAsync(CancellationToken cancellationTo
8282
EnsureFullRead(buffer, bytesRead);
8383

8484
ReadSchema(buffer);
85+
86+
return _schema;
8587
}
8688
}
8789

src/Apache.Arrow/Ipc/ArrowMemoryReaderImplementation.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,11 @@ public ArrowMemoryReaderImplementation(ReadOnlyMemory<byte> buffer, ICompression
3333
_buffer = buffer;
3434
}
3535

36-
public override ValueTask ReadSchemaAsync(CancellationToken cancellationToken)
36+
public override ValueTask<Schema> ReadSchemaAsync(CancellationToken cancellationToken)
3737
{
3838
cancellationToken.ThrowIfCancellationRequested();
3939
ReadSchema();
40-
return default;
40+
return new ValueTask<Schema>(_schema);
4141
}
4242

4343
public override ValueTask<RecordBatch> ReadNextRecordBatchAsync(CancellationToken cancellationToken)

src/Apache.Arrow/Ipc/ArrowReaderImplementation.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ protected virtual void Dispose(bool disposing)
6969
{
7070
}
7171

72-
public abstract ValueTask ReadSchemaAsync(CancellationToken cancellationToken);
72+
public abstract ValueTask<Schema> ReadSchemaAsync(CancellationToken cancellationToken);
7373
public abstract void ReadSchema();
7474

7575
public abstract ValueTask<RecordBatch> ReadNextRecordBatchAsync(CancellationToken cancellationToken);

src/Apache.Arrow/Ipc/ArrowStreamReader.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,11 @@ protected virtual void Dispose(bool disposing)
102102

103103
public async ValueTask<Schema> GetSchema(CancellationToken cancellationToken = default)
104104
{
105-
if (!_implementation.HasReadSchema)
105+
if (_implementation.HasReadSchema)
106106
{
107-
await _implementation.ReadSchemaAsync(cancellationToken);
107+
return _implementation.Schema;
108108
}
109-
return _implementation.Schema;
109+
return await _implementation.ReadSchemaAsync(cancellationToken);
110110
}
111111

112112
public ValueTask<RecordBatch> ReadNextRecordBatchAsync(CancellationToken cancellationToken = default)

src/Apache.Arrow/Ipc/ArrowStreamReaderImplementation.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,19 +151,19 @@ protected ReadResult ReadMessage()
151151
return new ReadResult(messageLength, result);
152152
}
153153

154-
public override async ValueTask ReadSchemaAsync(CancellationToken cancellationToken = default)
154+
public override async ValueTask<Schema> ReadSchemaAsync(CancellationToken cancellationToken = default)
155155
{
156156
if (HasReadSchema)
157157
{
158-
return;
158+
return _schema;
159159
}
160160

161161
// Figure out length of schema
162162
int schemaMessageLength = await ReadMessageLengthAsync(throwOnFullRead: true, returnOnEmptyStream: true, cancellationToken)
163163
.ConfigureAwait(false);
164164
if (schemaMessageLength == 0)
165165
{
166-
return;
166+
return null;
167167
}
168168

169169
using (ArrayPool<byte>.Shared.RentReturn(schemaMessageLength, out Memory<byte> buff))
@@ -174,6 +174,7 @@ public override async ValueTask ReadSchemaAsync(CancellationToken cancellationTo
174174

175175
Google.FlatBuffers.ByteBuffer schemabb = CreateByteBuffer(buff);
176176
_schema = MessageSerializer.GetSchema(ReadMessage<Flatbuf.Schema>(schemabb), ref _dictionaryMemo);
177+
return _schema;
177178
}
178179
}
179180

test/Apache.Arrow.Tests/ArrowStreamReaderTests.cs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,17 @@ private static async Task TestReaderFromPartialReadStream(Func<ArrowStreamReader
240240
}
241241
}
242242

243+
[Fact]
244+
public async Task EmptyStreamNoSyncRead()
245+
{
246+
using (var stream = new EmptyAsyncOnlyStream())
247+
{
248+
var reader = new ArrowStreamReader(stream);
249+
var schema = await reader.GetSchema();
250+
Assert.Null(schema);
251+
}
252+
}
253+
243254
/// <summary>
244255
/// A stream class that only returns a part of the data at a time.
245256
/// </summary>
@@ -280,6 +291,20 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int length, Cance
280291
}
281292
#endif
282293
}
294+
295+
private class EmptyAsyncOnlyStream : Stream
296+
{
297+
public override bool CanRead => true;
298+
public override bool CanSeek => false;
299+
public override bool CanWrite => false;
300+
public override long Length => 0;
301+
public override long Position { get => 0; set => throw new NotSupportedException(); }
302+
public override void Flush() { }
303+
public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
304+
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
305+
public override void SetLength(long value) => throw new NotSupportedException();
306+
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
307+
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) => Task.FromResult(0);
308+
}
283309
}
284310
}
285-

0 commit comments

Comments
 (0)