-
Notifications
You must be signed in to change notification settings - Fork 23
Create a Queue based Parallel Task Processing Program with Supervisor
Think about a scenario. There's a URL list in a queue. A daemon fetch items one by one from the queue, then dispatches those URLs as tasks to child processes. They crawl URLs and save results to files. For promoting efficiency, the children work parallelly. But there's still an upper limit of concurrency to keep servers from overloading.
Let’s try to implement the scenario with Supervisor::taskFactoryMode().
For the demonstration purpose, we can use a text file to simulate the queue. see spider_task_factory_data.txt
http://news.sina.com.cn/
http://news.ifeng.com/
http://news.163.com/
http://news.sohu.com/
http://ent.sina.com.cn/
http://ent.ifeng.com/
…
END
First, we need a SpiderTaskFactory. The factory method fetchTask() reads each line of text file and returns instance of Comos\Qpm\Process\Runnable. As it reaches an END or end of the file, a StopSignal is thrown, it causes the whole program exits.
The factory looks like following.
class SpiderTaskFactory {
private $_fh;
public function __construct($input) {
$this->_input = $input;
$this->_fh = fopen($input, 'r');
if ($this->_fh === false) {
throw new Exception('fopen failed:'.$input);
}
}
public function fetchTask() {
while (true) {
if (feof($this->_fh)) {
throw new Comos\Qpm\supervisor\StopSignal();
}
$line = trim(fgets($this->_fh));
if ($line == 'END') {
throw new Comos\Qpm\supervisor\StopSignal();
}
if (empty($line)) {
continue;
}
break;
}
return new SpiderTask($line);
}
}
The Task looks like following.
class SpiderTask implements Comos\Qpm\Process\Runnable {
private $_target;
public function __construct($target) {
$this->_target = $target;
}
//The method runs in child process.
public function run() {
$r = @file_get_contents($this->_target);
if ($r===false) {
throw new Exception('fail to crawl url:'.$this->_target);
}
file_put_contents($this->getLocalFilename(), $r);
}
private function getLocalFilename() {
$filename = str_replace('/', '~', $this->_target);
$filename = str_replace(':', '_', $filename);
$filename = $filename.'-'.date('YmdHis');
return __DIR__.'/_spider/'.$filename.'.html';
}
}
The assembly process looks like following.
$input = isset($argv[1]) ? $argv[1] : __DIR__.'/spider_task_factory_data.txt';
$spiderTaskFactory = new SpiderTaskFactory($input);
$config = [
//The factory method is $spiderTaskFactory->fetchTask()
'factory'=>[$spiderTaskFactory, 'fetchTask'],
//The max quantity of concurrency is 3.
'quantity' => 3,
];
//Launch...
Comos\Qpm\Supervision\Supervisor::taskFactoryMode($config)->start();
You can see the complete example at here. spider_task_factory.php
文档首页 ##教程
- 安装和使用QPM
- 使用Process创建daemon程序
- 使用Supervisor创建健壮的多进程程序
- 使用Supervisor实现基于队列的并行任务处理程序
- 使用Pid 防止进程重复启动
- 使用日志,接入第三方日志
##参考
##旧版本
##Guides
- Getting Started
- Create a daemon with Qpm/Process
- Create Robust Multiprocess Programs with Supervisor
- Create a Queue-based Parallel Task Processing Program with Supervisor
##References