Python实现大文件排序的方法
本文实例讲述了Python实现大文件排序的方法。分享给大家供大家参考。具体实现方法如下:
importgzip importos frommultiprocessingimportProcess,Queue,Pipe,current_process,freeze_support fromdatetimeimportdatetime defsort_worker(input,output): whileTrue: lines=input.get().splitlines() element_set={} forlineinlines: ifline.strip()=='STOP': return try: element=line.split('')[0] ifnotelement_set.get(element):element_set[element]='' except: pass sorted_element=sorted(element_set) #printsorted_element output.put('\n'.join(sorted_element)) defwrite_worker(input,pre): os.system('mkdir%s'%pre) i=0 whileTrue: content=input.get() ifcontent.strip()=='STOP': return write_sorted_bulk(content,'%s/%s'%(pre,i)) i+=1 defwrite_sorted_bulk(content,filename): f=file(filename,'w') f.write(content) f.close() defsplit_sort_file(filename,num_sort=3,buf_size=65536*64*4): t=datetime.now() pre,ext=os.path.splitext(filename) ifext=='.gz': file_file=gzip.open(filename,'rb') else: file_file=open(filename) bulk_queue=Queue(10) sorted_queue=Queue(10) NUM_SORT=num_sort sort_worker_pool=[] foriinrange(NUM_SORT): sort_worker_pool.append(Process(target=sort_worker,args=(bulk_queue,sorted_queue))) sort_worker_pool[i].start() NUM_WRITE=1 write_worker_pool=[] foriinrange(NUM_WRITE): write_worker_pool.append(Process(target=write_worker,args=(sorted_queue,pre))) write_worker_pool[i].start() buf=file_file.read(buf_size) sorted_count=0 whilelen(buf): end_line=buf.rfind('\n') #printbuf[:end_line+1] bulk_queue.put(buf[:end_line+1]) sorted_count+=1 ifend_line!=-1: buf=buf[end_line+1:]+file_file.read(buf_size) else: buf=file_file.read(buf_size) foriinrange(NUM_SORT): bulk_queue.put('STOP') foriinrange(NUM_SORT): sort_worker_pool[i].join() foriinrange(NUM_WRITE): sorted_queue.put('STOP') foriinrange(NUM_WRITE): write_worker_pool[i].join() print'elasped',datetime.now()-t returnsorted_count fromheapqimportheappush,heappop fromdatetimeimportdatetime frommultiprocessingimportProcess,Queue,Pipe,current_process,freeze_support importos classfile_heap: def__init__(self,dir,idx=0,count=1): files=os.listdir(dir) self.heap=[] self.files={} self.bulks={} self.pre_element=None foriinrange(len(files)): file=files[i] ifhash(file)%count!=idx:continue input=open(os.path.join(dir,file)) self.files[i]=input self.bulks[i]='' heappush(self.heap,(self.get_next_element_buffered(i),i)) defget_next_element_buffered(self,i): iflen(self.bulks[i])<256: ifself.files[i]isnotNone: buf=self.files[i].read(65536) ifbuf: self.bulks[i]+=buf else: self.files[i].close() self.files[i]=None end_line=self.bulks[i].find('\n') ifend_line==-1: end_line=len(self.bulks[i]) element=self.bulks[i][:end_line] self.bulks[i]=self.bulks[i][end_line+1:] returnelement defpoppush_uniq(self): whileTrue: element=self.poppush() ifelementisNone: returnNone ifelement!=self.pre_element: self.pre_element=element returnelement defpoppush(self): try: element,index=heappop(self.heap) exceptIndexError: returnNone new_element=self.get_next_element_buffered(index) ifnew_element: heappush(self.heap,(new_element,index)) returnelement defheappoppush(dir,queue,idx=0,count=1): heap=file_heap(dir,idx,count) whileTrue: d=heap.poppush_uniq() queue.put(d) ifdisNone:return defheappoppush2(dir,queue,count=1): heap=[] procs=[] queues=[] pre_element=None foriinrange(count): q=Queue(1024) q_buf=queue_buffer(q) queues.append(q_buf) p=Process(target=heappoppush,args=(dir,q_buf,i,count)) procs.append(p) p.start() queues=tuple(queues) foriinrange(count): heappush(heap,(queues[i].get(),i)) whileTrue: try: d,i=heappop(heap) exceptIndexError: queue.put(None) forpinprocs: p.join() return else: ifdisnotNone: heappush(heap,(queues[i].get(),i)) ifd!=pre_element: pre_element=d queue.put(d) defmerge_file(dir): heap=file_heap(dir) os.system('rm-f'+dir+'.merge') fmerge=open(dir+'.merge','a') element=heap.poppush_uniq() fmerge.write(element+'\n') whileelementisnotNone: element=heap.poppush_uniq() fmerge.write(element+'\n') classqueue_buffer: def__init__(self,queue): self.q=queue self.rbuf=[] self.wbuf=[] defget(self): iflen(self.rbuf)==0: self.rbuf=self.q.get() r=self.rbuf[0] delself.rbuf[0] returnr defput(self,d): self.wbuf.append(d) ifdisNoneorlen(self.wbuf)>1024: self.q.put(self.wbuf) self.wbuf=[] defdiff_file(file_old,file_new,file_diff,buf=268435456): print'buffersize',buf fromfile_splitimportsplit_sort_file os.system('rm-rf'+os.path.splitext(file_old)[0]) os.system('rm-rf'+os.path.splitext(file_new)[0]) t=datetime.now() split_sort_file(file_old,5,buf) split_sort_file(file_new,5,buf) print'splitelasped',datetime.now()-t os.system('cat%s/*|wc-l'%os.path.splitext(file_old)[0]) os.system('cat%s/*|wc-l'%os.path.splitext(file_new)[0]) os.system('rm-f'+file_diff) t=datetime.now() zdiff=open(file_diff,'a') old_q=Queue(1024) new_q=Queue(1024) old_queue=queue_buffer(old_q) new_queue=queue_buffer(new_q) h1=Process(target=heappoppush2,args=(os.path.splitext(file_old)[0],old_queue,3)) h2=Process(target=heappoppush2,args=(os.path.splitext(file_new)[0],new_queue,3)) h1.start(),h2.start() old=old_queue.get() new=new_queue.get() old_count,new_count=0,0 whileoldisnotNoneornewisnotNone: ifold>neworoldisNone: zdiff.write('<'+new+'\n') new=new_queue.get() new_count+=1 elifold<newornewisNone: zdiff.write('>'+old+'\n') old=old_queue.get() old_count+=1 else: old=old_queue.get() new=new_queue.get() print'new_count:',new_count print'old_count:',old_count print'diffelasped',datetime.now()-t h1.join(),h2.join()
希望本文所述对大家的Python程序设计有所帮助。