queue_id | queue_title | status | start_time | worker | cursor_id | params | runkey |
---|
queue_id 队列ID 在运行队列的时候是唯一标识 queue_title 队列标题,直观区分队列 status 队列状态(running/运行中,hibernate/休眠中,paused/已暂停,failure/执行失败) start_time 队列产生时间 worker 在执行队列的时候会根据worker这个字段表示该队列在哪个方法执行 cursor_id 游标--对于一个队列中有多个任务需要用到此字段 params 参数(参数,通常就是filter) runkey 队列运行的Key,通常不需要自定义
http://localhost/ecstore-bugfix/index.php/shopadmin/index.php?ctl=default&act=status
在desktop_ctl_default的status方法中会调
function status(){ ... app::get('base')->model('queue')->flush(); ... }
<?php
class base_mdl_queue extends base_db_model{
var $limit = 100; //最大任务并发
var $task_timeout = 300; //单次任务超时
function flush(){
$base_url = kernel::base_url();
foreach($this->db->select('select queue_id from sdb_base_queue limit '.$this->limit) as $r){
$this->runtask($r['queue_id']);
}
}
function runtask($task_id){
$http = new base_httpclient;
$_POST['task_id'] = $task_id;
$url = kernel::openapi_url('openapi.queue','worker',array('task_id'=>$task_id));
kernel::log('Spawn [Task-'.$task_id.'] '.$url);
//99%概率不会有问题
$http->hostaddr = $_SERVER["SERVER_ADDR"]?$_SERVER["SERVER_ADDR"]:'127.0.0.1';
$http->hostport = $_SERVER["SERVER_PORT"]?$_SERVER["SERVER_PORT"]:'80';
$http->timeout = 2;
kernel::log($http->post($url,$_POST));
}
}
从代码中可以看到在这获取到存放在队列表中的队列任务,进行运行
根据 kernel::openapi_url('openapi.queue','worker',array('task_id'=>$task_id)); 知道调用了base_service_queue类的worker方法
function worker(){ ... list($worker,$method) = explode('.',$task_info['worker']); $errmsg = null; $obj_work = kernel::single($worker); $remaining = call_user_func_array( array( $obj_work ,$method),array(&$task_info['cursor_id'],$task_info['params'], &$errmsg)); ... }
从以上代码可以看出:执行队列的方法是队列worker字段的值。
eg
'worker'=>'b2c_queue.send_msg',表示在b2c_queue类中的send_msg方法中执行此队列
从worker方法中可以看出,在向执行队列的方法中会传入cursor_id,params两个参数
如果在一个队列中只有一个任务,如上则在执行完一个任务后需要返回一个false,则会删除此队列.
如果有一个队列中有多个任务,则会根据游标(cursor_id)进行完成任务,只要队列还没执行完成,
返回true则会跟新游标(cursor_id)直到返回false,再删除队列
如下是ECOS后台中群发消息的一个队列
在ECOS中每个队列都是放在sdb_base_queue这个队列表中,所以在插入队列的时候需要把队列信息插入到
队列表中即可
//可循环插入队列 $data = array( 'queue_title'=>app::get('b2c')->_('到货通知站内信'), 'start_time'=>time(), 'params'=>array( 'member_id'=>$data['member_id'], 'data' =>$aTmp, 'name' => $login_name, 'gnotify_id' => $gnid, ), 'worker'=>'b2c_queue.send_msg', ); if(!$queue->insert($data)){ $this->end(false,app::get('b2c')->_('操作失败!')); }
新建b2c_queue类,写send_msg方法来执行此队列
function send_msg(&$cursor_id,$params){ $obj_memmsg = kernel::single('b2c_message_msg'); $aData = $params['data']; $aData['member_id'] = 1; $aData['uname'] = app::get('b2c')->_('管理员'); $aData['to_id'] = $params['member_id']; $aData['msg_to'] = $params['name']; $aData['subject'] = $aData['title']; $aData['comment'] = $aData['content']; $aData['has_sent'] = 'true'; $obj_memmsg->send($aData); }
注意执行完一个队列,需要返回一个false