python3+PyQt5 创建多线程网络应用-TCP客户端和TCP服务器实例
本文在上文的基础上重新实现支持多线程的服务器。
以下为TCP客户端的程序代码:
#!/usr/bin/envpython3
importsys
fromPyQt5.QtCoreimport(QByteArray,QDataStream,QDate,QIODevice,
QRegExp,Qt)
fromPyQt5.QtWidgetsimport(QApplication,QDateEdit,QFrame,QGridLayout,
QHBoxLayout,QLabel,QLineEdit,QPushButton,
QWidget)
fromPyQt5.QtGuiimportQRegExpValidator
fromPyQt5.QtNetworkimport(QTcpSocket,)
MAC=True
try:
fromPyQt5.QtGuiimportqt_mac_set_native_menubar
exceptImportError:
MAC=False
PORT=9407
SIZEOF_UINT16=2
classBuildingServicesClient(QWidget):
def__init__(self,parent=None):
super(BuildingServicesClient,self).__init__(parent)
self.socket=QTcpSocket()
self.nextBlockSize=0
self.request=None
roomLabel=QLabel("&Room")
self.roomEdit=QLineEdit()
roomLabel.setBuddy(self.roomEdit)
regex=QRegExp(r"[0-9](?:0[1-9]|[12][0-9]|3[0-4])")
self.roomEdit.setValidator(QRegExpValidator(regex,self))
self.roomEdit.setAlignment(Qt.AlignRight|Qt.AlignVCenter)
dateLabel=QLabel("&Date")
self.dateEdit=QDateEdit()
dateLabel.setBuddy(self.dateEdit)
self.dateEdit.setAlignment(Qt.AlignRight|Qt.AlignVCenter)
self.dateEdit.setDate(QDate.currentDate().addDays(1))
self.dateEdit.setDisplayFormat("yyyy-MM-dd")
responseLabel=QLabel("Response")
self.responseLabel=QLabel()
self.responseLabel.setFrameStyle(QFrame.StyledPanel|QFrame.Sunken)
self.bookButton=QPushButton("&Book")
self.bookButton.setEnabled(False)
self.unBookButton=QPushButton("&Unbook")
self.unBookButton.setEnabled(False)
quitButton=QPushButton("&Quit")
ifnotMAC:
self.bookButton.setFocusPolicy(Qt.NoFocus)
self.unBookButton.setFocusPolicy(Qt.NoFocus)
buttonLayout=QHBoxLayout()
buttonLayout.addWidget(self.bookButton)
buttonLayout.addWidget(self.unBookButton)
buttonLayout.addStretch()
buttonLayout.addWidget(quitButton)
layout=QGridLayout()
layout.addWidget(roomLabel,0,0)
layout.addWidget(self.roomEdit,0,1)
layout.addWidget(dateLabel,0,2)
layout.addWidget(self.dateEdit,0,3)
layout.addWidget(responseLabel,1,0)
layout.addWidget(self.responseLabel,1,1,1,3)
layout.addLayout(buttonLayout,2,1,1,4)
self.setLayout(layout)
self.socket.connected.connect(self.sendRequest)
self.socket.readyRead.connect(self.readResponse)
self.socket.disconnected.connect(self.serverHasStopped)
#self.connect(self.socket,
#SIGNAL("error(QAbstractSocket::SocketError)"),
#self.serverHasError)
self.socket.error.connect(self.serverHasError)
self.roomEdit.textEdited.connect(self.updateUi)
self.dateEdit.dateChanged.connect(self.updateUi)
self.bookButton.clicked.connect(self.book)
self.unBookButton.clicked.connect(self.unBook)
quitButton.clicked.connect(self.close)
self.setWindowTitle("BuildingServices")
defupdateUi(self):
enabled=False
if(self.roomEdit.text()and
self.dateEdit.date()>QDate.currentDate()):
enabled=True
ifself.requestisnotNone:
enabled=False
self.bookButton.setEnabled(enabled)
self.unBookButton.setEnabled(enabled)
defcloseEvent(self,event):
self.socket.close()
event.accept()
defbook(self):
self.issueRequest("BOOK",self.roomEdit.text(),
self.dateEdit.date())
defunBook(self):
self.issueRequest("UNBOOK",self.roomEdit.text(),
self.dateEdit.date())
defissueRequest(self,action,room,date):
self.request=QByteArray()
stream=QDataStream(self.request,QIODevice.WriteOnly)
stream.setVersion(QDataStream.Qt_5_7)
stream.writeUInt16(0)
stream.writeQString(action)
stream.writeQString(room)
stream<>action>>room
action=stream.readQString()
room=stream.readQString()
ifaction!="ERROR":
stream>>date
ifaction=="ERROR":
msg="Error:{0}".format(room)
elifaction=="BOOK":
msg="Bookedroom{0}for{1}".format(room,date.toString(Qt.ISODate))
elifaction=="UNBOOK":
msg="Unbookedroom{0}for{1}".format(room,date.toString(Qt.ISODate))
self.responseLabel.setText(msg)
self.updateUi()
self.nextBlockSize=0
defserverHasStopped(self):
self.responseLabel.setText(
"Error:Connectionclosedbyserver")
self.socket.close()
defserverHasError(self,error):
self.responseLabel.setText("Error:{0}".format(self.socket.errorString()))
self.socket.close()
app=QApplication(sys.argv)
form=BuildingServicesClient()
form.show()
app.exec_()
以下为TCP服务端的程序代码:
#!/usr/bin/envpython3
importbisect
importcollections
importsys
fromPyQt5.QtCoreimport(QByteArray,QDataStream,QDate,QReadWriteLock,QThread,QIODevice,Qt)
fromPyQt5.QtWidgetsimport(QApplication,QMessageBox,QPushButton)
fromPyQt5.QtNetworkimport(QAbstractSocket,QHostAddress,QTcpServer,QTcpSocket)
PORT=9407
SIZEOF_UINT16=2
MAX_BOOKINGS_PER_DAY=5
#Key=date,value=listofroomIDs
Bookings=collections.defaultdict(list)
defprintBookings():
forkeyinsorted(Bookings):
print(key,Bookings[key])
print()
classThread(QThread):
lock=QReadWriteLock()
def__init__(self,socketId,parent):
super(Thread,self).__init__(parent)
self.socketId=socketId
defrun(self):
socket=QTcpSocket()
ifnotsocket.setSocketDescriptor(self.socketId):
#self.emit(SIGNAL("error(int)"),socket.error())
self.error.connect(socket.error)
return
whilesocket.state()==QAbstractSocket.ConnectedState:
nextBlockSize=0
stream=QDataStream(socket)
stream.setVersion(QDataStream.Qt_5_7)
if(socket.waitForReadyRead()and
socket.bytesAvailable()>=SIZEOF_UINT16):
nextBlockSize=stream.readUInt16()
else:
self.sendError(socket,"Cannotreadclientrequest")
return
ifsocket.bytesAvailable()>date
try:
Thread.lock.lockForRead()
bookings=Bookings.get(date.toPyDate())
finally:
Thread.lock.unlock()
uroom=str(room)
ifaction=="BOOK":
newlist=False
try:
Thread.lock.lockForRead()
ifbookingsisNone:
newlist=True
finally:
Thread.lock.unlock()
ifnewlist:
try:
Thread.lock.lockForWrite()
bookings=Bookings[date.toPyDate()]
finally:
Thread.lock.unlock()
error=None
insert=False
try:
Thread.lock.lockForRead()
iflen(bookings)=MAX_BOOKINGS_PER_DAY:
continue
bisect.insort(bookings,"{0:1d}{1:02d}".format(
floor,room))
printBookings()
app=QApplication(sys.argv)
form=BuildingServicesDlg()
form.show()
form.move(0,0)
app.exec_()
以上这篇python3+PyQt5创建多线程网络应用-TCP客户端和TCP服务器实例就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持毛票票。