Skip to content

Commit 656d7f9

Browse files
NinoFlorisJonasWestman
authored andcommitted
Redo binary exporter column reading (npgsql#5464)
Fixes npgsql#5457 (cherry picked from commit cdf9841) Signed-off-by: monjowe <jonas.westman@monitor.se>
1 parent 7c8f60a commit 656d7f9

File tree

3 files changed

+148
-80
lines changed

3 files changed

+148
-80
lines changed

src/Npgsql/Internal/PgReader.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ namespace Npgsql.Internal;
1212

1313
public class PgReader
1414
{
15+
// We don't want to add a ton of memory pressure for large strings.
16+
internal const int MaxPreparedTextReaderSize = 1024 * 64;
17+
1518
readonly NpgsqlReadBuffer _buffer;
1619

1720
bool _resumable;
@@ -210,11 +213,8 @@ public ValueTask<TextReader> GetTextReaderAsync(Encoding encoding, CancellationT
210213

211214
async ValueTask<TextReader> GetTextReader(bool async, Encoding encoding, CancellationToken cancellationToken)
212215
{
213-
// We don't want to add a ton of memory pressure for large strings.
214-
const int maxPreparedSize = 1024 * 64;
215-
216216
_requiresCleanup = true;
217-
if (CurrentRemaining > _buffer.ReadBytesLeft || CurrentRemaining > maxPreparedSize)
217+
if (CurrentRemaining > _buffer.ReadBytesLeft || CurrentRemaining > MaxPreparedTextReaderSize)
218218
return new StreamReader(GetColumnStream(), encoding, detectEncodingFromByteOrderMarks: false);
219219

220220
if (_preparedTextReader is { IsDisposed: false })

src/Npgsql/NpgsqlBinaryExporter.cs

Lines changed: 77 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public sealed class NpgsqlBinaryExporter : ICancelable
3535
/// <summary>
3636
/// The number of columns, as returned from the backend in the CopyInResponse.
3737
/// </summary>
38-
internal int NumColumns { get; private set; }
38+
int NumColumns { get; set; }
3939

4040
PgConverterInfo[] _columnInfoCache;
4141

@@ -140,16 +140,18 @@ async Task ReadHeader(bool async)
140140

141141
async ValueTask<int> StartRow(bool async, CancellationToken cancellationToken = default)
142142
{
143-
144-
CheckDisposed();
143+
ThrowIfDisposed();
145144
if (_isConsumed)
146145
return -1;
147146

148147
using var registration = _connector.StartNestedCancellableOperation(cancellationToken);
149148

150149
// Consume and advance any active column.
151150
if (_column >= 0)
152-
await Commit(async, resumableOp: false).ConfigureAwait(false);
151+
{
152+
await Commit(async).ConfigureAwait(false);
153+
_column++;
154+
}
153155

154156
// The very first row (i.e. _column == -1) is included in the header's CopyData message.
155157
// Otherwise we need to read in a new CopyData row (the docs specify that there's a CopyData
@@ -210,29 +212,6 @@ public ValueTask<T> ReadAsync<T>(CancellationToken cancellationToken = default)
210212
ValueTask<T> Read<T>(bool async, CancellationToken cancellationToken = default)
211213
=> Read<T>(async, null, cancellationToken);
212214

213-
PgConverterInfo CreateConverterInfo(Type type, NpgsqlDbType? npgsqlDbType = null)
214-
{
215-
var options = _connector.SerializerOptions;
216-
PgTypeId? pgTypeId = null;
217-
if (npgsqlDbType.HasValue)
218-
{
219-
pgTypeId = npgsqlDbType.Value.ToDataTypeName() is { } name
220-
? options.GetCanonicalTypeId(name)
221-
// Handle plugin types via lookup.
222-
: GetRepresentationalOrDefault(npgsqlDbType.Value.ToUnqualifiedDataTypeNameOrThrow());
223-
}
224-
var info = options.GetTypeInfo(type, pgTypeId)
225-
?? throw new NotSupportedException($"Reading is not supported for type '{type}'{(npgsqlDbType is null ? "" : $" and NpgsqlDbType '{npgsqlDbType}'")}");
226-
// Binary export has no type info so we only do caller-directed interpretation of data.
227-
return info.Bind(new Field("?", info.PgTypeId!.Value, -1), DataFormat.Binary);
228-
229-
PgTypeId GetRepresentationalOrDefault(string dataTypeName)
230-
{
231-
var type = options.DatabaseInfo.GetPostgresType(dataTypeName);
232-
return options.ToCanonicalTypeId(type.GetRepresentationalType());
233-
}
234-
}
235-
236215
/// <summary>
237216
/// Reads the current column, returns its value according to <paramref name="type"/> and
238217
/// moves ahead to the next column.
@@ -269,39 +248,22 @@ public ValueTask<T> ReadAsync<T>(NpgsqlDbType type, CancellationToken cancellati
269248

270249
async ValueTask<T> Read<T>(bool async, NpgsqlDbType? type, CancellationToken cancellationToken)
271250
{
272-
CheckDisposed();
273-
if (_column is BeforeRow)
274-
ThrowHelper.ThrowInvalidOperationException("Not reading a row");
251+
ThrowIfNotOnRow();
275252

276253
using var registration = _connector.StartNestedCancellableOperation(cancellationToken, attemptPgCancellation: false);
277254

278-
// Allow one more read if the field is a db null.
279-
// We cannot allow endless rereads otherwise it becomes quite unclear when a column advance happens.
280-
if (PgReader is { Initialized: true, Resumable: true, FieldSize: -1 })
281-
{
282-
await Commit(async, resumableOp: false).ConfigureAwait(false);
283-
return DbNullOrThrow();
284-
}
255+
if (!IsInitializedAndAtStart)
256+
await MoveNextColumn(async, resumableOp: false).ConfigureAwait(false);
285257

286-
// We must commit the current column before reading the next one unless it was an IsNull call.
287-
PgConverterInfo info;
288-
bool asObject;
289-
if (!PgReader.Initialized || !PgReader.Resumable || PgReader.CurrentRemaining != PgReader.FieldSize)
258+
if (PgReader.FieldSize is (-1 or 0) and var fieldSize)
290259
{
291-
await Commit(async, resumableOp: false).ConfigureAwait(false);
292-
info = GetInfo(type, out asObject);
293-
294-
// We need to get info after potential I/O as we don't know beforehand at what column we're at.
295-
var columnLen = await ReadColumnLenIfNeeded(async, resumableOp: false).ConfigureAwait(false);
296-
if (_column == NumColumns)
297-
ThrowHelper.ThrowInvalidOperationException("No more columns left in the current row");
298-
299-
if (columnLen is -1)
260+
// Commit, otherwise we'll have no way of knowing this column is finished.
261+
await Commit(async).ConfigureAwait(false);
262+
if (fieldSize is -1)
300263
return DbNullOrThrow();
301-
302264
}
303-
else
304-
info = GetInfo(type, out asObject);
265+
266+
var info = GetInfo(type, out var asObject);
305267

306268
T result;
307269
if (async)
@@ -323,6 +285,14 @@ async ValueTask<T> Read<T>(bool async, NpgsqlDbType? type, CancellationToken can
323285

324286
return result;
325287

288+
static T DbNullOrThrow()
289+
{
290+
// When T is a Nullable<T>, we support returning null
291+
if (default(T) is null && typeof(T).IsValueType)
292+
return default!;
293+
throw new InvalidCastException("Column is null");
294+
}
295+
326296
PgConverterInfo GetInfo(NpgsqlDbType? type, out bool asObject)
327297
{
328298
ref var cachedInfo = ref _columnInfoCache[_column];
@@ -331,12 +301,27 @@ PgConverterInfo GetInfo(NpgsqlDbType? type, out bool asObject)
331301
return converterInfo;
332302
}
333303

334-
T DbNullOrThrow()
304+
PgConverterInfo CreateConverterInfo(Type type, NpgsqlDbType? npgsqlDbType = null)
335305
{
336-
// When T is a Nullable<T>, we support returning null
337-
if (default(T) is null && typeof(T).IsValueType)
338-
return default!;
339-
throw new InvalidCastException("Column is null");
306+
var options = _connector.SerializerOptions;
307+
PgTypeId? pgTypeId = null;
308+
if (npgsqlDbType.HasValue)
309+
{
310+
pgTypeId = npgsqlDbType.Value.ToDataTypeName() is { } name
311+
? options.GetCanonicalTypeId(name)
312+
// Handle plugin types via lookup.
313+
: GetRepresentationalOrDefault(npgsqlDbType.Value.ToUnqualifiedDataTypeNameOrThrow());
314+
}
315+
var info = options.GetTypeInfo(type, pgTypeId)
316+
?? throw new NotSupportedException($"Reading is not supported for type '{type}'{(npgsqlDbType is null ? "" : $" and NpgsqlDbType '{npgsqlDbType}'")}");
317+
// Binary export has no type info so we only do caller-directed interpretation of data.
318+
return info.Bind(new Field("?", info.PgTypeId!.Value, -1), DataFormat.Binary);
319+
320+
PgTypeId GetRepresentationalOrDefault(string dataTypeName)
321+
{
322+
var type = options.DatabaseInfo.GetPostgresType(dataTypeName);
323+
return options.ToCanonicalTypeId(type.GetRepresentationalType());
324+
}
340325
}
341326
}
342327

@@ -347,8 +332,11 @@ public bool IsNull
347332
{
348333
get
349334
{
350-
Commit(async: false, resumableOp: true);
351-
return ReadColumnLenIfNeeded(async: false, resumableOp: true).GetAwaiter().GetResult() is -1;
335+
ThrowIfNotOnRow();
336+
if (!IsInitializedAndAtStart)
337+
return MoveNextColumn(async: false, resumableOp: true).GetAwaiter().GetResult() is -1;
338+
339+
return PgReader.FieldSize is - 1;
352340
}
353341
}
354342

@@ -365,46 +353,59 @@ public Task SkipAsync(CancellationToken cancellationToken = default)
365353

366354
async Task Skip(bool async, CancellationToken cancellationToken = default)
367355
{
368-
CheckDisposed();
356+
ThrowIfNotOnRow();
369357

370358
using var registration = _connector.StartNestedCancellableOperation(cancellationToken);
371359

372-
// We allow IsNull to have been called before skip.
373-
if (PgReader.Initialized && PgReader is not { Resumable: true, FieldSize: -1 })
374-
await Commit(async, resumableOp: false).ConfigureAwait(false);
375-
await ReadColumnLenIfNeeded(async, resumableOp: false).ConfigureAwait(false);
360+
if (!IsInitializedAndAtStart)
361+
await MoveNextColumn(async, resumableOp: false).ConfigureAwait(false);
362+
376363
await PgReader.Consume(async, cancellationToken: cancellationToken).ConfigureAwait(false);
364+
365+
// Commit, otherwise we'll have no way of knowing this column is finished.
366+
if (PgReader.FieldSize is -1 or 0)
367+
await Commit(async).ConfigureAwait(false);
377368
}
378369

379370
#endregion
380371

381372
#region Utilities
382373

383-
ValueTask Commit(bool async, bool resumableOp)
384-
{
385-
var resuming = PgReader is { Initialized: true, Resumable: true } && resumableOp;
386-
if (!resuming)
387-
_column++;
374+
bool IsInitializedAndAtStart => PgReader.Initialized && (PgReader.FieldSize is -1 || PgReader.FieldOffset is 0);
388375

376+
ValueTask Commit(bool async)
377+
{
389378
if (async)
390-
return PgReader.CommitAsync(resuming);
379+
return PgReader.CommitAsync(resuming: false);
391380

392-
PgReader.Commit(resuming);
381+
PgReader.Commit(resuming: false);
393382
return new();
394383
}
395384

396-
async ValueTask<int> ReadColumnLenIfNeeded(bool async, bool resumableOp)
385+
async ValueTask<int> MoveNextColumn(bool async, bool resumableOp)
397386
{
398-
if (PgReader is { Initialized: true, Resumable: true, FieldSize: -1 })
399-
return -1;
387+
if (async)
388+
await PgReader.CommitAsync(resuming: false).ConfigureAwait(false);
389+
else
390+
PgReader.Commit(resuming: false);
400391

392+
if (_column + 1 == NumColumns)
393+
ThrowHelper.ThrowInvalidOperationException("No more columns left in the current row");
394+
_column++;
401395
await _buf.Ensure(4, async).ConfigureAwait(false);
402396
var columnLen = _buf.ReadInt32();
403397
PgReader.Init(columnLen, DataFormat.Binary, resumableOp);
404398
return PgReader.FieldSize;
405399
}
406400

407-
void CheckDisposed()
401+
void ThrowIfNotOnRow()
402+
{
403+
ThrowIfDisposed();
404+
if (_column is BeforeRow)
405+
ThrowHelper.ThrowInvalidOperationException("Not reading a row");
406+
}
407+
408+
void ThrowIfDisposed()
408409
{
409410
if (_isDisposed)
410411
ThrowHelper.ThrowObjectDisposedException(nameof(NpgsqlBinaryExporter), "The COPY operation has already ended.");

test/Npgsql.Tests/CopyTests.cs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,73 @@ public async Task Wrong_table_definition_binary_export()
510510
Assert.That(await conn.ExecuteScalarAsync("SELECT 1"), Is.EqualTo(1));
511511
}
512512

513+
[Test, IssueLink("https://github.com/npgsql/npgsql/issues/5457")]
514+
public async Task MixedOperations()
515+
{
516+
if (IsMultiplexing)
517+
Assert.Ignore("Multiplexing: fails");
518+
using var conn = await OpenConnectionAsync();
519+
520+
var reader = conn.BeginBinaryExport("""
521+
COPY (values ('foo', 1), ('bar', null), (null, 2)) TO STDOUT BINARY
522+
""");
523+
while(reader.StartRow() != -1)
524+
{
525+
string? col1 = null;
526+
if (reader.IsNull)
527+
reader.Skip();
528+
else
529+
col1 = reader.Read<string>();
530+
int? col2 = null;
531+
if (reader.IsNull)
532+
reader.Skip();
533+
else
534+
col2 = reader.Read<int>();
535+
}
536+
}
537+
538+
[Test]
539+
public async Task ReadMoreColumnsThanExist()
540+
{
541+
if (IsMultiplexing)
542+
Assert.Ignore("Multiplexing: fails");
543+
using var conn = await OpenConnectionAsync();
544+
545+
var reader = conn.BeginBinaryExport("""
546+
COPY (values ('foo', 1), ('bar', null), (null, 2)) TO STDOUT BINARY
547+
""");
548+
while(reader.StartRow() != -1)
549+
{
550+
string? col1 = null;
551+
if (reader.IsNull)
552+
reader.Skip();
553+
else
554+
col1 = reader.Read<string>();
555+
int? col2 = null;
556+
if (reader.IsNull)
557+
reader.Skip();
558+
else
559+
col2 = reader.Read<int>();
560+
561+
Assert.Throws<InvalidOperationException>(() => _ = reader.IsNull);
562+
}
563+
}
564+
565+
[Test]
566+
public async Task StreamingRead()
567+
{
568+
if (IsMultiplexing)
569+
Assert.Ignore("Multiplexing: fails");
570+
using var conn = await OpenConnectionAsync();
571+
572+
var str = new string('a', PgReader.MaxPreparedTextReaderSize + 1);
573+
var reader = conn.BeginBinaryExport($"""COPY (values ('{str}')) TO STDOUT BINARY""");
574+
while (reader.StartRow() != -1)
575+
{
576+
using var _ = reader.Read<TextReader>(NpgsqlDbType.Text);
577+
}
578+
}
579+
513580
[Test, IssueLink("https://github.com/npgsql/npgsql/issues/2330")]
514581
public async Task Wrong_format_binary_export()
515582
{

0 commit comments

Comments
 (0)