swoole_process实现进程池的方法示例
swoole——重新定义PHP
swoole的进程之间有两种通信方式,一种是消息队列(queue),另一种是管道(pipe),对swoole_process的研究在swoole中显得尤为重要。
预备知识
IO多路复用
swoole中的io多路复用表现为底层的epoll进程模型,在C语言中表现为epoll函数。
- epoll模型下会持续监听自己名下的素有socket描述符fd
- 当触发了socket监听的事件时,epoll函数才会响应,并返回所有监听该时间的socket集合
- epoll的本质是阻塞IO,它的优点在于能同事处理大量socket连接
Eventloop事件循环
swoole对epoll实现了一个Reactor线程模型封装,设置了read事件和write事件的监听回调函数。(详见swoole_event_add)
- Eventloop是一个Reactor线程,其中运行了一个epoll实例。
- 通过swoole_event_add将socket描述符的一个事件添加到epoll监听中,事件发生时将执行回调函数
- 不可用于fpm环境下,因为fpm在任务结束时可能会关掉进程。
swoole_process
- 基于C语言封装的进程管理模块,方便php来调用
- 内置管道、消息队列接口,方便实现进程间通信
我们在php-fpm.conf配置文件中发现,php-fpm中有两种进程池管理设置。
- 静态模式即初始化固定的进程数,当来了一个请求时,从中选取一个进程来处理。
- 动态模式指定最小、最大进程数,当请求量过大,进程数不超过最大限制时,新增线程去处理请求
接下来用swoole代码来实现,这里只是为理解swoole_process、进程间通信、定时器等使用,实际情况使用封装好的swoole_server来实现task任务队列池会更方便。
假如有个定时投递的任务队列:
pool=newswoole_process(function(){
//循环建立worker进程
for($i=0;$i<$this->start_worker_num;$i++){
$this->createWorker();
}
echo'初始化进程数:'.$this->curr_num.PHP_EOL;
//每秒定时往闲置的worker的管道中投递任务
swoole_timer_tick(1000,function($timer_id){
static$count=0;
$count++;
$need_create=true;
foreach($this->used_workersas$pid=>$used){
if($used==0){
$need_create=false;
$this->workers[$pid]->write($count.'job');
//标记使用中
$this->used_workers[$pid]=1;
$this->active_time[$pid]=time();
break;
}
}
foreach($this->used_workersas$pid=>$used)
//如果所有worker队列都没有闲置的,则新建一个worker来处理
if($need_create&&$this->curr_num<$this->max_woker_num){
$new_pid=$this->createWorker();
$this->workers[$new_pid]->write($count.'job');
$this->used_workers[$new_pid]=1;
$this->active_time[$new_pid]=time();
}
//闲置超过一段时间则销毁进程
foreach($this->active_timeas$pid=>$timestamp){
if((time()-$timestamp)>$this->idle_seconds&&$this->curr_num>$this->min_woker_num){
//销毁该进程
if(isset($this->workers[$pid])&&$this->workers[$pid]instanceofswoole_process){
$this->workers[$pid]->write('exit');
unset($this->workers[$pid]);
$this->curr_num=count($this->workers);
unset($this->used_workers[$pid]);
unset($this->active_time[$pid]);
echo"{$pid}destroyed\n";
break;
}
}
}
echo"任务{$count}/{$this->curr_num}\n";
if($count==20){
foreach($this->workersas$pid=>$worker){
$worker->write('exit');
}
//关闭定时器
swoole_timer_clear($timer_id);
//退出进程池
$this->pool->exit(0);
exit();
}
});
});
$master_pid=$this->pool->start();
echo"Master$master_pidstart\n";
while($ret=swoole_process::wait()){
$pid=$ret['pid'];
echo"process{$pid}existed\n";
}
}
/**
*创建一个新进程
*@returnint新进程的pid
*/
publicfunctioncreateWorker()
{
$worker_process=newswoole_process(function(swoole_process$worker){
//给子进程管道绑定事件
swoole_event_add($worker->pipe,function($pipe)use($worker){
$data=trim($worker->read());
if($data=='exit'){
$worker->exit(0);
exit();
}
echo"{$worker->pid}正在处理{$data}\n";
sleep(5);
//返回结果,表示空闲
$worker->write("complete");
});
});
$worker_pid=$worker_process->start();
//给父进程管道绑定事件
swoole_event_add($worker_process->pipe,function($pipe)use($worker_process){
$data=trim($worker_process->read());
if($data=='complete'){
//标记为空闲
//echo"{$worker_process->pid}空闲了\n";
$this->used_workers[$worker_process->pid]=0;
}
});
//保存process对象
$this->workers[$worker_pid]=$worker_process;
//标记为空闲
$this->used_workers[$worker_pid]=0;
$this->active_time[$worker_pid]=time();
$this->curr_num=count($this->workers);
return$worker_pid;
}
}
newprocessPool();
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。