Skip to content

Commit c0bca61

Browse files
author
David García Vives
authored
Squash commit (#487)
1 parent 0d13c41 commit c0bca61

File tree

12 files changed

+1367
-286
lines changed

12 files changed

+1367
-286
lines changed

src/Docker.DotNet/Docker.DotNet.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,6 @@
77
<ItemGroup>
88
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
99
<PackageReference Include="System.Buffers" Version="4.5.1" />
10+
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.4" />
1011
</ItemGroup>
1112
</Project>

src/Docker.DotNet/DockerClient.cs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
using System;
1+
using System;
22
using System.Collections.Generic;
33
using System.IO;
44
using System.IO.Pipes;
@@ -346,22 +346,22 @@ private async Task<HttpResponseMessage> PrivateMakeRequestAsync(
346346
IRequestContent data,
347347
CancellationToken cancellationToken)
348348
{
349-
// If there is a timeout, we turn it into a cancellation token. At the same time, we need to link to the caller's
350-
// cancellation token. To avoid leaking objects, we must then also dispose of the CancellationTokenSource. To keep
351-
// code flow simple, we treat it as re-entering the same method with a different CancellationToken and no timeout.
349+
var request = PrepareRequest(method, path, queryString, headers, data);
350+
352351
if (timeout != s_InfiniteTimeout)
353352
{
354353
using (var timeoutTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
355354
{
356355
timeoutTokenSource.CancelAfter(timeout);
357-
358-
// We must await here because we need to dispose of the CTS only after the work has been completed.
359-
return await PrivateMakeRequestAsync(s_InfiniteTimeout, completionOption, method, path, queryString, headers, data, timeoutTokenSource.Token).ConfigureAwait(false);
356+
return await _client.SendAsync(request, completionOption, timeoutTokenSource.Token).ConfigureAwait(false);
360357
}
361358
}
362359

363-
var request = PrepareRequest(method, path, queryString, headers, data);
364-
return await _client.SendAsync(request, completionOption, cancellationToken).ConfigureAwait(false);
360+
var tcs = new TaskCompletionSource<HttpResponseMessage>();
361+
using (cancellationToken.Register(() => tcs.SetCanceled()))
362+
{
363+
return await await Task.WhenAny(tcs.Task, _client.SendAsync(request, completionOption, cancellationToken)).ConfigureAwait(false);
364+
}
365365
}
366366

367367
private async Task HandleIfErrorResponseAsync(HttpStatusCode statusCode, HttpResponseMessage response, IEnumerable<ApiResponseErrorHandlingDelegate> handlers)
@@ -452,4 +452,4 @@ public void Dispose()
452452
}
453453

454454
internal delegate void ApiResponseErrorHandlingDelegate(HttpStatusCode statusCode, string responseBody);
455-
}
455+
}
Lines changed: 55 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -1,97 +1,55 @@
1-
using System;
2-
using System.IO;
3-
using System.Net.Http;
4-
using System.Text;
5-
using System.Threading;
6-
using System.Threading.Tasks;
7-
using Newtonsoft.Json;
8-
9-
namespace Docker.DotNet.Models
10-
{
11-
internal static class StreamUtil
12-
{
13-
private static Newtonsoft.Json.JsonSerializer _serializer = new Newtonsoft.Json.JsonSerializer();
14-
15-
internal static async Task MonitorStreamAsync(Task<Stream> streamTask, DockerClient client, CancellationToken cancel, IProgress<string> progress)
16-
{
17-
using (var stream = await streamTask)
18-
{
19-
// ReadLineAsync must be cancelled by closing the whole stream.
20-
using (cancel.Register(() => stream.Dispose()))
21-
{
22-
using (var reader = new StreamReader(stream, new UTF8Encoding(false)))
23-
{
24-
string line;
25-
while ((line = await reader.ReadLineAsync()) != null)
26-
{
27-
progress.Report(line);
28-
}
29-
}
30-
}
31-
}
32-
}
33-
34-
internal static async Task MonitorStreamForMessagesAsync<T>(Task<Stream> streamTask, DockerClient client, CancellationToken cancel, IProgress<T> progress)
35-
{
36-
using (var stream = await streamTask)
37-
using (var reader = new StreamReader(stream, new UTF8Encoding(false)))
38-
using (var jsonReader = new JsonTextReader(reader) { SupportMultipleContent = true })
39-
{
40-
while (await jsonReader.ReadAsync().WithCancellation(cancel))
41-
{
42-
var ev = _serializer.Deserialize<T>(jsonReader);
43-
progress?.Report(ev);
44-
}
45-
}
46-
}
47-
48-
internal static async Task MonitorResponseForMessagesAsync<T>(Task<HttpResponseMessage> responseTask, DockerClient client, CancellationToken cancel, IProgress<T> progress)
49-
{
50-
using (var response = await responseTask)
51-
{
52-
await client.HandleIfErrorResponseAsync(response.StatusCode, response);
53-
54-
using (var stream = await response.Content.ReadAsStreamAsync())
55-
{
56-
// ReadLineAsync must be cancelled by closing the whole stream.
57-
using (cancel.Register(() => stream.Dispose()))
58-
{
59-
using (var reader = new StreamReader(stream, new UTF8Encoding(false)))
60-
{
61-
string line;
62-
try
63-
{
64-
while ((line = await reader.ReadLineAsync()) != null)
65-
{
66-
var prog = client.JsonSerializer.DeserializeObject<T>(line);
67-
if (prog == null) continue;
68-
69-
progress.Report(prog);
70-
}
71-
}
72-
catch (ObjectDisposedException)
73-
{
74-
// The subsequent call to reader.ReadLineAsync() after cancellation
75-
// will fail because we disposed the stream. Just ignore here.
76-
}
77-
}
78-
}
79-
}
80-
}
81-
}
82-
83-
private static async Task<T> WithCancellation<T>(this Task<T> task, CancellationToken cancellationToken)
84-
{
85-
var tcs = new TaskCompletionSource<bool>();
86-
using (cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true), tcs))
87-
{
88-
if (task != await Task.WhenAny(task, tcs.Task))
89-
{
90-
throw new OperationCanceledException(cancellationToken);
91-
}
92-
}
93-
94-
return await task;
95-
}
96-
}
97-
}
1+
using System;
2+
using System.Diagnostics;
3+
using System.IO;
4+
using System.Net.Http;
5+
using System.Text;
6+
using System.Threading;
7+
using System.Threading.Tasks;
8+
using Newtonsoft.Json;
9+
10+
namespace Docker.DotNet.Models
11+
{
12+
internal static class StreamUtil
13+
{
14+
internal static async Task MonitorStreamAsync(Task<Stream> streamTask, DockerClient client, CancellationToken cancellationToken, IProgress<string> progress)
15+
{
16+
var tcs = new TaskCompletionSource<string>();
17+
18+
using (var stream = await streamTask)
19+
using (var reader = new StreamReader(stream, new UTF8Encoding(false)))
20+
using (cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken)))
21+
{
22+
string line;
23+
while ((line = await await Task.WhenAny(reader.ReadLineAsync(), tcs.Task)) != null)
24+
{
25+
progress.Report(line);
26+
}
27+
}
28+
}
29+
30+
internal static async Task MonitorStreamForMessagesAsync<T>(Task<Stream> streamTask, DockerClient client, CancellationToken cancellationToken, IProgress<T> progress)
31+
{
32+
var tcs = new TaskCompletionSource<bool>();
33+
34+
using (var stream = await streamTask)
35+
using (var reader = new StreamReader(stream, new UTF8Encoding(false)))
36+
using (var jsonReader = new JsonTextReader(reader) { SupportMultipleContent = true })
37+
using (cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken)))
38+
{
39+
while (await await Task.WhenAny(jsonReader.ReadAsync(cancellationToken), tcs.Task))
40+
{
41+
var ev = await client.JsonSerializer.Deserialize<T>(jsonReader, cancellationToken);
42+
progress.Report(ev);
43+
}
44+
}
45+
}
46+
47+
internal static async Task MonitorResponseForMessagesAsync<T>(Task<HttpResponseMessage> responseTask, DockerClient client, CancellationToken cancel, IProgress<T> progress)
48+
{
49+
using (var response = await responseTask)
50+
{
51+
await MonitorStreamForMessagesAsync<T>(response.Content.ReadAsStreamAsync(), client, cancel, progress);
52+
}
53+
}
54+
}
55+
}

src/Docker.DotNet/JsonSerializer.cs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
using Newtonsoft.Json;
1+
using System.Threading;
2+
using System.Threading.Tasks;
3+
using Newtonsoft.Json;
24
using Newtonsoft.Json.Converters;
35

46
namespace Docker.DotNet
@@ -8,6 +10,8 @@ namespace Docker.DotNet
810
/// </summary>
911
internal class JsonSerializer
1012
{
13+
private readonly Newtonsoft.Json.JsonSerializer _serializer;
14+
1115
private readonly JsonSerializerSettings _settings = new JsonSerializerSettings
1216
{
1317
NullValueHandling = NullValueHandling.Ignore,
@@ -24,6 +28,23 @@ internal class JsonSerializer
2428

2529
public JsonSerializer()
2630
{
31+
_serializer = Newtonsoft.Json.JsonSerializer.CreateDefault(this._settings);
32+
}
33+
34+
public Task<T> Deserialize<T>(JsonReader jsonReader, CancellationToken cancellationToken)
35+
{
36+
var tcs = new TaskCompletionSource<T>();
37+
using (cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken)))
38+
{
39+
Task.Factory.StartNew(
40+
() => tcs.TrySetResult(_serializer.Deserialize<T>(jsonReader)),
41+
cancellationToken,
42+
TaskCreationOptions.LongRunning,
43+
TaskScheduler.Default
44+
);
45+
46+
return tcs.Task;
47+
}
2748
}
2849

2950
public T DeserializeObject<T>(string json)
@@ -36,4 +57,4 @@ public string SerializeObject<T>(T value)
3657
return JsonConvert.SerializeObject(value, this._settings);
3758
}
3859
}
39-
}
60+
}

src/Docker.DotNet/Microsoft.Net.Http.Client/ChunkedReadStream.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public override int WriteTimeout
7979

8080
public override int Read(byte[] buffer, int offset, int count)
8181
{
82-
return ReadAsync(buffer, offset, count, CancellationToken.None).Result;
82+
return ReadAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();
8383
}
8484

8585
public async override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)

0 commit comments

Comments
 (0)