Skip to content

Commit b34efbd

Browse files
author
Paul Banks
committed
Support multiplexing multiple requests over persistent socket and making asynchronous requests.
1 parent b64c934 commit b34efbd

File tree

1 file changed

+250
-15
lines changed

1 file changed

+250
-15
lines changed

src/Adoy/FastCGI/Client.php

Lines changed: 250 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
*/
1010
namespace Adoy\FastCGI;
1111

12+
class TimedOutException extends \Exception {}
13+
1214
/**
1315
* Handles communication with a FastCGI application
1416
*
@@ -47,6 +49,11 @@ class Client
4749

4850
const HEADER_LEN = 8;
4951

52+
const REQ_STATE_WRITTEN = 1;
53+
const REQ_STATE_OK = 2;
54+
const REQ_STATE_ERR = 3;
55+
const REQ_STATE_TIMED_OUT = 4;
56+
5057
/**
5158
* Socket
5259
* @var Resource
@@ -71,6 +78,38 @@ class Client
7178
*/
7279
private $_keepAlive = false;
7380

81+
/**
82+
* Outstanding request statuses keyed by request id
83+
*
84+
* Each request is an array with following form:
85+
*
86+
* array(
87+
* 'state' => REQ_STATE_*
88+
* 'response' => null | string
89+
* )
90+
*
91+
* @var array
92+
*/
93+
private $_requests = array();
94+
95+
/**
96+
* Use persistent sockets to connect to backend
97+
* @var Boolean
98+
*/
99+
private $_persistentSocket = false;
100+
101+
/**
102+
* Connect timeout in milliseconds
103+
* @var Integer
104+
*/
105+
private $_connectTimeout = 5000;
106+
107+
/**
108+
* Read/Write timeout in milliseconds
109+
* @var Integer
110+
*/
111+
private $_readWriteTimeout = 5000;
112+
74113
/**
75114
* Constructor
76115
*
@@ -107,15 +146,105 @@ public function getKeepAlive()
107146
return $this->_keepAlive;
108147
}
109148

149+
/**
150+
* Define whether or not PHP should attempt to re-use sockets opened by previous
151+
* request for efficiency
152+
*
153+
* @param Boolean $b true if persistent socket should be used, false otherwise
154+
*/
155+
public function setPersistentSocket($b)
156+
{
157+
$was_persistent = ($this->_sock && $this->_persistentSocket);
158+
$this->_persistentSocket = (boolean)$b;
159+
if (!$this->_persistentSocket && $was_persistent) {
160+
fclose($this->_sock);
161+
}
162+
}
163+
164+
/**
165+
* Get the pesistent socket status
166+
*
167+
* @return Boolean true if the socket should be persistent, false otherwise
168+
*/
169+
public function getPersistentSocket()
170+
{
171+
return $this->_persistentSocket;
172+
}
173+
174+
175+
/**
176+
* Set the connect timeout
177+
*
178+
* @param Integer number of milliseconds before connect will timeout
179+
*/
180+
public function setConnectTimeout($timeoutMs)
181+
{
182+
$this->_connectTimeout = $timeoutMs;
183+
}
184+
185+
/**
186+
* Get the connect timeout
187+
*
188+
* @return Integer number of milliseconds before connect will timeout
189+
*/
190+
public function getConnectTimeout()
191+
{
192+
return $this->_connectTimeout;
193+
}
194+
195+
/**
196+
* Set the read/write timeout
197+
*
198+
* @param Integer number of milliseconds before read or write call will timeout
199+
*/
200+
public function setReadWriteTimeout($timeoutMs)
201+
{
202+
$this->_readWriteTimeout = $timeoutMs;
203+
$this->set_ms_timeout($this->_readWriteTimeout);
204+
}
205+
206+
/**
207+
* Get the read timeout
208+
*
209+
* @return Integer number of milliseconds before read will timeout
210+
*/
211+
public function getReadWriteTimeout()
212+
{
213+
return $this->_readWriteTimeout;
214+
}
215+
216+
/**
217+
* Helper to avoid duplicating milliseconds to secs/usecs in a few places
218+
*
219+
* @param Integer millisecond timeout
220+
* @return Boolean
221+
*/
222+
private function set_ms_timeout($timeoutMs) {
223+
if (!$this->_sock) {
224+
return false;
225+
}
226+
return stream_set_timeout($this->_sock, floor($timeoutMs / 1000), ($timeoutMs % 1000) * 1000);
227+
}
228+
229+
110230
/**
111231
* Create a connection to the FastCGI application
112232
*/
113233
private function connect()
114234
{
115235
if (!$this->_sock) {
116-
$this->_sock = fsockopen($this->_host, $this->_port, $errno, $errstr, 5);
236+
if ($this->_persistentSocket) {
237+
$this->_sock = pfsockopen($this->_host, $this->_port, $errno, $errstr, $this->_connectTimeout/1000);
238+
} else {
239+
$this->_sock = fsockopen($this->_host, $this->_port, $errno, $errstr, $this->_connectTimeout/1000);
240+
}
241+
117242
if (!$this->_sock) {
118-
throw new \Exception('Unable to connect to FastCGI application');
243+
throw new \Exception('Unable to connect to FastCGI application: ' . $errstr);
244+
}
245+
246+
if (!$this->set_ms_timeout($this->_readWriteTimeout)) {
247+
throw new \Exception('Unable to set timeout on socket');
119248
}
120249
}
121250
}
@@ -245,7 +374,7 @@ private function readPacket()
245374
}
246375
}
247376
if ($resp['paddingLength']) {
248-
$buf=fread($this->_sock, $resp['paddingLength']);
377+
$buf = fread($this->_sock, $resp['paddingLength']);
249378
}
250379
return $resp;
251380
} else {
@@ -286,38 +415,144 @@ public function getValues(array $requestedInfo)
286415
*/
287416
public function request(array $params, $stdin)
288417
{
289-
$response = '';
418+
$id = $this->async_request($params, $stdin);
419+
return $this->wait_for_response($id);
420+
}
421+
422+
/**
423+
* Execute a request to the FastCGI application asyncronously
424+
*
425+
* This sends request to application and returns the assigned ID for that request.
426+
*
427+
* You should keep this id for later use with wait_for_response(). Ids are chosen randomly
428+
* rather than seqentially to guard against false-positives when using persistent sockets.
429+
* In that case it is possible that a delayed response to a request made by a previous script
430+
* invocation comes back on this socket and is mistaken for response to request made with same ID
431+
* during this request.
432+
*
433+
* @param array $params Array of parameters
434+
* @param String $stdin Content
435+
* @return Integer
436+
*/
437+
public function async_request(array $params, $stdin)
438+
{
290439
$this->connect();
291440

292-
$request = $this->buildPacket(self::BEGIN_REQUEST, chr(0) . chr(self::RESPONDER) . chr((int) $this->_keepAlive) . str_repeat(chr(0), 5));
441+
// Pick random number between 1 and max 16 bit unsigned int 65535
442+
$id = mt_rand(1, (1 << 16) - 1);
443+
444+
// Using persistent sockets implies you want them keept alive by server!
445+
$keepAlive = intval($this->_keepAlive || $this->_persistentSocket);
446+
447+
$request = $this->buildPacket(self::BEGIN_REQUEST
448+
,chr(0) . chr(self::RESPONDER) . chr($keepAlive) . str_repeat(chr(0), 5)
449+
,$id
450+
);
293451

294452
$paramsRequest = '';
295453
foreach ($params as $key => $value) {
296-
$paramsRequest .= $this->buildNvpair($key, $value);
454+
$paramsRequest .= $this->buildNvpair($key, $value, $id);
297455
}
298456
if ($paramsRequest) {
299-
$request .= $this->buildPacket(self::PARAMS, $paramsRequest);
457+
$request .= $this->buildPacket(self::PARAMS, $paramsRequest, $id);
300458
}
301-
$request .= $this->buildPacket(self::PARAMS, '');
459+
$request .= $this->buildPacket(self::PARAMS, '', $id);
302460

303461
if ($stdin) {
304-
$request .= $this->buildPacket(self::STDIN, $stdin);
462+
$request .= $this->buildPacket(self::STDIN, $stdin, $id);
463+
}
464+
$request .= $this->buildPacket(self::STDIN, '', $id);
465+
466+
if (fwrite($this->_sock, $request) === false || fflush($this->_sock) === false) {
467+
468+
$info = stream_get_meta_data($this->_sock);
469+
470+
if ($info['timed_out']) {
471+
throw new TimedOutException('Write timed out');
472+
}
473+
474+
// Broken pipe, tear down so future requests might succeed
475+
fclose($this->_sock);
476+
throw new \Exception('Failed to write request to socket');
477+
}
478+
479+
$this->_requests[$id] = array(
480+
'state' => self::REQ_STATE_WRITTEN,
481+
'response' => null
482+
);
483+
484+
return $id;
485+
}
486+
487+
/**
488+
* Blocking call that waits for response to specific request
489+
*
490+
* @param Integer $requestId
491+
* @param Integer $timeoutMs [optional] the number of milliseconds to wait. Defaults to the ReadWriteTimeout value set.
492+
* @return string response body
493+
*/
494+
public function wait_for_response($requestId, $timeoutMs = 0) {
495+
496+
if (!isset($this->_requests[$requestId])) {
497+
throw new \Exception('Invalid request id given');
498+
}
499+
500+
// If we already read the response during an earlier call for different id, just return it
501+
if ($this->_requests[$requestId]['state'] == self::REQ_STATE_OK
502+
|| $this->_requests[$requestId]['state'] == self::REQ_STATE_ERR
503+
) {
504+
return $this->_requests[$requestId]['response'];
505+
}
506+
507+
if ($timeoutMs > 0) {
508+
// Reset timeout on socket for now
509+
$this->set_ms_timeout($timeoutMs);
510+
} else {
511+
$timeoutMs = $this->_readWriteTimeout;
305512
}
306-
$request .= $this->buildPacket(self::STDIN, '');
307513

308-
fwrite($this->_sock, $request);
514+
// Need to manually check since we might do several reads none of which timeout themselves
515+
// but still not get the response requested
516+
$startTime = microtime(true);
309517

310518
do {
311519
$resp = $this->readPacket();
520+
312521
if ($resp['type'] == self::STDOUT || $resp['type'] == self::STDERR) {
313-
$response .= $resp['content'];
522+
if ($resp['type'] == self::STDERR) {
523+
$this->_requests[$resp['requestId']]['state'] = self::REQ_STATE_ERR;
524+
}
525+
$this->_requests[$resp['requestId']]['response'] .= $resp['content'];
526+
}
527+
if ($resp['type'] == self::END_REQUEST) {
528+
$this->_requests[$resp['requestId']]['state'] = self::REQ_STATE_OK;
529+
if ($resp['requestId'] == $requestId) {
530+
break;
531+
}
532+
}
533+
if (microtime(true) - $startTime >= ($timeoutMs * 1000)) {
534+
// Reset
535+
$this->set_ms_timeout($this->_readWriteTimeout);
536+
throw new \Exception('Timed out');
314537
}
315-
} while ($resp && $resp['type'] != self::END_REQUEST);
538+
} while ($resp);
316539

317540
if (!is_array($resp)) {
318-
throw new \Exception('Bad request');
541+
$info = stream_get_meta_data($this->_sock);
542+
543+
// We must reset timeout but it must be AFTER we get info
544+
$this->set_ms_timeout($this->_readWriteTimeout);
545+
546+
if ($info['timed_out']) {
547+
throw new TimedOutException('Read timed out');
548+
}
549+
550+
throw new \Exception('Read failed');
319551
}
320552

553+
// Reset timeout
554+
$this->set_ms_timeout($this->_readWriteTimeout);
555+
321556
switch (ord($resp['content']{4})) {
322557
case self::CANT_MPX_CONN:
323558
throw new \Exception('This app can\'t multiplex [CANT_MPX_CONN]');
@@ -329,7 +564,7 @@ public function request(array $params, $stdin)
329564
throw new \Exception('Role value not known [UNKNOWN_ROLE]');
330565
break;
331566
case self::REQUEST_COMPLETE:
332-
return $response;
567+
return $this->_requests[$requestId]['response'];
333568
}
334569
}
335570
}

0 commit comments

Comments
 (0)