Skip to content

Commit

Permalink
use Channel.TryWrite when re-queuing messages
Browse files Browse the repository at this point in the history
  • Loading branch information
williamdenton committed Jun 11, 2024
1 parent 91d2e78 commit f1fa2ed
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
using System.Data;
using System.Threading.Channels;
using HotChocolate.Subscriptions.Diagnostics;
using Npgsql;
using NpgsqlTypes;
using static HotChocolate.Subscriptions.Postgres.PostgresResources;

namespace HotChocolate.Subscriptions.Postgres;
Expand Down Expand Up @@ -136,9 +136,20 @@ private async Task HandleMessage(NpgsqlConnection connection, CancellationToken
_diagnosticEvents.ProviderInfo(msg);

// if we cannot send the message we put it back into the channel
// however as the channel is bounded, we might not able to requeue the message and will be forced to drop them if they can't be written
var failedCount = 0;

foreach (var message in messages)
{
await _channel.Writer.WriteAsync(message, CancellationToken.None);
if (!_channel.Writer.TryWrite(message))
{
failedCount++;
}
}

if (failedCount > 0)
{
_diagnosticEvents.ProviderInfo(string.Format(ChannelWriter_FailedToRequeueMessage, failedCount));
}
}
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -55,4 +55,7 @@
<data name="PostgresMessageEnvelope_PayloadTooLarge" xml:space="preserve">
<value>Payload is too long to we written to Postgres. Serialized message is {0} bytes but limit is {1} bytes</value>
</data>
<data name="ChannelWriter_FailedToRequeueMessage" xml:space="preserve">
<value>The postgres writer was unable to requeue messages. {0} messages have been lost</value>
</data>
</root>

0 comments on commit f1fa2ed

Please sign in to comment.