Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 29 additions & 18 deletions src/Hangfire.PostgreSql/PostgreSqlJobQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,10 @@ public IFetchedJob Dequeue(string[] queues, CancellationToken cancellationToken)
cancelListenSource?.Cancel();
listenTask?.Wait();
}
catch (AggregateException ex)
catch (AggregateException)
{
if (ex.InnerException is not TaskCanceledException)
{
throw;
}

// Otherwise do nothing, cancel exception is expected.
// Swallow all exceptions from listen task cleanup.
// The main dequeue operation already succeeded or failed independently.
}
finally
{
Expand Down Expand Up @@ -282,30 +278,45 @@ private Task ListenForNotificationsAsync(CancellationToken cancellationToken)
// CreateAnOpenConnection can return the same connection over and over if an existing connection
// is passed in the constructor of PostgreSqlStorage. We must use a separate dedicated
// connection to listen for notifications.
NpgsqlConnection clonedConnection = connection.CloneWith(connection.ConnectionString);
string connectionString = connection.ConnectionString;
NpgsqlConnection clonedConnection = connection.CloneWith(connectionString);

return Task.Run(async () => {
NpgsqlConnection currentConnection = clonedConnection;
try
{
if (clonedConnection.State != ConnectionState.Open)
{
await clonedConnection.OpenAsync(cancellationToken); // Open so that Dapper doesn't auto-close.
}

while (!cancellationToken.IsCancellationRequested)
{
await clonedConnection.ExecuteAsync($"LISTEN {JobNotificationChannel}");
await clonedConnection.WaitAsync(cancellationToken);
JobQueueNotification.Set();
try
{
if (currentConnection.State != ConnectionState.Open)
{
await currentConnection.OpenAsync(cancellationToken);
}

await currentConnection.ExecuteAsync($"LISTEN {JobNotificationChannel}");
await currentConnection.WaitAsync(cancellationToken);
JobQueueNotification.Set();
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex) when (ex.IsCatchableExceptionType())
{
currentConnection?.Dispose();
await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
currentConnection = new NpgsqlConnection(connectionString);
}
}
}
catch (TaskCanceledException)
catch (OperationCanceledException)
{
// Do nothing, cancellation requested so just end.
}
finally
{
_storage.ReleaseConnection(clonedConnection);
currentConnection?.Dispose();
}

}, cancellationToken);
Expand Down
49 changes: 49 additions & 0 deletions tests/Hangfire.PostgreSql.Tests/PostgreSqlJobQueueFacts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using Hangfire.PostgreSql.Tests.Utils;
using Hangfire.PostgreSql.Utils;
using Hangfire.Storage;
using Npgsql;
using Xunit;

namespace Hangfire.PostgreSql.Tests
Expand Down Expand Up @@ -57,9 +58,9 @@
queue.Enqueue(connection, "2", "2");
queue.Enqueue(connection, "3", "3");

Assert.Equal("1", queue.Dequeue(new[] { "1", "2", "3" }, token).JobId);

Check warning on line 61 in tests/Hangfire.PostgreSql.Tests/PostgreSqlJobQueueFacts.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Prefer 'static readonly' fields over constant array arguments if the called method is called repeatedly and is not mutating the passed array (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca1861)
Assert.Equal("2", queue.Dequeue(new[] { "2", "3", "1" }, token).JobId);

Check warning on line 62 in tests/Hangfire.PostgreSql.Tests/PostgreSqlJobQueueFacts.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Prefer 'static readonly' fields over constant array arguments if the called method is called repeatedly and is not mutating the passed array (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca1861)
Assert.Equal("3", queue.Dequeue(new[] { "3", "1", "2" }, token).JobId);

Check warning on line 63 in tests/Hangfire.PostgreSql.Tests/PostgreSqlJobQueueFacts.cs

View workflow job for this annotation

GitHub Actions / build-and-test

Prefer 'static readonly' fields over constant array arguments if the called method is called repeatedly and is not mutating the passed array (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca1861)
});
}

Expand Down Expand Up @@ -538,6 +539,54 @@
});
}

[Fact]
[CleanDatabase]
public void Dequeue_ShouldSelfHeal_WhenListenConnectionFails()
{
UseConnection((_, storage) => {
storage.Options.QueuePollInterval = TimeSpan.FromMilliseconds(500);
PostgreSqlJobQueue queue = CreateJobQueue(storage, false, true);
Exception thrownException = null;
IFetchedJob job = null;

CancellationTokenSource cts = new(TimeSpan.FromSeconds(10));

Task dequeueTask = Task.Run(() => {
try
{
job = queue.Dequeue(new[] { "default" }, cts.Token);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
thrownException = ex;
}
});

Thread.Sleep(1000);

using (NpgsqlConnection adminConnection = ConnectionUtils.CreateMasterConnection())
{
adminConnection.Execute(@"
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE query LIKE '%LISTEN%'
AND pid <> pg_backend_pid()");
}

Thread.Sleep(500);

using (NpgsqlConnection enqueueConnection = ConnectionUtils.CreateConnection())
{
queue.Enqueue(enqueueConnection, "default", "1");
}

dequeueTask.Wait(TimeSpan.FromSeconds(5));

Assert.Null(thrownException);
Assert.NotNull(job);
});
}

private static CancellationToken CreateTimingOutCancellationToken()
{
CancellationTokenSource source = new CancellationTokenSource(TimeSpan.FromSeconds(10));
Expand Down
Loading