python 串行执行和并行执行实例
我就废话不多说了,大家还是直接看代码吧!
#coding=utf-8
importthreading
importtime
importcx_Oracle
frompprintimportpprint
importcsv
printtime.asctime()
table_name="dbtest.csv"
f=open(table_name+".csv","w")
conn=cx_Oracle.connect('tlcbuser/tlcbuser@20.5.101.31/tlyy')
cursor=conn.cursor()
defquery01():
tname=threading.current_thread()
aa=10
#cursor.execute("SELECT*FROMTEST100WHEREID=%s",[aa])
cursor.prepare("""SELECT*FROMTEST100WHEREID=:id""")
cursor.execute(None,{'id':aa})
row=cursor.fetchall()
printrow
returnrow
cursor.close()
defquery02():
tname=threading.current_thread()
aa=20
#cursor.execute("SELECT*FROMTEST100WHEREID=%s",[aa])
cursor.prepare("""SELECT*FROMTEST100WHEREID=:id""")
cursor.execute(None,{'id':aa})
row=cursor.fetchall()
printrow
returnrow
cursor.close()
defquery03():
tname=threading.current_thread()
aa=30
#cursor.execute("SELECT*FROMTEST100WHEREID=%s",[aa])
cursor.prepare("""SELECT*FROMTEST100WHEREID=:id""")
cursor.execute(None,{'id':aa})
row=cursor.fetchall()
printrow
returnrow
cursor.close()
defquery04():
tname=threading.current_thread()
aa=40
#cursor.execute("SELECT*FROMTEST100WHEREID=%s",[aa])
cursor.prepare("""SELECT*FROMTEST100WHEREID=:id""")
cursor.execute(None,{'id':aa})
row=cursor.fetchall()
printrow
returnrow
defquery05():
tname=threading.current_thread()
aa=50
#cursor.execute("SELECT*FROMTEST100WHEREID=%s",[aa])
cursor.prepare("""SELECT*FROMTEST100WHEREID=:id""")
cursor.execute(None,{'id':aa})
row=cursor.fetchall()
printrow
returnrow
cursor.close()
defquery06():
tname=threading.current_thread()
aa=60
#cursor.execute("SELECT*FROMTEST100WHEREID=%s",[aa])
cursor.prepare("""SELECT*FROMTEST100WHEREID=:id""")
cursor.execute(None,{'id':aa})
row=cursor.fetchall()
printrow
returnrow
cursor.close()
defquery07():
tname=threading.current_thread()
aa=70
#cursor.execute("SELECT*FROMTEST100WHEREID=%s",[aa])
cursor.prepare("""SELECT*FROMTEST100WHEREID=:id""")
cursor.execute(None,{'id':aa})
row=cursor.fetchall()
returnrow
cursor.close()
defquery08():
tname=threading.current_thread()
aa=80
#cursor.execute("SELECT*FROMTEST100WHEREID=%s",[aa])
cursor.prepare("""SELECT*FROMTEST100WHEREID=:id""")
cursor.execute(None,{'id':aa})
row=cursor.fetchall()
printrow
returnrow
cursor.close()
defquery09():
tname=threading.current_thread()
aa=90
#cursor.execute("SELECT*FROMTEST100WHEREID=%s",[aa])
cursor.prepare("""SELECT*FROMTEST100WHEREID=:id""")
cursor.execute(None,{'id':aa})
row=cursor.fetchall()
printrow
returnrow
cursor.close()
defquery10():
tname=threading.current_thread()
aa=100
#cursor.execute("SELECT*FROMTEST100WHEREID=%s",[aa])
cursor.prepare("""SELECT*FROMTEST100WHEREID=:id""")
cursor.execute(None,{'id':aa})
row=cursor.fetchall()
printrow
returnrow
cursor.close()
threads=[]
t1=threading.Thread(target=query01,name='query01')
threads.append(t1)
t2=threading.Thread(target=query02,name='query02')
threads.append(t2)
t2=threading.Thread(target=query03,name='query03')
threads.append(t2)
t2=threading.Thread(target=query04,name='query04')
threads.append(t2)
t2=threading.Thread(target=query05,name='query05')
threads.append(t2)
t2=threading.Thread(target=query06,name='query06')
threads.append(t2)
t2=threading.Thread(target=query07,name='query07')
threads.append(t2)
t2=threading.Thread(target=query08,name='query08')
threads.append(t2)
t2=threading.Thread(target=query09,name='query09')
threads.append(t2)
t2=threading.Thread(target=query10,name='query10')
threads.append(t2)
if__name__=='__main__':
fortinthreads:
#t.setDaemon(True)
t.start()
#t.run()
#t.start()
#print'3333333'
printthreading.current_thread()
#printt.is_alive()
#print'3333333'
t.join()
print"allover"
printtime.asctime()
C:\Python27\python.exeC:/Users/tlcb/PycharmProjects/untitled/a2.py
WedMar2811:08:192018
<_MainThread(MainThread,started18744)>
[(10,'10boobook10','10aaaaaaaaaaaa10','10bbbbbbbbbbbbbbbbb10'),(10,'10sssssssss10','tlcb','tlcb'),(10,'10boobook10','10aaaaaaaaaaaa10','10bbbbbbbbbbbbbbbbb10')]
<_MainThread(MainThread,started18744)>
[(20,'20boobook20','20aaaaaaaaaaaa20','20bbbbbbbbbbbbbbbbb20'),(20,'20boobook20','20aaaaaaaaaaaa20','20bbbbbbbbbbbbbbbbb20'),(20,'20boobook20','20aaaaaaaaaaaa20','20bbbbbbbbbbbbbbbbb20')]
<_MainThread(MainThread,started18744)>
[(30,'30boobook30','30aaaaaaaaaaaa30','30bbbbbbbbbbbbbbbbb30'),(30,'30boobook30','30aaaaaaaaaaaa30','30bbbbbbbbbbbbbbbbb30'),(30,'30boobook30','30aaaaaaaaaaaa30','30bbbbbbbbbbbbbbbbb30')]
<_MainThread(MainThread,started18744)>
[(40,'40boobook40','40aaaaaaaaaaaa40','40bbbbbbbbbbbbbbbbb40'),(40,'40boobook40','40aaaaaaaaaaaa40','40bbbbbbbbbbbbbbbbb40'),(40,'40boobook40','40aaaaaaaaaaaa40','40bbbbbbbbbbbbbbbbb40')]
<_MainThread(MainThread,started18744)>
[(50,'50boobook50','50aaaaaaaaaaaa50','50bbbbbbbbbbbbbbbbb50'),(50,'50boobook50','50aaaaaaaaaaaa50','50bbbbbbbbbbbbbbbbb50'),(50,'50boobook50','50aaaaaaaaaaaa50','50bbbbbbbbbbbbbbbbb50')]
<_MainThread(MainThread,started18744)>
[(60,'60boobook60','60aaaaaaaaaaaa60','60bbbbbbbbbbbbbbbbb60'),(60,'60boobook60','60aaaaaaaaaaaa60','60bbbbbbbbbbbbbbbbb60'),(60,'60boobook60','60aaaaaaaaaaaa60','60bbbbbbbbbbbbbbbbb60')]
<_MainThread(MainThread,started18744)>
<_MainThread(MainThread,started18744)>
[(80,'80boobook80','80aaaaaaaaaaaa80','80bbbbbbbbbbbbbbbbb80'),(80,'80boobook80','80aaaaaaaaaaaa80','80bbbbbbbbbbbbbbbbb80'),(80,'80boobook80','80aaaaaaaaaaaa80','80bbbbbbbbbbbbbbbbb80')]
<_MainThread(MainThread,started18744)>
[(90,'90boobook90','90aaaaaaaaaaaa90','90bbbbbbbbbbbbbbbbb90'),(90,'90boobook90','90aaaaaaaaaaaa90','90bbbbbbbbbbbbbbbbb90'),(90,'90boobook90','90aaaaaaaaaaaa90','90bbbbbbbbbbbbbbbbb90')]
<_MainThread(MainThread,started18744)>
[(100,'100boobook100','100aaaaaaaaaaaa100','100bbbbbbbbbbbbbbbbb100'),(100,'100boobook100','100aaaaaaaaaaaa100','100bbbbbbbbbbbbbbbbb100'),(100,'100boobook100','100aaaaaaaaaaaa100','100bbbbbbbbbbbbbbbbb100')]
allover
WedMar2811:08:342018
Processfinishedwithexitcode0
这个时候是串行花费了15秒
多线程跑:
#coding=utf-8
importthreading
importtime
importcx_Oracle
frompprintimportpprint
importcsv
printtime.asctime()
table_name="dbtest.csv"
f=open(table_name+".csv","w")
defquery01():
tname=threading.current_thread()
aa=10
conn=cx_Oracle.connect('tlcbuser/tlcbuser@20.5.101.31/tlyy')
cursor=conn.cursor()
#cursor.execute("SELECT*FROMTEST100WHEREID=%s",[aa])
cursor.prepare("""SELECT*FROMTEST100WHEREID=:id""")
cursor.execute(None,{'id':aa})
row=cursor.fetchall()
printrow
returnrow
cursor.close()
conn.close()
defquery02():
tname=threading.current_thread()
aa=20
conn=cx_Oracle.connect('tlcbuser/tlcbuser@20.5.101.31/tlyy')
cursor=conn.cursor()
#cursor.execute("SELECT*FROMTEST100WHEREID=%s",[aa])
cursor.prepare("""SELECT*FROMTEST100WHEREID=:id""")
cursor.execute(None,{'id':aa})
row=cursor.fetchall()
printrow
returnrow
cursor.close()
conn.close()
defquery03():
tname=threading.current_thread()
aa=30
conn=cx_Oracle.connect('tlcbuser/tlcbuser@20.5.101.31/tlyy')
cursor=conn.cursor()
#cursor.execute("SELECT*FROMTEST100WHEREID=%s",[aa])
cursor.prepare("""SELECT*FROMTEST100WHEREID=:id""")
cursor.execute(None,{'id':aa})
row=cursor.fetchall()
printrow
returnrow
cursor.close()
conn.close()
defquery04():
conn=cx_Oracle.connect('tlcbuser/tlcbuser@20.5.101.31/tlyy')
cursor=conn.cursor()
tname=threading.current_thread()
aa=40
#cursor.execute("SELECT*FROMTEST100WHEREID=%s",[aa])
cursor.prepare("""SELECT*FROMTEST100WHEREID=:id""")
cursor.execute(None,{'id':aa})
row=cursor.fetchall()
printrow
returnrow
cursor.close()
conn.close()
defquery05():
conn=cx_Oracle.connect('tlcbuser/tlcbuser@20.5.101.31/tlyy')
cursor=conn.cursor()
tname=threading.current_thread()
aa=50
#cursor.execute("SELECT*FROMTEST100WHEREID=%s",[aa])
cursor.prepare("""SELECT*FROMTEST100WHEREID=:id""")
cursor.execute(None,{'id':aa})
row=cursor.fetchall()
printrow
returnrow
cursor.close()
conn.close()
defquery06():
conn=cx_Oracle.connect('tlcbuser/tlcbuser@20.5.101.31/tlyy')
cursor=conn.cursor()
tname=threading.current_thread()
aa=60
#cursor.execute("SELECT*FROMTEST100WHEREID=%s",[aa])
cursor.prepare("""SELECT*FROMTEST100WHEREID=:id""")
cursor.execute(None,{'id':aa})
row=cursor.fetchall()
printrow
returnrow
cursor.close()
conn.close()
defquery07():
conn=cx_Oracle.connect('tlcbuser/tlcbuser@20.5.101.31/tlyy')
cursor=conn.cursor()
tname=threading.current_thread()
aa=70
#cursor.execute("SELECT*FROMTEST100WHEREID=%s",[aa])
cursor.prepare("""SELECT*FROMTEST100WHEREID=:id""")
cursor.execute(None,{'id':aa})
row=cursor.fetchall()
returnrow
cursor.close()
conn.close()
defquery08():
conn=cx_Oracle.connect('tlcbuser/tlcbuser@20.5.101.31/tlyy')
cursor=conn.cursor()
tname=threading.current_thread()
aa=80
#cursor.execute("SELECT*FROMTEST100WHEREID=%s",[aa])
cursor.prepare("""SELECT*FROMTEST100WHEREID=:id""")
cursor.execute(None,{'id':aa})
row=cursor.fetchall()
printrow
returnrow
cursor.close()
conn.close()
defquery09():
conn=cx_Oracle.connect('tlcbuser/tlcbuser@20.5.101.31/tlyy')
cursor=conn.cursor()
tname=threading.current_thread()
aa=90
#cursor.execute("SELECT*FROMTEST100WHEREID=%s",[aa])
cursor.prepare("""SELECT*FROMTEST100WHEREID=:id""")
cursor.execute(None,{'id':aa})
row=cursor.fetchall()
printrow
returnrow
cursor.close()
conn.close()
defquery10():
conn=cx_Oracle.connect('tlcbuser/tlcbuser@20.5.101.31/tlyy')
cursor=conn.cursor()
tname=threading.current_thread()
aa=100
#cursor.execute("SELECT*FROMTEST100WHEREID=%s",[aa])
cursor.prepare("""SELECT*FROMTEST100WHEREID=:id""")
cursor.execute(None,{'id':aa})
row=cursor.fetchall()
printrow
returnrow
cursor.close()
conn.close()
threads=[]
t1=threading.Thread(target=query01,name='query01')
threads.append(t1)
t2=threading.Thread(target=query02,name='query02')
threads.append(t2)
t2=threading.Thread(target=query03,name='query03')
threads.append(t2)
t2=threading.Thread(target=query04,name='query04')
threads.append(t2)
t2=threading.Thread(target=query05,name='query05')
threads.append(t2)
t2=threading.Thread(target=query06,name='query06')
threads.append(t2)
t2=threading.Thread(target=query07,name='query07')
threads.append(t2)
t2=threading.Thread(target=query08,name='query08')
threads.append(t2)
t2=threading.Thread(target=query09,name='query09')
threads.append(t2)
t2=threading.Thread(target=query10,name='query10')
threads.append(t2)
if__name__=='__main__':
fortinthreads:
#t.setDaemon(True)
t.start()
#t.run()
#t.start()
#print'3333333'
printthreading.current_thread()
#printt.is_alive()
#print'3333333'
t.join()
print"allover"
printtime.asctime()
C:\Python27\python.exeC:/Users/tlcb/PycharmProjects/untitled/a2.py
WedMar2811:12:472018
<_MainThread(MainThread,started22500)>
<_MainThread(MainThread,started22500)>
<_MainThread(MainThread,started22500)>
<_MainThread(MainThread,started22500)>
<_MainThread(MainThread,started22500)>
<_MainThread(MainThread,started22500)>
<_MainThread(MainThread,started22500)>
<_MainThread(MainThread,started22500)>
<_MainThread(MainThread,started22500)>
<_MainThread(MainThread,started22500)>
[(40,'40boobook40','40aaaaaaaaaaaa40','40bbbbbbbbbbbbbbbbb40'),(40,'40boobook40','40aaaaaaaaaaaa40','40bbbbbbbbbbbbbbbbb40'),(40,'40boobook40','40aaaaaaaaaaaa40','40bbbbbbbbbbbbbbbbb40')]
[(60,'60boobook60','60aaaaaaaaaaaa60','60bbbbbbbbbbbbbbbbb60'),(60,'60boobook60','60aaaaaaaaaaaa60','60bbbbbbbbbbbbbbbbb60'),(60,'60boobook60','60aaaaaaaaaaaa60','60bbbbbbbbbbbbbbbbb60')]
[(80,'80boobook80','80aaaaaaaaaaaa80','80bbbbbbbbbbbbbbbbb80'),(80,'80boobook80','80aaaaaaaaaaaa80','80bbbbbbbbbbbbbbbbb80'),(80,'80boobook80','80aaaaaaaaaaaa80','80bbbbbbbbbbbbbbbbb80')]
[(50,'50boobook50','50aaaaaaaaaaaa50','50bbbbbbbbbbbbbbbbb50'),(50,'50boobook50','50aaaaaaaaaaaa50','50bbbbbbbbbbbbbbbbb50'),(50,'50boobook50','50aaaaaaaaaaaa50','50bbbbbbbbbbbbbbbbb50')]
[(10,'10boobook10','10aaaaaaaaaaaa10','10bbbbbbbbbbbbbbbbb10'),(10,'10sssssssss10','tlcb','tlcb'),(10,'10boobook10','10aaaaaaaaaaaa10','10bbbbbbbbbbbbbbbbb10')]
[(20,'20boobook20','20aaaaaaaaaaaa20','20bbbbbbbbbbbbbbbbb20'),(20,'20boobook20','20aaaaaaaaaaaa20','20bbbbbbbbbbbbbbbbb20'),(20,'20boobook20','20aaaaaaaaaaaa20','20bbbbbbbbbbbbbbbbb20')]
[(30,'30boobook30','30aaaaaaaaaaaa30','30bbbbbbbbbbbbbbbbb30'),(30,'30boobook30','30aaaaaaaaaaaa30','30bbbbbbbbbbbbbbbbb30'),(30,'30boobook30','30aaaaaaaaaaaa30','30bbbbbbbbbbbbbbbbb30')]
[(100,'100boobook100','100aaaaaaaaaaaa100','100bbbbbbbbbbbbbbbbb100'),(100,'100boobook100','100aaaaaaaaaaaa100','100bbbbbbbbbbbbbbbbb100'),(100,'100boobook100','100aaaaaaaaaaaa100','100bbbbbbbbbbbbbbbbb100')]
[(90,'90boobook90','90aaaaaaaaaaaa90','90bbbbbbbbbbbbbbbbb90'),(90,'90boobook90','90aaaaaaaaaaaa90','90bbbbbbbbbbbbbbbbb90'),(90,'90boobook90','90aaaaaaaaaaaa90','90bbbbbbbbbbbbbbbbb90')]
allover
WedMar2811:12:552018
Processfinishedwithexitcode0
此时花了8秒
补充知识:pythonlogging定制logstash的json日志格式
最近一直在折腾日志的收集,现在算是收尾了。写一篇算python优化logstash的方案。
其实大家都知道logstash调用grok来解析日志的话,是要消耗cpu的成本的,毕竟是需要正则的匹配的。
根据logstash调优的方案,咱们可以预先生成json的格式。我这边基本是python的程序,怎么搞尼?
有两种方法,第一种方法是生成json后,直接打入logstash的端口。还有一种是生成json写入文件,让logstash做tail操作的时候,把一行的日志数据直接载入json就可以了。
python下的日志调试用得时logging,改成json也是很好改得。另外不少老外已经考虑到这样的需求,已经做了pythonlogstash的模块。
importlogging
importlogstash
importsys
host='localhost'
test_logger=logging.getLogger('python-logstash-logger')
test_logger.setLevel(logging.INFO)
test_logger.addHandler(logstash.LogstashHandler(host,5959,version=1))
#test_logger.addHandler(logstash.TCPLogstashHandler(host,5959,version=1))
test_logger.error('python-logstash:testlogstasherrormessage.')
test_logger.info('python-logstash:testlogstashinfomessage.')
test_logger.warning('python-logstash:testlogstashwarningmessage.')
#addextrafieldtologstashmessage
extra={
'test_string':'pythonversion:'+repr(sys.version_info),
'test_boolean':True,
'test_dict':{'a':1,'b':'c'},
'test_float':1.23,
'test_integer':123,
'test_list':[1,2,'3'],
}
test_logger.info('python-logstash:testextrafields',extra=extra)
python-logstash自带了amqp的方案
importlogging
importlogstash
#AMQPparameters
host='localhost'
username='guest'
password='guest'
exchange='logstash.py'
#getaloggerandsetlogginglevel
test_logger=logging.getLogger('python-logstash-logger')
test_logger.setLevel(logging.INFO)
#addthehandler
test_logger.addHandler(logstash.AMQPLogstashHandler(version=1,
host=host,
durable=True,
username=username,
password=password,
exchange=exchange))
#log
test_logger.error('python-logstash:testlogstasherrormessage.')
test_logger.info('python-logstash:testlogstashinfomessage.')
test_logger.warning('python-logstash:testlogstashwarningmessage.')
try:
1/0
except:
test_logger.exception('python-logstash:testlogstashexceptionwithstacktrace')
不管怎么说,最后生成的格式是这样就可以了。
{
"@source"=>"unknown",
"@type"=>"nginx",
"@tags"=>[],
"@fields"=>{
"remote_addr"=>"192.168.0.1",
"remote_user"=>"-",
"body_bytes_sent"=>"13988",
"request_time"=>"0.122",
"status"=>"200",
"request"=>"GET/some/urlHTTP/1.1",
"request_method"=>"GET",
"http_referrer"=>"http://www.example.org/some/url",
"http_user_agent"=>"Mozilla/5.0(X11;Linuxx86_64)AppleWebKit/537.1(KHTML,likeGecko)Chrome/21.0.1180.79Safari/537.1"
},
"@timestamp"=>"2012-08-23T10:49:14+02:00"
}
我这里简单提一下,这个模块用的不是很满意,我在python下把日志打成了json字符串,我原本以为会像grok那样,在Es里面,我的这条日志是个字段的结构,而不是这个日志都在message里面….我想大家应该明白了我的意思,这样很是不容易在kibana的搜索…
在kibana搜索,我经常上source:xxxANDlevel:INFO结果正像上面描述的那样,整条日志,都在@message里面。
以上这篇python串行执行和并行执行实例就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。