Skip to content

Commit 57cb83e

Browse files
authored
Merge pull request #139 from clue-labs/closing
Fix separately removing readable and writable side of stream when closing
2 parents 3f4ea9a + 88b5c2a commit 57cb83e

File tree

3 files changed

+144
-172
lines changed

3 files changed

+144
-172
lines changed

src/ExtEventLoop.php

Lines changed: 40 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,12 @@ final class ExtEventLoop implements LoopInterface
2727
private $timerCallback;
2828
private $timerEvents;
2929
private $streamCallback;
30-
private $streamEvents = [];
31-
private $streamFlags = [];
32-
private $streamRefs = [];
30+
private $readEvents = [];
31+
private $writeEvents = [];
3332
private $readListeners = [];
3433
private $writeListeners = [];
34+
private $readRefs = [];
35+
private $writeRefs = [];
3536
private $running;
3637
private $signals;
3738
private $signalEvents = [];
@@ -70,56 +71,65 @@ function ($signal) {
7071
public function addReadStream($stream, callable $listener)
7172
{
7273
$key = (int) $stream;
74+
if (isset($this->readListeners[$key])) {
75+
return;
76+
}
7377

74-
if (!isset($this->readListeners[$key])) {
75-
$this->readListeners[$key] = $listener;
76-
$this->subscribeStreamEvent($stream, Event::READ);
78+
$event = new Event($this->eventBase, $stream, Event::PERSIST | Event::READ, $this->streamCallback);
79+
$event->add();
80+
$this->readEvents[$key] = $event;
81+
$this->readListeners[$key] = $listener;
82+
83+
// ext-event does not increase refcount on stream resources for PHP 7+
84+
// manually keep track of stream resource to prevent premature garbage collection
85+
if (PHP_VERSION_ID >= 70000) {
86+
$this->readRefs[$key] = $stream;
7787
}
7888
}
7989

8090
public function addWriteStream($stream, callable $listener)
8191
{
8292
$key = (int) $stream;
83-
84-
if (!isset($this->writeListeners[$key])) {
85-
$this->writeListeners[$key] = $listener;
86-
$this->subscribeStreamEvent($stream, Event::WRITE);
93+
if (isset($this->writeListeners[$key])) {
94+
return;
8795
}
88-
}
8996

90-
public function removeReadStream($stream)
91-
{
92-
$key = (int) $stream;
97+
$event = new Event($this->eventBase, $stream, Event::PERSIST | Event::WRITE, $this->streamCallback);
98+
$event->add();
99+
$this->writeEvents[$key] = $event;
100+
$this->writeListeners[$key] = $listener;
93101

94-
if (isset($this->readListeners[$key])) {
95-
unset($this->readListeners[$key]);
96-
$this->unsubscribeStreamEvent($stream, Event::READ);
102+
// ext-event does not increase refcount on stream resources for PHP 7+
103+
// manually keep track of stream resource to prevent premature garbage collection
104+
if (PHP_VERSION_ID >= 70000) {
105+
$this->writeRefs[$key] = $stream;
97106
}
98107
}
99108

100-
public function removeWriteStream($stream)
109+
public function removeReadStream($stream)
101110
{
102111
$key = (int) $stream;
103112

104-
if (isset($this->writeListeners[$key])) {
105-
unset($this->writeListeners[$key]);
106-
$this->unsubscribeStreamEvent($stream, Event::WRITE);
113+
if (isset($this->readEvents[$key])) {
114+
$this->readEvents[$key]->free();
115+
unset(
116+
$this->readEvents[$key],
117+
$this->readListeners[$key],
118+
$this->readRefs[$key]
119+
);
107120
}
108121
}
109122

110-
private function removeStream($stream)
123+
public function removeWriteStream($stream)
111124
{
112125
$key = (int) $stream;
113126

114-
if (isset($this->streamEvents[$key])) {
115-
$this->streamEvents[$key]->free();
116-
127+
if (isset($this->writeEvents[$key])) {
128+
$this->writeEvents[$key]->free();
117129
unset(
118-
$this->streamFlags[$key],
119-
$this->streamEvents[$key],
120-
$this->readListeners[$key],
130+
$this->writeEvents[$key],
121131
$this->writeListeners[$key],
122-
$this->streamRefs[$key]
132+
$this->writeRefs[$key]
123133
);
124134
}
125135
}
@@ -175,7 +185,7 @@ public function run()
175185
$flags = EventBase::LOOP_ONCE;
176186
if (!$this->running || !$this->futureTickQueue->isEmpty()) {
177187
$flags |= EventBase::LOOP_NONBLOCK;
178-
} elseif (!$this->streamEvents && !$this->timerEvents->count()) {
188+
} elseif (!$this->readEvents && !$this->writeEvents && !$this->timerEvents->count()) {
179189
break;
180190
}
181191

@@ -207,64 +217,6 @@ private function scheduleTimer(TimerInterface $timer)
207217
$event->add($timer->getInterval());
208218
}
209219

210-
/**
211-
* Create a new ext-event Event object, or update the existing one.
212-
*
213-
* @param resource $stream
214-
* @param integer $flag Event::READ or Event::WRITE
215-
*/
216-
private function subscribeStreamEvent($stream, $flag)
217-
{
218-
$key = (int) $stream;
219-
220-
if (isset($this->streamEvents[$key])) {
221-
$event = $this->streamEvents[$key];
222-
$flags = ($this->streamFlags[$key] |= $flag);
223-
224-
$event->del();
225-
$event->set($this->eventBase, $stream, Event::PERSIST | $flags, $this->streamCallback);
226-
} else {
227-
$event = new Event($this->eventBase, $stream, Event::PERSIST | $flag, $this->streamCallback);
228-
229-
$this->streamEvents[$key] = $event;
230-
$this->streamFlags[$key] = $flag;
231-
232-
// ext-event does not increase refcount on stream resources for PHP 7+
233-
// manually keep track of stream resource to prevent premature garbage collection
234-
if (PHP_VERSION_ID >= 70000) {
235-
$this->streamRefs[$key] = $stream;
236-
}
237-
}
238-
239-
$event->add();
240-
}
241-
242-
/**
243-
* Update the ext-event Event object for this stream to stop listening to
244-
* the given event type, or remove it entirely if it's no longer needed.
245-
*
246-
* @param resource $stream
247-
* @param integer $flag Event::READ or Event::WRITE
248-
*/
249-
private function unsubscribeStreamEvent($stream, $flag)
250-
{
251-
$key = (int) $stream;
252-
253-
$flags = $this->streamFlags[$key] &= ~$flag;
254-
255-
if (0 === $flags) {
256-
$this->removeStream($stream);
257-
258-
return;
259-
}
260-
261-
$event = $this->streamEvents[$key];
262-
263-
$event->del();
264-
$event->set($this->eventBase, $stream, Event::PERSIST | $flags, $this->streamCallback);
265-
$event->add();
266-
}
267-
268220
/**
269221
* Create a callback used as the target of timer events.
270222
*

src/ExtLibeventLoop.php

Lines changed: 33 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ final class ExtLibeventLoop implements LoopInterface
4343
private $timerCallback;
4444
private $timerEvents;
4545
private $streamCallback;
46-
private $streamEvents = [];
47-
private $streamFlags = [];
46+
private $readEvents = [];
47+
private $writeEvents = [];
4848
private $readListeners = [];
4949
private $writeListeners = [];
5050
private $running;
@@ -88,30 +88,48 @@ function ($signal) {
8888
public function addReadStream($stream, callable $listener)
8989
{
9090
$key = (int) $stream;
91-
92-
if (!isset($this->readListeners[$key])) {
93-
$this->readListeners[$key] = $listener;
94-
$this->subscribeStreamEvent($stream, EV_READ);
91+
if (isset($this->readListeners[$key])) {
92+
return;
9593
}
94+
95+
$event = event_new();
96+
event_set($event, $stream, EV_PERSIST | EV_READ, $this->streamCallback);
97+
event_base_set($event, $this->eventBase);
98+
event_add($event);
99+
100+
$this->readEvents[$key] = $event;
101+
$this->readListeners[$key] = $listener;
96102
}
97103

98104
public function addWriteStream($stream, callable $listener)
99105
{
100106
$key = (int) $stream;
101-
102-
if (!isset($this->writeListeners[$key])) {
103-
$this->writeListeners[$key] = $listener;
104-
$this->subscribeStreamEvent($stream, EV_WRITE);
107+
if (isset($this->writeListeners[$key])) {
108+
return;
105109
}
110+
111+
$event = event_new();
112+
event_set($event, $stream, EV_PERSIST | EV_WRITE, $this->streamCallback);
113+
event_base_set($event, $this->eventBase);
114+
event_add($event);
115+
116+
$this->writeEvents[$key] = $event;
117+
$this->writeListeners[$key] = $listener;
106118
}
107119

108120
public function removeReadStream($stream)
109121
{
110122
$key = (int) $stream;
111123

112124
if (isset($this->readListeners[$key])) {
113-
unset($this->readListeners[$key]);
114-
$this->unsubscribeStreamEvent($stream, EV_READ);
125+
$event = $this->readEvents[$key];
126+
event_del($event);
127+
event_free($event);
128+
129+
unset(
130+
$this->readEvents[$key],
131+
$this->readListeners[$key]
132+
);
115133
}
116134
}
117135

@@ -120,25 +138,12 @@ public function removeWriteStream($stream)
120138
$key = (int) $stream;
121139

122140
if (isset($this->writeListeners[$key])) {
123-
unset($this->writeListeners[$key]);
124-
$this->unsubscribeStreamEvent($stream, EV_WRITE);
125-
}
126-
}
127-
128-
private function removeStream($stream)
129-
{
130-
$key = (int) $stream;
131-
132-
if (isset($this->streamEvents[$key])) {
133-
$event = $this->streamEvents[$key];
134-
141+
$event = $this->writeEvents[$key];
135142
event_del($event);
136143
event_free($event);
137144

138145
unset(
139-
$this->streamFlags[$key],
140-
$this->streamEvents[$key],
141-
$this->readListeners[$key],
146+
$this->writeEvents[$key],
142147
$this->writeListeners[$key]
143148
);
144149
}
@@ -166,7 +171,6 @@ public function cancelTimer(TimerInterface $timer)
166171
{
167172
if ($this->timerEvents->contains($timer)) {
168173
$event = $this->timerEvents[$timer];
169-
170174
event_del($event);
171175
event_free($event);
172176

@@ -199,7 +203,7 @@ public function run()
199203
$flags = EVLOOP_ONCE;
200204
if (!$this->running || !$this->futureTickQueue->isEmpty()) {
201205
$flags |= EVLOOP_NONBLOCK;
202-
} elseif (!$this->streamEvents && !$this->timerEvents->count()) {
206+
} elseif (!$this->readEvents && !$this->writeEvents && !$this->timerEvents->count()) {
203207
break;
204208
}
205209

@@ -226,61 +230,6 @@ private function scheduleTimer(TimerInterface $timer)
226230
event_add($event, $timer->getInterval() * self::MICROSECONDS_PER_SECOND);
227231
}
228232

229-
/**
230-
* Create a new ext-libevent event resource, or update the existing one.
231-
*
232-
* @param resource $stream
233-
* @param integer $flag EV_READ or EV_WRITE
234-
*/
235-
private function subscribeStreamEvent($stream, $flag)
236-
{
237-
$key = (int) $stream;
238-
239-
if (isset($this->streamEvents[$key])) {
240-
$event = $this->streamEvents[$key];
241-
$flags = $this->streamFlags[$key] |= $flag;
242-
243-
event_del($event);
244-
event_set($event, $stream, EV_PERSIST | $flags, $this->streamCallback);
245-
} else {
246-
$event = event_new();
247-
248-
event_set($event, $stream, EV_PERSIST | $flag, $this->streamCallback);
249-
event_base_set($event, $this->eventBase);
250-
251-
$this->streamEvents[$key] = $event;
252-
$this->streamFlags[$key] = $flag;
253-
}
254-
255-
event_add($event);
256-
}
257-
258-
/**
259-
* Update the ext-libevent event resource for this stream to stop listening to
260-
* the given event type, or remove it entirely if it's no longer needed.
261-
*
262-
* @param resource $stream
263-
* @param integer $flag EV_READ or EV_WRITE
264-
*/
265-
private function unsubscribeStreamEvent($stream, $flag)
266-
{
267-
$key = (int) $stream;
268-
269-
$flags = $this->streamFlags[$key] &= ~$flag;
270-
271-
if (0 === $flags) {
272-
$this->removeStream($stream);
273-
274-
return;
275-
}
276-
277-
$event = $this->streamEvents[$key];
278-
279-
event_del($event);
280-
event_set($event, $stream, EV_PERSIST | $flags, $this->streamCallback);
281-
event_add($event);
282-
}
283-
284233
/**
285234
* Create a callback used as the target of timer events.
286235
*

0 commit comments

Comments
 (0)