mongodb源码分析--查询
本文内容纲要:
在之前的一篇文章中,介绍了mongodb的主程序入口main()的执行流程,其实main只是实始化一些参数信息并做了些后台线程任务的启动工作(包括数据准备和恢复),并最终启动一个线程进行循环侦听。今天将会介绍在mongodb中数据查询(find)的流程,以了解mongodb是如果对message进行拆包分析,以及数据进行表扫描及索引使用的。
好了,开始今天的正文吧!
这里继续昨天的代码浏览过程,从connThread函数说起,看了上一篇文章的朋友都清楚了该函数主要工作就是不断循环[while(1)]获取当前客户端发来的信息(上面已封装成了message)并将其信息进行分析,并根据相应操作标志位确定当前操作是CRUD或构建索引等[assembleResponse()],如果一些正常,则向客户端发送应答信息。而如果客户端连接提交了一个查询操作(也包括CUD及其它操作)的话,那么它就会调用assembleResponse方法来进行相关操作的处理,该方法声明如下(instance.cpp第224行):
//直接请求包括'end'字符,则返回false
voidassembleResponse(Message&m/*客户端传来的(操作)信息*/,
DbResponse&dbresponse,/*响应结构体,用于绑定要响应的数据及状态*/
constSockAddr&client){
//获取操作符枚举信息
intop=m.operation();
注:枚举定义如下
enumOperations{
opReply=1,/*reply.responseToisset.*/
dbMsg=1000,/*genericmsgcommandfollowedbyastring*/
dbUpdate=2001,/*updateobject*/
dbInsert=2002,//dbGetByOID=2003,
dbQuery=2004,
dbGetMore=2005,
dbDelete=2006,
dbKillCursors=2007
};
接着它会判断是否为$cmd命令,即以.$cmd为开头,形如db.$cmd.findOne({getlasterror:1}),并对一些特殊指令进行单独处理,包括inprog,killop,unlock。
boolisCommand=false;
constchar*ns=m.singleData()->_data+4;
if(op==dbQuery){
if(strstr(ns,".$cmd")){
isCommand=true;
opwrite(m);
if(strstr(ns,".$cmd.sys.")){
if(strstr(ns,"$cmd.sys.inprog")){
inProgCmd(m,dbresponse);
return;
}
if(strstr(ns,"$cmd.sys.killop")){
killOp(m,dbresponse);
return;
}
if(strstr(ns,"$cmd.sys.unlock")){
unlockFsync(ns,m,dbresponse);
return;
}
}
}
else{
opread(m);
}
}
elseif(op==dbGetMore){
opread(m);
}
接着就是获取当前线程连接的客户端对象,如下:
Client&c=cc();
该方法实现代码如下:
/**gettheClientobjectforthisthread.*/
inlineClient&cc(){
Client*c=currentClient.get();
assert(c);
return*c;
}
其主要用内联函数方式获取当前客户端操作的线程信息,而该线程默认就是上一篇文章中所创建的那个:
Client::initThread("initandlisten");
因为mongodb会为每一个客户端DB操作创建一个线程Client对象,我个人把它理解为服务端持有的对应(每)客户端的操作对象。其主体函数如下:
boost::thread_specific_ptr
/*eachthreadwhichdoesdboperationshasaClientobjectinTLS.
callthiswhenyourthreadstarts.
*/
Client&Client::initThread(constchar*desc,MessagingPort*mp){
assert(currentClient.get()==0);
Client*c=newClient(desc,mp);
currentClient.reset(c);
mongo::lastError.initThread();
return*c;
}
我们再回到assembleResponse函数,接下来的代码就是使用CurOp(一个提供了内部锁机制来保存当前客户端操作状态的对象)来把当前Client对象及相应操作(CRUD等)封装于其中,这样当以访问该对象进行原子操作时(Atomic)就可以通过其内置支持多线程并发访问和锁保护了。
CurOp*currentOpP=c.curop();
......
CurOp¤tOp=*currentOpP;
currentOp.reset(client,op);
OpDebug&debug=currentOp.debug();
StringBuilder&ss=debug.str;
ss<<opToString(op)<<"";
intlogThreshold=cmdLine.slowMS;
boollog=logLevel>=1;
接着就是执行查询语句,也就是我们今天的主角“隆重登场”了,如下:
if(op==dbQuery){
if(handlePossibleShardedMessage(m,&dbresponse))/*查看是不是sharding状态(查询),如果是则返回啊*/
return;
receivedQuery(c,dbresponse,m);/*执行查询*/
}
elseif(op==dbGetMore){
......
else{
constchar*ns=m.singleData()->_data+4;
charcl[256];
nsToDatabase(ns,cl);
//进行权限认证
if(!c.getAuthenticationInfo()->isAuthorized(cl)){
uassert_nothrow("unauthorized");
}
else{
try{
if(op==dbInsert){//添加记录操作
receivedInsert(m,currentOp);
}
elseif(op==dbUpdate){//更新记录
receivedUpdate(m,currentOp);
}
elseif(op==dbDelete){//删除记录
receivedDelete(m,currentOp);
}
elseif(op==dbKillCursors){//删除Cursors(游标)对象
currentOp.ensureStarted();
logThreshold=10;
ss<<"killcursors";
receivedKillCursors(m);
}
else{
mongo::log()<<"operationisn'tsupported:"<<op<<endl;
currentOp.done();
log=true;
}
}
.....
}
}
上面代码中receivedQuery(c,dbresponse,m)就是执行查询功能的,而其它else分支特别是cud操作等我会专门再写文章加以解释,因为今天主要介绍查询功能,所以我们接着会分析该方法的执行逻辑,如下:
staticboolreceivedQuery(Client&c,DbResponse&dbresponse,Message&m){
boolok=true;
MSGIDresponseTo=m.header()->id;/*从message提取id信息用于绑定到dbresponse.responseTo*/
DbMessaged(m);/*对Message进行封装,从而初始化DbMessage实例*/
QueryMessageq(d);
auto_ptr<Message>resp(newMessage());
CurOp&op=*(c.curop());//获取当前Client线程对象执行的操作(支持线程安全)
try{
dbresponse.exhaust=runQuery(m,q,op,*resp);/*执行查询*/
assert(!resp->empty());
}
catch(AssertionException&e){
......
}
......
dbresponse.response=resp.release();
dbresponse.responseTo=responseTo;
returnok;
}
上面代码主要实始化一些查询对象,包括数据库操作消息(用于数据库/服务协议),查询结果消息(QueryMessage,即运行查询的请求所接收到的来自数据库的信息)等。最后运行runQuery(m,q,op,*resp)开始查询(方法位于query.cpp),该方法代码较长,主要功能如下:
constchar*runQuery(Message&m,QueryMessage&q,CurOp&curop,Message&result){
StringBuilder&ss=curop.debug().str;
//构造ParsedQuery查询对象,该对象包括查询记录数字,以及记录跳转偏移量等信息,这些值会在访问磁盘查询时使用,用法参见:query.cpp662行的virtualvoid_init()方法
shared_ptr
ParsedQuery&pq(*pq_shared);
......
//对查询命令判断,指令形如abc.$cmd.findOne({ismaster:1})
if(pq.couldBeCommand()){
BufBuilderbb;
bb.skip(sizeof(QueryResult));
BSONObjBuildercmdResBuf;
//对查询权限判断,并执行相应查询指令
if(runCommands(ns,jsobj,curop,bb,cmdResBuf,false,queryOptions)){
ss<<"command:";
jsobj.toString(ss);
curop.markCommand();
auto_ptr<QueryResult>qr;
qr.reset((QueryResult*)bb.buf());
bb.decouple();
qr->setResultFlagsToOk();
qr->len=bb.len();
ss<<"reslen:"<<bb.len();
qr->setOperation(opReply);
qr->cursorId=0;
qr->startingFrom=0;
qr->nReturned=1;
result.setData(qr.release(),true);
}
else{
uasserted(13530,"badormalformedcommandrequest?");
}
return0;
}
/*普通查询分支(非指令式操作,也就是我们用c#客户端链接查询方式)*/
......
BSONObjorder=pq.getOrder();
BSONObjquery=pq.getFilter();
/*对查询对象大小进行判断,过滤错误的查询对象(为0)*/
if(query.objsize()==0){
out()<<"Badqueryobject?\njsobj:";
out()<<jsobj.toString()<<"\nquery:";
out()<<query.toString()<<endl;
uassert(10110,"badqueryobject",false);
}
/*声明读锁*/
mongolocklk(false);
Client::Contextctx(ns,dbpath,&lk);
......
/*对查询对象及选项进行过滤,比如:ns,_id值,索引等*/
if(!(explain||pq.showDiskLoc())&&isSimpleIdQuery(query)&&!pq.hasOption(QueryOption_CursorTailable)){
boolnsFound=false;
boolindexFound=false;
BSONObjresObject;
Client&c=cc();
boolfound=Helpers::findById(c,ns,query,resObject,&nsFound,&indexFound);
if(nsFound==false||indexFound==true){
......
returnfalse;
}
}
.....
//定义扫描器(类定义位于queryoptimizer.h),在看代码过程中发现MultiPlanScanner主要用于提供$or查询支持,
//语法形如:db.foo.find({name:"bob",$or:[{a:1},{b:2}]})
//更多内容参见:http://www.mongodb.org/display/DOCS/OR+operations+in+query+expressions
auto_ptr<MultiPlanScanner>mps(newMultiPlanScanner(ns,query,order,&hint,!explain,pq.getMin(),pq.getMax(),false,true));
.....
ExplainBuildereb;
UserQueryOporiginal(pq,result,eb,curop);
shared_ptr<UserQueryOp>o=mps->runOp(original);/*执行查询*/
UserQueryOp&dqo=*o;
if(!dqo.complete())
throwMsgAssertionException(dqo.exception());
if(explain){
dqo.finishExplain(explainSuffix);
}
......
/*设置查询结果*/
QueryResult*qr=(QueryResult*)result.header();
qr->cursorId=cursorid;
qr->setResultFlagsToOk();
//qr->lenisupdatedautomaticallybyappendData()
ss<<"reslen:"<<qr->len;
qr->setOperation(opReply);
qr->startingFrom=0;
qr->nReturned=n;
/*查询耗时统计*/
intduration=curop.elapsedMillis();
booldbprofile=curop.shouldDBProfile(duration);
if(dbprofile||duration>=cmdLine.slowMS){
ss<<"nscanned:"<<nscanned<<'';
if(ntoskip)
ss<<"ntoskip:"<<ntoskip;
if(dbprofile)
ss<<"\nquery:";
ss<<jsobj.toString()<<'';
}
ss<<"nreturned:"<<n;
returnexhaust;
}
读到这里,发现系统除了把查询消息之类的信息一股脑塞给了MultiPlanScanner之后就运行了runOP方法之外,竟然还没看到mongodb是如果查询数据库文件的,看来mongodb还真挺卖关子,没变法,只有继续往下挖代码,下面是runOP方法代码(位于queryoptimizer.cpp文件730行):
shared_ptr<QueryOp>MultiPlanScanner::runOp(QueryOp&op){
shared_ptr<QueryOp>ret=runOpOnce(op);/*先运行一次查询*/
while(!ret->stopRequested()&&mayRunMore()){/*当前查询请求未停止并且有$or查询关键字时*/
ret=runOpOnce(*ret);//再次运行查询
}
returnret;
}
看来runOpOnce方法是用于进行单次非or查询的,看一下代码就明白了,如下:
shared_ptr<QueryOp>MultiPlanScanner::runOpOnce(QueryOp&op){
massert(13271,"can'trunmoreops",mayRunMore());
if(!_or){/*如当前查询不是or,则运行*/
++_i;
return_currentQps->runOp(op);
}
++_i;
auto_ptr<FieldRangeSet>frs(_fros.topFrs());/*(表)字段对象集合*/
auto_ptr<FieldRangeSet>originalFrs(_fros.topFrsOriginal());
BSONElementhintElt=_hint.firstElement();
//创建查询计划集合
_currentQps.reset(newQueryPlanSet(_ns,frs,originalFrs,_query,BSONObj(),&hintElt,_honorRecordedPlan,BSONObj(),BSONObj(),_bestGuessOnly,_mayYield));
//设置查询计划要调用的查询方法及相关参数
shared_ptr<QueryOp>ret(_currentQps->runOp(op));
if(ret->qp().willScanTable()){/*设置表扫描标识*/
_tableScanned=true;
}
//pop出or谓词/子句
_fros.popOrClause(ret->qp().indexed()?ret->qp().indexKey():BSONObj());
returnret;
}
上面方面最终都是调用_currentQps->runOp(op)来执行查询操作,下面就是方法的代码:
shared_ptr<QueryOp>QueryPlanSet::runOp(QueryOp&op){
if(_usingPrerecordedPlan){/*该变量貌似“是否使用预先记录的计划”,也就是索引*/
Runnerr(*this,op);
shared_ptr<QueryOp>res=r.run();
......
}
Runnerr(*this,op);
returnr.run();
}
上面代码主要是定义声明Runner实例并运行它,Runner本身为strcut类型,主要是用于对执行步骤进行封装(形成依次执行的操作流),这里不再多述了。下面是其r.run()方法的定义:
shared_ptr<QueryOp>QueryPlanSet::Runner::run(){
......
for(vector<shared_ptr<QueryOp>>::iteratori=ops.begin();i!=ops.end();++i){
initOp(**i);//初始化操作,声明如下
if((*i)->complete())
return*i;
}
......
}
voidQueryPlanSet::Runner::initOp(QueryOp&op){
GUARD_OP_EXCEPTION(op,op.init());
}
上面op.init操作主要最终会执行下面方法(位于query.cpp662行),该方法会用查询条件构造一个游标,该游标记录着遍历数据集方式,查询起始位置等信息等
virtualvoid_init(){
......
_c=qp().newCursor(DiskLoc(),_pq.getNumToReturn()+_pq.getSkip());/*构造*/
_capped=_c->capped();
//setupcheckforifwecanonlyuseindextoextract
if(_c->modifiedKeys()==false&&_c->isMultiKey()==false&&_pq.getFields()){
_keyFieldsOnly.reset(_pq.getFields()->checkKey(_c->indexKeyPattern()));
}
}
......
}
下面是其函数的代码(queryoptimizer.cpp168行):
shared_ptr
.....
if(!_index){//非索引扫描
if(_fbs.nNontrivialRanges())
checkTableScanAllowed(_fbs.ns());
returnfindTableScan(_fbs.ns(),_order,startLoc);/*进行表扫描*/
}
.....
}
findTableScan方法(pdfile.cpp687行)即开始表扫描指定磁盘位置信息,并根据相关条件指定相应类型的游标信息。
shared_ptr
BSONElementel=order.getField("$natural");//e.g.,{$natural:-1}
if(el.number()>=0)
returnDataFileMgr::findAll(ns,startLoc);/*startLoc开始位置*/
......
}
返回的游标类型为Cursor,但findAll方法里构造的是BasicCursor,相应代码(pdfile.cpp639行):
shared_ptr
NamespaceDetails*d=nsdetails(ns);
if(!d)
returnshared_ptr
.....
returnshared_ptr
}
BasicCursor构造函数比较有意思,其引入了AdvanceStrategy对象指针,这个策略指针定义访问物理磁盘文件的方式,其操作单元是DiskLoc(DiskLoc实例对象实际是一个双向链接),访问方法虽然只有next一种,但mongodb却用它实现了向前和后转两种访问方式(详情参见cursor.cpp),如下:
BasicCursor(DiskLocdl,constAdvanceStrategy*_s=forward()):curr(dl),s(_s),_nscanned(){
incNscanned();
init();
}
/*thesewillbeusedoutsideofmutexes-reallyfunctors-thustheconst*/
classForward:publicAdvanceStrategy{
virtualDiskLocnext(constDiskLoc&prev)const{
returnprev.rec()->getNext(prev);
}
}_forward;
classReverse:publicAdvanceStrategy{
virtualDiskLocnext(constDiskLoc&prev)const{
returnprev.rec()->getPrev(prev);
}
}_reverse;
上面的prev.rec()方法调用最终会执行下面函数调用流程:
//pdfile.h
inlineRecord*DiskLoc::rec()const{
returnDataFileMgr::getRecord(*this);
}
inlineRecord*DataFileMgr::getRecord(constDiskLoc&dl){
assert(dl.a()!=-1);
returncc().database()->getFile(dl.a())->recordAt(dl);
}
而最后“cc().database()->getFile(dl.a())->recordAt(dl)”方法会最终从数据库文件mongodfile中获取记录信息(详见database.cpp):
//pdfile.h
inlineRecord*MongoDataFile::recordAt(DiskLocdl){
intofs=dl.getOfs();
if(ofs<DataFileHeader::HeaderSize)badOfs(ofs);//willuassert-externalcalltokeepoutofthenormalcodepath
return(Record*)(p()+ofs);
}
兜了一大圈,头都快大了,不是吗?呵呵。另外Record,DiskLoc这两个与数据访问/存储相关类以后会抽时间介绍。
好了,今天的内容到这里就告一段落了,在接下来的文章中,将会介绍客户端发起Insert操作时,Mongodb的执行流程和B树的相应部分实现。
原文链接:http://www.cnblogs.com/daizhj/archive/2011/03/18/1988288.html
作者:daizhj,代震军
微博:http://t.sina.com.cn/daizhj
Tags:mongodb,c++,sourcecode
本文内容总结:
原文链接:https://www.cnblogs.com/daizhj/archive/2011/03/18/1988288.html