swoole_process实现进程池的方法示例
swoole——重新定义PHP
swoole的进程之间有两种通信方式,一种是消息队列(queue),另一种是管道(pipe),对swoole_process的研究在swoole中显得尤为重要。
预备知识
IO多路复用
swoole中的io多路复用表现为底层的epoll进程模型,在C语言中表现为epoll函数。
- epoll模型下会持续监听自己名下的素有socket描述符fd
- 当触发了socket监听的事件时,epoll函数才会响应,并返回所有监听该时间的socket集合
- epoll的本质是阻塞IO,它的优点在于能同事处理大量socket连接
Eventloop事件循环
swoole对epoll实现了一个Reactor线程模型封装,设置了read事件和write事件的监听回调函数。(详见swoole_event_add)
- Eventloop是一个Reactor线程,其中运行了一个epoll实例。
- 通过swoole_event_add将socket描述符的一个事件添加到epoll监听中,事件发生时将执行回调函数
- 不可用于fpm环境下,因为fpm在任务结束时可能会关掉进程。
swoole_process
- 基于C语言封装的进程管理模块,方便php来调用
- 内置管道、消息队列接口,方便实现进程间通信
我们在php-fpm.conf配置文件中发现,php-fpm中有两种进程池管理设置。
- 静态模式即初始化固定的进程数,当来了一个请求时,从中选取一个进程来处理。
- 动态模式指定最小、最大进程数,当请求量过大,进程数不超过最大限制时,新增线程去处理请求
接下来用swoole代码来实现,这里只是为理解swoole_process、进程间通信、定时器等使用,实际情况使用封装好的swoole_server来实现task任务队列池会更方便。
假如有个定时投递的任务队列:
pool=newswoole_process(function(){ //循环建立worker进程 for($i=0;$i<$this->start_worker_num;$i++){ $this->createWorker(); } echo'初始化进程数:'.$this->curr_num.PHP_EOL; //每秒定时往闲置的worker的管道中投递任务 swoole_timer_tick(1000,function($timer_id){ static$count=0; $count++; $need_create=true; foreach($this->used_workersas$pid=>$used){ if($used==0){ $need_create=false; $this->workers[$pid]->write($count.'job'); //标记使用中 $this->used_workers[$pid]=1; $this->active_time[$pid]=time(); break; } } foreach($this->used_workersas$pid=>$used) //如果所有worker队列都没有闲置的,则新建一个worker来处理 if($need_create&&$this->curr_num<$this->max_woker_num){ $new_pid=$this->createWorker(); $this->workers[$new_pid]->write($count.'job'); $this->used_workers[$new_pid]=1; $this->active_time[$new_pid]=time(); } //闲置超过一段时间则销毁进程 foreach($this->active_timeas$pid=>$timestamp){ if((time()-$timestamp)>$this->idle_seconds&&$this->curr_num>$this->min_woker_num){ //销毁该进程 if(isset($this->workers[$pid])&&$this->workers[$pid]instanceofswoole_process){ $this->workers[$pid]->write('exit'); unset($this->workers[$pid]); $this->curr_num=count($this->workers); unset($this->used_workers[$pid]); unset($this->active_time[$pid]); echo"{$pid}destroyed\n"; break; } } } echo"任务{$count}/{$this->curr_num}\n"; if($count==20){ foreach($this->workersas$pid=>$worker){ $worker->write('exit'); } //关闭定时器 swoole_timer_clear($timer_id); //退出进程池 $this->pool->exit(0); exit(); } }); }); $master_pid=$this->pool->start(); echo"Master$master_pidstart\n"; while($ret=swoole_process::wait()){ $pid=$ret['pid']; echo"process{$pid}existed\n"; } } /** *创建一个新进程 *@returnint新进程的pid */ publicfunctioncreateWorker() { $worker_process=newswoole_process(function(swoole_process$worker){ //给子进程管道绑定事件 swoole_event_add($worker->pipe,function($pipe)use($worker){ $data=trim($worker->read()); if($data=='exit'){ $worker->exit(0); exit(); } echo"{$worker->pid}正在处理{$data}\n"; sleep(5); //返回结果,表示空闲 $worker->write("complete"); }); }); $worker_pid=$worker_process->start(); //给父进程管道绑定事件 swoole_event_add($worker_process->pipe,function($pipe)use($worker_process){ $data=trim($worker_process->read()); if($data=='complete'){ //标记为空闲 //echo"{$worker_process->pid}空闲了\n"; $this->used_workers[$worker_process->pid]=0; } }); //保存process对象 $this->workers[$worker_pid]=$worker_process; //标记为空闲 $this->used_workers[$worker_pid]=0; $this->active_time[$worker_pid]=time(); $this->curr_num=count($this->workers); return$worker_pid; } } newprocessPool();
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。