You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
300 lines
11 KiB
300 lines
11 KiB
import subprocess
|
|
import platform
|
|
import locale
|
|
import re
|
|
|
|
'''
|
|
针对windows和linux的网络信息获取和设置功能类。
|
|
由于网络接口名字并不一定,又有多网卡等情况,实际应用时需要指定网络接口(固化硬件环境)
|
|
'''
|
|
|
|
class IPManager:
|
|
def __init__(self):
|
|
self.os_type = platform.system()
|
|
self.encoding = locale.getpreferredencoding()
|
|
|
|
def set_ip(self, interface_name, ip_address, subnet_mask, gateway):
|
|
if self.os_type == "Windows":
|
|
self._set_ip_windows(interface_name, ip_address, subnet_mask, gateway)
|
|
elif self.os_type == "Linux":
|
|
self._set_ip_linux(interface_name, ip_address, subnet_mask, gateway)
|
|
else:
|
|
raise NotImplementedError("Unsupported OS")
|
|
|
|
def connect_wifi(self, ssid, password):
|
|
if self.os_type == "Windows":
|
|
self._connect_wifi_windows(ssid, password)
|
|
elif self.os_type == "Linux":
|
|
self._connect_wifi_linux(ssid, password)
|
|
else:
|
|
raise NotImplementedError("Unsupported OS")
|
|
|
|
def get_wifi_list(self):
|
|
wifis = []
|
|
if self.os_type == "Windows":
|
|
wifis = self._get_wifi_list_windows()
|
|
elif self.os_type == "Linux":
|
|
wifis = self._get_wifi_list_linux()
|
|
else:
|
|
raise NotImplementedError("Unsupported OS")
|
|
return wifis
|
|
|
|
def get_connected_wifi(self):
|
|
ssid = None
|
|
if self.os_type == "Windows":
|
|
ssid = self._get_connected_wifi_windows()
|
|
elif self.os_type == "Linux":
|
|
ssid = self._get_connected_wifi_linux()
|
|
else:
|
|
raise NotImplementedError("Unsupported OS")
|
|
return ssid
|
|
|
|
def get_network_info(self):
|
|
info = None
|
|
if self.os_type == "Windows":
|
|
info = self._get_network_info_windows()
|
|
elif self.os_type == "Linux":
|
|
info = self._get_network_info_linux()
|
|
else:
|
|
raise NotImplementedError("Unsupported OS")
|
|
return info
|
|
|
|
def _set_ip_windows(self, interface_name, ip_address, subnet_mask, gateway):
|
|
command = f'netsh interface ip set address name="{interface_name}" static {ip_address} {subnet_mask} {gateway} 1'
|
|
#result = subprocess.run(command, shell=True,capture_output=True, text=True, encoding='utf-8')
|
|
# 打印输出结果
|
|
result = self._run_cmd(command)
|
|
print(result.stdout)
|
|
|
|
# 检查返回码
|
|
if result.returncode == 0:
|
|
print("IP address set successfully.")
|
|
else:
|
|
print("Failed to set IP address. Please check the output for errors.")
|
|
|
|
def _set_ip_linux(self, interface_name, ip_address, subnet_mask, gateway):
|
|
self._run_cmd(f"sudo ip addr flush dev {interface_name}")
|
|
self._run_cmd(f"sudo ip addr add {ip_address}/{subnet_mask} dev {interface_name}")
|
|
self._run_cmd(f"sudo ip route add default via {gateway}")
|
|
|
|
def _connect_wifi_windows(self, ssid, password):
|
|
command = f'netsh wlan connect name="{ssid}"'
|
|
self._run_cmd(command)
|
|
|
|
def _connect_wifi_linux(self, ssid, password):
|
|
command = f"nmcli dev wifi connect '{ssid}' password '{password}'"
|
|
self._run_cmd(command)
|
|
|
|
def _get_wifi_list_windows(self):
|
|
command = "netsh wlan show networks"
|
|
result = self._run_cmd(command)
|
|
wifi_list = []
|
|
lines = result.stdout.split('\n')
|
|
for line in lines:
|
|
if "SSID" in line and "BSSID" not in line:
|
|
ssid = line.split(":")[1].strip()
|
|
wifi_list.append(ssid)
|
|
|
|
return wifi_list
|
|
|
|
def _get_wifi_list_linux(self):
|
|
command = "nmcli dev wifi list"
|
|
result = self._run_cmd(command)
|
|
|
|
# 解析输出结果
|
|
wifi_list = []
|
|
lines = result.stdout.split('\n')
|
|
for line in lines[1:]: # 跳过表头
|
|
if line.strip():
|
|
ssid = line.split()[0]
|
|
wifi_list.append(ssid)
|
|
|
|
return wifi_list
|
|
|
|
def _get_connected_wifi_windows(self):
|
|
command = "netsh wlan show interfaces"
|
|
result = self._run_cmd(command)
|
|
|
|
# 使用正则表达式提取SSID
|
|
ssid_match = re.search(r"SSID\s+: (.+)", result.stdout)
|
|
|
|
if ssid_match:
|
|
ssid = ssid_match.group(1).strip()
|
|
return ssid
|
|
else:
|
|
return None
|
|
|
|
def _get_connected_wifi_linux(self):
|
|
command = "iwgetid -r"
|
|
result = self._run_cmd(command)
|
|
|
|
ssid = result.stdout.strip()
|
|
if ssid:
|
|
return ssid
|
|
else:
|
|
return None
|
|
|
|
def _get_network_info_windows(self):
|
|
command = "ipconfig /all"
|
|
result = self._run_cmd(command)
|
|
|
|
interfaces = {}
|
|
current_interface = None
|
|
for line in result.stdout.splitlines():
|
|
# 识别接口名称
|
|
interface_match = re.match(r"^\s*([^:\s]+.*):$", line)
|
|
if interface_match:
|
|
current_interface = interface_match.group(1).strip()
|
|
interfaces[current_interface] = {"IP": None, "Subnet": None, "Gateway": None, "DNS": []}
|
|
|
|
if current_interface:
|
|
# 解析IP地址
|
|
ip_match = re.search(r"^\s*IPv4 地址[^\:]*:\s*(\d+\.\d+\.\d+\.\d+)", line)
|
|
if ip_match:
|
|
interfaces[current_interface]["IP"] = ip_match.group(1).strip()
|
|
|
|
# 解析子网掩码
|
|
subnet_match = re.search(r"^\s*子网掩码[^\:]*:\s*(\d+\.\d+\.\d+\.\d+)", line)
|
|
if subnet_match:
|
|
interfaces[current_interface]["Subnet"] = subnet_match.group(1).strip()
|
|
|
|
# 解析网关
|
|
gateway_match = re.search(r"^\s*默认网关[^\:]*:\s*(\d+\.\d+\.\d+\.\d+)", line)
|
|
if gateway_match:
|
|
interfaces[current_interface]["Gateway"] = gateway_match.group(1).strip()
|
|
|
|
# 解析DNS服务器
|
|
dns_match = re.search(r"^\s*DNS 服务器[^\:]*:\s*(\d+\.\d+\.\d+\.\d+)", line)
|
|
if dns_match:
|
|
interfaces[current_interface]["DNS"].append(dns_match.group(1).strip())
|
|
else:
|
|
# 匹配后续行的DNS服务器
|
|
dns_continued_match = re.search(r"^\s*(\d+\.\d+\.\d+\.\d+)", line)
|
|
if dns_continued_match:
|
|
interfaces[current_interface]["DNS"].append(dns_continued_match.group(1).strip())
|
|
return interfaces
|
|
|
|
def _get_network_info_linux(self):
|
|
# 获取IP地址、子网掩码和网关
|
|
ip_info = self._run_cmd("ip addr show").stdout
|
|
route_info = self._run_cmd("ip route show").stdout
|
|
|
|
# 获取DNS服务器
|
|
dns_info = self._run_cmd("nmcli dev show | grep DNS").stdout
|
|
|
|
interfaces = {}
|
|
current_interface = None
|
|
for line in ip_info.splitlines():
|
|
# 识别接口名称
|
|
interface_match = re.match(r"^\d+: ([^:\s]+):", line)
|
|
if interface_match:
|
|
current_interface = interface_match.group(1).strip()
|
|
interfaces[current_interface] = {"IP": None, "Subnet": None, "Gateway": None, "DNS": []}
|
|
|
|
if current_interface:
|
|
# 解析IP地址和子网掩码
|
|
ip_match = re.search(r"^\s*inet (\d+\.\d+\.\d+\.\d+)/(\d+)", line)
|
|
if ip_match:
|
|
interfaces[current_interface]["IP"] = ip_match.group(1).strip()
|
|
interfaces[current_interface]["Subnet"] = ip_match.group(2).strip()
|
|
|
|
# 解析网关
|
|
for line in route_info.splitlines():
|
|
if "default via" in line:
|
|
gateway_match = re.search(r"default via (\d+\.\d+\.\d+\.\d+)", line)
|
|
interface_match = re.search(r"dev (\S+)", line)
|
|
if gateway_match and interface_match:
|
|
interface = interface_match.group(1).strip()
|
|
if interface in interfaces:
|
|
interfaces[interface]["Gateway"] = gateway_match.group(1).strip()
|
|
|
|
# 解析DNS服务器
|
|
for line in dns_info.splitlines():
|
|
dns_match = re.search(r"DNS\[\d+\]:\s*(\d+\.\d+\.\d+\.\d+)", line)
|
|
if dns_match:
|
|
# 假设第一个接口是主要接口
|
|
primary_interface = list(interfaces.keys())[0]
|
|
interfaces[primary_interface]["DNS"].append(dns_match.group(1).strip())
|
|
|
|
return interfaces
|
|
|
|
def _get_interface_info_windows(self,interface_name):
|
|
command = f'netsh interface ip show config name="{interface_name}"'
|
|
result = self._run_cmd(command)
|
|
|
|
info = {"IP": None, "Subnet": None, "Gateway": None, "DNS": []}
|
|
|
|
# 解析IP地址
|
|
ip_match = re.search(r"IP Address:\s*(\d+\.\d+\.\d+\.\d+)", result.stdout)
|
|
if ip_match:
|
|
info["IP"] = ip_match.group(1).strip()
|
|
|
|
# 解析子网掩码
|
|
subnet_match = re.search(r"Subnet Prefix:\s*(\d+\.\d+\.\d+\.\d+)", result.stdout)
|
|
if subnet_match:
|
|
info["Subnet"] = subnet_match.group(1).strip()
|
|
|
|
# 解析网关
|
|
gateway_match = re.search(r"Default Gateway:\s*(\d+\.\d+\.\d+\.\d+)", result.stdout)
|
|
if gateway_match:
|
|
info["Gateway"] = gateway_match.group(1).strip()
|
|
|
|
# 解析DNS服务器
|
|
dns_matches = re.findall(r"Statically Configured DNS Servers:\s*(\d+\.\d+\.\d+\.\d+)", result.stdout)
|
|
for dns in dns_matches:
|
|
info["DNS"].append(dns.strip())
|
|
|
|
return info
|
|
|
|
def _get_interface_info_linux(self,interface_name):
|
|
info = {"IP": None, "Subnet": None, "Gateway": None, "DNS": []}
|
|
|
|
# 获取IP地址和子网掩码
|
|
ip_command = f"ip addr show dev {interface_name}"
|
|
ip_result = self._run_cmd(ip_command)
|
|
|
|
ip_match = re.search(r"inet (\d+\.\d+\.\d+\.\d+)/(\d+)", ip_result.stdout)
|
|
if ip_match:
|
|
info["IP"] = ip_match.group(1).strip()
|
|
info["Subnet"] = ip_match.group(2).strip()
|
|
|
|
# 获取网关信息
|
|
gateway_command = f"ip route show dev {interface_name}"
|
|
gateway_result = subprocess.run(gateway_command, shell=True, capture_output=True, text=True, encoding='utf-8')
|
|
|
|
gateway_match = re.search(r"default via (\d+\.\d+\.\d+\.\d+)", gateway_result.stdout)
|
|
if gateway_match:
|
|
info["Gateway"] = gateway_match.group(1).strip()
|
|
|
|
# 获取DNS服务器信息
|
|
dns_command = f"nmcli dev show {interface_name} | grep DNS"
|
|
dns_result = subprocess.run(dns_command, shell=True, capture_output=True, text=True, encoding='utf-8')
|
|
|
|
dns_matches = re.findall(r"DNS\[\d+\]:\s*(\d+\.\d+\.\d+\.\d+)", dns_result.stdout)
|
|
for dns in dns_matches:
|
|
info["DNS"].append(dns.strip())
|
|
|
|
return info
|
|
|
|
def _run_cmd(self,command):
|
|
'''
|
|
为了统一命令执行的返回参数格式,统一定义了一个执行函数
|
|
:param command:
|
|
:return:
|
|
'''
|
|
result = subprocess.run(command, shell=True, capture_output=True, text=True, encoding=self.encoding)
|
|
return result
|
|
|
|
IPM = IPManager()
|
|
|
|
if __name__ == "__main__":#WLAN
|
|
# 示例用法
|
|
nm = IPManager()
|
|
try:
|
|
#nm.set_ip("以太网 2", "192.168.1.100", "255.255.255.0", "192.168.1.1")
|
|
#nm.set_ip("WLAN", "192.168.3.100", "255.255.255.0", "192.168.3.1") #zhang wifi
|
|
network_info = nm.get_wifi_list()
|
|
print(network_info)
|
|
except Exception as e:
|
|
print(e)
|
|
#nm.connect_wifi("YourSSID", "YourPassword")
|
|
|