@@ -20,6 +20,11 @@ class Watch implements WatchIteratorInterface
2020 */
2121 const DEFAULT_STREAM_READ_LENGTH = 8192 ;
2222
23+ /**
24+ * Default deadPeerDetectionTimeout
25+ */
26+ const DEFAULT_DEAD_PEER_DETECTION_TIMEOUT = 600 ;
27+
2328 /**
2429 * Client instance;
2530 *
@@ -99,6 +104,28 @@ class Watch implements WatchIteratorInterface
99104 */
100105 private $ streamReadLength = self ::DEFAULT_STREAM_READ_LENGTH ;
101106
107+ /**
108+ * Time after which no data has been received that the socket connection is reestablished
109+ *
110+ * @var int
111+ */
112+ private $ deadPeerDetectionTimeout = self ::DEFAULT_DEAD_PEER_DETECTION_TIMEOUT ;
113+
114+ /**
115+ * Used to keep track of the very last time data was successfully read from the socket.
116+ * If the time + deadPeerDetectionTimeout is < 'now' then the socket/connection is re-established
117+ *
118+ * @var int
119+ */
120+ private $ lastBytesReadTimestamp = 0 ;
121+
122+ /**
123+ * Used to track when a connection is made
124+ *
125+ * @var int
126+ */
127+ private $ handleStartTimestamp = 0 ;
128+
102129 /**
103130 * Watch constructor.
104131 *
@@ -178,6 +205,7 @@ private function getHandle()
178205 }
179206 stream_set_timeout ($ handle , 0 , $ this ->getStreamTimeout ());
180207 $ this ->handle = $ handle ;
208+ $ this ->setHandleStartTimestamp (time ());
181209 }
182210
183211 return $ this ->handle ;
@@ -266,6 +294,46 @@ public function getStreamReadLength()
266294 return $ this ->streamReadLength ;
267295 }
268296
297+ /**
298+ * Set deadPeerDetectionTimeout (seconds)
299+ *
300+ * @param $value
301+ */
302+ public function setDeadPeerDetectionTimeout ($ value )
303+ {
304+ $ this ->deadPeerDetectionTimeout = (int ) $ value ;
305+ }
306+
307+ /**
308+ * Get deadPeerDetectionTimeout (seconds)
309+ *
310+ * @return int
311+ */
312+ public function getDeadPeerDetectionTimeout ()
313+ {
314+ return $ this ->deadPeerDetectionTimeout ;
315+ }
316+
317+ /**
318+ * Get handleStartTimestamp
319+ *
320+ * @return int
321+ */
322+ private function getHandleStartTimestamp ()
323+ {
324+ return $ this ->handleStartTimestamp ;
325+ }
326+
327+ /**
328+ * Set handleStartTimestamp
329+ *
330+ * @param $value
331+ */
332+ private function setHandleStartTimestamp ($ value )
333+ {
334+ $ this ->handleStartTimestamp = (int ) $ value ;
335+ }
336+
269337 /**
270338 * Set resourceVersion
271339 *
@@ -310,6 +378,26 @@ private function getResourceVersionLastSuccess()
310378 return $ this ->resourceVersionLastSuccess ;
311379 }
312380
381+ /**
382+ * Set lastBytesReadTimestamp
383+ *
384+ * @param $value
385+ */
386+ private function setLastBytesReadTimestamp ($ value )
387+ {
388+ $ this ->lastBytesReadTimestamp = (int ) $ value ;
389+ }
390+
391+ /**
392+ * Get lastBytesReadTimestamp
393+ *
394+ * @return int
395+ */
396+ private function getLastBytesReadTimestamp ()
397+ {
398+ return $ this ->lastBytesReadTimestamp ;
399+ }
400+
313401 /**
314402 * Read and process event messages (closure/callback)
315403 *
@@ -326,6 +414,16 @@ private function internal_iterator($cycles = 0)
326414 return ;
327415 }
328416
417+ // detect dead peers
418+ $ now = time ();
419+ if ($ this ->getDeadPeerDetectionTimeout () > 0 &&
420+ $ now >= ($ this ->getHandleStartTimestamp () + $ this ->getDeadPeerDetectionTimeout ()) &&
421+ $ now >= ($ this ->getLastBytesReadTimestamp () + $ this ->getDeadPeerDetectionTimeout ())
422+ ) {
423+ $ this ->resetHandle ();
424+ $ handle = $ this ->getHandle ();
425+ }
426+
329427 //$meta = stream_get_meta_data($handle);
330428 if (feof ($ handle )) {
331429 if ($ this ->params ['timeoutSeconds ' ] > 0 ) {
@@ -341,6 +439,11 @@ private function internal_iterator($cycles = 0)
341439 if ($ data === false ) {
342440 throw new \Exception ('Failed to read bytes from stream: ' . $ this ->getClient ()->getConfig ()->getServer ());
343441 }
442+
443+ if (strlen ($ data ) > 0 ) {
444+ $ this ->setLastBytesReadTimestamp (time ());
445+ }
446+
344447 $ this ->buffer .= $ data ;
345448
346449 //break immediately if nothing is on the buffer
@@ -395,6 +498,7 @@ private function internal_iterator($cycles = 0)
395498 *
396499 * @param int $cycles
397500 * @throws \Exception
501+ * @return void
398502 */
399503 private function internal_generator ($ cycles = 0 )
400504 {
@@ -406,6 +510,16 @@ private function internal_generator($cycles = 0)
406510 return ;
407511 }
408512
513+ // detect dead peers
514+ $ now = time ();
515+ if ($ this ->getDeadPeerDetectionTimeout () > 0 &&
516+ $ now >= ($ this ->getHandleStartTimestamp () + $ this ->getDeadPeerDetectionTimeout ()) &&
517+ $ now >= ($ this ->getLastBytesReadTimestamp () + $ this ->getDeadPeerDetectionTimeout ())
518+ ) {
519+ $ this ->resetHandle ();
520+ $ handle = $ this ->getHandle ();
521+ }
522+
409523 //$meta = stream_get_meta_data($handle);
410524 if (feof ($ handle )) {
411525 if ($ this ->params ['timeoutSeconds ' ] > 0 ) {
@@ -421,6 +535,11 @@ private function internal_generator($cycles = 0)
421535 if ($ data === false ) {
422536 throw new \Exception ('Failed to read bytes from stream: ' . $ this ->getClient ()->getConfig ()->getServer ());
423537 }
538+
539+ if (strlen ($ data ) > 0 ) {
540+ $ this ->setLastBytesReadTimestamp (time ());
541+ }
542+
424543 $ this ->buffer .= $ data ;
425544
426545 //break immediately if nothing is on the buffer
@@ -539,6 +658,7 @@ public function start($cycles = 0)
539658 *
540659 * @param int $cycles
541660 * @throws \Exception
661+ * @return void
542662 */
543663 public function stream ($ cycles = 0 )
544664 {
0 commit comments