python实现的文件同步服务器实例
本文实例讲述了python实现的文件同步服务器。分享给大家供大家参考。具体实现方法如下:
服务端使用asyncore,收到文件后保存到本地。
客户端使用pyinotify监视目录的变化,把变动的文件发送到服务端。
重点:
1.使用structs打包发送文件的信息,服务端收到后,根据文件信息来接收客户端传送过来的文件。
2.客户端使用多线程,pyinotify监视到文件变化,放到队列中,由另外一个线程发送。
上代码:
服务端:
#receivefilefromclientandstorethemintofileuseasyncore.#
#/usr/bin/python
#coding:utf-8
importasyncore
importsocket
fromsocketimporterrno
importlogging
importtime
importsys
importstruct
importos
importfcntl
importthreading
fromrrd_graphimportMakeGraph
try:
importrrdtool
except(ImportError,ImportWarnning):
print"Hopethisinformationcanhelpyou:"
print"Cannotfindpyinotifymoduleinsyspath,justrun[apt-getinstallpython-rrdtool]inubuntu."
sys.exit(1)
classRequestHandler(asyncore.dispatcher):
def__init__(self,sock,map=None,chunk_size=1024):
self.logger=logging.getLogger('%s-%s'%(self.__class__.__name__,str(sock.getsockname())))
self.chunk_size=chunk_size
asyncore.dispatcher.__init__(self,sock,map)
self.data_to_write=list()
defreadable(self):
#self.logger.debug("readable()called.")
returnTrue
defwritable(self):
response=(notself.connected)orlen(self.data_to_write)
#self.logger.debug('writable()->%sdatalength->%s'%(response,len(self.data_to_write)))
returnresponse
defhandle_write(self):
data=self.data_to_write.pop()
#self.logger.debug("handle_write()->%ssize:%s",data.rstrip('\r\n'),len(data))
sent=self.send(data[:self.chunk_size])
ifsent<len(data):
remaining=data[sent:]
self.data_to_write.append(remaining)
defhandle_read(self):
self.writen_size=0
nagios_perfdata='../perfdata'
head_packet_format="!LL128s128sL"
head_packet_size=struct.calcsize(head_packet_format)
data=self.recv(head_packet_size)
ifnotdata:
return
filepath_len,filename_len,filepath,filename,filesize=struct.unpack(head_packet_format,data)
filepath=os.path.join(nagios_perfdata,filepath[:filepath_len])
filename=filename[:filename_len]
self.logger.debug("updatefile:%s"%filepath+'/'+filename)
try:
ifnotos.path.exists(filepath):
os.makedirs(filepath)
exceptOSError:
pass
self.fd=open(os.path.join(filepath,filename),'w')
#self.fd=open(filename,'w')
iffilesize>self.chunk_size:
times=filesize/self.chunk_size
first_part_size=times*self.chunk_size
second_part_size=filesize%self.chunk_size
while1:
try:
data=self.recv(self.chunk_size)
#self.logger.debug("handle_read()->%ssize.",len(data))
exceptsocket.error,e:
ife.args[0]==errno.EWOULDBLOCK:
print"EWOULDBLOCK"
time.sleep(1)
else:
#self.logger.debug("Errorhappendwhilereceivedata:%s"%e)
break
else:
self.fd.write(data)
self.fd.flush()
self.writen_size+=len(data)
ifself.writen_size==first_part_size:
break
#receivethepacketatlast
while1:
try:
data=self.recv(second_part_size)
#self.logger.debug("handle_read()->%ssize.",len(data))
exceptsocket.error,e:
ife.args[0]==errno.EWOULDBLOCK:
print"EWOULDBLOCK"
time.sleep(1)
else:
#self.logger.debug("Errorhappendwhilereceivedata:%s"%e)
break
else:
self.fd.write(data)
self.fd.flush()
self.writen_size+=len(data)
iflen(data)==second_part_size:
break
eliffilesize<=self.chunk_size:
while1:
try:
data=self.recv(filesize)
#self.logger.debug("handle_read()->%ssize.",len(data))
exceptsocket.error,e:
ife.args[0]==errno.EWOULDBLOCK:
print"EWOULDBLOCK"
time.sleep(1)
else:
#self.logger.debug("Errorhappendwhilereceivedata:%s"%e)
break
else:
self.fd.write(data)
self.fd.flush()
self.writen_size+=len(data)
iflen(data)==filesize:
break
self.logger.debug("Filesize:%s"%self.writen_size)
classSyncServer(asyncore.dispatcher):
def__init__(self,host,port):
asyncore.dispatcher.__init__(self)
self.debug=True
self.logger=logging.getLogger(self.__class__.__name__)
self.create_socket(socket.AF_INET,socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((host,port))
self.listen(2000)
defhandle_accept(self):
client_socket=self.accept()
ifclient_socketisNone:
pass
else:
sock,addr=client_socket
#self.logger.debug("Incomingconnectionfrom%s"%repr(addr))
handler=RequestHandler(sock=sock)
classRunServer(threading.Thread):
def__init__(self):
super(RunServer,self).__init__()
self.daemon=False
defrun(self):
server=SyncServer('',9999)
asyncore.loop(use_poll=True)
defStartServer():
logging.basicConfig(level=logging.DEBUG,
format='%(name)s:%(message)s',
)
RunServer().start()
#MakeGraph().start()
if__name__=='__main__':
StartServer()
客户端:
#monitorpathwithinotify(pythonmodule),andsendthemtoremoteserver.#
#usesendfile(2)insteadofsendfunctioninsocket,ifwehavepython-sendfileinstalled.#
importsocket
importtime
importos
importsys
importstruct
importthreading
importQueue
try:
importpyinotify
except(ImportError,ImportWarnning):
print"Hopethisinformationcanhelpyou:"
print"Cannotfindpyinotifymoduleinsyspath,justrun[apt-getinstallpython-pyinotify]inubuntu."
sys.exit(1)
try:
fromsendfileimportsendfile
except(ImportError,ImportWarnning):
pass
filetype_filter=[".rrd",".xml"]
defcheck_filetype(pathname):
forsuffix_nameinfiletype_filter:
ifpathname[-4:]==suffix_name:
returnTrue
try:
end_string=pathname.rsplit('.')[-1:][0]
end_int=int(end_string)
except:
pass
else:
#meanspathnameendwithdigit
returnFalse
classsync_file(threading.Thread):
def__init__(self,addr,events_queue):
super(sync_file,self).__init__()
self.daemon=False
self.queue=events_queue
self.addr=addr
self.chunk_size=1024
defrun(self):
while1:
event=self.queue.get()
ifcheck_filetype(event.pathname):
printtime.asctime(),event.maskname,event.pathname
filepath=event.path.split('/')[-1:][0]
filename=event.name
filesize=os.stat(os.path.join(event.path,filename)).st_size
sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
filepath_len=len(filepath)
filename_len=len(filename)
sock.connect(self.addr)
offset=0
data=struct.pack("!LL128s128sL",filepath_len,filename_len,filepath,filename,filesize)
fd=open(event.pathname,'rb')
sock.sendall(data)
if"sendfile"insys.modules:
#print"usesendfile(2)"
while1:
sent=sendfile(sock.fileno(),fd.fileno(),offset,self.chunk_size)
ifsent==0:
break
offset+=sent
else:
#print"useoriginalsendfunction"
while1:
data=fd.read(self.chunk_size)
ifnotdata:break
sock.send(data)
sock.close()
fd.close()
classEventHandler(pyinotify.ProcessEvent):
def__init__(self,events_queue):
super(EventHandler,self).__init__()
self.events_queue=events_queue
defmy_init(self):
pass
defprocess_IN_CLOSE_WRITE(self,event):
self.events_queue.put(event)
defprocess_IN_MOVED_TO(self,event):
self.events_queue.put(event)
defstart_notify(path,mask,sync_server):
events_queue=Queue.Queue()
sync_thread_pool=list()
foriinrange(500):
sync_thread_pool.append(sync_file(sync_server,events_queue))
foriinsync_thread_pool:
i.start()
wm=pyinotify.WatchManager()
notifier=pyinotify.Notifier(wm,EventHandler(events_queue))
wdd=wm.add_watch(path,mask,rec=True)
notifier.loop()
defdo_notify():
perfdata_path='/var/lib/pnp4nagios/perfdata'
mask=pyinotify.IN_CLOSE_WRITE|pyinotify.IN_MOVED_TO
sync_server=('127.0.0.1',9999)
start_notify(perfdata_path,mask,sync_server)
if__name__=='__main__':
do_notify()
python监视线程池
#!/usr/bin/python
importthreading
importtime
classMonitor(threading.Thread):
def__init__(self,*args,**kwargs):
super(Monitor,self).__init__()
self.daemon=False
self.args=args
self.kwargs=kwargs
self.pool_list=[]
defrun(self):
printself.args
printself.kwargs
forname,valueinself.kwargs.items():
obj=value[0]
temp={}
temp[name]=obj
self.pool_list.append(temp)
while1:
printself.pool_list
forname,valueinself.kwargs.items():
obj=value[0]
parameters=value[1:]
died_threads=self.cal_died_thread(self.pool_list,name)
print"died_threads",died_threads
ifdied_threads>0:
foriinrange(died_threads):
print"start%sthread..."%name
t=obj[0].__class__(*parameters)
t.start()
self.add_to_pool_list(t,name)
else:
break
time.sleep(0.5)
defcal_died_thread(self,pool_list,name):
i=0
foriteminself.pool_list:
fork,vinitem.items():
ifname==k:
lists=v
fortinlists:
ifnott.isAlive():
self.remove_from_pool_list(t)
i+=1
returni
defadd_to_pool_list(self,obj,name):
foriteminself.pool_list:
fork,vinitem.items():
ifname==k:
v.append(obj)
defremove_from_pool_list(self,obj):
foriteminself.pool_list:
fork,vinitem.items():
try:
v.remove(obj)
except:
pass
else:
return
使用方法:
rrds_queue=Queue.Queue() make_rrds_pool=[] foriinrange(5): make_rrds_pool.append(MakeRrds(rrds_queue)) foriinmake_rrds_pool: i.start() make_graph_pool=[] foriinrange(5): make_graph_pool.append(MakeGraph(rrds_queue)) foriinmake_graph_pool: i.start() monitor=Monitor(make_rrds_pool=(make_rrds_pool,rrds_queue),\ make_graph_pool=(make_graph_pool,rrds_queue)) monitor.start()
解析:
1.接受字典参数,value为一个元组,第一个元素是线程池,后面的都是参数。
2.每0.5秒监视线程池中的线程数量,如果线程死掉了,记录死掉线程的数目,再启动同样数量的线程。
3.如果没有线程死去,则什么也不做。
从外部调用Django模块
importos importsys sys.path.insert(0,'/data/cloud_manage') fromdjango.core.managementimportsetup_environ importsettings setup_environ(settings) fromcommon.monitorimportMonitor fromdjango.dbimportconnection,transaction
前提就是,要新建一个django的project,这里我们新建了一个cloud_manage.
这样不仅可以调用django自身的模块,还能调用project本身的东西。
希望本文所述对大家的Python程序设计有所帮助。