Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions Config/Schema/schema.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class QueueSchema extends CakeSchema {
* @param array $event Schema object properties
* @return bool Always true
*/
public function before($event = array()) {
public function before($event = []) {
return true;
}

Expand All @@ -17,27 +17,27 @@ public function before($event = array()) {
* @param array $event Schema object properties
* @return void
*/
public function after($event = array()) {
public function after($event = []) {
}

public $queued_tasks = array(
'id' => array('type' => 'integer', 'null' => false, 'default' => null, 'length' => 10, 'unsigned' => true, 'key' => 'primary'),
'task' => array('type' => 'string', 'null' => false, 'default' => null, 'key' => 'index', 'collate' => 'utf8_general_ci', 'charset' => 'utf8'),
'data' => array('type' => 'text', 'null' => true, 'default' => null, 'collate' => 'utf8_general_ci', 'charset' => 'utf8'),
'not_before' => array('type' => 'datetime', 'null' => false, 'default' => null),
'fetched' => array('type' => 'datetime', 'null' => true, 'default' => null),
'completed' => array('type' => 'datetime', 'null' => true, 'default' => null, 'key' => 'index'),
'failed_count' => array('type' => 'integer', 'null' => false, 'default' => '0', 'length' => 10, 'unsigned' => true),
'failure_message' => array('type' => 'text', 'null' => true, 'default' => null, 'collate' => 'utf8_general_ci', 'charset' => 'utf8'),
'worker_key' => array('type' => 'string', 'null' => true, 'default' => null, 'length' => 40, 'key' => 'index', 'collate' => 'utf8_general_ci', 'charset' => 'utf8'),
'created' => array('type' => 'datetime', 'null' => false, 'default' => null),
'indexes' => array(
'PRIMARY' => array('column' => 'id', 'unique' => 1),
'completed' => array('column' => 'completed', 'unique' => 0),
'worker_key' => array('column' => 'worker_key', 'unique' => 0),
'task' => array('column' => 'task', 'unique' => 0)
),
'tableParameters' => array('charset' => 'utf8', 'collate' => 'utf8_general_ci', 'engine' => 'InnoDB')
);
public $queued_tasks = [
'id' => ['type' => 'integer', 'null' => false, 'default' => null, 'length' => 10, 'unsigned' => true, 'key' => 'primary'],
'task' => ['type' => 'string', 'null' => false, 'default' => null, 'key' => 'index', 'collate' => 'utf8_general_ci', 'charset' => 'utf8'],
'data' => ['type' => 'text', 'null' => true, 'default' => null, 'collate' => 'utf8_general_ci', 'charset' => 'utf8'],
'not_before' => ['type' => 'datetime', 'null' => false, 'default' => null],
'fetched' => ['type' => 'datetime', 'null' => true, 'default' => null],
'completed' => ['type' => 'datetime', 'null' => true, 'default' => null, 'key' => 'index'],
'failed_count' => ['type' => 'integer', 'null' => false, 'default' => '0', 'length' => 10, 'unsigned' => true],
'failure_message' => ['type' => 'text', 'null' => true, 'default' => null, 'collate' => 'utf8_general_ci', 'charset' => 'utf8'],
'worker_key' => ['type' => 'string', 'null' => true, 'default' => null, 'length' => 40, 'key' => 'index', 'collate' => 'utf8_general_ci', 'charset' => 'utf8'],
'created' => ['type' => 'datetime', 'null' => false, 'default' => null],
'indexes' => [
'PRIMARY' => ['column' => 'id', 'unique' => 1],
'completed' => ['column' => 'completed', 'unique' => 0],
'worker_key' => ['column' => 'worker_key', 'unique' => 0],
'task' => ['column' => 'task', 'unique' => 0]
],
'tableParameters' => ['charset' => 'utf8', 'collate' => 'utf8_general_ci', 'engine' => 'InnoDB']
];

}
60 changes: 30 additions & 30 deletions Console/Command/QueueShell.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class QueueShell extends AppShell {
*
* @var array
*/
public $uses = array('Queue.QueuedTask');
public $uses = ['Queue.QueuedTask'];

/**
* A list of available queue tasks and their individual configurations.
Expand Down Expand Up @@ -68,12 +68,12 @@ public function initialize() {

$conf = Configure::read('Queue');
if (!is_array($conf)) {
$conf = array();
$conf = [];
}

// Merge with default configuration vars.
Configure::write('Queue', array_merge(
array(
[
'workers' => 3,
'sleepTime' => 10,
'gcprop' => 10,
Expand All @@ -82,7 +82,7 @@ public function initialize() {
'workerMaxRuntime' => 0,
'cleanupTimeout' => DAY,
'exitWhenNothingToDo' => false
),
],
$conf
)
);
Expand All @@ -97,42 +97,42 @@ public function initialize() {
*/
public function getOptionParser() {
$parser = parent::getOptionParser();
$parser->addSubcommand('add', array(
$parser->addSubcommand('add', [
'help' => __d('queue', 'Tries to call the cli `add()` function on a task.'),
'parser' => array(
'description' => array(
'parser' => [
'description' => [
__d('queue', 'Tries to call the cli `add()` function on a task.'),
__d('queue', 'Tasks may or may not provide this functionality.')
),
'arguments' => array(
'taskname' => array(
],
'arguments' => [
'taskname' => [
'help' => __d('queue', 'Name of the task.'),
'required' => true,
'choices' => $this->taskNames
)
)
)
))->addSubcommand('runworker', array(
]
]
]
])->addSubcommand('runworker', [
'help' => __d('queue', 'Run a queue worker.'),
'parser' => array(
'description' => array(__d('queue', 'Run a queue worker, which will look for a pending task it can execute.'))
)
))->addSubcommand('stats', array(
'parser' => [
'description' => [__d('queue', 'Run a queue worker, which will look for a pending task it can execute.')]
]
])->addSubcommand('stats', [
'help' => __d('queue', 'Display general statistics.'),
'parser' => array(
'parser' => [
'description' => __d('queue', 'Display general statistics.'),
)
))->addSubcommand('clean', array(
]
])->addSubcommand('clean', [
'help' => __d('queue', 'Manually call cleanup function to delete task data of completed tasks.'),
'parser' => array(
'parser' => [
'description' => __d('queue', 'Manually call cleanup function to delete task data of completed tasks.')
)
))->addSubcommand('clean_failed', array(
]
])->addSubcommand('clean_failed', [
'help' => __d('queue', 'Manually call cleanup function to delete task data of failed tasks.'),
'parser' => array(
'parser' => [
'description' => __d('queue', 'Manually call cleanup function to delete task data of failed tasks.')
)
))->description(__d('queue', 'CakePHP Queue Plugin.'));
]
])->description(__d('queue', 'CakePHP Queue Plugin.'));

return $parser;
}
Expand Down Expand Up @@ -191,8 +191,8 @@ public function runworker() {

// Register signal handler(s) if possible
if (function_exists('pcntl_signal')) {
pcntl_signal(SIGTERM, array($this, 'signalHandler'));
pcntl_signal(SIGINT, array($this, 'signalHandler'));
pcntl_signal(SIGTERM, [$this, 'signalHandler']);
pcntl_signal(SIGINT, [$this, 'signalHandler']);
} else {
$this->err(__d('queue', 'Signal handler(s) could not be registered.'));
}
Expand Down Expand Up @@ -328,7 +328,7 @@ public function stats() {
*/
protected function _getTaskConf() {
if (!is_array($this->_taskConf)) {
$this->_taskConf = array();
$this->_taskConf = [];
foreach ($this->tasks as $task) {
list($pluginName, $taskName) = pluginSplit($task);

Expand Down
2 changes: 1 addition & 1 deletion Console/Command/Task/QueueExampleTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class QueueExampleTask extends Shell {
*
* @var array
*/
public $uses = array('Queue.QueuedTask');
public $uses = ['Queue.QueuedTask'];

/**
* Timeout for run, after which the task is reassigned to a new worker.
Expand Down
70 changes: 35 additions & 35 deletions Model/QueuedTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ class QueuedTask extends AppModel {
* @return mixed On success `Model::$data` if its not empty or true, false on failure
*/
public function createJob($taskName, $data, $notBefore = null) {
$data = array(
$data = [
'task' => $taskName,
'data' => serialize($data),
'not_before' => date('Y-m-d H:i:s'),
);
];

if (!empty($notBefore)) {
$data['not_before'] = date('Y-m-d H:i:s', strtotime($notBefore));
Expand All @@ -45,41 +45,41 @@ public function createJob($taskName, $data, $notBefore = null) {
* @return mixed Job data or false.
*/
public function requestJob($capabilities) {
$idlist = array();
$wasFetched = array();
$idlist = [];
$wasFetched = [];

$this->virtualFields['age'] = 'IFNULL(TIMESTAMPDIFF(SECOND, NOW(), not_before), 0)';
$conditions = array(
$conditions = [
'completed' => null,
'OR' => array()
);
$fields = array(
'OR' => []
];
$fields = [
'id',
'fetched',
'age'
);
$order = array(
];
$order = [
'age' => 'ASC',
'id' => 'ASC'
);
];
$limit = Configure::read('Queue.workers');

// Generate the job specific conditions.
foreach ($capabilities as $task) {
list($plugin, $name) = pluginSplit($task['name']);
$tmp = array(
$tmp = [
'task' => $name,
'AND' => array(
'AND' => [
'not_before <=' => date('Y-m-d H:i:s'),
array(
'OR' => array(
[
'OR' => [
'fetched <' => date('Y-m-d H:i:s', time() - $task['timeout']),
'fetched' => null
)
)
),
]
]
],
'failed_count <' => ($task['retries'] + 1)
);
];
$conditions['OR'][] = $tmp;
}

Expand Down Expand Up @@ -108,7 +108,7 @@ public function requestJob($capabilities) {
);

// Read which one actually got updated, which is the job we are supposed to execute.
$conditions = array('worker_key' => $key);
$conditions = ['worker_key' => $key];
$data = $this->find('first', compact('conditions'));
if (!empty($data)) {
// If the job had an existing fetched timestamp, increment the failure counter.
Expand Down Expand Up @@ -147,10 +147,10 @@ public function markJobDone($id) {
*/
public function markJobFailed($id, $failureMessage = null) {
$conditions = compact('id');
$fields = array(
$fields = [
'failed_count' => 'failed_count + 1',
'failure_message' => $this->getDataSource()->value($failureMessage, 'failure_message')
);
];

return $this->updateAll($fields, $conditions);
}
Expand All @@ -164,7 +164,7 @@ public function markJobFailed($id, $failureMessage = null) {
* @return int The number of pending jobs
*/
public function getLength($taskName = null) {
$conditions = array('completed' => null);
$conditions = ['completed' => null];
if (!empty($taskName)) {
$conditions['task'] = $taskName;
}
Expand All @@ -178,8 +178,8 @@ public function getLength($taskName = null) {
* @return array A list of task names
*/
public function getTypes() {
$fields = array('task');
$group = array('task');
$fields = ['task'];
$group = ['task'];

return $this->find('list', compact('fields', 'group'));
}
Expand All @@ -190,15 +190,15 @@ public function getTypes() {
* @return array An array with statistics
*/
public function getStats() {
$fields = array(
$fields = [
'task',
'COUNT(id) AS num',
'AVG(UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(created)) AS alltime',
'AVG(UNIX_TIMESTAMP(completed) - UNIX_TIMESTAMP(fetched)) AS runtime',
'AVG(UNIX_TIMESTAMP(fetched) - IF(not_before IS NULL, UNIX_TIMESTAMP(created), UNIX_TIMESTAMP(not_before))) AS fetchdelay'
);
$conditions = array('NOT' => array('completed' => null));
$group = array('task');
];
$conditions = ['NOT' => ['completed' => null]];
$group = ['task'];

return $this->find('all', compact('fields', 'conditions', 'group'));
}
Expand All @@ -210,15 +210,15 @@ public function getStats() {
* @return bool Success
*/
public function cleanOldJobs($capabilities) {
$conditions = array();
$conditions = [];

// Generate the job specific conditions
foreach ($capabilities as $task) {
list($plugin, $name) = pluginSplit($task['name']);
$conditions['OR'][] = array(
$conditions['OR'][] = [
'task' => $name,
'completed <' => date('Y-m-d H:i:s', time() - $task['cleanupTimeout'])
);
];
}

return $this->deleteAll($conditions, false);
Expand All @@ -231,15 +231,15 @@ public function cleanOldJobs($capabilities) {
* @return bool Success
*/
public function cleanFailedJobs($capabilities) {
$conditions = array();
$conditions = [];

// Generate the job specific conditions.
foreach ($capabilities as $task) {
list($plugin, $name) = pluginSplit($task['name']);
$conditions['OR'][] = array(
$conditions['OR'][] = [
'task' => $name,
'failed_count >' => $task['retries']
);
];
}

return $this->deleteAll($conditions, false);
Expand Down
Loading