Skip to content

Commit 58dd8da

Browse files
committed
Different more sane approach
1 parent 1c02184 commit 58dd8da

File tree

4 files changed

+136
-104
lines changed

4 files changed

+136
-104
lines changed

LiveControlGateway/Controllers/HubControllerBase.cs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,6 @@ protected override async Task<bool> TryRegisterConnection()
146146
/// </summary>
147147
protected override async Task UnregisterConnection()
148148
{
149-
if (_newConnection) return; // We dont want to call this here, as it would lead to a deadlock, this is already taken care of in the manager
150149
await _hubLifetimeManager.RemoveDeviceConnection(this);
151150
}
152151

@@ -159,21 +158,6 @@ protected override async Task UnregisterConnection()
159158
/// <inheritdoc />
160159
public abstract ValueTask OtaInstall(SemVersion version);
161160

162-
/// <summary>
163-
/// When we are disposing the controller because there is a new connection already
164-
/// </summary>
165-
private bool _newConnection = false;
166-
167-
/// <summary>
168-
/// Called by the hub lifetime manager to dispose the connection because there is a new connection
169-
/// This is a direct replacement for DisposeAsync
170-
/// </summary>
171-
/// <returns></returns>
172-
public ValueTask DisposeForNewConnection()
173-
{
174-
_newConnection = true;
175-
return DisposeAsync();
176-
}
177161

178162
/// <summary>
179163
/// Keep the hub online

LiveControlGateway/Controllers/IHubController.cs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,4 @@ public interface IHubController : IAsyncDisposable
3434
/// <param name="version"></param>
3535
/// <returns></returns>
3636
public ValueTask OtaInstall(SemVersion version);
37-
38-
/// <summary>
39-
/// Dispose the current hub controller because a new connection has been established
40-
/// </summary>
41-
/// <returns></returns>
42-
public ValueTask DisposeForNewConnection();
4337
}

LiveControlGateway/LifetimeManager/HubLifetime.cs

Lines changed: 75 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,24 @@ namespace OpenShock.LiveControlGateway.LifetimeManager;
2626
/// </summary>
2727
public sealed class HubLifetime : IAsyncDisposable
2828
{
29+
30+
public enum HubLifetimeState
31+
{
32+
Idle,
33+
SettingUp,
34+
Swapping,
35+
Removing
36+
}
37+
38+
private volatile HubLifetimeState _state = HubLifetimeState.SettingUp;
39+
public IHubController HubController { get; private set; }
40+
2941
private readonly TimeSpan _waitBetweenTicks;
3042
private readonly ushort _commandDuration;
3143

3244
private Dictionary<Guid, ShockerState> _shockerStates = new();
3345
private readonly byte _tps;
34-
public IHubController HubController { get; }
35-
private readonly CancellationToken _cancellationToken;
46+
private readonly CancellationTokenSource _cancellationToken;
3647

3748
private readonly IDbContextFactory<OpenShockContext> _dbContextFactory;
3849
private readonly IRedisConnectionProvider _redisConnectionProvider;
@@ -49,17 +60,15 @@ public sealed class HubLifetime : IAsyncDisposable
4960
/// <param name="redisConnectionProvider"></param>
5061
/// <param name="redisPubService"></param>
5162
/// <param name="logger"></param>
52-
/// <param name="cancellationToken"></param>
5363
public HubLifetime([Range(1, 10)] byte tps, IHubController hubController,
5464
IDbContextFactory<OpenShockContext> dbContextFactory,
5565
IRedisConnectionProvider redisConnectionProvider,
5666
IRedisPubService redisPubService,
57-
ILogger<HubLifetime> logger,
58-
CancellationToken cancellationToken = default)
67+
ILogger<HubLifetime> logger)
5968
{
6069
_tps = tps;
6170
HubController = hubController;
62-
_cancellationToken = cancellationToken;
71+
_cancellationToken = new CancellationTokenSource();
6372
_dbContextFactory = dbContextFactory;
6473
_redisConnectionProvider = redisConnectionProvider;
6574
_redisPubService = redisPubService;
@@ -72,21 +81,62 @@ public HubLifetime([Range(1, 10)] byte tps, IHubController hubController,
7281
/// <summary>
7382
/// Call on creation to setup shockers for the first time
7483
/// </summary>
75-
/// <param name="db"></param>
76-
public async Task InitAsync(OpenShockContext db)
84+
public async Task InitAsync()
7785
{
86+
await using var db = await _dbContextFactory.CreateDbContextAsync(_cancellationToken.Token);
7887
await UpdateShockers(db);
7988
#pragma warning disable CS4014
8089
LucTask.Run(UpdateLoop);
8190
#pragma warning restore CS4014
91+
92+
_state = HubLifetimeState.Idle; // We are fully setup, we can go back to idle state
93+
}
94+
95+
/// <summary>
96+
/// Swap to a new underlying controller
97+
/// </summary>
98+
/// <param name="newController"></param>
99+
public async Task Swap(IHubController newController)
100+
{
101+
var oldController = HubController;
102+
103+
await oldController.DisposeAsync();
104+
105+
HubController = newController;
106+
await UpdateDevice();
107+
108+
_state = HubLifetimeState.Idle; // Swap is done, return to (~~monke~~) idle
109+
}
110+
111+
/// <summary>
112+
/// Try to mark the lifetime as swapping
113+
/// This needs external synchronization
114+
/// </summary>
115+
/// <returns>true if the lifetime is not busy, false if it is. Consider rejecting the connection</returns>
116+
public bool TryMarkSwapping()
117+
{
118+
if (_state != HubLifetimeState.Idle) return false;
119+
_state = HubLifetimeState.Swapping;
120+
return true;
121+
}
122+
123+
/// <summary>
124+
/// Mark the lifetime as removing
125+
/// This needs external synchronization
126+
/// </summary>
127+
/// <returns>true if the lifetime is not swapping or already removing</returns>
128+
public bool TryMarkRemoving()
129+
{
130+
if (_state is HubLifetimeState.Swapping or HubLifetimeState.Removing) return false;
131+
_state = HubLifetimeState.Removing;
132+
return true;
82133
}
83134

84135
private async Task UpdateLoop()
85136
{
86-
var stopwatch = Stopwatch.StartNew();
87137
while (!_cancellationToken.IsCancellationRequested)
88138
{
89-
stopwatch.Restart();
139+
var startingTime = Stopwatch.GetTimestamp();
90140

91141
try
92142
{
@@ -97,15 +147,16 @@ private async Task UpdateLoop()
97147
_logger.LogError(e, "Error in Update()");
98148
}
99149

100-
var elapsed = stopwatch.Elapsed;
150+
151+
var elapsed = Stopwatch.GetElapsedTime(startingTime);
101152
var waitTime = _waitBetweenTicks - elapsed;
102153
if (waitTime.TotalMilliseconds < 1)
103154
{
104155
_logger.LogWarning("Update loop running behind for device [{DeviceId}]", HubController.Id);
105156
continue;
106157
}
107158

108-
await Task.Delay(waitTime, _cancellationToken);
159+
await Task.Delay(waitTime, _cancellationToken.Token);
109160
}
110161
}
111162

@@ -138,7 +189,7 @@ private async Task Update()
138189
/// </summary>
139190
public async Task UpdateDevice()
140191
{
141-
await using var db = await _dbContextFactory.CreateDbContextAsync(_cancellationToken);
192+
await using var db = await _dbContextFactory.CreateDbContextAsync(_cancellationToken.Token);
142193
await UpdateShockers(db);
143194

144195
foreach (var websocketController in
@@ -158,7 +209,7 @@ private async Task UpdateShockers(OpenShockContext db)
158209
Id = x.Id,
159210
Model = x.Model,
160211
RfId = x.RfId
161-
}).ToDictionaryAsync(x => x.Id, x => x, cancellationToken: _cancellationToken);
212+
}).ToDictionaryAsync(x => x.Id, x => x, cancellationToken: _cancellationToken.Token);
162213
}
163214

164215
/// <summary>
@@ -294,11 +345,17 @@ await deviceOnline.InsertAsync(new DeviceOnline
294345
return new Success();
295346
}
296347

348+
private bool _disposed = false;
349+
297350
/// <inheritdoc />
298-
public ValueTask DisposeAsync() => HubController.DisposeAsync();
299-
300-
/// <inheritdoc />
301-
public ValueTask DisposeForNewConnection() => HubController.DisposeForNewConnection();
351+
public async ValueTask DisposeAsync()
352+
{
353+
if(_disposed) return;
354+
_disposed = true;
355+
356+
await _cancellationToken.CancelAsync();
357+
}
358+
302359
}
303360

304361
/// <summary>

LiveControlGateway/LifetimeManager/HubLifetimeManager.cs

Lines changed: 61 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,9 @@ public sealed class HubLifetimeManager
2424
private readonly IRedisPubService _redisPubService;
2525
private readonly ILoggerFactory _loggerFactory;
2626
private readonly ILogger<HubLifetimeManager> _logger;
27-
private readonly ConcurrentDictionary<Guid, HubLifetime> _lifetimes = new();
28-
private readonly ConcurrentDictionary<Guid, SemaphoreSlim> _hubLocks = new();
29-
30-
private readonly HashSet<Guid> _pendingHubs = [];
31-
private readonly SemaphoreSlim _pendingHubsLock = new(1);
27+
private readonly Dictionary<Guid, HubLifetime> _lifetimes = new();
28+
29+
private readonly SemaphoreSlim _lifetimesLock = new(1);
3230

3331
/// <summary>
3432
/// DI constructor
@@ -62,100 +60,99 @@ ILoggerFactory loggerFactory
6260
public async Task<bool> TryAddDeviceConnection(byte tps, IHubController hubController,
6361
CancellationToken cancellationToken)
6462
{
65-
// Try finally for _pendingHubs Add <-> Remove scope
66-
try
63+
var isSwapping = false;
64+
HubLifetime? hubLifetime;
65+
66+
using (await _lifetimesLock.LockAsyncScoped(cancellationToken))
6767
{
68-
using (await _pendingHubsLock.LockAsyncScoped(cancellationToken))
69-
{
70-
if (!_pendingHubs.Add(hubController.Id)) return false; // Another hub is already queued here
71-
}
72-
73-
using (await _hubLocks.GetOrAdd(hubController.Id, new SemaphoreSlim(1)).LockAsyncScoped(cancellationToken)) // Any only one thread is allowed here, per hub, and that is what we want.
68+
if (_lifetimes.TryGetValue(hubController.Id, out hubLifetime))
7469
{
75-
if (_lifetimes.TryGetValue(hubController.Id, out var oldControllerLifetime))
70+
// There already is a hub lifetime, lets swap!
71+
if (!hubLifetime.TryMarkSwapping())
7672
{
77-
_logger.LogDebug("Disposing old hub controller");
78-
await oldControllerLifetime
79-
.DisposeForNewConnection(); // Use this to not call the remove connection method from the controller
80-
81-
foreach (var websocketController in WebsocketManager.LiveControlUsers.GetConnections(hubController
82-
.Id))
83-
await websocketController.UpdateConnectedState(false);
73+
return
74+
false; // Tell the controller that we are busy right now TODO: Tell the connecting client why it failed
8475
}
8576

86-
var hubLifetime = GetLifetime(tps, hubController, cancellationToken);
77+
isSwapping = true;
78+
}
79+
else
80+
{
81+
// This is a fresh connection with no existing lifetime, create one!
82+
hubLifetime = CreateNewLifetime(tps, hubController);
8783
_lifetimes[hubController.Id] = hubLifetime;
84+
}
85+
}
8886

89-
await using var db = await _dbContextFactory.CreateDbContextAsync(cancellationToken);
90-
91-
await hubLifetime.InitAsync(db);
92-
93-
foreach (var websocketController in WebsocketManager.LiveControlUsers.GetConnections(hubController.Id))
94-
await websocketController.UpdateConnectedState(true);
9587

96-
return true;
97-
}
88+
if (isSwapping)
89+
{
90+
await hubLifetime.Swap(hubController);
9891
}
99-
finally
92+
else
10093
{
101-
using (await _pendingHubsLock.LockAsyncScoped(cancellationToken))
102-
{
103-
_pendingHubs.Remove(hubController.Id);
104-
}
94+
await hubLifetime.InitAsync();
95+
96+
foreach (var websocketController in WebsocketManager.LiveControlUsers.GetConnections(hubLifetime
97+
.HubController.Id))
98+
await websocketController.UpdateConnectedState(true);
10599
}
100+
101+
return true;
106102
}
107103

108-
private HubLifetime GetLifetime(byte tps, IHubController hubController, CancellationToken cancellationToken)
104+
private HubLifetime CreateNewLifetime(byte tps, IHubController hubController)
109105
{
110-
_logger.LogInformation("New device connected, creating lifetime [{DeviceId}]", hubController.Id);
106+
_logger.LogInformation("New hub connected, creating lifetime [{DeviceId}]", hubController.Id);
111107

112108
var deviceLifetime = new HubLifetime(
113109
tps,
114110
hubController,
115111
_dbContextFactory,
116112
_redisConnectionProvider,
117113
_redisPubService,
118-
_loggerFactory.CreateLogger<HubLifetime>(),
119-
cancellationToken);
114+
_loggerFactory.CreateLogger<HubLifetime>());
120115

121116
return deviceLifetime;
122117
}
123118

124119
/// <summary>
125-
/// Remove device from Lifetime Manager, called on dispose of device controller, this is the actual end of life of the hub
120+
/// Remove device from Lifetime Manager, called on dispose of device controller,
121+
/// this is the actual end of life of the hub
126122
/// </summary>
127123
/// <param name="hubController"></param>
128124
public async Task RemoveDeviceConnection(IHubController hubController)
129125
{
130-
if (!_hubLocks.TryGetValue(hubController.Id, out var hubLock))
131-
{
132-
// Our lock doesnt exist, this shouldn't happen
133-
_logger.LogWarning("Hub lock not found for hub [{HubId}]", hubController.Id);
134-
return;
135-
}
136-
137-
using (await hubLock.LockAsyncScoped())
126+
HubLifetime? hubLifetime;
127+
128+
using (await _lifetimesLock.LockAsyncScoped())
138129
{
139-
try
130+
if (!_lifetimes.TryGetValue(hubController.Id, out hubLifetime))
140131
{
141-
if(!_lifetimes.TryGetValue(hubController.Id, out var oldControllerLifetime)) return;
142-
143-
if(oldControllerLifetime.HubController != hubController) return;
144-
145-
_lifetimes.TryRemove(hubController.Id, out _);
132+
_logger.LogError("Hub lifetime not found for hub [{HubId}]", hubController.Id);
133+
return;
134+
}
146135

147-
foreach (var websocketController in WebsocketManager.LiveControlUsers.GetConnections(hubController.Id))
148-
await websocketController.UpdateConnectedState(false);
136+
// Dont remove a hub lifetime that has a different hub controller,
137+
// this might happen when remove is called after a swap has been fully done
138+
if(hubLifetime.HubController != hubController) return;
139+
140+
if (hubLifetime.TryMarkRemoving())
141+
{
142+
return;
149143
}
150-
finally
144+
}
145+
146+
foreach (var websocketController in WebsocketManager.LiveControlUsers.GetConnections(hubController.Id))
147+
await websocketController.UpdateConnectedState(false);
148+
149+
await hubLifetime.DisposeAsync();
150+
151+
using (await _lifetimesLock.LockAsyncScoped())
152+
{
153+
if (!_lifetimes.Remove(hubController.Id))
151154
{
152-
using (await _pendingHubsLock.LockAsyncScoped())
153-
{
154-
if (!_pendingHubs.Contains(hubController.Id))
155-
{
156-
_hubLocks.TryRemove(hubController.Id, out _);
157-
}
158-
}
155+
_logger.LogError("Failed to remove hub lifetime [{HubId}], this shouldnt happen WTF?!", hubController.Id);
159156
}
160157
}
161158
}

0 commit comments

Comments
 (0)