You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

239 lines
9.1 KiB

10 months ago
import pymysql
import sqlite3
import threading
import os
import json
10 months ago
from myutils.ConfigManager import myCongif
from myutils.MyLogger_logger import LogHandler
class DBManager():
#实例化数据库管理对象,并连接数据库
#itype=0 使用mysql数据库,1-使用sqlite数据库
def __init__(self):
self.logger = LogHandler().get_logger("DBManager")
self.lock = threading.Lock()
self.itype = myCongif.get_data("DBType")
if self.itype ==0:
self.host = myCongif.get_data('mysql.host')
self.port = myCongif.get_data('mysql.port')
self.user = myCongif.get_data('mysql.user')
self.passwd = myCongif.get_data('mysql.passwd')
self.database = myCongif.get_data('mysql.database')
self.connection = None
self.cursor = None
elif self.itype ==1:
self.dbfile = myCongif.get_data("sqlite")
if not os.path.exists(self.dbfile):
self.dbfile = "../" + self.dbfile #直接运行DBManager时初始路径不是在根目录
if not os.path.exists(self.dbfile):
raise FileNotFoundError(f"Database file {self.dbfile} does not exist.")
else:
self.logger.error("错误的数据库类型,请检查")
def __del__(self):
if self.ok:
self.cursor.close()
self.connection.close()
self.cursor = None
self.connection = None
self.logger.debug("DBManager销毁")
def connect(self):
try:
if self.itype ==0:
self.connection = pymysql.connect(host=self.host, port=self.port, user=self.user,
passwd=self.passwd, db=self.database,charset='utf8')
self.cursor = self.connection.cursor()
elif self.itype ==1:
self.connection = sqlite3.connect(self.dbfile)
self.cursor = self.connection.cursor()
self.ok = True
return True
except:
self.logger.error("服务器端数据库连接失败")
return False
# 判断数据库连接是否正常,若不正常则重连接
def Retest_conn(self):
if self.itype == 0: #除了mysql,sqlite3不需要判断连接状态
try:
self.connection.ping()
except:
return self.connect()
return True
# 执行数据库查询操作 1-只查询一条记录,其他所有记录
def do_select(self, strsql, itype=0):
# self.conn.begin()
self.lock.acquire()
data = None
if self.Retest_conn():
try:
self.cursor.execute(strsql)
self.connection.commit() # select要commit提交事务,是存在获取不到最新数据的问题(innoDB事务机制)
except Exception as e:
self.logger.error("do_select异常报错:%s" % str(e))
self.lock.release()
return None
if itype == 1:
data = self.cursor.fetchone()
else:
data = self.cursor.fetchall()
self.lock.release()
return data
# 执行数据库语句
def do_sql(self, strsql, data=None):
self.lock.acquire()
bok = False
if self.Retest_conn():
try:
# self.conn.begin()
if data:
iret = self.cursor.executemany(strsql, data) #批量执行sql语句
else:
iret = self.cursor.execute(strsql)
self.connection.commit()
if iret.rowcount > 0: # 要有修改成功的记录才返回true
bok = True
except Exception as e:
self.logger.error("执行数据库语句%s出错:%s" % (strsql, str(e)))
self.connection.rollback()
self.lock.release()
return bok
#---------------------特定数据库操作函数---------------------
#根据通道ID或者模型ID删除通道和模型间的关联数据 1-通道ID,2-模型ID
def delC2M(self,ID,itype):
#channel2model
if itype ==1:
strsql = f"select ID from channel2model where channel_id={ID};"
datas = self.do_select(strsql)
strsql = f"delete from channel2model where channel_id={ID};"
ret = self.do_sql(strsql)
if ret == False:
return False
elif itype ==2:
strsql = f"select ID from channel2model where model_id={ID};"
datas = self.do_select(strsql)
strsql = f"delete from channel2model where model_id={ID};"
ret = self.do_sql(strsql)
if ret == False:
return False
else:
return False
#schedule
for data in datas:
c2m_id = data[0]
strsql = f"delete from schedule where channel2model_id={c2m_id};"
ret = self.do_sql(strsql)
if ret == False:
return False
return True
#删除通道,需要关联删除布防时间,通道和算法的关联表
def delchannel(self,ID):
ret = self.delC2M(ID)
if ret == False:
return False
#channel
strsql = f"delete from channel where ID={ID};"
ret = self.do_sql(strsql)
if ret == False:
return False
return True
#修改视频通道和算法间的关联关系
#channel_id 通道ID
#modell_list 最新配置的模型id list
def updateC2M(self,channel_id,model_list):
strsql = f"select model_id from channel2model where channel_id={channel_id};"
datas = set(self.do_select(strsql))
data_new = set(model_list)
#计算要新增和修改的
need_add = data_new - datas
need_del = datas-data_new
#新增
for one in need_add:
strsql = f"insert into channel2model (channel_id,model_id) values ({channel_id},{one});"
if self.do_sql(strsql) == False:
return False
#初始化布防时间 -- 全1
strsql = f"select ID from channel2model where channel_id={channel_id} and model_id={one};"
data = mDBM.do_select(strsql,1)
schedule_data_str = ("{'6': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], "
"'0': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], "
"'1': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], "
"'2': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], "
"'3': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],"
"'4': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1],"
"'5': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]}")
schedule_data = json.loads(schedule_data_str.replace("'", '"'))
for day, hours in schedule_data.items():
for hour, status in enumerate(hours):
strsql = (
f"insert into schedule (channel2model_id,day,hour,status) values ({ID},'{day}',{hour},{status})"
f" on conflict(channel2model_id,day,hour) do update set status=excluded.status;")
ret = mDBM.do_sql(strsql)
if not ret:
return ret
10 months ago
#差删除
for one in need_del:
strsql = f"select ID from channel2model where channel_id={channel_id} and model_id={one};"
c2m_id = mDBM.do_select(strsql,1)[0]
#删除布防计划数据
strsql = f"delete from schedule where channel2model_id={c2m_id};"
if self.do_sql(strsql) == False:
return False
#删除关联记录
strsql = f"delete from channel2model where ID = {c2m_id};"
if self.do_sql(strsql) == False:
return False
return True
#检查设备ID是否在数据库?
def checkDevID(self,cID):
pass
def test(self):
# 建立数据库连接
conn = pymysql.connect(
host='localhost',
port=3306,
user='username',
password='password',
database='database_name'
)
# 创建游标对象
cursor = conn.cursor()
# 执行 SQL 查询
query = "SELECT * FROM table_name"
cursor.execute(query)
# 获取查询结果
result = cursor.fetchall()
# 输出结果
for row in result:
print(row)
# 关闭游标和连接
cursor.close()
conn.close()
mDBM = DBManager()
mDBM.connect()
if __name__ == "__main__":
lista = set([1,2,3,4,5,6,7])
listb = set([4,6,8,9,10])
nadd = lista -listb
ndel = listb -lista
for one in nadd:
print(one)