Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/Hangfire.PostgreSql/PostgreSqlJobQueueMonitoringApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,13 @@ SELECT COUNT(*)
private IEnumerable<long> GetQueuedOrFetchedJobIds(string queue, bool fetched, int from, int perPage)
{
string sqlQuery = $@"
SELECT j.""id""
SELECT DISTINCT j.""id""
FROM ""{_storage.Options.SchemaName}"".""jobqueue"" jq
LEFT JOIN ""{_storage.Options.SchemaName}"".""job"" j ON jq.""jobid"" = j.""id""
WHERE jq.""queue"" = @Queue
WHERE jq.""queue"" = @Queue
AND jq.""fetchedat"" {(fetched ? "IS NOT NULL" : "IS NULL")}
AND j.""id"" IS NOT NULL
ORDER BY jq.""fetchedat"", jq.""jobid""
ORDER BY j.""id""
LIMIT @Limit OFFSET @Offset;
";

Expand Down
12 changes: 7 additions & 5 deletions src/Hangfire.PostgreSql/PostgreSqlMonitoringApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -433,13 +433,14 @@ private IPersistentJobQueueMonitoringApi GetQueueApi(string queueName)
private JobList<EnqueuedJobDto> EnqueuedJobs(IEnumerable<long> jobIds)
{
string enqueuedJobsSql = $@"
SELECT ""j"".""id"" ""Id"", ""j"".""invocationdata"" ""InvocationData"", ""j"".""arguments"" ""Arguments"", ""j"".""createdat"" ""CreatedAt"",
SELECT DISTINCT ON (""j"".""id"") ""j"".""id"" ""Id"", ""j"".""invocationdata"" ""InvocationData"", ""j"".""arguments"" ""Arguments"", ""j"".""createdat"" ""CreatedAt"",
""j"".""expireat"" ""ExpireAt"", ""s"".""name"" ""StateName"", ""s"".""reason"" ""StateReason"", ""s"".""data"" ""StateData""
FROM ""{_storage.Options.SchemaName}"".""job"" ""j""
LEFT JOIN ""{_storage.Options.SchemaName}"".""state"" ""s"" ON ""s"".""id"" = ""j"".""stateid""
LEFT JOIN ""{_storage.Options.SchemaName}"".""jobqueue"" ""jq"" ON ""jq"".""jobid"" = ""j"".""id""
WHERE ""j"".""id"" = ANY (@JobIds)
AND ""jq"".""fetchedat"" IS NULL;
AND ""jq"".""fetchedat"" IS NULL
ORDER BY ""j"".""id"";
";

List<SqlJob> jobs = UseConnection(connection => connection.Query<SqlJob>(enqueuedJobsSql,
Expand Down Expand Up @@ -528,14 +529,15 @@ private JobList<FetchedJobDto> FetchedJobs(
IEnumerable<long> jobIds)
{
string fetchedJobsSql = $@"
SELECT ""j"".""id"" ""Id"", ""j"".""invocationdata"" ""InvocationData"", ""j"".""arguments"" ""Arguments"",
""j"".""createdat"" ""CreatedAt"", ""j"".""expireat"" ""ExpireAt"", ""jq"".""fetchedat"" ""FetchedAt"",
SELECT DISTINCT ON (""j"".""id"") ""j"".""id"" ""Id"", ""j"".""invocationdata"" ""InvocationData"", ""j"".""arguments"" ""Arguments"",
""j"".""createdat"" ""CreatedAt"", ""j"".""expireat"" ""ExpireAt"", ""jq"".""fetchedat"" ""FetchedAt"",
""j"".""statename"" ""StateName"", ""s"".""reason"" ""StateReason"", ""s"".""data"" ""StateData""
FROM ""{_storage.Options.SchemaName}"".""job"" ""j""
LEFT JOIN ""{_storage.Options.SchemaName}"".""state"" ""s"" ON ""j"".""stateid"" = ""s"".""id""
LEFT JOIN ""{_storage.Options.SchemaName}"".""jobqueue"" ""jq"" ON ""jq"".""jobid"" = ""j"".""id""
WHERE ""j"".""id"" = ANY (@JobIds)
AND ""jq"".""fetchedat"" IS NOT NULL;
AND ""jq"".""fetchedat"" IS NOT NULL
ORDER BY ""j"".""id"", ""jq"".""fetchedat"" DESC;
";

List<SqlJob> jobs = UseConnection(connection => connection.Query<SqlJob>(fetchedJobsSql,
Expand Down
43 changes: 43 additions & 0 deletions tests/Hangfire.PostgreSql.Tests/PostgreSqlMonitoringApiFacts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,49 @@ private void Commit(
transaction.Commit();
}

[Fact]
[CleanDatabase]
public void FetchedJobs_WithDuplicateJobQueueEntries_DoesNotThrow()
{
string schemaName = ConnectionUtils.GetSchemaName();

string createJobSql = $@"
INSERT INTO ""{schemaName}"".""job"" (""invocationdata"", ""arguments"", ""createdat"")
VALUES (@InvocationData, @Arguments, NOW()) RETURNING ""id""";

string createJobQueueSql = $@"
INSERT INTO ""{schemaName}"".""jobqueue"" (""jobid"", ""queue"", ""fetchedat"")
VALUES (@JobId, @Queue, @FetchedAt)";

UseConnection(connection => {
Job job = Job.FromExpression(() => SampleMethod("test"));
InvocationData invocationData = InvocationData.SerializeJob(job);

long jobId = connection.QuerySingle<long>(createJobSql,
new {
InvocationData = new JsonParameter(SerializationHelper.Serialize(invocationData)),
Arguments = new JsonParameter(invocationData.Arguments, JsonParameter.ValueType.Array),
});

DateTime fetchedAt = DateTime.UtcNow;
connection.Execute(createJobQueueSql, new { JobId = jobId, Queue = "default", FetchedAt = fetchedAt });
connection.Execute(createJobQueueSql, new { JobId = jobId, Queue = "default", FetchedAt = fetchedAt.AddSeconds(1) });

PostgreSqlStorage storage = _fixture.SafeInit();
PostgreSqlStorageOptions options = new() { SchemaName = schemaName };

PostgreSqlJobQueueProvider provider = new(storage, options);
PersistentJobQueueProviderCollection providers = new(provider);
storage.QueueProviders = providers;

IMonitoringApi monitoringApi = storage.GetMonitoringApi();
JobList<FetchedJobDto> fetchedJobs = monitoringApi.FetchedJobs("default", 0, 10);

Assert.NotNull(fetchedJobs);
Assert.Single(fetchedJobs);
});
}

#pragma warning disable xUnit1013 // Public method should be marked as test
public static void SampleMethod(string arg)
#pragma warning restore xUnit1013 // Public method should be marked as test
Expand Down
Loading