Python实现大文件排序的方法
本文实例讲述了Python实现大文件排序的方法。分享给大家供大家参考。具体实现方法如下:
importgzip
importos
frommultiprocessingimportProcess,Queue,Pipe,current_process,freeze_support
fromdatetimeimportdatetime
defsort_worker(input,output):
whileTrue:
lines=input.get().splitlines()
element_set={}
forlineinlines:
ifline.strip()=='STOP':
return
try:
element=line.split('')[0]
ifnotelement_set.get(element):element_set[element]=''
except:
pass
sorted_element=sorted(element_set)
#printsorted_element
output.put('\n'.join(sorted_element))
defwrite_worker(input,pre):
os.system('mkdir%s'%pre)
i=0
whileTrue:
content=input.get()
ifcontent.strip()=='STOP':
return
write_sorted_bulk(content,'%s/%s'%(pre,i))
i+=1
defwrite_sorted_bulk(content,filename):
f=file(filename,'w')
f.write(content)
f.close()
defsplit_sort_file(filename,num_sort=3,buf_size=65536*64*4):
t=datetime.now()
pre,ext=os.path.splitext(filename)
ifext=='.gz':
file_file=gzip.open(filename,'rb')
else:
file_file=open(filename)
bulk_queue=Queue(10)
sorted_queue=Queue(10)
NUM_SORT=num_sort
sort_worker_pool=[]
foriinrange(NUM_SORT):
sort_worker_pool.append(Process(target=sort_worker,args=(bulk_queue,sorted_queue)))
sort_worker_pool[i].start()
NUM_WRITE=1
write_worker_pool=[]
foriinrange(NUM_WRITE):
write_worker_pool.append(Process(target=write_worker,args=(sorted_queue,pre)))
write_worker_pool[i].start()
buf=file_file.read(buf_size)
sorted_count=0
whilelen(buf):
end_line=buf.rfind('\n')
#printbuf[:end_line+1]
bulk_queue.put(buf[:end_line+1])
sorted_count+=1
ifend_line!=-1:
buf=buf[end_line+1:]+file_file.read(buf_size)
else:
buf=file_file.read(buf_size)
foriinrange(NUM_SORT):
bulk_queue.put('STOP')
foriinrange(NUM_SORT):
sort_worker_pool[i].join()
foriinrange(NUM_WRITE):
sorted_queue.put('STOP')
foriinrange(NUM_WRITE):
write_worker_pool[i].join()
print'elasped',datetime.now()-t
returnsorted_count
fromheapqimportheappush,heappop
fromdatetimeimportdatetime
frommultiprocessingimportProcess,Queue,Pipe,current_process,freeze_support
importos
classfile_heap:
def__init__(self,dir,idx=0,count=1):
files=os.listdir(dir)
self.heap=[]
self.files={}
self.bulks={}
self.pre_element=None
foriinrange(len(files)):
file=files[i]
ifhash(file)%count!=idx:continue
input=open(os.path.join(dir,file))
self.files[i]=input
self.bulks[i]=''
heappush(self.heap,(self.get_next_element_buffered(i),i))
defget_next_element_buffered(self,i):
iflen(self.bulks[i])<256:
ifself.files[i]isnotNone:
buf=self.files[i].read(65536)
ifbuf:
self.bulks[i]+=buf
else:
self.files[i].close()
self.files[i]=None
end_line=self.bulks[i].find('\n')
ifend_line==-1:
end_line=len(self.bulks[i])
element=self.bulks[i][:end_line]
self.bulks[i]=self.bulks[i][end_line+1:]
returnelement
defpoppush_uniq(self):
whileTrue:
element=self.poppush()
ifelementisNone:
returnNone
ifelement!=self.pre_element:
self.pre_element=element
returnelement
defpoppush(self):
try:
element,index=heappop(self.heap)
exceptIndexError:
returnNone
new_element=self.get_next_element_buffered(index)
ifnew_element:
heappush(self.heap,(new_element,index))
returnelement
defheappoppush(dir,queue,idx=0,count=1):
heap=file_heap(dir,idx,count)
whileTrue:
d=heap.poppush_uniq()
queue.put(d)
ifdisNone:return
defheappoppush2(dir,queue,count=1):
heap=[]
procs=[]
queues=[]
pre_element=None
foriinrange(count):
q=Queue(1024)
q_buf=queue_buffer(q)
queues.append(q_buf)
p=Process(target=heappoppush,args=(dir,q_buf,i,count))
procs.append(p)
p.start()
queues=tuple(queues)
foriinrange(count):
heappush(heap,(queues[i].get(),i))
whileTrue:
try:
d,i=heappop(heap)
exceptIndexError:
queue.put(None)
forpinprocs:
p.join()
return
else:
ifdisnotNone:
heappush(heap,(queues[i].get(),i))
ifd!=pre_element:
pre_element=d
queue.put(d)
defmerge_file(dir):
heap=file_heap(dir)
os.system('rm-f'+dir+'.merge')
fmerge=open(dir+'.merge','a')
element=heap.poppush_uniq()
fmerge.write(element+'\n')
whileelementisnotNone:
element=heap.poppush_uniq()
fmerge.write(element+'\n')
classqueue_buffer:
def__init__(self,queue):
self.q=queue
self.rbuf=[]
self.wbuf=[]
defget(self):
iflen(self.rbuf)==0:
self.rbuf=self.q.get()
r=self.rbuf[0]
delself.rbuf[0]
returnr
defput(self,d):
self.wbuf.append(d)
ifdisNoneorlen(self.wbuf)>1024:
self.q.put(self.wbuf)
self.wbuf=[]
defdiff_file(file_old,file_new,file_diff,buf=268435456):
print'buffersize',buf
fromfile_splitimportsplit_sort_file
os.system('rm-rf'+os.path.splitext(file_old)[0])
os.system('rm-rf'+os.path.splitext(file_new)[0])
t=datetime.now()
split_sort_file(file_old,5,buf)
split_sort_file(file_new,5,buf)
print'splitelasped',datetime.now()-t
os.system('cat%s/*|wc-l'%os.path.splitext(file_old)[0])
os.system('cat%s/*|wc-l'%os.path.splitext(file_new)[0])
os.system('rm-f'+file_diff)
t=datetime.now()
zdiff=open(file_diff,'a')
old_q=Queue(1024)
new_q=Queue(1024)
old_queue=queue_buffer(old_q)
new_queue=queue_buffer(new_q)
h1=Process(target=heappoppush2,args=(os.path.splitext(file_old)[0],old_queue,3))
h2=Process(target=heappoppush2,args=(os.path.splitext(file_new)[0],new_queue,3))
h1.start(),h2.start()
old=old_queue.get()
new=new_queue.get()
old_count,new_count=0,0
whileoldisnotNoneornewisnotNone:
ifold>neworoldisNone:
zdiff.write('<'+new+'\n')
new=new_queue.get()
new_count+=1
elifold<newornewisNone:
zdiff.write('>'+old+'\n')
old=old_queue.get()
old_count+=1
else:
old=old_queue.get()
new=new_queue.get()
print'new_count:',new_count
print'old_count:',old_count
print'diffelasped',datetime.now()-t
h1.join(),h2.join()
希望本文所述对大家的Python程序设计有所帮助。