PHP编程中尝试程序并发的几种方式总结
本文大约总结了PHP编程中的五种并发方式:
1.curl_multi_init
文档中说的是AllowstheprocessingofmultiplecURLhandlesasynchronously.确实是异步。这里需要理解的是select这个方法,文档中是这么解释的Blocksuntilthereisactivityonanyofthecurl_multiconnections.。了解一下常见的异步模型就应该能理解,select,epoll,都很有名
<?php //buildtheindividualrequestsasabove,butdonotexecutethem $ch_1=curl_init('https://www.nhooo.com/'); $ch_2=curl_init('https://www.nhooo.com/'); curl_setopt($ch_1,CURLOPT_RETURNTRANSFER,true); curl_setopt($ch_2,CURLOPT_RETURNTRANSFER,true); //buildthemulti-curlhandle,addingboth$ch $mh=curl_multi_init(); curl_multi_add_handle($mh,$ch_1); curl_multi_add_handle($mh,$ch_2); //executeallqueriessimultaneously,andcontinuewhenallarecomplete $running=null; do{ curl_multi_exec($mh,$running); $ch=curl_multi_select($mh); if($ch!==0){ $info=curl_multi_info_read($mh); if($info){ var_dump($info); $response_1=curl_multi_getcontent($info['handle']); echo"$response_1\n"; break; } } }while($running>0); //closethehandles curl_multi_remove_handle($mh,$ch_1); curl_multi_remove_handle($mh,$ch_2); curl_multi_close($mh);
这里我设置的是,select得到结果,就退出循环,并且删除curlresource,从而达到取消http请求的目的。
2.swoole_client
swoole_client提供了异步模式,我竟然把这个忘了。这里的sleep方法需要swoole版本大于等于1.7.21,我还没升到这个版本,所以直接exit也可以。
<?php $client=newswoole_client(SWOOLE_SOCK_TCP,SWOOLE_SOCK_ASYNC); //设置事件回调函数 $client->on("connect",function($cli){ $req="GET/HTTP/1.1\r\n Host:www.nhooo.com\r\n Connection:keep-alive\r\n Cache-Control:no-cache\r\n Pragma:no-cache\r\n\r\n"; for($i=0;$i<3;$i++){ $cli->send($req); } }); $client->on("receive",function($cli,$data){ echo"Received:".$data."\n"; exit(0); $cli->sleep();//swoole>=1.7.21 }); $client->on("error",function($cli){ echo"Connectfailed\n"; }); $client->on("close",function($cli){ echo"Connectionclose\n"; }); //发起网络连接 $client->connect('183.207.95.145',80,1);
3.process
哎,竟然差点忘了swoole_process,这里就不用pcntl模块了。但是写完发现,这其实也不算是中断请求,而是哪个先到读哪个,忽视后面的返回值。
<?php $workers=[]; $worker_num=3;//创建的进程数 $finished=false; $lock=newswoole_lock(SWOOLE_MUTEX); for($i=0;$i<$worker_num;$i++){ $process=newswoole_process('process'); //$process->useQueue(); $pid=$process->start(); $workers[$pid]=$process; } foreach($workersas$pid=>$process){ //子进程也会包含此事件 swoole_event_add($process->pipe,function($pipe)use($process,$lock,&$finished){ $lock->lock(); if(!$finished){ $finished=true; $data=$process->read(); echo"RECV:".$data.PHP_EOL; } $lock->unlock(); }); } functionprocess(swoole_process$process){ $response='httpresponse'; $process->write($response); echo$process->pid,"\t",$process->callback.PHP_EOL; } for($i=0;$i<$worker_num;$i++){ $ret=swoole_process::wait(); $pid=$ret['pid']; echo"WorkerExit,PID=".$pid.PHP_EOL; }
4.pthreads
编译pthreads模块时,提示php编译时必须打开ZTS,所以貌似必须threadsafe版本才能使用.wamp中多php正好是TS的,直接下了个dll,文档中的说明复制到对应目录,就在win下测试了。还没完全理解,查到文章说php的pthreads和POSIXpthreads是完全不一样的。代码有些烂,还需要多看看文档,体会一下。
<?php classFooextendsStackable{ public$url; public$response=null; publicfunction__construct(){ $this->url='https://www.nhooo.com'; } publicfunctionrun(){} } classProcessextendsWorker{ private$text=""; publicfunction__construct($text,$object){ $this->text=$text; $this->object=$object; } publicfunctionrun(){ while(is_null($this->object->response)){ print"Thread{$this->text}isrunning\n"; $this->object->response='httpresponse'; sleep(1); } } } $foo=newFoo(); $a=newProcess("A",$foo); $a->start(); $b=newProcess("B",$foo); $b->start(); echo$foo->response;
5.yield
以同步方式书写异步代码:
<?php classAsyncServer{ protected$handler; protected$socket; protected$tasks=[]; protected$timers=[]; publicfunction__construct(callable$handler){ $this->handler=$handler; $this->socket=socket_create(AF_INET,SOCK_DGRAM,SOL_UDP); if(!$this->socket){ die(socket_strerror(socket_last_error())."\n"); } if(!socket_set_nonblock($this->socket)){ die(socket_strerror(socket_last_error())."\n"); } if(!socket_bind($this->socket,"0.0.0.0",1234)){ die(socket_strerror(socket_last_error())."\n"); } } publicfunctionRun(){ while(true){ $now=microtime(true)*1000; foreach($this->timersas$time=>$sockets){ if($time>$now)break; foreach($socketsas$one){ list($socket,$coroutine)=$this->tasks[$one]; unset($this->tasks[$one]); socket_close($socket); $coroutine->throw(newException("Timeout")); } unset($this->timers[$time]); } $reads=array($this->socket); foreach($this->tasksaslist($socket)){ $reads[]=$socket; } $writes=NULL; $excepts=NULL; if(!socket_select($reads,$writes,$excepts,0,1000)){ continue; } foreach($readsas$one){ $len=socket_recvfrom($one,$data,65535,0,$ip,$port); if(!$len){ //echo"socket_recvfromfail.\n"; continue; } if($one==$this->socket){ //echo"[Run]requestrecvfromsucc.data=$dataip=$ipport=$port\n"; $handler=$this->handler; $coroutine=$handler($one,$data,$len,$ip,$port); if(!$coroutine){ //echo"[Run]everythingisdone.\n"; continue; } $task=$coroutine->current(); //echo"[Run]AsyncTaskrecv.data=$task->dataip=$task->ipport=$task->porttimeout=$task->timeout\n"; $socket=socket_create(AF_INET,SOCK_DGRAM,SOL_UDP); if(!$socket){ //echosocket_strerror(socket_last_error())."\n"; $coroutine->throw(newException(socket_strerror(socket_last_error()),socket_last_error())); continue; } if(!socket_set_nonblock($socket)){ //echosocket_strerror(socket_last_error())."\n"; $coroutine->throw(newException(socket_strerror(socket_last_error()),socket_last_error())); continue; } socket_sendto($socket,$task->data,$task->len,0,$task->ip,$task->port); $deadline=$now+$task->timeout; $this->tasks[$socket]=[$socket,$coroutine,$deadline]; $this->timers[$deadline][$socket]=$socket; }else{ //echo"[Run]responserecvfromsucc.data=$dataip=$ipport=$port\n"; list($socket,$coroutine,$deadline)=$this->tasks[$one]; unset($this->tasks[$one]); unset($this->timers[$deadline][$one]); socket_close($socket); $coroutine->send(array($data,$len)); } } } } } classAsyncTask{ public$data; public$len; public$ip; public$port; public$timeout; publicfunction__construct($data,$len,$ip,$port,$timeout){ $this->data=$data; $this->len=$len; $this->ip=$ip; $this->port=$port; $this->timeout=$timeout; } } functionAsyncSendRecv($req_buf,$req_len,$ip,$port,$timeout){ returnnewAsyncTask($req_buf,$req_len,$ip,$port,$timeout); } functionRequestHandler($socket,$req_buf,$req_len,$ip,$port){ //echo"[RequestHandler]beforeyieldAsyncTask.REQ=$req_buf\n"; try{ list($rsp_buf,$rsp_len)=(yieldAsyncSendRecv($req_buf,$req_len,"127.0.0.1",2345,3000)); }catch(Exception$ex){ $rsp_buf=$ex->getMessage(); $rsp_len=strlen($rsp_buf); //echo"[Exception]$rsp_buf\n"; } //echo"[RequestHandler]afteryieldAsyncTask.RSP=$rsp_buf\n"; socket_sendto($socket,$rsp_buf,$rsp_len,0,$ip,$port); } $server=newAsyncServer(RequestHandler); $server->Run(); ?>
代码解读:
借助PHP内置array能力,实现简单的“超时管理”,以毫秒为精度作为时间分片;
封装AsyncSendRecv接口,调用形如yieldAsyncSendRecv(),更加自然;
添加Exception作为错误处理机制,添加ret_code亦可,仅为展示之用。