实例分析python3实现并发访问水平切分表
场景说明
假设有一个mysql表被水平切分,分散到多个host中,每个host拥有n个切分表。
如果需要并发去访问这些表,快速得到查询结果,应该怎么做呢?
这里提供一种方案,利用python3的asyncio异步io库及aiomysql异步库去实现这个需求。
代码演示
importlogging
importrandom
importasynciofromaiomysql
importcreate_pool
#假设mysql表分散在8个host,每个host有16张子表
TBLES={"192.168.1.01":"table_000-015",
#000-015表示该ip下的表明从table_000一直连续到table_015
"192.168.1.02":"table_016-031",
"192.168.1.03":"table_032-047",
"192.168.1.04":"table_048-063",
"192.168.1.05":"table_064-079",
"192.168.1.06":"table_080-095",
"192.168.1.07":"table_096-0111",
"192.168.1.08":"table_112-0127",
}
USER="xxx"PASSWD="xxxx"#wrapper函数,用于捕捉异常defquery_wrapper(func):
asyncdefwrapper(*args,**kwargs):
try:
awaitfunc(*args,**kwargs)exceptExceptionase:
print(e)returnwrapper
#实际的sql访问处理函数,通过aiomysql实现异步非阻塞请求@
query_wrapperasyncdefquery_do_something(ip,db,table):
asyncwithcreate_pool(host=ip,db=db,user=USER,password=PASSWD)aspool:
asyncwithpool.get()asconn:
asyncwithconn.cursor()ascur:
sql=("selectxxxfrom{}wherexxxx")
awaitcur.execute(sql.format(table))
res=awaitcur.fetchall()
#thendosomething...#生成sql访问队列,队列的每个元素包含要对某个表进行访问的函数及参数defgen_tasks():
tasks=[]forip,tblsinTBLES.items():
cols=re.split('_|-',tbls)
tblpre="_".join(cols[:-2])
min_num=int(cols[-2])
max_num=int(cols[-1])
fornuminrange(min_num,max_num+1):
tasks.append(
(query_do_something,ip,'your_dbname','{}_{}'.format(tblpre,num))
)
random.shuffle(tasks)
returntasks#按批量运行sql访问请求队列defrun_tasks(tasks,batch_len):
try:
foridxinrange(0,len(tasks),batch_len):
batch_tasks=tasks[idx:idx+batch_len]
logging.info("currentbatch,start_idx:%slen:%s"%(idx,len(batch_tasks)))
foriinrange(0,len(batch_tasks)):
l=batch_tasks[i]
batch_tasks[i]=asyncio.ensure_future(
l[0](*l[1:])
)
loop.run_until_complete(asyncio.gather(*batch_tasks))
exceptExceptionase:
logging.warn(e)#main方法,通过asyncio实现函数异步调用defmain():
loop=asyncio.get_event_loop()
tasks=gen_tasks()
batch_len=len(TBLES.keys())*5#alluptoyou
run_tasks(tasks,batch_len)
loop.close()
以上就是本次相关内容的全部实例代码,大家可以本地测试以下,感谢你对毛票票的支持。