nginx源码分析线程池详解
nginx源码分析线程池详解
一、前言
nginx是采用多进程模型,master和worker之间主要通过pipe管道的方式进行通信,多进程的优势就在于各个进程互不影响。但是经常会有人问道,nginx为什么不采用多线程模型(这个除了之前一篇文章讲到的情况,别的只有去问作者了,HAHA)。其实,nginx代码中提供了一个thread_pool(线程池)的核心模块来处理多任务的。下面就本人对该thread_pool这个模块的理解来跟大家做些分享(文中错误、不足还请大家指出,谢谢)
二、thread_pool线程池模块介绍
nginx的主要功能都是由一个个模块构成的,thread_pool也不例外。线程池主要用于读取、发送文件等IO操作,避免慢速IO影响worker的正常运行。先引用一段官方的配置示例
Syntax:thread_poolnamethreads=number[max_queue=number]; Default:thread_pooldefaultthreads=32max_queue=65536; Context:main
根据上述的配置说明,thread_pool是有名字的,上面的线程数目以及队列大小都是指每个worker进程中的线程,而不是所有worker中线程的总数。一个线程池中所有的线程共享一个队列,队列中的最大人数数量为上面定义的max_queue,如果队列满了的话,再往队列中添加任务就会报错。
根据之前讲到过的模块初始化流程(在master启动worker之前)create_conf-->command_set函数-->init_conf,下面就按照这个流程看看thread_pool模块的初始化
/*******************nginx/src/core/ngx_thread_pool.c************************/
//创建线程池所需的基础结构
staticvoid*ngx_thread_pool_create_conf(ngx_cycle_t*cycle)
{
ngx_thread_pool_conf_t*tcf;
//从cycle->pool指向的内存池中申请一块内存
tcf=ngx_pcalloc(cycle->pool,sizeof(ngx_thread_pool_conf_t));
if(tcf==NULL){
returnNULL;
}
//先申请包含4个ngx_thread_pool_t指针类型元素的数组
//ngx_thread_pool_t结构体中保存了一个线程池相关的信息
if(ngx_array_init(&tcf->pools,cycle->pool,4,
sizeof(ngx_thread_pool_t*))
!=NGX_OK)
{
returnNULL;
}
returntcf;
}
//解析处理配置文件中thread_pool的配置,并将相关信息保存的ngx_thread_pool_t中
staticchar*ngx_thread_pool(ngx_conf_t*cf,ngx_command_t*cmd,void*conf)
{
ngx_str_t*value;
ngx_uint_ti;
ngx_thread_pool_t*tp;
value=cf->args->elts;
//根据thread_pool配置中的name作为线程池的唯一标识(如果重名,只有第一个有效)
//申请ngx_thread_pool_t结构保存线程池的相关信息
//由此可见,nginx支持配置多个name不同的线程池
tp=ngx_thread_pool_add(cf,&value[1]);
.......
//处理thread_pool配置行的所有元素
for(i=2;iargs->nelts;i++){
//检查配置的线程数
if(ngx_strncmp(value[i].data,"threads=",8)==0){
.......
}
//检查配置的最大队列长度
if(ngx_strncmp(value[i].data,"max_queue=",10)==0){
.......
}
}
......
}
//判断包含多个线程池的数组中的各个线程池的配置是否正确
staticchar*ngx_thread_pool_init_conf(ngx_cycle_t*cycle,void*conf)
{
....
ngx_thread_pool_t**tpp;
tpp=tcf->pools.elts;
//遍历数组中所有的线程池配置,并检查其正确性
for(i=0;ipools.nelts;i++){
.....
}
returnNGX_CONF_OK;
}
在上述的流程走完之后,nginx的master就保存了一份所有线程池的配置(tcf->pools),这份配置在创建worker时也会被继承。然后每个worker中都调用各个核心模块的init_process函数(如果有的话)。
/*******************nginx/src/core/ngx_thread_pool.c************************/
//创建线程池所需的基础结构
staticngx_int_t
ngx_thread_pool_init_worker(ngx_cycle_t*cycle)
{
ngx_uint_ti;
ngx_thread_pool_t**tpp;
ngx_thread_pool_conf_t*tcf;
//如果不是worker或者只有一个worker就不起用线程池
if(ngx_process!=NGX_PROCESS_WORKER
&&ngx_process!=NGX_PROCESS_SINGLE)
{
returnNGX_OK;
}
//初始化任务队列
ngx_thread_pool_queue_init(&ngx_thread_pool_done);
tpp=tcf->pools.elts;
for(i=0;ipools.nelts;i++){
//初始化各个线程池
if(ngx_thread_pool_init(tpp[i],cycle->log,cycle->pool)!=NGX_OK){
returnNGX_ERROR;
}
}
returnNGX_OK;
}
//线程池初始化
staticngx_int_tngx_thread_pool_init(ngx_thread_pool_t*tp,ngx_log_t*log,ngx_pool_t*pool)
{
.....
//初始化任务队列
ngx_thread_pool_queue_init(&tp->queue);
//创建线程锁
if(ngx_thread_mutex_create(&tp->mtx,log)!=NGX_OK){
returnNGX_ERROR;
}
//创建线程条件变量
if(ngx_thread_cond_create(&tp->cond,log)!=NGX_OK){
(void)ngx_thread_mutex_destroy(&tp->mtx,log);
returnNGX_ERROR;
}
......
for(n=0;nthreads;n++){
//创建线程池中的每个线程
err=pthread_create(&tid,&attr,ngx_thread_pool_cycle,tp);
if(err){
ngx_log_error(NGX_LOG_ALERT,log,err,
"pthread_create()failed");
returnNGX_ERROR;
}
}
......
}
//线程池中线程处理主函数
staticvoid*ngx_thread_pool_cycle(void*data)
{
......
for(;;){
//阻塞的方式获取线程锁
if(ngx_thread_mutex_lock(&tp->mtx,tp->log)!=NGX_OK){
returnNULL;
}
/*thenumbermaybecomenegative*/
tp->waiting--;
//如果任务队列为空,就cond_wait阻塞等待有新任务时调用cond_signal/broadcast触发
while(tp->queue.first==NULL){
if(ngx_thread_cond_wait(&tp->cond,&tp->mtx,tp->log)
!=NGX_OK)
{
(void)ngx_thread_mutex_unlock(&tp->mtx,tp->log);
returnNULL;
}
}
//从任务队列中获取task,并将其从队列中移除
task=tp->queue.first;
tp->queue.first=task->next;
if(tp->queue.first==NULL){
tp->queue.last=&tp->queue.first;
}
if(ngx_thread_mutex_unlock(&tp->mtx,tp->log)!=NGX_OK){
returnNULL;
}
......
//task的处理函数
task->handler(task->ctx,tp->log);
.....
ngx_spinlock(&ngx_thread_pool_done_lock,1,2048);
//将经过预处理的任务添加到done队列中等待调用event的回调函数继续处理
*ngx_thread_pool_done.last=task;
ngx_thread_pool_done.last=&task->next;
//防止编译器优化,保证解锁操作是在上述语句执行完毕后再去执行的
ngx_memory_barrier();
ngx_unlock(&ngx_thread_pool_done_lock);
(void)ngx_notify(ngx_thread_pool_handler);
}
}
//处理pool_done队列上task中包含的每个event事件
staticvoidngx_thread_pool_handler(ngx_event_t*ev)
{
.....
ngx_spinlock(&ngx_thread_pool_done_lock,1,2048);
//获取任务链表的头部
task=ngx_thread_pool_done.first;
ngx_thread_pool_done.first=NULL;
ngx_thread_pool_done.last=&ngx_thread_pool_done.first;
ngx_memory_barrier();
ngx_unlock(&ngx_thread_pool_done_lock);
while(task){
ngx_log_debug1(NGX_LOG_DEBUG_CORE,ev->log,0,
"runcompletionhandlerfortask#%ui",task->id);
//遍历队列中的所有任务事件
event=&task->event;
task=task->next;
event->complete=1;
event->active=0;
//调用event对应的处理函数有针对性的进行处理
event->handler(event);
}
}
三、thread_pool线程池使用示例
根据之前所讲到的,nginx中的线程池主要是用于操作文件的IO操作。所以,在nginx中自带的模块ngx_http_file_cache.c文件中看到了线程池的使用。
/***********************nginx/src/os/unix/ngx_files.c**********************/
//file_cache模块的处理函数(涉及到了线程池)
staticssize_tngx_http_file_cache_aio_read(ngx_http_request_t*r,ngx_http_cache_t*c)
{
.......
#if(NGX_THREADS)
if(clcf->aio==NGX_HTTP_AIO_THREADS){
c->file.thread_task=c->thread_task;
//这里注册的函数在下面语句中的ngx_thread_read函数中被调用
c->file.thread_handler=ngx_http_cache_thread_handler;
c->file.thread_ctx=r;
//根据任务的属性,选择正确的线程池,并初始化task结构体中的各个成员
n=ngx_thread_read(&c->file,c->buf->pos,c->body_start,0,r->pool);
c->thread_task=c->file.thread_task;
c->reading=(n==NGX_AGAIN);
returnn;
}
#endif
returnngx_read_file(&c->file,c->buf->pos,c->body_start,0);
}
//task任务的处理函数
staticngx_int_tngx_http_cache_thread_handler(ngx_thread_task_t*task,ngx_file_t*file)
{
.......
tp=clcf->thread_pool;
.......
task->event.data=r;
//注册thread_event_handler函数,该函数在处理pool_done队列中event事件时被调用
task->event.handler=ngx_http_cache_thread_event_handler;
//将任务放到线程池的任务队列中
if(ngx_thread_task_post(tp,task)!=NGX_OK){
returnNGX_ERROR;
}
......
}
/***********************nginx/src/core/ngx_thread_pool.c**********************/
//添加任务到队列中
ngx_int_tngx_thread_task_post(ngx_thread_pool_t*tp,ngx_thread_task_t*task)
{
//如果当前的任务正在处理就退出
if(task->event.active){
ngx_log_error(NGX_LOG_ALERT,tp->log,0,
"task#%uialreadyactive",task->id);
returnNGX_ERROR;
}
if(ngx_thread_mutex_lock(&tp->mtx,tp->log)!=NGX_OK){
returnNGX_ERROR;
}
//判断当前线程池等待的任务数量与最大队列长度的关系
if(tp->waiting>=tp->max_queue){
(void)ngx_thread_mutex_unlock(&tp->mtx,tp->log);
ngx_log_error(NGX_LOG_ERR,tp->log,0,
"threadpool\"%V\"queueoverflow:%itaskswaiting",
&tp->name,tp->waiting);
returnNGX_ERROR;
}
//激活任务
task->event.active=1;
task->id=ngx_thread_pool_task_id++;
task->next=NULL;
//通知阻塞的线程有新事件加入,可以解除阻塞
if(ngx_thread_cond_signal(&tp->cond,tp->log)!=NGX_OK){
(void)ngx_thread_mutex_unlock(&tp->mtx,tp->log);
returnNGX_ERROR;
}
*tp->queue.last=task;
tp->queue.last=&task->next;
tp->waiting++;
(void)ngx_thread_mutex_unlock(&tp->mtx,tp->log);
ngx_log_debug2(NGX_LOG_DEBUG_CORE,tp->log,0,
"task#%uiaddedtothreadpool\"%V\"",
task->id,&tp->name);
returnNGX_OK;
}
上面示例基本展示了nginx目前对线程池的使用方法,采用线程池来处理IO这类慢速操作可以提升worker的主线程的执行效率。当然,用户自己在开发模块时,也可以参照file_cache模块中使用线程池的方法来调用多线程提升程序性能。(欢迎大家多多批评指正)
感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!