使用Supervisor实现基于队列的并行任务处理程序

技术文档网 2021-04-19

考虑用PHP实现以下场景: 有一个抓站的URL列表保存在队列里,后台程序读取这个队列,然后转交给子进程去抓取HTML存放到文件里。 为了提高效率,允许多任务并行执行,但为了避免机器负载过高,限制了最大的并行任务数(为了测试方便,我们把这个数设为3),当队列中取到 END标记时,程序结束运行。

这个场景用QPM的Supervisor::taskFactoryMode()实现,非常简单。

为了,简化测试环境,我们可以用一个文本文件来模拟队列的数据。完整的例子文件看这里:spider_task_factory_data.txt

http://news.sina.com.cn/
http://news.ifeng.com/
http://news.163.com/
http://news.sohu.com/
http://ent.sina.com.cn/
http://ent.ifeng.com/
...
END

使用QPM的taskFactoryMode之前,我们需要准备一个TaskFactory类。 我们将其命名为 SpiderTaskFactory,SpdierTaskFactory 的工厂方法fetchTask 正常返回 Runnable的子类的实例。当碰到END或文件结束,则throw StopSignal,这样程序就会终止。

以下是组装 Supervisor 并执行的代码片段。完整的例子见:spider_task_factory.php

    //如果没有从参数指定输入,把spider_task_factory_data.txt作为数据源
    $input = isset($argv[1]) ? $argv[1] : __DIR__.'/spider_task_factory_data.txt';

    $spiderTaskFactory = new SpiderTaskFactory($input);
    $config = [
        //指定taskFactory对象和工厂方法
        'factory'=>[$spiderTaskFactory, 'fetchTask'],
        //指定最大并发数量为3
        'quantity' => 3,
    ];
    //启动Supervisor
    Comos\Qpm\Supervision\Supervisor::taskFactoryMode($config)->start();

SpiderTaskFactory 的实现如下:

    /**
     * 任务工厂,必须实现 fetchTask方法。
     * 该方法正常返回
     *
    */
    class SpiderTaskFactory {
    private $_fh;
    public function __construct($input) {
        $this->_input = $input;
        $this->_fh = fopen($input, 'r');
        if ($this->_fh === false) {
            throw new Exception('fopen failed:'.$input);
        }
    }
    public function fetchTask() {
        while (true) {
            if (feof($this->_fh)) {
                throw new Comos\Qpm\supervisor\StopSignal();
            }
            $line = trim(fgets($this->_fh));
            if ($line == 'END') {
                throw new Comos\Qpm\supervisor\StopSignal();
            }

            if (empty($line)) {
                continue;
            }

            break;
        }

        return new SpiderTask($line);
    }
    }

SpiderTask 的实现如下:

    /**
     * 在子进程中执行任务的类
     * 必须实现 Runnable 接口
     */
    class SpiderTask implements Comos\Qpm\Process\Runnable {
    private $_target;

    public function __construct($target) {
        $this->_target = $target;
    }
    //在子进程中执行的部分
    public function run() {
        $r = @file_get_contents($this->_target);
        if ($r===false) {
            throw new Exception('fail to crawl url:'.$this->_target);
        }
        file_put_contents($this->getLocalFilename(), $r);
    }

    private function getLocalFilename() {
        $filename = str_replace('/', '~', $this->_target);
        $filename = str_replace(':', '_', $filename);
        $filename = $filename.'-'.date('YmdHis');
        return __DIR__.'/_spider/'.$filename.'.html';
    }
    }

真实的生产环境,用队列替换文件输入,即可实现持久运行的生产者/消费者模型的程序。

相关文章

  1. supervisor使用教程

    一、安装 1:easy_install 安装: easy_install supervisor 2:pip 安装: pip install supervisor 3:Debian / Ubuntu

  2. gitlab的ssh key不生效的问题

    在用 gitlab 的管理代码时发现一个问题:如果用 http 协议,每次 push 的时候都需要输入用户名和密码,如果 用 ssh 协议,先要生成公钥: ssh-keygen -t rsa -C

  3. UOJ安装指南

    这是一个UOJ的docker版本。在安装之前,请确认Docker已经安装在您的操作系统中。这个docker的映像是64位的版本,在32位的系统上安装可能会出现错误。 安装 请先下载 JDK7u76

  4. untu14.04下创建用户并赋予执行sudo命令的权限

    untu14.04下创建用户并赋予执行sudo命令的权限 创建用户:adduser +用户名(该命令在home下生成用户目录并创建用户) 1. 切换到root用户下 2. /etc/sud

  5. 如何查看当前apache的工作模式prefork worker还是event模式?

    查看apache工作模式 $ apachectl -V (注:apachectl可理解为apache control,其实是一段bash脚本) Server version: Apache/2.4.

随机推荐

  1. supervisor使用教程

    一、安装 1:easy_install 安装: easy_install supervisor 2:pip 安装: pip install supervisor 3:Debian / Ubuntu

  2. gitlab的ssh key不生效的问题

    在用 gitlab 的管理代码时发现一个问题:如果用 http 协议,每次 push 的时候都需要输入用户名和密码,如果 用 ssh 协议,先要生成公钥: ssh-keygen -t rsa -C

  3. UOJ安装指南

    这是一个UOJ的docker版本。在安装之前,请确认Docker已经安装在您的操作系统中。这个docker的映像是64位的版本,在32位的系统上安装可能会出现错误。 安装 请先下载 JDK7u76

  4. untu14.04下创建用户并赋予执行sudo命令的权限

    untu14.04下创建用户并赋予执行sudo命令的权限 创建用户:adduser +用户名(该命令在home下生成用户目录并创建用户) 1. 切换到root用户下 2. /etc/sud

  5. 如何查看当前apache的工作模式prefork worker还是event模式?

    查看apache工作模式 $ apachectl -V (注:apachectl可理解为apache control,其实是一段bash脚本) Server version: Apache/2.4.