MySQL队列(默认队列)

队列表

  • 在队列表中主要用到的几个字段(队列表是在base/dbschema/queue.php中定义),具体的可以去查看
queue_id queue_title status start_time worker cursor_id params runkey
  • 主要字段说明
    queue_id        队列ID 在运行队列的时候是唯一标识
    queue_title     队列标题,直观区分队列
    status          队列状态(running/运行中,hibernate/休眠中,paused/已暂停,failure/执行失败)
    start_time      队列产生时间
    worker          在执行队列的时候会根据worker这个字段表示该队列在哪个方法执行
    cursor_id       游标--对于一个队列中有多个任务需要用到此字段
    params          参数(参数,通常就是filter)
    runkey          队列运行的Key,通常不需要自定义
    

队列调用流程


  • 在ECOS后台系统的后台会每30秒会调用如下地址
http://localhost/ecstore-bugfix/index.php/shopadmin/index.php?ctl=default&act=status

在desktop_ctl_default的status方法中会调

function status(){
    ...
    app::get('base')->model('queue')->flush();
    ...
}
  • 跳到base的queue model中
    <?php
    class base_mdl_queue extends base_db_model{

        var 
    $limit 100//最大任务并发
        
    var $task_timeout 300//单次任务超时

        
    function flush(){
            
    $base_url kernel::base_url();
            foreach(
    $this->db->select('select queue_id from sdb_base_queue limit '.$this->limit) as $r){
                
    $this->runtask($r['queue_id']);
            }
        }

        function 
    runtask($task_id){
            
    $http = new base_httpclient;
            
    $_POST['task_id'] = $task_id;
            
    $url =  kernel::openapi_url('openapi.queue','worker',array('task_id'=>$task_id));
            
    kernel::log('Spawn [Task-'.$task_id.'] '.$url);

            
    //99%概率不会有问题
            
    $http->hostaddr $_SERVER["SERVER_ADDR"]?$_SERVER["SERVER_ADDR"]:'127.0.0.1';
            
    $http->hostport $_SERVER["SERVER_PORT"]?$_SERVER["SERVER_PORT"]:'80';
            
    $http->timeout 2;
            
    kernel::log($http->post($url,$_POST));
        }

    }

从代码中可以看到在这获取到存放在队列表中的队列任务,进行运行

根据  kernel::openapi_url('openapi.queue','worker',array('task_id'=>$task_id));
知道调用了base_service_queue类的worker方法

  • 调用base_service_queue的worker方法
    function worker(){
        ...
        list($worker,$method) = explode('.',$task_info['worker']);
        $errmsg = null;
        $obj_work = kernel::single($worker);
        $remaining = call_user_func_array( array(  $obj_work ,$method),array(&$task_info['cursor_id'],$task_info['params'], &$errmsg));
        ...
    }
    

从以上代码可以看出:执行队列的方法是队列worker字段的值。

eg

'worker'=>'b2c_queue.send_msg',
表示在b2c_queue类中的send_msg方法中执行此队列

从worker方法中可以看出,在向执行队列的方法中会传入cursor_id,params两个参数

如果在一个队列中只有一个任务,如上则在执行完一个任务后需要返回一个false,则会删除此队列.

如果有一个队列中有多个任务,则会根据游标(cursor_id)进行完成任务,只要队列还没执行完成,

返回true则会跟新游标(cursor_id)直到返回false,再删除队列

队列应用

如下是ECOS后台中群发消息的一个队列

插入队列

在ECOS中每个队列都是放在sdb_base_queue这个队列表中,所以在插入队列的时候需要把队列信息插入到

队列表中即可

//可循环插入队列
$data = array(
    'queue_title'=>app::get('b2c')->_('到货通知站内信'),
    'start_time'=>time(),
    'params'=>array(
                'member_id'=>$data['member_id'],
                'data' =>$aTmp,
                'name' => $login_name,
                'gnotify_id' => $gnid,
        ),
    'worker'=>'b2c_queue.send_msg',
);
if(!$queue->insert($data)){
    $this->end(false,app::get('b2c')->_('操作失败!'));
}

执行队列

新建b2c_queue类,写send_msg方法来执行此队列

function send_msg(&$cursor_id,$params){
    $obj_memmsg = kernel::single('b2c_message_msg');
    $aData = $params['data'];
    $aData['member_id'] = 1;
    $aData['uname'] = app::get('b2c')->_('管理员');
    $aData['to_id'] = $params['member_id'];
    $aData['msg_to'] = $params['name'];
    $aData['subject'] = $aData['title'];
    $aData['comment'] = $aData['content'];
    $aData['has_sent'] = 'true';
    $obj_memmsg->send($aData);
}

注意执行完一个队列,需要返回一个false

內容目录

上一个主题

队列

下一个主题

RabbitMQ

快速搜索

输入相关的模块,术语,类或者函数名称进行搜索