Skip to content

Commit 6dfa777

Browse files
committed
Bugfixes and added a way to neatly stop the worker-tenders
1 parent d6618a6 commit 6dfa777

File tree

6 files changed

+109
-13
lines changed

6 files changed

+109
-13
lines changed

Command/AbstractWorker.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ abstract class AbstractWorker extends ContainerAwareCommand
9494
*/
9595
protected function addDefaultConfiguration()
9696
{
97-
$this->addOption('use-tube', null, InputOption::VALUE_OPTIONAL, 'What tube to send to');
97+
$this->addOption('use-tube', null, InputOption::VALUE_OPTIONAL, 'What tube to work with');
9898
}
9999

100100
/**

Command/StopWorkerTenderCommand.php

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
<?php
2+
3+
namespace Webdevvie\PheanstalkTaskQueueBundle\Command;
4+
5+
use Webdevvie\PheanstalkTaskQueueBundle\Command\AbstractWorker;
6+
use Symfony\Component\Console\Input\InputInterface;
7+
use Symfony\Component\Console\Input\InputOption;
8+
use Symfony\Component\Console\Output\OutputInterface;
9+
use Webdevvie\PheanstalkTaskQueueBundle\Entity\Task;
10+
11+
/**
12+
* Class StopWorkerTenderCommand
13+
* Sends a signal to the worker tender to stop
14+
*
15+
* @package Webdevvie\PheanstalkTaskQueueBundle\Command
16+
* @author John Bakker <me@johnbakker.name>
17+
*/
18+
class StopWorkerTenderCommand extends AbstractWorker
19+
{
20+
/**
21+
* {@inheritDoc}
22+
*
23+
* @return void
24+
*/
25+
protected function configure()
26+
{
27+
$this->setName('taskqueue:stop-worker-tender')
28+
->setDescription('Signals the worker tender to stop and waits for it to stop');
29+
$this->addDefaultConfiguration();
30+
}
31+
32+
/**
33+
* {@inheritDoc}
34+
*
35+
* @param InputInterface $input
36+
* @param OutputInterface $output
37+
* @return void
38+
* @throws \InvalidArgumentException
39+
*/
40+
protected function execute(InputInterface $input, OutputInterface $output)
41+
{
42+
$this->initialiseWorker($input, $output);
43+
$processes = $this->findWorkerTenderProcessesForTube($this->tube);
44+
if (count($processes) == 0) {
45+
$output->writeln("<info>Found no processes</info>");
46+
return;
47+
}
48+
$output->writeln('<info>Found ' . count($processes) . ' process(es)');
49+
foreach ($processes as $process) {
50+
$output->writeln('<info>sending kill to:</info> ' . $process);
51+
exec("kill $process");
52+
}
53+
$output->write('<info>Waiting on processes to stop</info> ');
54+
while (count($processes) > 0) {
55+
$processes = $this->findWorkerTenderProcessesForTube($this->tube);
56+
$output->write(".");
57+
sleep(1);
58+
59+
}
60+
$output->writeln("Done!");
61+
}
62+
63+
/**
64+
* Returns a list of processes that match the worker-tender
65+
*
66+
* @param string $tube
67+
* @return array
68+
*/
69+
private function findWorkerTenderProcessesForTube($tube)
70+
{
71+
$processes = array();
72+
$command = "ps ax ";
73+
$command .= "|grep php ";
74+
$command .= "|grep -v stop ";
75+
$command .= "|grep worker-tender ";
76+
if ($tube !== '') {
77+
$command .= "|grep " . escapeshellarg('use-tube=' . $tube);
78+
}
79+
$data = exec($command);
80+
$lines = explode("\n", $data);
81+
foreach ($lines as $line) {
82+
$line = trim($line);
83+
$parts = explode(" ", $line);
84+
if ($parts[0] === '') {
85+
continue;
86+
}
87+
$processes[] = intval($parts[0]);
88+
}
89+
90+
return $processes;
91+
92+
}
93+
}

Command/Tender/ChildProcessContainer.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ public function isCleanedUp()
197197
*/
198198
public function getOutputLinesAvailable()
199199
{
200-
$outputLines = [];
200+
$outputLines = array();
201201
return $outputLines;
202202
}
203203

@@ -301,9 +301,9 @@ private function sendTERMSignal()
301301
*/
302302
public function getReadyForBed()
303303
{
304-
$busyStatusses = [ChildProcessContainer::STATUS_SLEEPY,
304+
$busyStatusses = array(ChildProcessContainer::STATUS_SLEEPY,
305305
ChildProcessContainer::STATUS_BUSY_BUT_SLEEPY
306-
];
306+
);
307307
if (in_array($this->status, $busyStatusses)) {
308308
//ready for bed
309309
return;

Command/WorkerTenderCommand.php

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
* one time per "tube"* You can supply several parameters to this command. Please refer to those parameters
1919
* for more information about customising your worker-tender experience
2020
*
21-
* @author John Bakker <me@johnbakker.name
21+
* @author John Bakker <me@johnbakker.name>
2222
*/
2323
class WorkerTenderCommand extends AbstractWorker
2424
{
@@ -66,7 +66,7 @@ protected function configure()
6666
'Max time to keep a worker in seconds',
6767
6000000
6868
);
69-
$this->addDefaultConfiguration();
69+
$this->addOption('use-tube', null, InputOption::VALUE_REQUIRED, 'What tube to work with');
7070
}
7171

7272
/**
@@ -235,8 +235,10 @@ public function moreOrLess($total, $busy, $available)
235235
*/
236236
private function findDisposableWorkers()
237237
{
238+
$disposableStatusses = array(ChildProcessContainer::STATUS_READY, ChildProcessContainer::STATUS_ALIVE);
238239
foreach ($this->family as &$child) {
239-
if (in_array($child->status, array(ChildProcessContainer::STATUS_READY, ChildProcessContainer::STATUS_ALIVE))) {
240+
241+
if (in_array($child->status, $disposableStatusses)) {
240242
if ($child->getAge() < 10) {
241243
//less than ten seconds old keep it alive a bit to let its do its job
242244
continue;

DependencyInjection/Configuration.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77

88
/**
99
* This is the class that validates and merges configuration from your app/config files
10-
*
11-
* To learn more see {@link http://symfony.com/doc/current/cookbook/bundles/extension.html#cookbook-bundles-extension-config-class}
1210
*/
1311
class Configuration implements ConfigurationInterface
1412
{

Service/TaskQueueService.php

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,7 @@ public function getDefaultTube()
8686
public function getStatusOfTaskWithId($taskId)
8787
{
8888
$task = $this->taskRepo->find($taskId);
89-
if(!($task instanceof Task))
90-
{
89+
if (!($task instanceof Task)) {
9190
//the task was not found
9291
return Task::STATUS_GONE;
9392
}
@@ -169,7 +168,7 @@ public function cleanUpTasks($timePeriod)
169168
t.created <= :older'
170169
);
171170
$query->setParameter('status', array(Task::STATUS_DONE));
172-
$query->setParameter('older', date("Y-m-d H:i:s", time()-$timePeriod));
171+
$query->setParameter('older', date("Y-m-d H:i:s", time() - $timePeriod));
173172
$query->execute();
174173
}
175174

@@ -206,7 +205,9 @@ public function reserveTask($tube = null)
206205
throw new TaskQueueServiceException("Invalid format in TaskQueue {$tube} ");
207206
} catch (\ReflectionException $exception) {
208207
$this->beanstalk->delete($inTask);
209-
throw new TaskQueueServiceException("Invalid format in TaskQueue {$tube} class ".$parts[0].' is unknown');
208+
throw new TaskQueueServiceException(
209+
"Invalid format in TaskQueue {$tube} class " . $parts[0] . ' is unknown'
210+
);
210211
}
211212
if (!($taskObject instanceof TaskDescriptionInterface)) {
212213
$this->beanstalk->delete($inTask);
@@ -254,6 +255,8 @@ public function markFailed(WorkPackage $task)
254255
}
255256

256257
/**
258+
* Updates the task status
259+
*
257260
* @param WorkPackage $task
258261
* @param string $status
259262
* @return void

0 commit comments

Comments
 (0)