5
5
using System . IO ;
6
6
using System . IO . Pipelines ;
7
7
using System . Threading . Tasks ;
8
- using Microsoft . Extensions . Logging ;
9
8
10
9
namespace Microsoft . AspNetCore . Server . Kestrel . Core . Internal
11
10
{
@@ -15,127 +14,36 @@ namespace Microsoft.AspNetCore.Server.Kestrel.Core.Internal
15
14
/// <typeparam name="TStream"></typeparam>
16
15
internal class DuplexPipeStreamAdapter < TStream > : DuplexPipeStream , IDuplexPipe where TStream : Stream
17
16
{
18
- private readonly Pipe _output ;
19
- private Task _outputTask ;
20
- private bool _disposed ;
21
- private readonly object _disposeLock = new object ( ) ;
22
-
23
17
public DuplexPipeStreamAdapter ( IDuplexPipe duplexPipe , Func < Stream , TStream > createStream ) :
24
18
this ( duplexPipe , new StreamPipeReaderOptions ( leaveOpen : true ) , new StreamPipeWriterOptions ( leaveOpen : true ) , createStream )
25
19
{
26
20
}
27
21
28
- public DuplexPipeStreamAdapter ( IDuplexPipe duplexPipe , StreamPipeReaderOptions readerOptions , StreamPipeWriterOptions writerOptions , Func < Stream , TStream > createStream ) :
29
- base ( duplexPipe . Input , duplexPipe . Output )
22
+ public DuplexPipeStreamAdapter ( IDuplexPipe duplexPipe , StreamPipeReaderOptions readerOptions , StreamPipeWriterOptions writerOptions , Func < Stream , TStream > createStream ) : base ( duplexPipe . Input , duplexPipe . Output )
30
23
{
31
24
Stream = createStream ( this ) ;
32
-
33
- var outputOptions = new PipeOptions ( pool : writerOptions . Pool ,
34
- readerScheduler : PipeScheduler . Inline ,
35
- writerScheduler : PipeScheduler . Inline ,
36
- pauseWriterThreshold : 1 ,
37
- resumeWriterThreshold : 1 ,
38
- minimumSegmentSize : writerOptions . MinimumBufferSize ,
39
- useSynchronizationContext : false ) ;
40
-
41
25
Input = PipeReader . Create ( Stream , readerOptions ) ;
42
-
43
- // We're using a pipe here because the HTTP/2 stack in Kestrel currently makes assumptions
44
- // about when it is ok to write to the PipeWriter. This should be reverted back to PipeWriter.Create once
45
- // those patterns are fixed.
46
- _output = new Pipe ( outputOptions ) ;
26
+ Output = PipeWriter . Create ( Stream , writerOptions ) ;
47
27
}
48
28
49
- public ILogger Log { get ; set ; }
50
-
51
29
public TStream Stream { get ; }
52
30
53
31
public PipeReader Input { get ; }
54
32
55
- public PipeWriter Output
56
- {
57
- get
58
- {
59
- if ( _outputTask == null )
60
- {
61
- _outputTask = WriteOutputAsync ( ) ;
62
- }
63
-
64
- return _output . Writer ;
65
- }
66
- }
33
+ public PipeWriter Output { get ; }
67
34
68
- public override ValueTask DisposeAsync ( )
35
+ protected override void Dispose ( bool disposing )
69
36
{
70
- lock ( _disposeLock )
71
- {
72
- if ( _disposed )
73
- {
74
- return default ;
75
- }
76
- _disposed = true ;
77
- }
78
-
79
37
Input . Complete ( ) ;
80
- _output . Writer . Complete ( ) ;
81
-
82
- if ( _outputTask == null || _outputTask . IsCompletedSuccessfully )
83
- {
84
- // Wait for the output task to complete, this ensures that we've copied
85
- // the application data to the underlying stream
86
- return default ;
87
- }
88
-
89
- return new ValueTask ( _outputTask ) ;
38
+ Output . Complete ( ) ;
39
+ base . Dispose ( disposing ) ;
90
40
}
91
41
92
- private async Task WriteOutputAsync ( )
42
+ public override ValueTask DisposeAsync ( )
93
43
{
94
- try
95
- {
96
- while ( true )
97
- {
98
- var result = await _output . Reader . ReadAsync ( ) ;
99
- var buffer = result . Buffer ;
100
-
101
- try
102
- {
103
- if ( buffer . IsEmpty )
104
- {
105
- if ( result . IsCompleted )
106
- {
107
- break ;
108
- }
109
-
110
- await Stream . FlushAsync ( ) ;
111
- }
112
- else if ( buffer . IsSingleSegment )
113
- {
114
- await Stream . WriteAsync ( buffer . First ) ;
115
- }
116
- else
117
- {
118
- foreach ( var memory in buffer )
119
- {
120
- await Stream . WriteAsync ( memory ) ;
121
- }
122
- }
123
- }
124
- finally
125
- {
126
- _output . Reader . AdvanceTo ( buffer . End ) ;
127
- }
128
- }
129
- }
130
- catch ( Exception ex )
131
- {
132
- Log ? . LogCritical ( 0 , ex , $ "{ GetType ( ) . Name } .{ nameof ( WriteOutputAsync ) } ") ;
133
- }
134
- finally
135
- {
136
- _output . Reader . Complete ( ) ;
137
- }
44
+ Input . Complete ( ) ;
45
+ Output . Complete ( ) ;
46
+ return base . DisposeAsync ( ) ;
138
47
}
139
48
}
140
49
}
141
-
0 commit comments