| queue_id | queue_title | status | start_time | worker | cursor_id | params | runkey |
|---|
queue_id 队列ID 在运行队列的时候是唯一标识 queue_title 队列标题,直观区分队列 status 队列状态(running/运行中,hibernate/休眠中,paused/已暂停,failure/执行失败) start_time 队列产生时间 worker 在执行队列的时候会根据worker这个字段表示该队列在哪个方法执行 cursor_id 游标--对于一个队列中有多个任务需要用到此字段 params 参数(参数,通常就是filter) runkey 队列运行的Key,通常不需要自定义

http://localhost/ecstore-bugfix/index.php/shopadmin/index.php?ctl=default&act=status
在desktop_ctl_default的status方法中会调
function status(){
...
app::get('base')->model('queue')->flush();
...
}
<?php
class base_mdl_queue extends base_db_model{
var $limit = 100; //最大任务并发
var $task_timeout = 300; //单次任务超时
function flush(){
$base_url = kernel::base_url();
foreach($this->db->select('select queue_id from sdb_base_queue limit '.$this->limit) as $r){
$this->runtask($r['queue_id']);
}
}
function runtask($task_id){
$http = new base_httpclient;
$_POST['task_id'] = $task_id;
$url = kernel::openapi_url('openapi.queue','worker',array('task_id'=>$task_id));
kernel::log('Spawn [Task-'.$task_id.'] '.$url);
//99%概率不会有问题
$http->hostaddr = $_SERVER["SERVER_ADDR"]?$_SERVER["SERVER_ADDR"]:'127.0.0.1';
$http->hostport = $_SERVER["SERVER_PORT"]?$_SERVER["SERVER_PORT"]:'80';
$http->timeout = 2;
kernel::log($http->post($url,$_POST));
}
}
从代码中可以看到在这获取到存放在队列表中的队列任务,进行运行
根据 kernel::openapi_url('openapi.queue','worker',array('task_id'=>$task_id));
知道调用了base_service_queue类的worker方法
function worker(){
...
list($worker,$method) = explode('.',$task_info['worker']);
$errmsg = null;
$obj_work = kernel::single($worker);
$remaining = call_user_func_array( array( $obj_work ,$method),array(&$task_info['cursor_id'],$task_info['params'], &$errmsg));
...
}
从以上代码可以看出:执行队列的方法是队列worker字段的值。
eg
'worker'=>'b2c_queue.send_msg',表示在b2c_queue类中的send_msg方法中执行此队列
从worker方法中可以看出,在向执行队列的方法中会传入cursor_id,params两个参数
如果在一个队列中只有一个任务,如上则在执行完一个任务后需要返回一个false,则会删除此队列.
如果有一个队列中有多个任务,则会根据游标(cursor_id)进行完成任务,只要队列还没执行完成,
返回true则会跟新游标(cursor_id)直到返回false,再删除队列
如下是ECOS后台中群发消息的一个队列
在ECOS中每个队列都是放在sdb_base_queue这个队列表中,所以在插入队列的时候需要把队列信息插入到
队列表中即可
//可循环插入队列
$data = array(
'queue_title'=>app::get('b2c')->_('到货通知站内信'),
'start_time'=>time(),
'params'=>array(
'member_id'=>$data['member_id'],
'data' =>$aTmp,
'name' => $login_name,
'gnotify_id' => $gnid,
),
'worker'=>'b2c_queue.send_msg',
);
if(!$queue->insert($data)){
$this->end(false,app::get('b2c')->_('操作失败!'));
}
新建b2c_queue类,写send_msg方法来执行此队列
function send_msg(&$cursor_id,$params){
$obj_memmsg = kernel::single('b2c_message_msg');
$aData = $params['data'];
$aData['member_id'] = 1;
$aData['uname'] = app::get('b2c')->_('管理员');
$aData['to_id'] = $params['member_id'];
$aData['msg_to'] = $params['name'];
$aData['subject'] = $aData['title'];
$aData['comment'] = $aData['content'];
$aData['has_sent'] = 'true';
$obj_memmsg->send($aData);
}
注意执行完一个队列,需要返回一个false