PHP编程中尝试程序并发的几种方式总结
本文大约总结了PHP编程中的五种并发方式:
1.curl_multi_init
文档中说的是AllowstheprocessingofmultiplecURLhandlesasynchronously.确实是异步。这里需要理解的是select这个方法,文档中是这么解释的Blocksuntilthereisactivityonanyofthecurl_multiconnections.。了解一下常见的异步模型就应该能理解,select,epoll,都很有名
<?php
//buildtheindividualrequestsasabove,butdonotexecutethem
$ch_1=curl_init('https://www.nhooo.com/');
$ch_2=curl_init('https://www.nhooo.com/');
curl_setopt($ch_1,CURLOPT_RETURNTRANSFER,true);
curl_setopt($ch_2,CURLOPT_RETURNTRANSFER,true);
//buildthemulti-curlhandle,addingboth$ch
$mh=curl_multi_init();
curl_multi_add_handle($mh,$ch_1);
curl_multi_add_handle($mh,$ch_2);
//executeallqueriessimultaneously,andcontinuewhenallarecomplete
$running=null;
do{
curl_multi_exec($mh,$running);
$ch=curl_multi_select($mh);
if($ch!==0){
$info=curl_multi_info_read($mh);
if($info){
var_dump($info);
$response_1=curl_multi_getcontent($info['handle']);
echo"$response_1\n";
break;
}
}
}while($running>0);
//closethehandles
curl_multi_remove_handle($mh,$ch_1);
curl_multi_remove_handle($mh,$ch_2);
curl_multi_close($mh);
这里我设置的是,select得到结果,就退出循环,并且删除curlresource,从而达到取消http请求的目的。
2.swoole_client
swoole_client提供了异步模式,我竟然把这个忘了。这里的sleep方法需要swoole版本大于等于1.7.21,我还没升到这个版本,所以直接exit也可以。
<?php
$client=newswoole_client(SWOOLE_SOCK_TCP,SWOOLE_SOCK_ASYNC);
//设置事件回调函数
$client->on("connect",function($cli){
$req="GET/HTTP/1.1\r\n
Host:www.nhooo.com\r\n
Connection:keep-alive\r\n
Cache-Control:no-cache\r\n
Pragma:no-cache\r\n\r\n";
for($i=0;$i<3;$i++){
$cli->send($req);
}
});
$client->on("receive",function($cli,$data){
echo"Received:".$data."\n";
exit(0);
$cli->sleep();//swoole>=1.7.21
});
$client->on("error",function($cli){
echo"Connectfailed\n";
});
$client->on("close",function($cli){
echo"Connectionclose\n";
});
//发起网络连接
$client->connect('183.207.95.145',80,1);
3.process
哎,竟然差点忘了swoole_process,这里就不用pcntl模块了。但是写完发现,这其实也不算是中断请求,而是哪个先到读哪个,忽视后面的返回值。
<?php
$workers=[];
$worker_num=3;//创建的进程数
$finished=false;
$lock=newswoole_lock(SWOOLE_MUTEX);
for($i=0;$i<$worker_num;$i++){
$process=newswoole_process('process');
//$process->useQueue();
$pid=$process->start();
$workers[$pid]=$process;
}
foreach($workersas$pid=>$process){
//子进程也会包含此事件
swoole_event_add($process->pipe,function($pipe)use($process,$lock,&$finished){
$lock->lock();
if(!$finished){
$finished=true;
$data=$process->read();
echo"RECV:".$data.PHP_EOL;
}
$lock->unlock();
});
}
functionprocess(swoole_process$process){
$response='httpresponse';
$process->write($response);
echo$process->pid,"\t",$process->callback.PHP_EOL;
}
for($i=0;$i<$worker_num;$i++){
$ret=swoole_process::wait();
$pid=$ret['pid'];
echo"WorkerExit,PID=".$pid.PHP_EOL;
}
4.pthreads
编译pthreads模块时,提示php编译时必须打开ZTS,所以貌似必须threadsafe版本才能使用.wamp中多php正好是TS的,直接下了个dll,文档中的说明复制到对应目录,就在win下测试了。还没完全理解,查到文章说php的pthreads和POSIXpthreads是完全不一样的。代码有些烂,还需要多看看文档,体会一下。
<?php
classFooextendsStackable{
public$url;
public$response=null;
publicfunction__construct(){
$this->url='https://www.nhooo.com';
}
publicfunctionrun(){}
}
classProcessextendsWorker{
private$text="";
publicfunction__construct($text,$object){
$this->text=$text;
$this->object=$object;
}
publicfunctionrun(){
while(is_null($this->object->response)){
print"Thread{$this->text}isrunning\n";
$this->object->response='httpresponse';
sleep(1);
}
}
}
$foo=newFoo();
$a=newProcess("A",$foo);
$a->start();
$b=newProcess("B",$foo);
$b->start();
echo$foo->response;
5.yield
以同步方式书写异步代码:
<?php
classAsyncServer{
protected$handler;
protected$socket;
protected$tasks=[];
protected$timers=[];
publicfunction__construct(callable$handler){
$this->handler=$handler;
$this->socket=socket_create(AF_INET,SOCK_DGRAM,SOL_UDP);
if(!$this->socket){
die(socket_strerror(socket_last_error())."\n");
}
if(!socket_set_nonblock($this->socket)){
die(socket_strerror(socket_last_error())."\n");
}
if(!socket_bind($this->socket,"0.0.0.0",1234)){
die(socket_strerror(socket_last_error())."\n");
}
}
publicfunctionRun(){
while(true){
$now=microtime(true)*1000;
foreach($this->timersas$time=>$sockets){
if($time>$now)break;
foreach($socketsas$one){
list($socket,$coroutine)=$this->tasks[$one];
unset($this->tasks[$one]);
socket_close($socket);
$coroutine->throw(newException("Timeout"));
}
unset($this->timers[$time]);
}
$reads=array($this->socket);
foreach($this->tasksaslist($socket)){
$reads[]=$socket;
}
$writes=NULL;
$excepts=NULL;
if(!socket_select($reads,$writes,$excepts,0,1000)){
continue;
}
foreach($readsas$one){
$len=socket_recvfrom($one,$data,65535,0,$ip,$port);
if(!$len){
//echo"socket_recvfromfail.\n";
continue;
}
if($one==$this->socket){
//echo"[Run]requestrecvfromsucc.data=$dataip=$ipport=$port\n";
$handler=$this->handler;
$coroutine=$handler($one,$data,$len,$ip,$port);
if(!$coroutine){
//echo"[Run]everythingisdone.\n";
continue;
}
$task=$coroutine->current();
//echo"[Run]AsyncTaskrecv.data=$task->dataip=$task->ipport=$task->porttimeout=$task->timeout\n";
$socket=socket_create(AF_INET,SOCK_DGRAM,SOL_UDP);
if(!$socket){
//echosocket_strerror(socket_last_error())."\n";
$coroutine->throw(newException(socket_strerror(socket_last_error()),socket_last_error()));
continue;
}
if(!socket_set_nonblock($socket)){
//echosocket_strerror(socket_last_error())."\n";
$coroutine->throw(newException(socket_strerror(socket_last_error()),socket_last_error()));
continue;
}
socket_sendto($socket,$task->data,$task->len,0,$task->ip,$task->port);
$deadline=$now+$task->timeout;
$this->tasks[$socket]=[$socket,$coroutine,$deadline];
$this->timers[$deadline][$socket]=$socket;
}else{
//echo"[Run]responserecvfromsucc.data=$dataip=$ipport=$port\n";
list($socket,$coroutine,$deadline)=$this->tasks[$one];
unset($this->tasks[$one]);
unset($this->timers[$deadline][$one]);
socket_close($socket);
$coroutine->send(array($data,$len));
}
}
}
}
}
classAsyncTask{
public$data;
public$len;
public$ip;
public$port;
public$timeout;
publicfunction__construct($data,$len,$ip,$port,$timeout){
$this->data=$data;
$this->len=$len;
$this->ip=$ip;
$this->port=$port;
$this->timeout=$timeout;
}
}
functionAsyncSendRecv($req_buf,$req_len,$ip,$port,$timeout){
returnnewAsyncTask($req_buf,$req_len,$ip,$port,$timeout);
}
functionRequestHandler($socket,$req_buf,$req_len,$ip,$port){
//echo"[RequestHandler]beforeyieldAsyncTask.REQ=$req_buf\n";
try{
list($rsp_buf,$rsp_len)=(yieldAsyncSendRecv($req_buf,$req_len,"127.0.0.1",2345,3000));
}catch(Exception$ex){
$rsp_buf=$ex->getMessage();
$rsp_len=strlen($rsp_buf);
//echo"[Exception]$rsp_buf\n";
}
//echo"[RequestHandler]afteryieldAsyncTask.RSP=$rsp_buf\n";
socket_sendto($socket,$rsp_buf,$rsp_len,0,$ip,$port);
}
$server=newAsyncServer(RequestHandler);
$server->Run();
?>
代码解读:
借助PHP内置array能力,实现简单的“超时管理”,以毫秒为精度作为时间分片;
封装AsyncSendRecv接口,调用形如yieldAsyncSendRecv(),更加自然;
添加Exception作为错误处理机制,添加ret_code亦可,仅为展示之用。