@@ -25,8 +25,7 @@ namespace Elasticsearch.Net
25
25
internal class RequestDataContent : HttpContent
26
26
{
27
27
private readonly RequestData _requestData ;
28
- private readonly Func < PostData , CompleteTaskOnCloseStream , RequestDataContent , TransportContext , Task > _onStreamAvailable ;
29
-
28
+ private readonly Func < RequestData , CompleteTaskOnCloseStream , RequestDataContent , TransportContext , Task > _onStreamAvailable ;
30
29
31
30
public RequestDataContent ( RequestData requestData )
32
31
{
@@ -35,12 +34,17 @@ public RequestDataContent(RequestData requestData)
35
34
if ( requestData . HttpCompression )
36
35
Headers . ContentEncoding . Add ( "gzip" ) ;
37
36
38
- Task OnStreamAvailable ( PostData data , Stream stream , HttpContent content , TransportContext context )
37
+ Task OnStreamAvailable ( RequestData data , Stream stream , HttpContent content , TransportContext context )
39
38
{
39
+ if ( data . HttpCompression )
40
+ stream = new GZipStream ( stream , CompressionMode . Compress , false ) ;
41
+
40
42
using ( stream )
41
- data . Write ( stream , requestData . ConnectionSettings ) ;
43
+ data . PostData . Write ( stream , data . ConnectionSettings ) ;
44
+
42
45
return Task . CompletedTask ;
43
46
}
47
+
44
48
_onStreamAvailable = OnStreamAvailable ;
45
49
}
46
50
public RequestDataContent ( RequestData requestData , CancellationToken token )
@@ -50,11 +54,15 @@ public RequestDataContent(RequestData requestData, CancellationToken token)
50
54
if ( requestData . HttpCompression )
51
55
Headers . ContentEncoding . Add ( "gzip" ) ;
52
56
53
- async Task OnStreamAvailable ( PostData data , Stream stream , HttpContent content , TransportContext context )
57
+ async Task OnStreamAvailable ( RequestData data , Stream stream , HttpContent content , TransportContext context )
54
58
{
59
+ if ( data . HttpCompression )
60
+ stream = new GZipStream ( stream , CompressionMode . Compress , false ) ;
61
+
55
62
using ( stream )
56
- await data . WriteAsync ( stream , requestData . ConnectionSettings , token ) . ConfigureAwait ( false ) ;
63
+ await data . PostData . WriteAsync ( stream , data . ConnectionSettings , token ) . ConfigureAwait ( false ) ;
57
64
}
65
+
58
66
_onStreamAvailable = OnStreamAvailable ;
59
67
}
60
68
@@ -69,16 +77,9 @@ async Task OnStreamAvailable(PostData data, Stream stream, HttpContent content,
69
77
[ SuppressMessage ( "Microsoft.Design" , "CA1031:DoNotCatchGeneralExceptionTypes" , Justification = "Exception is passed as task result." ) ]
70
78
protected override async Task SerializeToStreamAsync ( Stream stream , TransportContext context )
71
79
{
72
-
73
- var data = _requestData . PostData ;
74
- if ( data == null ) return ;
75
-
76
80
var serializeToStreamTask = new TaskCompletionSource < bool > ( ) ;
77
-
78
- if ( _requestData . HttpCompression )
79
- stream = new GZipStream ( stream , CompressionMode . Compress , false ) ;
80
81
var wrappedStream = new CompleteTaskOnCloseStream ( stream , serializeToStreamTask ) ;
81
- await _onStreamAvailable ( data , wrappedStream , this , context ) . ConfigureAwait ( false ) ;
82
+ await _onStreamAvailable ( _requestData , wrappedStream , this , context ) . ConfigureAwait ( false ) ;
82
83
await serializeToStreamTask . Task . ConfigureAwait ( false ) ;
83
84
}
84
85
@@ -111,7 +112,6 @@ protected override void Dispose(bool disposing)
111
112
base . Dispose ( ) ;
112
113
}
113
114
114
-
115
115
public override void Close ( ) => _serializeToStreamTask . TrySetResult ( true ) ;
116
116
}
117
117
@@ -193,6 +193,8 @@ public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, As
193
193
public override void EndWrite ( IAsyncResult asyncResult ) => _innerStream . EndWrite ( asyncResult ) ;
194
194
195
195
public override void WriteByte ( byte value ) => _innerStream . WriteByte ( value ) ;
196
+
197
+ public override void Close ( ) => _innerStream . Close ( ) ;
196
198
}
197
199
}
198
200
}
0 commit comments