Skip to content

Commit

Permalink
Make sure to dispose HttpContentStream when done reading module logs.… (
Browse files Browse the repository at this point in the history
#5245)

… (#5241)

This is another attempt to find and fix a potential issue where we don't dispose sockets in time.

For `GetModuleLogs` operation, we provide `HttpCompletionOption.ResponseHeadersRead` when calling `HttpClient.SendAsync()`. In this case, the response is not loaded into memory at once, but instead can be streamed. It implies, that we also need to properly dispose the stream so the connection is released ASAP.
  • Loading branch information
vadim-kovalyov authored Jul 14, 2021
1 parent 95b4441 commit 47011b1
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ public async Task<byte[]> GetLogs(string id, ModuleLogOptions logOptions, Cancel
logOptions.Filter.IncludeTimestamp = Option.Some(true);
}

Stream logsStream = await this.runtimeInfoProvider.GetModuleLogs(id, false, logOptions.Filter.Tail, logOptions.Filter.Since, logOptions.Filter.Until, logOptions.Filter.IncludeTimestamp, cancellationToken);
Events.ReceivedStream(id);
using (Stream logsStream = await this.runtimeInfoProvider.GetModuleLogs(id, false, logOptions.Filter.Tail, logOptions.Filter.Since, logOptions.Filter.Until, logOptions.Filter.IncludeTimestamp, cancellationToken))
{
Events.ReceivedStream(id);

byte[] logBytes = await this.GetProcessedLogs(id, logsStream, logOptions);
return logBytes;
byte[] logBytes = await this.GetProcessedLogs(id, logsStream, logOptions);
return logBytes;
}
}

// The id parameter is a regex. Logs for all modules that match this regex are processed.
Expand Down Expand Up @@ -65,10 +67,12 @@ internal async Task GetLogsStreamInternal(string id, ModuleLogOptions logOptions
Preconditions.CheckNotNull(logOptions, nameof(logOptions));
Preconditions.CheckNotNull(callback, nameof(callback));

Stream logsStream = await this.runtimeInfoProvider.GetModuleLogs(id, logOptions.Follow, logOptions.Filter.Tail, logOptions.Filter.Since, logOptions.Filter.Until, logOptions.Filter.IncludeTimestamp, cancellationToken);
Events.ReceivedStream(id);
using (Stream logsStream = await this.runtimeInfoProvider.GetModuleLogs(id, logOptions.Follow, logOptions.Filter.Tail, logOptions.Filter.Since, logOptions.Filter.Until, logOptions.Filter.IncludeTimestamp, cancellationToken))
{
Events.ReceivedStream(id);

await this.logsProcessor.ProcessLogsStream(id, logsStream, logOptions, callback);
await this.logsProcessor.ProcessLogsStream(id, logsStream, logOptions, callback);
}
}

static byte[] ProcessByContentEncoding(byte[] bytes, LogsContentEncoding contentEncoding) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,16 @@ async Task<string> ScrapeEndpoint(string endpoint, CancellationToken cancellatio
// Temporary. Only needed until edgeHub starts using asp.net to expose endpoints
endpoint = this.GetUriWithIpAddress(endpoint);

HttpResponseMessage result = await this.httpClient.GetAsync(endpoint, cancellationToken);
if (result.IsSuccessStatusCode)
using (HttpResponseMessage result = await this.httpClient.GetAsync(endpoint, cancellationToken))
{
return await result.Content.ReadAsStringAsync();
}
else
{
Log.LogInformation($"Error connecting to {endpoint} with result error code {result.StatusCode}");
if (result.IsSuccessStatusCode)
{
return await result.Content.ReadAsStringAsync();
}
else
{
Log.LogInformation($"Error connecting to {endpoint} with result error code {result.StatusCode}");
}
}
}
catch (Exception e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ async Task<ProxyReadiness> CheckAsync(CancellationToken token)
{
try
{
HttpResponseMessage response = await this.client.GetAsync($"/systeminfo?api-version={this.apiVersion}", token);
return response.StatusCode == HttpStatusCode.OK ? ProxyReadiness.Ready : ProxyReadiness.Failed;
using (HttpResponseMessage response = await this.client.GetAsync($"/systeminfo?api-version={this.apiVersion}", token))
{
return response.StatusCode == HttpStatusCode.OK ? ProxyReadiness.Ready : ProxyReadiness.Failed;
}
}
catch
{
Expand Down

0 comments on commit 47011b1

Please sign in to comment.