Skip to content

Commit 14b23a1

Browse files
eerhardtdavidfowl
andauthored
Dashboard should request log streams only when the user gestures to display them (#3235) (#3343)
Change the AppHost to only subscribe to DCP's logs when there is a subscriber for the logs. Fix #2789 * Show logs * Remove the "StreamingLogger" because that causes subsequent subscribers to not see what was already written. Fix duplicate logs issue by clearing out the backlog when the last subscriber leaves. * Fix a concurrency issue with the backlog. Ensure the backlog is only snap shotted after subscribing to the log. Fix existing tests for new functionality and add an additional test. * PR feedback Only clear the backlog on containers and executables. * PR feedback. Move lock out of async iterator. * Clean up code. - Remove count and instead check if the delegate is null to indicate whether there are subscribers or not. - Remove the separate IAsyncEnumerable classes and just use `async IAsyncEnumerable` methods. * Fix a race at startup when logs aren't available. Employ a 2nd loop that listens for both "has subscribers" and "logs available". * Simplify ResourceNotificationService.WatchAsync. * Fix test build * Address PR feedback - Set SingleReader=true on the logInformationChannel. - Add comment and assert that LogsAvailable can only turn true and can't go back to false. --------- Co-authored-by: David Fowler <davidfowl@gmail.com>
1 parent af97f4d commit 14b23a1

File tree

6 files changed

+379
-113
lines changed

6 files changed

+379
-113
lines changed

src/Aspire.Hosting/ApplicationModel/ResourceLoggerService.cs

Lines changed: 208 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33

44
using System.Collections.Concurrent;
5+
using System.Runtime.CompilerServices;
56
using System.Threading.Channels;
67
using Aspire.Dashboard.Otlp.Storage;
78
using Microsoft.Extensions.Logging;
@@ -15,11 +16,29 @@ public class ResourceLoggerService
1516
{
1617
private readonly ConcurrentDictionary<string, ResourceLoggerState> _loggers = new();
1718

19+
private Action<(string, ResourceLoggerState)>? _loggerAdded;
20+
private event Action<(string, ResourceLoggerState)> LoggerAdded
21+
{
22+
add
23+
{
24+
_loggerAdded += value;
25+
26+
foreach (var logger in _loggers)
27+
{
28+
value((logger.Key, logger.Value));
29+
}
30+
}
31+
remove
32+
{
33+
_loggerAdded -= value;
34+
}
35+
}
36+
1837
/// <summary>
1938
/// Gets the logger for the resource to write to.
2039
/// </summary>
2140
/// <param name="resource">The resource name</param>
22-
/// <returns>An <see cref="ILogger"/>.</returns>
41+
/// <returns>An <see cref="ILogger"/> which represents the resource.</returns>
2342
public ILogger GetLogger(IResource resource)
2443
{
2544
ArgumentNullException.ThrowIfNull(resource);
@@ -31,19 +50,31 @@ public ILogger GetLogger(IResource resource)
3150
/// Gets the logger for the resource to write to.
3251
/// </summary>
3352
/// <param name="resourceName">The name of the resource from the Aspire application model.</param>
34-
/// <returns>An <see cref="ILogger"/> which repesents the named resource.</returns>
53+
/// <returns>An <see cref="ILogger"/> which represents the named resource.</returns>
3554
public ILogger GetLogger(string resourceName)
3655
{
3756
ArgumentNullException.ThrowIfNull(resourceName);
3857

3958
return GetResourceLoggerState(resourceName).Logger;
4059
}
4160

61+
/// <summary>
62+
/// Watch for changes to the log stream for a resource.
63+
/// </summary>
64+
/// <param name="resource">The resource to watch for logs.</param>
65+
/// <returns>An async enumerable that returns the logs as they are written.</returns>
66+
public IAsyncEnumerable<IReadOnlyList<LogLine>> WatchAsync(IResource resource)
67+
{
68+
ArgumentNullException.ThrowIfNull(resource);
69+
70+
return WatchAsync(resource.Name);
71+
}
72+
4273
/// <summary>
4374
/// Watch for changes to the log stream for a resource.
4475
/// </summary>
4576
/// <param name="resourceName">The resource name</param>
46-
/// <returns></returns>
77+
/// <returns>An async enumerable that returns the logs as they are written.</returns>
4778
public IAsyncEnumerable<IReadOnlyList<LogLine>> WatchAsync(string resourceName)
4879
{
4980
ArgumentNullException.ThrowIfNull(resourceName);
@@ -52,15 +83,39 @@ public IAsyncEnumerable<IReadOnlyList<LogLine>> WatchAsync(string resourceName)
5283
}
5384

5485
/// <summary>
55-
/// Watch for changes to the log stream for a resource.
86+
/// Watch for subscribers to the log stream for a resource.
5687
/// </summary>
57-
/// <param name="resource">The resource to watch for logs.</param>
58-
/// <returns></returns>
59-
public IAsyncEnumerable<IReadOnlyList<LogLine>> WatchAsync(IResource resource)
88+
/// <returns>
89+
/// An async enumerable that returns when the first subscriber is added to a log,
90+
/// or when the last subscriber is removed.
91+
/// </returns>
92+
public async IAsyncEnumerable<LogSubscriber> WatchAnySubscribersAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
6093
{
61-
ArgumentNullException.ThrowIfNull(resource);
94+
var channel = Channel.CreateUnbounded<LogSubscriber>();
6295

63-
return WatchAsync(resource.Name);
96+
void OnLoggerAdded((string Name, ResourceLoggerState State) loggerItem)
97+
{
98+
var (name, state) = loggerItem;
99+
100+
state.OnSubscribersChanged += (hasSubscribers) =>
101+
{
102+
channel.Writer.TryWrite(new(name, hasSubscribers));
103+
};
104+
}
105+
106+
LoggerAdded += OnLoggerAdded;
107+
108+
try
109+
{
110+
await foreach (var entry in channel.Reader.ReadAllAsync(cancellationToken))
111+
{
112+
yield return entry;
113+
}
114+
}
115+
finally
116+
{
117+
LoggerAdded -= OnLoggerAdded;
118+
}
64119
}
65120

66121
/// <summary>
@@ -91,8 +146,27 @@ public void Complete(string name)
91146
}
92147
}
93148

149+
/// <summary>
150+
/// Clears the log stream's backlog for the resource.
151+
/// </summary>
152+
public void ClearBacklog(string resourceName)
153+
{
154+
ArgumentNullException.ThrowIfNull(resourceName);
155+
156+
if (_loggers.TryGetValue(resourceName, out var logger))
157+
{
158+
logger.ClearBacklog();
159+
}
160+
}
161+
94162
private ResourceLoggerState GetResourceLoggerState(string resourceName) =>
95-
_loggers.GetOrAdd(resourceName, _ => new ResourceLoggerState());
163+
_loggers.GetOrAdd(resourceName, (name, context) =>
164+
{
165+
var state = new ResourceLoggerState();
166+
context._loggerAdded?.Invoke((name, state));
167+
return state;
168+
},
169+
this);
96170

97171
/// <summary>
98172
/// A logger for the resource to write to.
@@ -102,7 +176,6 @@ private sealed class ResourceLoggerState
102176
private readonly ResourceLogger _logger;
103177
private readonly CancellationTokenSource _logStreamCts = new();
104178

105-
// History of logs, capped at 10000 entries.
106179
private readonly CircularBuffer<LogLine> _backlog = new(10000);
107180

108181
/// <summary>
@@ -113,21 +186,112 @@ public ResourceLoggerState()
113186
_logger = new ResourceLogger(this);
114187
}
115188

189+
private Action<bool>? _onSubscribersChanged;
190+
public event Action<bool> OnSubscribersChanged
191+
{
192+
add
193+
{
194+
_onSubscribersChanged += value;
195+
196+
var hasSubscribers = false;
197+
198+
lock (this)
199+
{
200+
if (_onNewLog is not null) // we have subscribers
201+
{
202+
hasSubscribers = true;
203+
}
204+
}
205+
206+
if (hasSubscribers)
207+
{
208+
value(hasSubscribers);
209+
}
210+
}
211+
remove
212+
{
213+
_onSubscribersChanged -= value;
214+
}
215+
}
216+
116217
/// <summary>
117218
/// Watch for changes to the log stream for a resource.
118219
/// </summary>
119220
/// <returns>The log stream for the resource.</returns>
120-
public IAsyncEnumerable<IReadOnlyList<LogLine>> WatchAsync()
221+
public async IAsyncEnumerable<IReadOnlyList<LogLine>> WatchAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
121222
{
122-
lock (_backlog)
223+
var channel = Channel.CreateUnbounded<LogLine>();
224+
225+
using var _ = _logStreamCts.Token.Register(() => channel.Writer.TryComplete());
226+
227+
void Log(LogLine log)
123228
{
124-
// REVIEW: Performance makes me very sad, but we can optimize this later.
125-
return new LogAsyncEnumerable(this, _backlog.ToList());
229+
channel.Writer.TryWrite(log);
230+
}
231+
232+
OnNewLog += Log;
233+
234+
// ensure the backlog snapshot is taken after subscribing to OnNewLog
235+
// to ensure the backlog snapshot contains the correct logs. The backlog
236+
// can get cleared when there are no subscribers, so we ensure we are subscribing first.
237+
238+
// REVIEW: Performance makes me very sad, but we can optimize this later.
239+
var backlogSnapshot = GetBacklogSnapshot();
240+
if (backlogSnapshot.Length > 0)
241+
{
242+
yield return backlogSnapshot;
243+
}
244+
245+
try
246+
{
247+
await foreach (var entry in channel.GetBatchesAsync(cancellationToken: cancellationToken))
248+
{
249+
yield return entry;
250+
}
251+
}
252+
finally
253+
{
254+
OnNewLog -= Log;
255+
256+
channel.Writer.TryComplete();
126257
}
127258
}
128259

129260
// This provides the fan out to multiple subscribers.
130-
private Action<LogLine>? OnNewLog { get; set; }
261+
private Action<LogLine>? _onNewLog;
262+
private event Action<LogLine> OnNewLog
263+
{
264+
add
265+
{
266+
bool raiseSubscribersChanged;
267+
lock (this)
268+
{
269+
raiseSubscribersChanged = _onNewLog is null; // is this the first subscriber?
270+
271+
_onNewLog += value;
272+
}
273+
274+
if (raiseSubscribersChanged)
275+
{
276+
_onSubscribersChanged?.Invoke(true);
277+
}
278+
}
279+
remove
280+
{
281+
bool raiseSubscribersChanged;
282+
lock (this)
283+
{
284+
_onNewLog -= value;
285+
286+
raiseSubscribersChanged = _onNewLog is null; // is this the last subscriber?
287+
}
288+
289+
if (raiseSubscribersChanged)
290+
{
291+
_onSubscribersChanged?.Invoke(false);
292+
}
293+
}
294+
}
131295

132296
/// <summary>
133297
/// The logger for the resource to write to. This will write updates to the live log stream for this resource.
@@ -143,7 +307,23 @@ public void Complete()
143307
_logStreamCts.Cancel();
144308
}
145309

146-
private sealed class ResourceLogger(ResourceLoggerState annotation) : ILogger
310+
public void ClearBacklog()
311+
{
312+
lock (_backlog)
313+
{
314+
_backlog.Clear();
315+
}
316+
}
317+
318+
private LogLine[] GetBacklogSnapshot()
319+
{
320+
lock (_backlog)
321+
{
322+
return [.. _backlog];
323+
}
324+
}
325+
326+
private sealed class ResourceLogger(ResourceLoggerState loggerState) : ILogger
147327
{
148328
private int _lineNumber;
149329

@@ -153,7 +333,7 @@ private sealed class ResourceLogger(ResourceLoggerState annotation) : ILogger
153333

154334
public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Exception? exception, Func<TState, Exception?, string> formatter)
155335
{
156-
if (annotation._logStreamCts.IsCancellationRequested)
336+
if (loggerState._logStreamCts.IsCancellationRequested)
157337
{
158338
// Noop if logging after completing the stream
159339
return;
@@ -163,52 +343,23 @@ public void Log<TState>(LogLevel logLevel, EventId eventId, TState state, Except
163343
var isErrorMessage = logLevel >= LogLevel.Error;
164344

165345
LogLine logLine;
166-
lock (annotation._backlog)
346+
lock (loggerState._backlog)
167347
{
168348
_lineNumber++;
169349
logLine = new LogLine(_lineNumber, log, isErrorMessage);
170350

171-
annotation._backlog.Add(logLine);
351+
loggerState._backlog.Add(logLine);
172352
}
173353

174-
annotation.OnNewLog?.Invoke(logLine);
175-
}
176-
}
177-
178-
private sealed class LogAsyncEnumerable(ResourceLoggerState annotation, List<LogLine> backlogSnapshot) : IAsyncEnumerable<IReadOnlyList<LogLine>>
179-
{
180-
public async IAsyncEnumerator<IReadOnlyList<LogLine>> GetAsyncEnumerator(CancellationToken cancellationToken = default)
181-
{
182-
if (backlogSnapshot.Count > 0)
183-
{
184-
yield return backlogSnapshot;
185-
}
186-
187-
var channel = Channel.CreateUnbounded<LogLine>();
188-
189-
using var _ = annotation._logStreamCts.Token.Register(() => channel.Writer.TryComplete());
190-
191-
void Log(LogLine log)
192-
{
193-
channel.Writer.TryWrite(log);
194-
}
195-
196-
annotation.OnNewLog += Log;
197-
198-
try
199-
{
200-
await foreach (var entry in channel.GetBatchesAsync(cancellationToken: cancellationToken))
201-
{
202-
yield return entry;
203-
}
204-
}
205-
finally
206-
{
207-
annotation.OnNewLog -= Log;
208-
209-
channel.Writer.TryComplete();
210-
}
354+
loggerState._onNewLog?.Invoke(logLine);
211355
}
212356
}
213357
}
214358
}
359+
360+
/// <summary>
361+
/// Represents a log subscriber for a resource.
362+
/// </summary>
363+
/// <param name="Name">The the resource name.</param>
364+
/// <param name="AnySubscribers">Determines if there are any subscribers.</param>
365+
public readonly record struct LogSubscriber(string Name, bool AnySubscribers);

0 commit comments

Comments
 (0)