7
7
use Http \Client \Exception \HttpException ;
8
8
use Http \Client \Exception \RequestException ;
9
9
use Http \Discovery \MessageFactoryDiscovery ;
10
+ use Http \Discovery \StreamFactoryDiscovery ;
10
11
use Http \Message \ResponseFactory ;
12
+ use Http \Message \StreamFactory ;
11
13
use Psr \Http \Message \RequestInterface ;
12
14
use Psr \Http \Message \ResponseInterface ;
13
15
use Psr \Http \Message \StreamInterface ;
@@ -43,17 +45,24 @@ class Client implements HttpClient, HttpAsyncClient
43
45
*/
44
46
private $ responseFactory ;
45
47
48
+ /**
49
+ * @var StreamFactory
50
+ */
51
+ private $ streamFactory ;
52
+
46
53
/**
47
54
* Initialize the React client.
48
55
*
49
56
* @param ResponseFactory|null $responseFactory
50
57
* @param LoopInterface|null $loop
51
58
* @param ReactClient|null $client
59
+ * @param StreamFactory|null $streamFactory
52
60
*/
53
61
public function __construct (
54
62
ResponseFactory $ responseFactory = null ,
55
63
LoopInterface $ loop = null ,
56
- ReactClient $ client = null
64
+ ReactClient $ client = null ,
65
+ StreamFactory $ streamFactory = null
57
66
) {
58
67
if (null !== $ client && null === $ loop ) {
59
68
throw new \RuntimeException (
@@ -65,6 +74,7 @@ public function __construct(
65
74
$ this ->client = $ client ?: ReactFactory::buildHttpClient ($ this ->loop );
66
75
67
76
$ this ->responseFactory = $ responseFactory ?: MessageFactoryDiscovery::find ();
77
+ $ this ->streamFactory = $ streamFactory ?: StreamFactoryDiscovery::find ();
68
78
}
69
79
70
80
/**
@@ -94,17 +104,12 @@ public function sendAsyncRequest(RequestInterface $request)
94
104
});
95
105
96
106
$ reactRequest ->on ('response ' , function (ReactResponse $ reactResponse = null ) use ($ deferred , $ reactRequest , $ request ) {
97
- $ bodyStream = null ;
107
+ $ bodyStream = $ this -> streamFactory -> createStream () ;
98
108
$ reactResponse ->on ('data ' , function ($ data ) use (&$ bodyStream ) {
99
- if ($ data instanceof StreamInterface) {
100
- $ bodyStream = $ data ;
101
- } else {
102
- $ bodyStream ->write ($ data );
103
- }
109
+ $ bodyStream ->write ((string ) $ data );
104
110
});
105
111
106
112
$ reactResponse ->on ('end ' , function (\Exception $ error = null ) use ($ deferred , $ request , $ reactResponse , &$ bodyStream ) {
107
- $ bodyStream ->rewind ();
108
113
$ response = $ this ->buildResponse (
109
114
$ reactResponse ,
110
115
$ bodyStream
@@ -158,7 +163,8 @@ private function buildReactRequest(RequestInterface $request)
158
163
/**
159
164
* Transform a React Response to a valid PSR7 ResponseInterface instance.
160
165
*
161
- * @param ReactResponse $response
166
+ * @param ReactResponse $response
167
+ * @param StreamInterface $body
162
168
*
163
169
* @return ResponseInterface
164
170
*/
0 commit comments