import pymysql import sqlite3 import threading import os import json from myutils.ConfigManager import myCongif from myutils.MyLogger_logger import LogHandler class DBManager(): #实例化数据库管理对象,并连接数据库 #itype=0 使用mysql数据库,1-使用sqlite数据库 def __init__(self): self.logger = LogHandler().get_logger("DBManager") self.lock = threading.Lock() self.itype = myCongif.get_data("DBType") if self.itype ==0: self.host = myCongif.get_data('mysql.host') self.port = myCongif.get_data('mysql.port') self.user = myCongif.get_data('mysql.user') self.passwd = myCongif.get_data('mysql.passwd') self.database = myCongif.get_data('mysql.database') self.connection = None self.cursor = None elif self.itype ==1: self.dbfile = myCongif.get_data("sqlite") if not os.path.exists(self.dbfile): self.dbfile = "../" + self.dbfile #直接运行DBManager时初始路径不是在根目录 if not os.path.exists(self.dbfile): raise FileNotFoundError(f"Database file {self.dbfile} does not exist.") else: self.logger.error("错误的数据库类型,请检查") def __del__(self): if self.ok: self.cursor.close() self.connection.close() self.cursor = None self.connection = None self.logger.debug("DBManager销毁") def connect(self): try: if self.itype ==0: self.connection = pymysql.connect(host=self.host, port=self.port, user=self.user, passwd=self.passwd, db=self.database,charset='utf8') self.cursor = self.connection.cursor() elif self.itype ==1: self.connection = sqlite3.connect(self.dbfile) self.cursor = self.connection.cursor() self.ok = True return True except: self.logger.error("服务器端数据库连接失败") return False # 判断数据库连接是否正常,若不正常则重连接 def Retest_conn(self): if self.itype == 0: #除了mysql,sqlite3不需要判断连接状态 try: self.connection.ping() except: return self.connect() return True # 执行数据库查询操作 1-只查询一条记录,其他所有记录 def do_select(self, strsql, itype=0): # self.conn.begin() self.lock.acquire() data = None if self.Retest_conn(): try: self.cursor.execute(strsql) self.connection.commit() # select要commit提交事务,是存在获取不到最新数据的问题(innoDB事务机制) except Exception as e: self.logger.error("do_select异常报错:%s" % str(e)) self.lock.release() return None if itype == 1: data = self.cursor.fetchone() else: data = self.cursor.fetchall() self.lock.release() return data # 执行数据库语句 def do_sql(self, strsql, data=None): self.lock.acquire() bok = False if self.Retest_conn(): try: # self.conn.begin() if data: iret = self.cursor.executemany(strsql, data) #批量执行sql语句 else: iret = self.cursor.execute(strsql) self.connection.commit() if iret.rowcount > 0: # 要有修改成功的记录才返回true bok = True except Exception as e: self.logger.error("执行数据库语句%s出错:%s" % (strsql, str(e))) self.connection.rollback() self.lock.release() return bok #---------------------特定数据库操作函数--------------------- def delC2M(self,ID,itype): ''' #根据通道ID或者模型ID删除通道和模型间的关联数据 1-通道ID,2-模型ID ,注意会有删除没有数据的情况 :param ID: :param itype:1-通道ID,2-模型ID :return: 删除数据库记录没做特别的判断 ''' #channel2model col_name = "" if itype ==1: col_name = "channel_id" elif itype ==2: col_name = "model_id" else: return False strsql = f"select ID from channel2model where {col_name}={ID};" data = self.do_select(strsql,1) if data: c2m_id = data[0] strsql = f"delete from channel2model where {col_name}={ID};" ret = self.do_sql(strsql) # schedule --- 调整后一个通道就一个mode strsql = f"delete from schedule where channel2model_id={c2m_id};" ret = self.do_sql(strsql) return True #删除通道,需要关联删除布防时间,通道和算法的关联表 def delchannel(self,ID): ret = self.delC2M(ID,1) if ret == False: return False #channel strsql = f"delete from channel where ID={ID};" ret = self.do_sql(strsql) if ret == False: return False return True #修改视频通道和算法间的关联关系 #channel_id 通道ID #modell_list 最新配置的模型id list # 0--失败,其他值成功 def updateC2M(self,channel_id,model_id,check_area,polygon_str,conf_thres,iou_thres): c2m_id = 0 strsql = f"select ID from channel2model where channel_id={channel_id};" data = self.do_select(strsql,1) #c2m_id if data: #修改数据 strsql = (f"update channel2model set model_id = {model_id},check_area={check_area}," f"polygon='{polygon_str}',conf_thres={conf_thres},iou_thres={iou_thres} " f"where channel_id={channel_id};") else: #插入数据 strsql = (f"insert into channel2model (channel_id,model_id,check_area,polygon,conf_thres,iou_thres) " f"values ({channel_id},{model_id},{check_area},'{polygon_str}',{conf_thres},{iou_thres});") ret = self.do_sql(strsql) if not ret: return c2m_id else: if data: return data[0] else: strsql = f"select ID from channel2model where channel_id={channel_id};" new_c2m_ids = self.do_select(strsql,1) if new_c2m_ids: return new_c2m_ids[0] else: print("正常不会没有值!!") return 0 def getschedule(self,c2m_id): ''' 根据c2mID 查询该算法的布防时间 :param c2m_id: :return: 以day为行,hour作为列的,布防标识值二维list ''' strsql = f"select day,hour,status from schedule where channel2model_id ={c2m_id} order by hour asc,day asc;" datas = self.do_select(strsql) onelist = [] twolist = [] threelist = [] fourlist = [] fivelist = [] sixlist = [] sevenlist = [] if datas: for i in range(24): onelist.append(datas[i*7 + 0][2]) twolist.append(datas[i*7 + 1][2]) threelist.append(datas[i*7 + 2][2]) fourlist.append(datas[i*7 + 3][2]) fivelist.append(datas[i*7 + 4][2]) sixlist.append(datas[i*7 + 5][2]) sevenlist.append(datas[i*7 + 6][2]) else: self.logger.debug(f"没有数据--{c2m_id}") onelist = [1]*24 twolist = [1]*24 threelist = [1]*24 fourlist = [1]*24 fivelist = [1]*24 sixlist = [1]*24 sevenlist = [1]*24 schedule_list = [] schedule_list.append(onelist) schedule_list.append(twolist) schedule_list.append(threelist) schedule_list.append(fourlist) schedule_list.append(fivelist) schedule_list.append(sixlist) schedule_list.append(sevenlist) return schedule_list #检查设备ID是否在数据库? def checkDevID(self,cID): pass def test(self): # 建立数据库连接 conn = pymysql.connect( host='localhost', port=3306, user='username', password='password', database='database_name' ) # 创建游标对象 cursor = conn.cursor() # 执行 SQL 查询 query = "SELECT * FROM table_name" cursor.execute(query) # 获取查询结果 result = cursor.fetchall() # 输出结果 for row in result: print(row) # 关闭游标和连接 cursor.close() conn.close() mDBM = DBManager() mDBM.connect() if __name__ == "__main__": lista = set([1,2,3,4,5,6,7]) listb = set([4,6,8,9,10]) nadd = lista -listb ndel = listb -lista for one in nadd: print(one)