Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
language: php

services:
- mysql

php:
- 7.0
- 7.1
Expand Down Expand Up @@ -36,6 +39,7 @@ matrix:
branches:
only:
- master
- 2.x

before_script:
- git clone -b master https://github.com/Oefenweb/travis --depth 1 ../travis
Expand Down
37 changes: 35 additions & 2 deletions Console/Command/QueueShell.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
App::uses('Folder', 'Utility');
App::uses('QueuedTask', 'Model');
App::uses('AppShell', 'Console/Command');
App::uses('CakeText', 'Utility');

declare(ticks = 1);

Expand Down Expand Up @@ -119,7 +120,14 @@ public function getOptionParser() {
])->addSubcommand('runworker', [
'help' => __d('queue', 'Run a queue worker.'),
'parser' => [
'description' => [__d('queue', 'Run a queue worker, which will look for a pending task it can execute.')]
'description' => [__d('queue', 'Run a queue worker, which will look for a pending task it can execute.')],
'options' => [
'type' => [
'short' => 't',
'help' => 'Type (comma separated list possible)',
'default' => null
]
]
]
])->addSubcommand('stats', [
'help' => __d('queue', 'Display general statistics.'),
Expand Down Expand Up @@ -204,10 +212,14 @@ public function runworker() {
$this->__exit = false;

$workerStartTime = time();

$typesParam = $this->param('type');
$types = is_string($typesParam) ? $this->_stringToArray($typesParam) : [];

while (!$this->__exit) {
$this->out(__d('queue', 'Looking for a job.'), 1, Shell::VERBOSE);

$data = $this->QueuedTask->requestJob($this->_getTaskConf());
$data = $this->QueuedTask->requestJob($this->_getTaskConf(), $types);
if ($this->QueuedTask->exit === true) {
$this->__exit = true;
} else {
Expand Down Expand Up @@ -381,4 +393,25 @@ public function signalHandler($signalNumber) {
}
}

/**
* Converts string to array
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing doc-comment

* @param string|null $param String to convert
* @return array
*/
protected function _stringToArray(string $param = null) : array {
if (!$param) {
return [];
}

$array = CakeText::tokenize($param);
if (is_string($array)) {
return [
$array
];
}

return array_filter($array);
}

}
38 changes: 36 additions & 2 deletions Model/QueuedTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ public function createJob($taskName, $data, $notBefore = null) {
* Looks for a new job that can be processed with the current abilities
*
* @param array $capabilities Available queue worker tasks.
* @param array $types Request a job from these types (or exclude certain types), or any otherwise.
* @return mixed Job data or false.
*/
public function requestJob($capabilities) {
public function requestJob($capabilities, array $types = []) {
$idlist = [];
$wasFetched = [];

Expand All @@ -64,6 +65,10 @@ public function requestJob($capabilities) {
];
$limit = Configure::read('Queue.workers');

if ($types) {
$conditions = $this->_addFilter($conditions, 'task', $types);
}

// Generate the job specific conditions.
foreach ($capabilities as $task) {
list($plugin, $name) = pluginSplit($task['name']);
Expand Down Expand Up @@ -179,7 +184,7 @@ public function getLength($taskName = null) {
* @return array A list of task names
*/
public function getTypes() {
$fields = ['task'];
$fields = ['task', 'task'];
$group = ['task'];

return $this->find('list', compact('fields', 'group'));
Expand Down Expand Up @@ -246,4 +251,33 @@ public function cleanFailedJobs($capabilities) {
return $this->deleteAll($conditions, false);
}

/**
* Filters field `key` based on the provided values. Values prefixed with '-' are excluded.
*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing doc-comment

* @param array $conditions Conditions
* @param string $key Key
* @param array $values Values
* @return array the conditions
*/
protected function _addFilter(array $conditions, $key, array $values) : array {
$include = [];
$exclude = [];
foreach ($values as $value) {
if (substr($value, 0, 1) === '-') {
$exclude[] = substr($value, 1);
} else {
$include[] = $value;
}
}

if ($include) {
$conditions[$key . ' IN'] = $include;
}
if ($exclude) {
$conditions[$key . ' NOT IN'] = $exclude;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the IN and NOT IN are needed

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may have been done on purpose. If $conditions['NOT'] is added here, there's a greater chance it would be overwritten by the calling function. Not too clean, though

}

return $conditions;
}

}
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,16 @@ Console/cake Queue.queue clean;
# Manually call cleanup_failed function to delete task data of failed tasks.
Console/cake Queue.queue clean_failed;
```

#### Running only specific tasks per worker
You can filter "running" by type:

```
Console/cake Queue.queue runworker -t MyType,AnotherType,-ThisOneToo
Console/cake Queue.queue runworker -t "-ThisOneNot"
```

Use `-` prefix to exclude. Note that you might need to use `""` around the value then to avoid it being seen as option key.

That can be helpful when migrating servers and you only want to execute certain ones on the new system or want to test specific servers.