You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
171 lines
7.2 KiB
171 lines
7.2 KiB
11 months ago
|
import threading
|
||
|
import socketserver
|
||
|
from DataStruct import *
|
||
10 months ago
|
from myutils.MyLogger_logging import MyLogger
|
||
|
from core.DBManager import DBManager
|
||
11 months ago
|
import time
|
||
|
|
||
|
m_BRun = False #线程运行标识
|
||
|
'''
|
||
|
客户端SOCKET类
|
||
|
'''
|
||
|
class ClientSocket():
|
||
|
def __init__(self,strIP,strPort):
|
||
|
self.strIP = strIP
|
||
|
self.strPort = strPort
|
||
|
self.isOnline = True
|
||
|
self.strUserID = "" # userID 可以在字典层
|
||
|
|
||
|
'''
|
||
|
服务端SOCKET--Handler类--目前验证效果是一个客户端就是一个handler
|
||
|
使用一些全局变量
|
||
|
'''
|
||
|
class SSocketHandler(socketserver.BaseRequestHandler):
|
||
|
|
||
|
#重写handle,把上级类的对象传递到每个socket
|
||
|
def __init__(self, request, client_address, server,sc_manager):
|
||
|
#自定义初始化内容
|
||
|
self.sc_manager = sc_manager #把管理类的对象传递到每个socket连接
|
||
|
self.db_manager = sc_manager.myDBM #数据库连接类
|
||
|
self.myID = "" #socket ID
|
||
|
self.log = MyLogger("SSocketHandler")
|
||
|
super().__init__(request, client_address, server)
|
||
|
|
||
|
def getSendHead(self,itype):
|
||
|
if itype ==0:
|
||
|
data_to_send = CSHeadMessage()
|
||
|
data_to_send.Flag = b"TFTF"
|
||
|
data_to_send.iDataType = 0
|
||
|
data_to_send.iDataLen = 0
|
||
|
return data_to_send
|
||
|
|
||
|
#对接收到的数据进行分析处理 --- 数据协议还没定好
|
||
|
#-1非法数据,会断开连接
|
||
|
def data_analyse(self,head,buffer):
|
||
|
if head.iDataType == 0: #心跳包
|
||
|
#回个心跳包
|
||
|
senddata = self.getSendHead(0)
|
||
|
self.request.sendall(senddata)
|
||
|
elif head.iDataType == 1: #设备端登录 -- 32个字节 -- 登录失败也退出
|
||
|
# 处理登录信息,buffer应该为32长度的ID值,在数据库中查询已售设备清单,存在的则上线。 初步为此机制,后期要加强安全机制
|
||
|
# 目前设备和用户使用同一个字典,后期可以分开 2024-04-07
|
||
|
cID = self.db_manager.checkDevID(buffer)
|
||
|
self.log.debug("处理登录信息:{}".format(buffer))
|
||
|
if cID:
|
||
|
self.myID = cID
|
||
|
#验证通过则添加设备客户端节点
|
||
|
self.sc_manager.update_client_dict(1,cID,self)
|
||
|
self.log.debug("设备{}登录成功".format(cID))
|
||
|
elif head.iDataType == 2: #app端等 --用户名和密码 -- 登录失败也退出
|
||
|
user_data = CSAPPLogin_Data.from_buffer_copy(buffer)
|
||
|
self.log.debug("处理APP登录信息:{}--{}".format(user_data.username,user_data.passwd))
|
||
|
else:
|
||
|
if self.myID == "": #在接收除了1和2的数据时,若没有登录,则关闭连接
|
||
|
return -1
|
||
|
elif head.iDataType == 3:
|
||
|
pass
|
||
|
elif head.iDataType == 4:
|
||
|
pass
|
||
|
else:
|
||
|
self.log.error("数据类型错误")
|
||
|
return -1
|
||
|
return 1
|
||
|
|
||
|
|
||
|
def handle(self): #接受到客户端连接会触发handle -- handle已经多线程
|
||
|
print("conn is :", self.request) # conn
|
||
|
print("addr is :", self.client_address) # addr
|
||
|
self.request.settimeout(10*60) #单位秒 -- 客户端需要5分钟发送一次心跳包
|
||
|
while self.sc_manager.m_BRun:
|
||
|
try:
|
||
|
# 先接收数据包头
|
||
|
head_data = self.request.recv(ctypes.sizeof(CSHeadMessage)) #12个字节
|
||
|
if not head_data or len(head_data) != ctypes.sizeof(CSHeadMessage):
|
||
|
self.log.debug("包头接收不完整,或客户端断开")
|
||
|
break
|
||
|
head = CSHeadMessage.from_buffer_copy(head_data)
|
||
|
if head.Flag == b"TFTF":
|
||
|
iDLen = head.iDataLen
|
||
|
irecvlen= 0
|
||
|
buffer = b''
|
||
|
while irecvlen < iDLen:
|
||
|
data = self.request.recv(1024)
|
||
|
if not data:
|
||
|
self.log.debug("客户端断开")
|
||
|
break
|
||
|
irecvlen += len(data)
|
||
|
buffer += data
|
||
|
#对数据进行分析处理
|
||
|
ret = self.data_analyse(head,buffer)
|
||
|
if ret == -1: #未登录处理其他数据或登录失败,则进行报错和关闭socket连接
|
||
|
self.log.info("该连接还未登录:{}".format(head_data))
|
||
|
break
|
||
|
else: #非法的数据头,需要关闭socket连接
|
||
|
self.log.info("接收到的数据非法:{}".format(head_data))
|
||
|
break
|
||
|
except TimeoutError:
|
||
|
self.log.debug("socket接收数据超时")
|
||
|
break
|
||
|
except Exception as e:
|
||
|
self.log.error("Socket异常:{}".format(e))
|
||
|
break
|
||
|
#需要更新客户端列表 -- 删除socket节点 --只有登录过维护过连接的需要删除
|
||
|
if(self.myID != ""):
|
||
|
self.sc_manager.update_client_dict(0,self.myID,self)
|
||
|
self.log.info("handle结束")#?handle结束后,资源是否释放需要确认
|
||
|
|
||
|
class SocketManager():
|
||
|
def __init__(self,iPort):
|
||
|
self.mylog = MyLogger("SocketManager")
|
||
|
self.client_dict = {} #socket列表
|
||
|
self.lock = threading.Lock() #互斥锁
|
||
|
self.m_BRun = True
|
||
|
self.iPort = iPort
|
||
|
|
||
|
def __del__(self):
|
||
|
self.m_BRun = False
|
||
|
time.sleep(3)
|
||
|
|
||
|
#itype:0-删除socket节点,1-添加socket节点
|
||
|
#设备ID和用户ID都为不重复值
|
||
|
def update_client_dict(self,itype,cID,csHandle):
|
||
|
with self.lock:
|
||
|
if itype == 0: #删除节点
|
||
|
self.client_dict[cID].request.close()
|
||
|
del self.client_dict[cID]
|
||
|
elif itype == 1: #添加节点,先删后加
|
||
|
if cID in self.client_dict:
|
||
|
self.client_dict[cID].request.close()
|
||
|
del self.client_dict[cID]
|
||
|
self.client_dict[cID] = csHandle
|
||
|
else:
|
||
|
self.mylog.error("有非预设值传入!")
|
||
|
|
||
|
def startListen(self):
|
||
|
print("开始socket实例化")
|
||
|
ip_port = ("127.0.0.1", self.iPort)
|
||
|
self.s = socketserver.ThreadingTCPServer(ip_port, lambda *args, **kwargs: SSocketHandler(*args, **kwargs,
|
||
|
sc_manager=self))
|
||
|
# s = socketserver.ThreadingTCPServer(ip_port, SSocketHandler)
|
||
|
self.s.serve_forever()
|
||
|
# ?待完善项,如何停止Server_socket的运行。
|
||
|
|
||
|
#创建socket对象,并执行工作
|
||
|
def doWork(self):
|
||
|
#实例化数据库连接
|
||
|
self.myDBM = DBManager("ip","prot","username","passwd")
|
||
|
|
||
|
if not self.myDBM:
|
||
|
self.mylog.error("建立数据库连接失败!请检查程序")
|
||
|
return
|
||
|
#开始
|
||
|
thread = threading.Thread(target=self.startListen) # 如果目标函数带括号,则代表已调研,返回的目标对象是函数执行的结果。
|
||
|
print("开始启动线程了")
|
||
|
thread.start()
|
||
|
# 等待线程结束
|
||
|
thread.join()
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
SocketM = SocketManager(18000)
|
||
|
SocketM.doWork()
|