Skip to content

Commit f30fb12

Browse files
authored
Merge pull request #346 from clue-labs/reuse-parser
Internal refactoring to reuse single request parser for all requests
2 parents 809a514 + 6e7dd66 commit f30fb12

File tree

3 files changed

+236
-140
lines changed

3 files changed

+236
-140
lines changed

src/Io/RequestHeaderParser.php

Lines changed: 65 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
namespace React\Http\Io;
44

55
use Evenement\EventEmitter;
6+
use Psr\Http\Message\ServerRequestInterface;
7+
use React\Socket\ConnectionInterface;
68
use RingCentral\Psr7 as g7;
79
use Exception;
810

@@ -19,47 +21,73 @@
1921
*/
2022
class RequestHeaderParser extends EventEmitter
2123
{
22-
private $buffer = '';
2324
private $maxSize = 8192;
2425

25-
private $localSocketUri;
26-
private $remoteSocketUri;
27-
28-
public function __construct($localSocketUri = null, $remoteSocketUri = null)
26+
public function handle(ConnectionInterface $conn)
2927
{
30-
$this->localSocketUri = $localSocketUri;
31-
$this->remoteSocketUri = $remoteSocketUri;
32-
}
28+
$buffer = '';
29+
$maxSize = $this->maxSize;
30+
$that = $this;
31+
$conn->on('data', $fn = function ($data) use (&$buffer, &$fn, $conn, $maxSize, $that) {
32+
// append chunk of data to buffer and look for end of request headers
33+
$buffer .= $data;
34+
$endOfHeader = \strpos($buffer, "\r\n\r\n");
35+
36+
// reject request if buffer size is exceeded
37+
if ($endOfHeader > $maxSize || ($endOfHeader === false && isset($buffer[$maxSize]))) {
38+
$conn->removeListener('data', $fn);
39+
$fn = null;
40+
41+
$that->emit('error', array(
42+
new \OverflowException("Maximum header size of {$maxSize} exceeded.", 431),
43+
$conn
44+
));
45+
return;
46+
}
3347

34-
public function feed($data)
35-
{
36-
$this->buffer .= $data;
37-
$endOfHeader = \strpos($this->buffer, "\r\n\r\n");
48+
// ignore incomplete requests
49+
if ($endOfHeader === false) {
50+
return;
51+
}
3852

39-
if ($endOfHeader > $this->maxSize || ($endOfHeader === false && isset($this->buffer[$this->maxSize]))) {
40-
$this->emit('error', array(new \OverflowException("Maximum header size of {$this->maxSize} exceeded.", 431), $this));
41-
$this->removeAllListeners();
42-
return;
43-
}
53+
// request headers received => try to parse request
54+
$conn->removeListener('data', $fn);
55+
$fn = null;
4456

45-
if (false !== $endOfHeader) {
4657
try {
47-
$this->parseAndEmitRequest($endOfHeader);
58+
$request = $that->parseRequest(
59+
(string)\substr($buffer, 0, $endOfHeader),
60+
$conn->getRemoteAddress(),
61+
$conn->getLocalAddress()
62+
);
4863
} catch (Exception $exception) {
49-
$this->emit('error', array($exception));
64+
$buffer = '';
65+
$that->emit('error', array(
66+
$exception,
67+
$conn
68+
));
69+
return;
5070
}
51-
$this->removeAllListeners();
52-
}
53-
}
5471

55-
private function parseAndEmitRequest($endOfHeader)
56-
{
57-
$request = $this->parseRequest((string)\substr($this->buffer, 0, $endOfHeader));
58-
$bodyBuffer = isset($this->buffer[$endOfHeader + 4]) ? \substr($this->buffer, $endOfHeader + 4) : '';
59-
$this->emit('headers', array($request, $bodyBuffer));
72+
$bodyBuffer = isset($buffer[$endOfHeader + 4]) ? \substr($buffer, $endOfHeader + 4) : '';
73+
$buffer = '';
74+
$that->emit('headers', array($request, $bodyBuffer, $conn));
75+
});
76+
77+
$conn->on('close', function () use (&$buffer, &$fn) {
78+
$fn = $buffer = null;
79+
});
6080
}
6181

62-
private function parseRequest($headers)
82+
/**
83+
* @param string $headers buffer string containing request headers only
84+
* @param ?string $remoteSocketUri
85+
* @param ?string $localSocketUri
86+
* @return ServerRequestInterface
87+
* @throws \InvalidArgumentException
88+
* @internal
89+
*/
90+
public function parseRequest($headers, $remoteSocketUri, $localSocketUri)
6391
{
6492
// additional, stricter safe-guard for request line
6593
// because request parser doesn't properly cope with invalid ones
@@ -99,22 +127,22 @@ private function parseRequest($headers)
99127

100128
// apply REMOTE_ADDR and REMOTE_PORT if source address is known
101129
// address should always be known, unless this is over Unix domain sockets (UDS)
102-
if ($this->remoteSocketUri !== null) {
103-
$remoteAddress = \parse_url($this->remoteSocketUri);
130+
if ($remoteSocketUri !== null) {
131+
$remoteAddress = \parse_url($remoteSocketUri);
104132
$serverParams['REMOTE_ADDR'] = $remoteAddress['host'];
105133
$serverParams['REMOTE_PORT'] = $remoteAddress['port'];
106134
}
107135

108136
// apply SERVER_ADDR and SERVER_PORT if server address is known
109137
// address should always be known, even for Unix domain sockets (UDS)
110138
// but skip UDS as it doesn't have a concept of host/port.s
111-
if ($this->localSocketUri !== null) {
112-
$localAddress = \parse_url($this->localSocketUri);
139+
if ($localSocketUri !== null) {
140+
$localAddress = \parse_url($localSocketUri);
113141
if (isset($localAddress['host'], $localAddress['port'])) {
114142
$serverParams['SERVER_ADDR'] = $localAddress['host'];
115143
$serverParams['SERVER_PORT'] = $localAddress['port'];
116144
}
117-
if (isset($localAddress['scheme']) && $localAddress['scheme'] === 'https') {
145+
if (isset($localAddress['scheme']) && $localAddress['scheme'] === 'tls') {
118146
$serverParams['HTTPS'] = 'on';
119147
}
120148
}
@@ -173,7 +201,7 @@ private function parseRequest($headers)
173201

174202
// set URI components from socket address if not already filled via Host header
175203
if ($request->getUri()->getHost() === '') {
176-
$parts = \parse_url($this->localSocketUri);
204+
$parts = \parse_url($localSocketUri);
177205
if (!isset($parts['host'], $parts['port'])) {
178206
$parts = array('host' => '127.0.0.1', 'port' => 80);
179207
}
@@ -194,8 +222,8 @@ private function parseRequest($headers)
194222
}
195223

196224
// Update request URI to "https" scheme if the connection is encrypted
197-
$parts = \parse_url($this->localSocketUri);
198-
if (isset($parts['scheme']) && $parts['scheme'] === 'https') {
225+
$parts = \parse_url($localSocketUri);
226+
if (isset($parts['scheme']) && $parts['scheme'] === 'tls') {
199227
// The request URI may omit default ports here, so try to parse port
200228
// from Host header field (if possible)
201229
$port = $request->getUri()->getPort();

src/StreamingServer.php

Lines changed: 23 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
final class StreamingServer extends EventEmitter
8888
{
8989
private $callback;
90+
private $parser;
9091

9192
/**
9293
* Creates an HTTP server that invokes the given callback for each incoming HTTP request
@@ -108,6 +109,27 @@ public function __construct($requestHandler)
108109
}
109110

110111
$this->callback = $requestHandler;
112+
$this->parser = new RequestHeaderParser();
113+
114+
$that = $this;
115+
$this->parser->on('headers', function (ServerRequestInterface $request, $bodyBuffer, ConnectionInterface $conn) use ($that) {
116+
$that->handleRequest($conn, $request);
117+
118+
if ($bodyBuffer !== '') {
119+
$conn->emit('data', array($bodyBuffer));
120+
}
121+
});
122+
123+
$this->parser->on('error', function(\Exception $e, ConnectionInterface $conn) use ($that) {
124+
$that->emit('error', array($e));
125+
126+
// parsing failed => assume dummy request and send appropriate error
127+
$that->writeError(
128+
$conn,
129+
$e->getCode() !== 0 ? $e->getCode() : 400,
130+
new ServerRequest('GET', '/')
131+
);
132+
});
111133
}
112134

113135
/**
@@ -154,47 +176,7 @@ public function __construct($requestHandler)
154176
*/
155177
public function listen(ServerInterface $socket)
156178
{
157-
$socket->on('connection', array($this, 'handleConnection'));
158-
}
159-
160-
/** @internal */
161-
public function handleConnection(ConnectionInterface $conn)
162-
{
163-
$uriLocal = $conn->getLocalAddress();
164-
if ($uriLocal !== null) {
165-
// local URI known, so translate transport scheme to application scheme
166-
$uriLocal = \strtr($uriLocal, array('tcp://' => 'http://', 'tls://' => 'https://'));
167-
}
168-
169-
$uriRemote = $conn->getRemoteAddress();
170-
171-
$that = $this;
172-
$parser = new RequestHeaderParser($uriLocal, $uriRemote);
173-
174-
$listener = array($parser, 'feed');
175-
$parser->on('headers', function (ServerRequestInterface $request, $bodyBuffer) use ($conn, $listener, $that) {
176-
// parsing request completed => stop feeding parser
177-
$conn->removeListener('data', $listener);
178-
179-
$that->handleRequest($conn, $request);
180-
181-
if ($bodyBuffer !== '') {
182-
$conn->emit('data', array($bodyBuffer));
183-
}
184-
});
185-
186-
$conn->on('data', $listener);
187-
$parser->on('error', function(\Exception $e) use ($conn, $listener, $that) {
188-
$conn->removeListener('data', $listener);
189-
$that->emit('error', array($e));
190-
191-
// parsing failed => assume dummy request and send appropriate error
192-
$that->writeError(
193-
$conn,
194-
$e->getCode() !== 0 ? $e->getCode() : 400,
195-
new ServerRequest('GET', '/')
196-
);
197-
});
179+
$socket->on('connection', array($this->parser, 'handle'));
198180
}
199181

200182
/** @internal */

0 commit comments

Comments
 (0)