Skip to content

Commit 4d800df

Browse files
authored
Merge pull request #25 from romikring/master
Split sockets to Producer and Consumer & Add sockets ping for keep connections active
2 parents 77de6f5 + a057d92 commit 4d800df

File tree

7 files changed

+378
-184
lines changed

7 files changed

+378
-184
lines changed

src/Queue/NsqQueue.php

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,10 @@ public function publish($message)
4949
{
5050
try
5151
{
52-
$tunnel = $this->pool->getTunnel();
53-
$tunnel->write(Writer::pub($this->topic, $message));
52+
$producer = $this->pool->getProducer();
53+
$producer->write(Writer::pub($this->topic, $message));
5454
}
55-
catch (Exception $e)
55+
catch (\Throwable $e)
5656
{
5757
$this->fallbackMessage($e, __METHOD__, $message);
5858
}
@@ -62,10 +62,10 @@ public function publishMulti(...$bodies)
6262
{
6363
try
6464
{
65-
$tunnel = $this->pool->getTunnel();
66-
$tunnel->write(Writer::mpub($this->topic, $bodies));
65+
$producer = $this->pool->getProducer();
66+
$producer->write(Writer::mpub($this->topic, $bodies));
6767
}
68-
catch (Exception $e)
68+
catch (\Throwable $e)
6969
{
7070
foreach ($bodies as $message)
7171
{
@@ -80,10 +80,10 @@ public function publishDefer($message, $deferTime)
8080
{
8181
$deferTime = Date::convertToInt($deferTime);
8282

83-
$tunnel = $this->pool->getTunnel();
84-
$tunnel->write(Writer::dpub($this->topic, $deferTime, $message));
83+
$producer = $this->pool->getProducer();
84+
$producer->write(Writer::dpub($this->topic, $deferTime, $message));
8585
}
86-
catch (Exception $e)
86+
catch (\Throwable $e)
8787
{
8888
$this->fallbackMessage($e, __METHOD__, $message);
8989
}

src/Tunnel/ConsumerTunnel.php

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
<?php
2+
3+
namespace Merkeleon\Nsq\Tunnel;
4+
5+
6+
use Illuminate\Support\Arr;
7+
use Merkeleon\Nsq\Exception\NsqException;
8+
use Merkeleon\Nsq\Utility\Stream;
9+
use Merkeleon\Nsq\Wire\Writer;
10+
11+
class ConsumerTunnel extends ProducerTunnel
12+
{
13+
protected $subscribed;
14+
protected $reader;
15+
16+
/**
17+
* @return ConsumerTunnel
18+
* @throws NsqException
19+
*/
20+
public function subscribe(): ConsumerTunnel
21+
{
22+
if (!$this->subscribed)
23+
{
24+
try
25+
{
26+
$this->write(Writer::sub($this->config['queue'], $this->config['channel']));
27+
$this->subscribed = true;
28+
}
29+
catch (\Exception $e)
30+
{
31+
throw new NsqException($e->getMessage(), $e->getCode());
32+
}
33+
}
34+
35+
return $this;
36+
}
37+
38+
/**
39+
* @param string $queue
40+
* @return ConsumerTunnel
41+
*/
42+
public function setQueue($queue): ConsumerTunnel
43+
{
44+
$this->config['queue'] = $queue;
45+
46+
return $this;
47+
}
48+
49+
/**
50+
* @return ConsumerTunnel
51+
* @throws NsqException
52+
*/
53+
public function ready(): ConsumerTunnel
54+
{
55+
if ($this->subscribed)
56+
{
57+
$this->write(Writer::rdy($this->config['ready']));
58+
}
59+
60+
return $this;
61+
}
62+
63+
/**
64+
* @param string|null $queue
65+
* @return Tunnel
66+
*/
67+
public function connect($queue = null): Tunnel
68+
{
69+
if ($this->isConnected())
70+
{
71+
return $this;
72+
}
73+
74+
if ($queue)
75+
{
76+
$this->setQueue($queue);
77+
}
78+
79+
$host = $this->config['host'];
80+
$port = $this->config['port'];
81+
$timeout = $this->config['timeout.connection'];
82+
83+
$this->sock = Stream::pfopen($host, $port, $timeout);
84+
85+
stream_set_blocking($this->sock, (bool)$this->config['blocking']);
86+
87+
$this->write(Writer::MAGIC_V2);
88+
$this->write(Writer::identify($this->config['identify'] + [
89+
'client_id' => 'Consumer: ' . $this->config['queue'],
90+
]));
91+
92+
$this->subscribed = false;
93+
$this->subscribe()
94+
->ready();
95+
96+
$this->connected = true;
97+
98+
return $this;
99+
}
100+
101+
/**
102+
* @return Tunnel
103+
*/
104+
public function shutdown(): Tunnel
105+
{
106+
parent::shutdown();
107+
108+
$this->subscribed = false;
109+
110+
return $this;
111+
}
112+
113+
/**
114+
* @param int $len
115+
* @return string
116+
* @throws NsqException
117+
*/
118+
public function read($len = 0)
119+
{
120+
try
121+
{
122+
$data = '';
123+
$sock = $this->getSock();
124+
$reader = [$sock];
125+
$writer = null;
126+
$timeout = Arr::get($this->config, 'identify.heartbeat_interval', 20000) / 1000;
127+
128+
while (strlen($data) < $len)
129+
{
130+
$readable = Stream::select($reader, $writer, $timeout);
131+
if ($readable > 0)
132+
{
133+
$buffer = Stream::recvFrom($sock, $len);
134+
135+
$data .= $buffer;
136+
$len -= strlen($buffer);
137+
}
138+
else
139+
{
140+
// if timeout, let's reconnect anyway there is no available data in the stream
141+
$this->shutdown();
142+
$sock = $this->getSock();
143+
}
144+
}
145+
}
146+
catch (\Throwable $e)
147+
{
148+
logger()->error($e->getMessage(), ['exception' => $e]);
149+
report($e);
150+
151+
$this->shutdown();
152+
153+
throw new NsqException($e->getMessage(), $e->getCode());
154+
}
155+
156+
return $data;
157+
}
158+
}

src/Tunnel/Pool.php

Lines changed: 80 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,34 +5,43 @@
55

66
use Illuminate\Support\Arr;
77
use Merkeleon\Nsq\Exception\NsqException;
8+
use Merkeleon\Nsq\Wire\Writer;
89
use SplObjectStorage;
910

1011
class Pool
1112
{
1213
/** @var SplObjectStorage */
1314
private $pool;
15+
private $producerTunnel;
1416
private $nsq;
1517
private $size;
1618

19+
/**
20+
* Pool constructor.
21+
* @param $nsq
22+
*/
1723
public function __construct($nsq)
1824
{
1925
$this->pool = new SplObjectStorage;
2026
$this->size = 0;
2127
$this->nsq = $nsq;
2228

2329
$config = [
24-
'timeout.connection' => Arr::get($this->nsq, 'timeout.connection'),
25-
'timeout.read' => Arr::get($this->nsq, 'timeout.read'),
30+
'timeout.connection' => Arr::get($this->nsq, 'timeout.connection', 2),
2631
'timeout.requeue' => Arr::get($this->nsq, 'timeout.requeue'),
27-
'timeout.write' => Arr::get($this->nsq, 'timeout.write'),
2832
'identify' => Arr::get($this->nsq, 'identify'),
2933
'blocking' => Arr::get($this->nsq, 'blocking'),
3034
'ready' => Arr::get($this->nsq, 'ready'),
31-
'attempts.write' => Arr::get($this->nsq, 'attempts.write'),
35+
'attempts.write' => Arr::get($this->nsq, 'attempts.write', 3),
3236
'channel' => Arr::get($this->nsq, 'channel'),
3337
'queue' => null,
3438
];
3539

40+
41+
$addresses = Arr::get($nsq, 'nsq.addresses', []);
42+
[$host, $port] = explode(':', $addresses[array_rand($addresses)]);
43+
$this->producerTunnel = Tunnel::make($config + ['host' => $host, 'port' => $port], Tunnel::STATE_WRITE);
44+
3645
$nsqd = [];
3746
foreach (Arr::get($nsq, 'nsqlookup.addresses', []) as $lookup)
3847
{
@@ -44,24 +53,79 @@ public function __construct($nsq)
4453
$config['host'] = $url;
4554
$config['port'] = $port;
4655

47-
$this->addTunnel(new Tunnel($config));
56+
$this->addTunnel(Tunnel::make($config, Tunnel::STATE_READ));
4857
}
58+
59+
$this->periodicSocketsPing();
4960
}
5061

62+
/**
63+
* Function pings sockets periodically
64+
* to keep connections
65+
*/
66+
public function periodicSocketsPing(): void
67+
{
68+
$heardBeatTime = Arr::get($this->nsq, 'identify.heartbeat_interval', 20000) / 1000;
69+
70+
pcntl_async_signals(true);
71+
pcntl_alarm($heardBeatTime / 2);
72+
73+
$this->mask = pcntl_sigprocmask(SIG_BLOCK, [SIGALRM]);
74+
75+
pcntl_signal(SIGALRM, function ($signo, $diginfo) use ($heardBeatTime) {
76+
$size = $this->size();
77+
if ($size > 0)
78+
{
79+
$this->pool->rewind();
80+
81+
/** @var ConsumerTunnel $tunnel */
82+
while ($tunnel = $this->pool->current())
83+
{
84+
if ($tunnel->isConnected())
85+
{
86+
$tunnel->write(Writer::nop());
87+
}
88+
89+
$this->pool->next();
90+
}
91+
}
92+
93+
if ($this->producerTunnel->isConnected())
94+
{
95+
$this->producerTunnel->write(Writer::nop());
96+
}
97+
98+
pcntl_alarm($heardBeatTime);
99+
});
100+
101+
pcntl_sigprocmask(SIG_UNBLOCK, [SIGALRM]);
102+
}
103+
104+
/**
105+
* @return int
106+
*/
51107
public function size()
52108
{
53109
return $this->pool->count();
54110
}
55111

56-
public function addTunnel(Tunnel $tunnel)
112+
/**
113+
* @param Tunnel $tunnel
114+
* @return Pool
115+
*/
116+
public function addTunnel(Tunnel $tunnel): Pool
57117
{
58118
$this->pool->attach($tunnel);
59119
$this->size++;
60120

61121
return $this;
62122
}
63123

64-
public function removeTunnel(Tunnel $tunnel)
124+
/**
125+
* @param Tunnel $tunnel
126+
* @return Pool
127+
*/
128+
public function removeTunnel(Tunnel $tunnel): Pool
65129
{
66130
$tunnel->shutdown();
67131

@@ -71,6 +135,14 @@ public function removeTunnel(Tunnel $tunnel)
71135
return $this;
72136
}
73137

138+
/**
139+
* @return ProducerTunnel|null
140+
*/
141+
public function getProducer(): ?ProducerTunnel
142+
{
143+
return $this->producerTunnel;
144+
}
145+
74146
/**
75147
* @throws \Exception
76148
* @return Tunnel
@@ -79,7 +151,7 @@ public function getTunnel(): Tunnel
79151
{
80152
if ($this->size === 0)
81153
{
82-
throw new NsqException('Pool is empty');
154+
throw new NsqException('There are no more active tunnels');
83155
}
84156
if ($this->size === 1)
85157
{

0 commit comments

Comments
 (0)