Python实现的HTTP并发测试完整示例
可修改变量thread_count指定最大的并发数量,即线程的数量。
完成之后,打印输出失败的次数,以及开始时间和结束时间,单位是毫秒。
主要是学习一下Python,仅供参考。
#!/usr/bin/python3 importsys,time,json,_thread importhttp.client,urllib.parse thread_count=100#并发数量 now_count=0 error_count=0 begin_time='' lock_obj=_thread.allocate() deftest_http_engine(): globalnow_count globalerror_count globalthread_count globalbegin_time conn=None ifnow_count==0: begin_time=int(round(time.time()*1000)) try: conn=http.client.HTTPConnection("192.168.1.1",80) conn.request('GET','/') response=conn.getresponse() data=response.read() print(data) ifjson.dumps(response.status)!='200': error_count+=1; print('errorcount:'+str(error_count)) sys.stdout.flush() now_count+=1 ifnow_count==thread_count: print('###errorcount:'+str(error_count)+'###') print('###begintime:'+str(begin_time)) print('###endtime:'+str(int(round(time.time()*1000)))) exceptExceptionase: print(e) finally: ifconn: conn.close() deftest_thread_func(): globalnow_count globallock_obj cnt=0 lock_obj.acquire() print('') print('===Request:'+str(now_count)+'===') cnt+=1 test_http_engine() sys.stdout.flush() lock_obj.release() deftest_main(): globalthread_count foriinrange(thread_count): _thread.start_new_thread(test_thread_func,()) if__name__=='__main__': test_main() whileTrue: time.sleep(5)