import threading import socketserver from DataStruct import * from myutils.MyLogger_logging import MyLogger from core.DBManager import DBManager import time m_BRun = False #线程运行标识 ''' 客户端SOCKET类 ''' class ClientSocket(): def __init__(self,strIP,strPort): self.strIP = strIP self.strPort = strPort self.isOnline = True self.strUserID = "" # userID 可以在字典层 ''' 服务端SOCKET--Handler类--目前验证效果是一个客户端就是一个handler 使用一些全局变量 ''' class SSocketHandler(socketserver.BaseRequestHandler): #重写handle,把上级类的对象传递到每个socket def __init__(self, request, client_address, server,sc_manager): #自定义初始化内容 self.sc_manager = sc_manager #把管理类的对象传递到每个socket连接 self.db_manager = sc_manager.myDBM #数据库连接类 self.myID = "" #socket ID self.log = MyLogger("SSocketHandler") super().__init__(request, client_address, server) def getSendHead(self,itype): if itype ==0: data_to_send = CSHeadMessage() data_to_send.Flag = b"TFTF" data_to_send.iDataType = 0 data_to_send.iDataLen = 0 return data_to_send #对接收到的数据进行分析处理 --- 数据协议还没定好 #-1非法数据,会断开连接 def data_analyse(self,head,buffer): if head.iDataType == 0: #心跳包 #回个心跳包 senddata = self.getSendHead(0) self.request.sendall(senddata) elif head.iDataType == 1: #设备端登录 -- 32个字节 -- 登录失败也退出 # 处理登录信息,buffer应该为32长度的ID值,在数据库中查询已售设备清单,存在的则上线。 初步为此机制,后期要加强安全机制 # 目前设备和用户使用同一个字典,后期可以分开 2024-04-07 cID = self.db_manager.checkDevID(buffer) self.log.debug("处理登录信息:{}".format(buffer)) if cID: self.myID = cID #验证通过则添加设备客户端节点 self.sc_manager.update_client_dict(1,cID,self) self.log.debug("设备{}登录成功".format(cID)) elif head.iDataType == 2: #app端等 --用户名和密码 -- 登录失败也退出 user_data = CSAPPLogin_Data.from_buffer_copy(buffer) self.log.debug("处理APP登录信息:{}--{}".format(user_data.username,user_data.passwd)) else: if self.myID == "": #在接收除了1和2的数据时,若没有登录,则关闭连接 return -1 elif head.iDataType == 3: pass elif head.iDataType == 4: pass else: self.log.error("数据类型错误") return -1 return 1 def handle(self): #接受到客户端连接会触发handle -- handle已经多线程 print("conn is :", self.request) # conn print("addr is :", self.client_address) # addr self.request.settimeout(10*60) #单位秒 -- 客户端需要5分钟发送一次心跳包 while self.sc_manager.m_BRun: try: # 先接收数据包头 head_data = self.request.recv(ctypes.sizeof(CSHeadMessage)) #12个字节 if not head_data or len(head_data) != ctypes.sizeof(CSHeadMessage): self.log.debug("包头接收不完整,或客户端断开") break head = CSHeadMessage.from_buffer_copy(head_data) if head.Flag == b"TFTF": iDLen = head.iDataLen irecvlen= 0 buffer = b'' while irecvlen < iDLen: data = self.request.recv(1024) if not data: self.log.debug("客户端断开") break irecvlen += len(data) buffer += data #对数据进行分析处理 ret = self.data_analyse(head,buffer) if ret == -1: #未登录处理其他数据或登录失败,则进行报错和关闭socket连接 self.log.info("该连接还未登录:{}".format(head_data)) break else: #非法的数据头,需要关闭socket连接 self.log.info("接收到的数据非法:{}".format(head_data)) break except TimeoutError: self.log.debug("socket接收数据超时") break except Exception as e: self.log.error("Socket异常:{}".format(e)) break #需要更新客户端列表 -- 删除socket节点 --只有登录过维护过连接的需要删除 if(self.myID != ""): self.sc_manager.update_client_dict(0,self.myID,self) self.log.info("handle结束")#?handle结束后,资源是否释放需要确认 class SocketManager(): def __init__(self,iPort): self.mylog = MyLogger("SocketManager") self.client_dict = {} #socket列表 self.lock = threading.Lock() #互斥锁 self.m_BRun = True self.iPort = iPort def __del__(self): self.m_BRun = False time.sleep(3) #itype:0-删除socket节点,1-添加socket节点 #设备ID和用户ID都为不重复值 def update_client_dict(self,itype,cID,csHandle): with self.lock: if itype == 0: #删除节点 self.client_dict[cID].request.close() del self.client_dict[cID] elif itype == 1: #添加节点,先删后加 if cID in self.client_dict: self.client_dict[cID].request.close() del self.client_dict[cID] self.client_dict[cID] = csHandle else: self.mylog.error("有非预设值传入!") def startListen(self): print("开始socket实例化") ip_port = ("127.0.0.1", self.iPort) self.s = socketserver.ThreadingTCPServer(ip_port, lambda *args, **kwargs: SSocketHandler(*args, **kwargs, sc_manager=self)) # s = socketserver.ThreadingTCPServer(ip_port, SSocketHandler) self.s.serve_forever() # ?待完善项,如何停止Server_socket的运行。 #创建socket对象,并执行工作 def doWork(self): #实例化数据库连接 self.myDBM = DBManager("ip","prot","username","passwd") if not self.myDBM: self.mylog.error("建立数据库连接失败!请检查程序") return #开始 thread = threading.Thread(target=self.startListen) # 如果目标函数带括号,则代表已调研,返回的目标对象是函数执行的结果。 print("开始启动线程了") thread.start() # 等待线程结束 thread.join() if __name__ == "__main__": SocketM = SocketManager(18000) SocketM.doWork()