import pymysql import sqlite3 import threading import os import json from myutils.ConfigManager import myCongif from myutils.MyLogger_logger import LogHandler class DBManager(): #实例化数据库管理对象,并连接数据库 #itype=0 使用mysql数据库,1-使用sqlite数据库 def __init__(self): self.logger = LogHandler().get_logger("DBManager") self.lock = threading.Lock() self.itype = myCongif.get_data("DBType") if self.itype ==0: self.host = myCongif.get_data('mysql.host') self.port = myCongif.get_data('mysql.port') self.user = myCongif.get_data('mysql.user') self.passwd = myCongif.get_data('mysql.passwd') self.database = myCongif.get_data('mysql.database') self.connection = None self.cursor = None elif self.itype ==1: self.dbfile = myCongif.get_data("sqlite") if not os.path.exists(self.dbfile): self.dbfile = "../" + self.dbfile #直接运行DBManager时初始路径不是在根目录 if not os.path.exists(self.dbfile): raise FileNotFoundError(f"Database file {self.dbfile} does not exist.") else: self.logger.error("错误的数据库类型,请检查") def __del__(self): if self.ok: self.cursor.close() self.connection.close() self.cursor = None self.connection = None self.logger.debug("DBManager销毁") def connect(self): try: if self.itype ==0: self.connection = pymysql.connect(host=self.host, port=self.port, user=self.user, passwd=self.passwd, db=self.database,charset='utf8') self.cursor = self.connection.cursor() elif self.itype ==1: self.connection = sqlite3.connect(self.dbfile) self.cursor = self.connection.cursor() self.ok = True return True except: self.logger.error("服务器端数据库连接失败") return False # 判断数据库连接是否正常,若不正常则重连接 def Retest_conn(self): if self.itype == 0: #除了mysql,sqlite3不需要判断连接状态 try: self.connection.ping() except: return self.connect() return True # 执行数据库查询操作 1-只查询一条记录,其他所有记录 def do_select(self, strsql, itype=0): # self.conn.begin() self.lock.acquire() data = None if self.Retest_conn(): try: self.cursor.execute(strsql) self.connection.commit() # select要commit提交事务,是存在获取不到最新数据的问题(innoDB事务机制) except Exception as e: self.logger.error("do_select异常报错:%s" % str(e)) self.lock.release() return None if itype == 1: data = self.cursor.fetchone() else: data = self.cursor.fetchall() self.lock.release() return data # 执行数据库语句 def do_sql(self, strsql, data=None): self.lock.acquire() bok = False if self.Retest_conn(): try: # self.conn.begin() if data: iret = self.cursor.executemany(strsql, data) #批量执行sql语句 else: iret = self.cursor.execute(strsql) self.connection.commit() if iret.rowcount > 0: # 要有修改成功的记录才返回true bok = True except Exception as e: self.logger.error("执行数据库语句%s出错:%s" % (strsql, str(e))) self.connection.rollback() self.lock.release() return bok #---------------------特定数据库操作函数--------------------- #根据通道ID或者模型ID删除通道和模型间的关联数据 1-通道ID,2-模型ID def delC2M(self,ID,itype): #channel2model if itype ==1: strsql = f"select ID from channel2model where channel_id={ID};" datas = self.do_select(strsql) strsql = f"delete from channel2model where channel_id={ID};" ret = self.do_sql(strsql) if ret == False: return False elif itype ==2: strsql = f"select ID from channel2model where model_id={ID};" datas = self.do_select(strsql) strsql = f"delete from channel2model where model_id={ID};" ret = self.do_sql(strsql) if ret == False: return False else: return False #schedule for data in datas: c2m_id = data[0] strsql = f"delete from schedule where channel2model_id={c2m_id};" ret = self.do_sql(strsql) if ret == False: return False return True #删除通道,需要关联删除布防时间,通道和算法的关联表 def delchannel(self,ID): ret = self.delC2M(ID) if ret == False: return False #channel strsql = f"delete from channel where ID={ID};" ret = self.do_sql(strsql) if ret == False: return False return True #修改视频通道和算法间的关联关系 #channel_id 通道ID #modell_list 最新配置的模型id list def updateC2M(self,channel_id,model_list): strsql = f"select model_id from channel2model where channel_id={channel_id};" datas = set(self.do_select(strsql)) data_new = set(model_list) #计算要新增和修改的 need_add = data_new - datas need_del = datas-data_new #新增 for one in need_add: strsql = f"insert into channel2model (channel_id,model_id) values ({channel_id},{one});" if self.do_sql(strsql) == False: return False #初始化布防时间 -- 全1 strsql = f"select ID from channel2model where channel_id={channel_id} and model_id={one};" data = mDBM.do_select(strsql,1) schedule_data_str = ("{'6': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], " "'0': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], " "'1': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], " "'2': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], " "'3': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]," "'4': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]," "'5': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]}") schedule_data = json.loads(schedule_data_str.replace("'", '"')) for day, hours in schedule_data.items(): for hour, status in enumerate(hours): strsql = ( f"insert into schedule (channel2model_id,day,hour,status) values ({ID},'{day}',{hour},{status})" f" on conflict(channel2model_id,day,hour) do update set status=excluded.status;") ret = mDBM.do_sql(strsql) if not ret: return ret #差删除 for one in need_del: strsql = f"select ID from channel2model where channel_id={channel_id} and model_id={one};" c2m_id = mDBM.do_select(strsql,1)[0] #删除布防计划数据 strsql = f"delete from schedule where channel2model_id={c2m_id};" if self.do_sql(strsql) == False: return False #删除关联记录 strsql = f"delete from channel2model where ID = {c2m_id};" if self.do_sql(strsql) == False: return False return True #检查设备ID是否在数据库? def checkDevID(self,cID): pass def test(self): # 建立数据库连接 conn = pymysql.connect( host='localhost', port=3306, user='username', password='password', database='database_name' ) # 创建游标对象 cursor = conn.cursor() # 执行 SQL 查询 query = "SELECT * FROM table_name" cursor.execute(query) # 获取查询结果 result = cursor.fetchall() # 输出结果 for row in result: print(row) # 关闭游标和连接 cursor.close() conn.close() mDBM = DBManager() mDBM.connect() if __name__ == "__main__": lista = set([1,2,3,4,5,6,7]) listb = set([4,6,8,9,10]) nadd = lista -listb ndel = listb -lista for one in nadd: print(one)