Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/guide/durability/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ builder.UseWolverine(opts =>
}
```

You can also override the polling interval for a specific queue:

```cs
opts.ListenToMySqlQueue("inbound").PollingInterval(2.Seconds());
```

When not set, the queue falls back to the global `DurabilitySettings.ScheduledJobPollingTime`.

::: info Control queue
Wolverine has an internal control queue (`dbcontrol`) used for internal operations.
This queue is hardcoded to poll every second and should not be changed to ensure the stability of the application.
Expand Down
8 changes: 8 additions & 0 deletions docs/guide/durability/oracle.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ builder.UseWolverine(opts =>
}
```

You can also override the polling interval for a specific queue:

```cs
opts.ListenToOracleQueue("inbound").PollingInterval(2.Seconds());
```

When not set, the queue falls back to the global `DurabilitySettings.ScheduledJobPollingTime`.

::: info Control queue
Wolverine has an internal control queue (`dbcontrol`) used for internal operations.
This queue is hardcoded to poll every second and should not be changed to ensure the stability of the application.
Expand Down
12 changes: 10 additions & 2 deletions docs/guide/durability/postgresql.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,16 @@ builder.UseWolverine(opts =>
}
```

::: info Control queue
Wolverine has an internal control queue (`dbcontrol`) used for internal operations.
You can also override the polling interval for a specific queue:

```cs
opts.ListenToPostgresqlQueue("inbound").PollingInterval(2.Seconds());
```

When not set, the queue falls back to the global `DurabilitySettings.ScheduledJobPollingTime`.

::: info Control queue
Wolverine has an internal control queue (`dbcontrol`) used for internal operations.
This queue is hardcoded to poll every second and should not be changed to ensure the stability of the application.
:::

Expand Down
8 changes: 8 additions & 0 deletions docs/guide/durability/sqlite.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ builder.UseWolverine(opts =>
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Persistence/SqliteTests/DocumentationSamples.cs#L160-L175' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_sqlite_polling_configuration' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

You can also override the polling interval for a specific queue:

```cs
opts.ListenToSqliteQueue("inbound").PollingInterval(2.Seconds());
```

When not set, the queue falls back to the global `DurabilitySettings.ScheduledJobPollingTime`.

::: info Control queue
Wolverine has an internal control queue (`dbcontrol`) used for internal operations.
This queue is hardcoded to poll every second and should not be changed to ensure the stability of the application.
Expand Down
16 changes: 14 additions & 2 deletions docs/guide/durability/sqlserver.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,21 @@ opts.ListenToSqlServerQueue("sender").BufferedInMemory();
<sup><a href='https://github.com/JasperFx/wolverine/blob/main/src/Persistence/SqlServerTests/Transport/compliance_tests.cs#L67-L71' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_setting_sql_server_queue_to_buffered' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Using this option just means that the Sql Server queues can be used for both sending or receiving with no integration
Using this option just means that the Sql Server queues can be used for both sending or receiving with no integration
with the transactional inbox or outbox. This is a little more performant, but less safe as messages could be
lost if held in memory when the application shuts down unexpectedly.
lost if held in memory when the application shuts down unexpectedly.

### Polling
The Sql Server transport polls queues on a configured interval. The default interval is controlled globally by
`DurabilitySettings.ScheduledJobPollingTime` (default: 5 seconds).

You can override the polling interval for a specific queue:

```cs
opts.ListenToSqlServerQueue("inbound").PollingInterval(2.Seconds());
```

When not set, the queue falls back to the global `DurabilitySettings.ScheduledJobPollingTime`.

If you want to use Sql Server as a queueing mechanism between multiple applications, you'll need:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ public MySqlListenerConfiguration MaximumMessagesToReceive(int maximum)
return this;
}

/// <summary>
/// Configure how often to poll for new messages when the queue is idle.
/// If not set, falls back to DurabilitySettings.ScheduledJobPollingTime (default 5s).
/// </summary>
public MySqlListenerConfiguration PollingInterval(TimeSpan interval)
{
add(e => e.PollingInterval = interval);
return this;
}

/// <summary>
/// Add circuit breaker exception handling to this listener
/// </summary>
Expand Down
6 changes: 6 additions & 0 deletions src/Persistence/MySql/Wolverine.MySql/Transport/MySqlQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ protected override bool supportsMode(EndpointMode mode)
/// </summary>
public int MaximumMessagesToReceive { get; set; } = 20;

/// <summary>
/// How often to poll for new messages when the queue is idle.
/// If null, falls back to DurabilitySettings.ScheduledJobPollingTime (default 5s).
/// </summary>
public TimeSpan? PollingInterval { get; set; }

public override async ValueTask<IListener> BuildListenerAsync(IWolverineRuntime runtime, IReceiver receiver)
{
if (Parent.AutoProvision)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ internal class MySqlQueueListener : IListener
private readonly string _queueName;
private readonly string _schemaName;
private readonly string _scheduledTableName;
private readonly TimeSpan _pollingInterval;

public MySqlQueueListener(MySqlQueue queue, IWolverineRuntime runtime, IReceiver receiver,
MySqlDataSource dataSource, string? databaseName)
Expand All @@ -38,6 +39,7 @@ public MySqlQueueListener(MySqlQueue queue, IWolverineRuntime runtime, IReceiver
_databaseName = databaseName;
_logger = runtime.LoggerFactory.CreateLogger<MySqlQueueListener>();
_settings = runtime.DurabilitySettings;
_pollingInterval = queue.PollingInterval ?? _settings.ScheduledJobPollingTime;

_sender = new MySqlQueueSender(queue, _dataSource, databaseName);

Expand Down Expand Up @@ -108,7 +110,7 @@ private async Task lookForScheduledMessagesAsync()

failedCount = 0;

await Task.Delay(_settings.ScheduledJobPollingTime);
await Task.Delay(_pollingInterval);
}
catch (Exception e)
{
Expand Down Expand Up @@ -223,13 +225,13 @@ private async Task listenForMessagesAsync()
}
else
{
await Task.Delay(_settings.ScheduledJobPollingTime);
await Task.Delay(_pollingInterval);
}
}
else
{
// Slow down if this is a periodically used queue
await Task.Delay(_settings.ScheduledJobPollingTime);
await Task.Delay(_pollingInterval);
}
}
catch (Exception e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ public OracleListenerConfiguration MaximumMessagesToReceive(int maximum)
return this;
}

/// <summary>
/// Configure how often to poll for new messages when the queue is idle.
/// If not set, falls back to DurabilitySettings.ScheduledJobPollingTime (default 5s).
/// </summary>
public OracleListenerConfiguration PollingInterval(TimeSpan interval)
{
add(e => e.PollingInterval = interval);
return this;
}

/// <summary>
/// Add circuit breaker exception handling to this listener
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ protected override bool supportsMode(EndpointMode mode)
/// </summary>
public int MaximumMessagesToReceive { get; set; } = 20;

/// <summary>
/// How often to poll for new messages when the queue is idle.
/// If null, falls back to DurabilitySettings.ScheduledJobPollingTime (default 5s).
/// </summary>
public TimeSpan? PollingInterval { get; set; }

public override async ValueTask<IListener> BuildListenerAsync(IWolverineRuntime runtime, IReceiver receiver)
{
if (Parent.AutoProvision)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ internal class OracleQueueListener : IListener
private readonly string _queueName;
private readonly string _schemaName;
private readonly string _scheduledTableName;
private readonly TimeSpan _pollingInterval;

public OracleQueueListener(OracleQueue queue, IWolverineRuntime runtime, IReceiver receiver,
OracleDataSource dataSource, string? databaseName)
Expand All @@ -39,6 +40,7 @@ public OracleQueueListener(OracleQueue queue, IWolverineRuntime runtime, IReceiv
_databaseName = databaseName;
_logger = runtime.LoggerFactory.CreateLogger<OracleQueueListener>();
_settings = runtime.DurabilitySettings;
_pollingInterval = queue.PollingInterval ?? _settings.ScheduledJobPollingTime;

_sender = new OracleQueueSender(queue, _dataSource, databaseName);

Expand Down Expand Up @@ -105,7 +107,7 @@ private async Task lookForScheduledMessagesAsync()

failedCount = 0;

await Task.Delay(_settings.ScheduledJobPollingTime);
await Task.Delay(_pollingInterval);
}
catch (Exception e)
{
Expand Down Expand Up @@ -226,12 +228,12 @@ private async Task listenForMessagesAsync()
}
else
{
await Task.Delay(_settings.ScheduledJobPollingTime);
await Task.Delay(_pollingInterval);
}
}
else
{
await Task.Delay(_settings.ScheduledJobPollingTime);
await Task.Delay(_pollingInterval);
}
}
catch (Exception e)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using JasperFx.Core;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Wolverine;
Expand Down Expand Up @@ -40,7 +41,10 @@ public static async Task Bootstrapping()

// Optionally specify how many messages to
// fetch into the listener at any one time
.MaximumMessagesToReceive(50);
.MaximumMessagesToReceive(50)

// Override how often to poll for new messages when the queue is idle.
.PollingInterval(1.Seconds());
});

using var host = builder.Build();
Expand Down
6 changes: 5 additions & 1 deletion src/Persistence/PostgresqlTests/DocumentationSamples.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using JasperFx.Core;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Wolverine;
Expand Down Expand Up @@ -52,7 +53,10 @@ public static async Task Bootstrapping()

// Optionally specify how many messages to
// fetch into the listener at any one time
.MaximumMessagesToReceive(50);
.MaximumMessagesToReceive(50)

// Override how often to poll for new messages when the queue is idle.
.PollingInterval(1.Seconds());
});

using var host = builder.Build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using JasperFx.Core;
using Shouldly;
using Wolverine.Configuration;
using Wolverine.Postgresql.Transport;
Expand Down Expand Up @@ -47,4 +48,12 @@ public void mode_cannot_be_inline()
queue.Mode = EndpointMode.Inline;
});
}

[Fact]
public void polling_interval_defaults_to_null()
{
var queue = new PostgresqlQueue("one", theTransport);
queue.PollingInterval.ShouldBeNull();
}

}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using JasperFx.Core;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Wolverine;
Expand Down Expand Up @@ -39,7 +40,10 @@ public static async Task Bootstrapping()

// Optionally specify how many messages to
// fetch into the listener at any one time
.MaximumMessagesToReceive(50);
.MaximumMessagesToReceive(50)

// Override how often to poll for new messages when the queue is idle.
.PollingInterval(1.Seconds());
});

using var host = builder.Build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,11 @@ public void mode_cannot_be_inline()
queue.Mode = EndpointMode.Inline;
});
}

[Fact]
public void polling_interval_defaults_to_null()
{
var queue = new SqlServerQueue("one", theTransport);
queue.PollingInterval.ShouldBeNull();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ public PostgresqlListenerConfiguration MaximumMessagesToReceive(int maximum)
return this;
}

/// <summary>
/// Configure how often to poll for new messages when the queue is idle.
/// If not set, falls back to DurabilitySettings.ScheduledJobPollingTime (default 5s).
/// </summary>
public PostgresqlListenerConfiguration PollingInterval(TimeSpan interval)
{
add(e => e.PollingInterval = interval);
return this;
}

/// <summary>
/// Add circuit breaker exception handling to this listener
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ protected override bool supportsMode(EndpointMode mode)
/// </summary>
public int MaximumMessagesToReceive { get; set; } = 20;

/// <summary>
/// How often to poll for new messages when the queue is idle.
/// If null, falls back to DurabilitySettings.ScheduledJobPollingTime (default 5s).
/// </summary>
public TimeSpan? PollingInterval { get; set; }

public override async ValueTask<IListener> BuildListenerAsync(IWolverineRuntime runtime, IReceiver receiver)
{
if (Parent.AutoProvision)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ internal class PostgresqlQueueListener : IListener
private readonly string _queueName;
private readonly string _schemaName;
private readonly string _scheduledTableName;
private readonly TimeSpan _pollingInterval;

public PostgresqlQueueListener(PostgresqlQueue queue, IWolverineRuntime runtime, IReceiver receiver,
NpgsqlDataSource dataSource, string? databaseName)
Expand All @@ -40,6 +41,7 @@ public PostgresqlQueueListener(PostgresqlQueue queue, IWolverineRuntime runtime,
_databaseName = databaseName;
_logger = runtime.LoggerFactory.CreateLogger<PostgresqlQueueListener>();
_settings = runtime.DurabilitySettings;
_pollingInterval = queue.PollingInterval ?? _settings.ScheduledJobPollingTime;

_sender = new PostgresqlQueueSender(queue, _dataSource, databaseName);

Expand Down Expand Up @@ -111,7 +113,7 @@ private async Task lookForScheduledMessagesAsync()

failedCount = 0;

await Task.Delay(_settings.ScheduledJobPollingTime);
await Task.Delay(_pollingInterval);

}
catch (Exception e)
Expand Down Expand Up @@ -189,13 +191,13 @@ private async Task listenForMessagesAsync()
}
else
{
await Task.Delay(_settings.ScheduledJobPollingTime);
await Task.Delay(_pollingInterval);
}
}
else
{
// Slow down if this is a periodically used queue
await Task.Delay(_settings.ScheduledJobPollingTime);
await Task.Delay(_pollingInterval);
}
}
catch (Exception e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ public SqlServerListenerConfiguration MaximumMessagesToReceive(int maximum)
return this;
}

/// <summary>
/// Configure how often to poll for new messages when the queue is idle.
/// If not set, falls back to DurabilitySettings.ScheduledJobPollingTime (default 5s).
/// </summary>
public SqlServerListenerConfiguration PollingInterval(TimeSpan interval)
{
add(e => e.PollingInterval = interval);
return this;
}

/// <summary>
/// Add circuit breaker exception handling to this listener
/// </summary>
Expand Down
Loading
Loading