mirror of
https://github.com/woodchen-ink/dnspod-yxip.git
synced 2025-07-17 21:32:08 +08:00
470 lines
19 KiB
Python
470 lines
19 KiB
Python
import json
|
||
import time
|
||
import requests
|
||
import schedule
|
||
from loguru import logger
|
||
from typing import Dict, List, Optional, Tuple
|
||
import config
|
||
from tencentcloud.common import credential
|
||
from tencentcloud.common.profile.client_profile import ClientProfile
|
||
from tencentcloud.common.profile.http_profile import HttpProfile
|
||
from tencentcloud.dnspod.v20210323 import dnspod_client, models
|
||
import subprocess
|
||
import os
|
||
from datetime import datetime, timedelta
|
||
|
||
# 配置日志
|
||
logger.add("logs/dnspod.log", rotation="10 MB", level=config.LOG_LEVEL)
|
||
|
||
|
||
class DNSPodManager:
|
||
def __init__(self):
|
||
# 实例化一个认证对象
|
||
cred = credential.Credential(config.SECRET_ID, config.SECRET_KEY)
|
||
# 实例化一个http选项,可选的,没有特殊需求可以跳过
|
||
httpProfile = HttpProfile()
|
||
httpProfile.endpoint = "dnspod.tencentcloudapi.com"
|
||
# 实例化一个client选项,可选的,没有特殊需求可以跳过
|
||
clientProfile = ClientProfile()
|
||
clientProfile.httpProfile = httpProfile
|
||
# 实例化要请求产品的client对象
|
||
self.client = dnspod_client.DnspodClient(cred, "", clientProfile)
|
||
# 记录当前使用的IP
|
||
self.current_ips = (
|
||
{}
|
||
) # 格式: {domain: {'默认': {'A': ip, 'AAAA': ip}, '移动': {...}}}
|
||
# IP可用性缓存,格式:{ip: {'available': bool, 'last_check': datetime}}
|
||
self.ip_availability_cache = {}
|
||
# IP可用性缓存时间设置为检查间隔的1/3
|
||
self.cache_duration = max(1, config.check_interval // 3)
|
||
# 初始化时获取所有域名当前的记录
|
||
self.init_current_records()
|
||
|
||
def init_current_records(self):
|
||
"""初始化时获取所有域名当前的解析记录"""
|
||
logger.info("正在获取所有域名当前的解析记录...")
|
||
for domain_config in config.DOMAINS:
|
||
if not domain_config["enabled"]:
|
||
continue
|
||
|
||
domain = domain_config["domain"]
|
||
sub_domain = domain_config["sub_domain"]
|
||
|
||
# 获取当前记录
|
||
current_records = self.get_current_records(domain, sub_domain)
|
||
if current_records:
|
||
self.current_ips[domain] = current_records
|
||
logger.info(f"域名 {domain} - {sub_domain} 当前记录:")
|
||
for line, records in current_records.items():
|
||
for record_type, ip in records.items():
|
||
logger.info(f" - {line} - {record_type}: {ip}")
|
||
else:
|
||
logger.warning(f"域名 {domain} - {sub_domain} 暂无解析记录")
|
||
|
||
def get_optimal_ips(self) -> Dict:
|
||
"""获取优选IP"""
|
||
try:
|
||
response = requests.get(config.API_URL)
|
||
data = response.json()
|
||
if data.get("success"):
|
||
return data["data"]
|
||
raise Exception("API返回数据格式错误")
|
||
except Exception as e:
|
||
logger.error(f"获取优选IP失败: {str(e)}")
|
||
return None
|
||
|
||
def find_best_ip(self, ip_data: Dict, ip_version: str) -> Optional[Tuple[str, int]]:
|
||
"""查找延迟最低的IP,返回(IP, 延迟)"""
|
||
best_ip = None
|
||
min_latency = float("inf")
|
||
|
||
# 遍历所有线路
|
||
for line_key in ["CM", "CU", "CT"]:
|
||
if line_key in ip_data[ip_version]:
|
||
ips = ip_data[ip_version][line_key]
|
||
for ip_info in ips:
|
||
if ip_info["latency"] < min_latency:
|
||
min_latency = ip_info["latency"]
|
||
best_ip = (ip_info["ip"], ip_info["latency"])
|
||
|
||
return best_ip
|
||
|
||
def find_line_best_ip(
|
||
self, ip_data: Dict, ip_version: str, line_key: str
|
||
) -> Optional[Tuple[str, int]]:
|
||
"""查找指定线路延迟最低的IP"""
|
||
if line_key not in ip_data[ip_version]:
|
||
return None
|
||
|
||
ips = ip_data[ip_version][line_key]
|
||
if not ips:
|
||
return None
|
||
|
||
best_ip = min(ips, key=lambda x: x["latency"])
|
||
return (best_ip["ip"], best_ip["latency"])
|
||
|
||
def get_record_list(
|
||
self, domain: str, sub_domain: str = None, record_type: str = None
|
||
) -> List:
|
||
"""获取域名记录列表"""
|
||
try:
|
||
# 实例化一个请求对象
|
||
req = models.DescribeRecordListRequest()
|
||
req.Domain = domain
|
||
if sub_domain:
|
||
req.Subdomain = sub_domain
|
||
if record_type:
|
||
req.RecordType = record_type
|
||
|
||
# 通过client对象调用DescribeRecordList接口
|
||
resp = self.client.DescribeRecordList(req)
|
||
return resp.RecordList
|
||
except Exception as e:
|
||
logger.error(f"获取记录列表失败: {str(e)}")
|
||
return []
|
||
|
||
def delete_record(self, domain: str, record_id: int) -> bool:
|
||
"""删除DNS记录"""
|
||
try:
|
||
req = models.DeleteRecordRequest()
|
||
req.Domain = domain
|
||
req.RecordId = record_id
|
||
self.client.DeleteRecord(req)
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"删除记录失败: {str(e)}")
|
||
return False
|
||
|
||
def clean_existing_records(
|
||
self, domain: str, sub_domain: str, record_type: str, line: str
|
||
) -> None:
|
||
"""清理指定类型和线路的现有记录"""
|
||
try:
|
||
# 定义我们要管理的线路
|
||
managed_lines = ["默认", "移动", "联通", "电信"]
|
||
|
||
# 如果不是我们管理的线路,直接返回
|
||
if line not in managed_lines:
|
||
return
|
||
|
||
records = self.get_record_list(domain, sub_domain, record_type)
|
||
for record in records:
|
||
# 只删除我们管理的线路中的记录
|
||
if record.Line == line and record.Line in managed_lines:
|
||
logger.info(
|
||
f"删除旧记录: {domain} - {sub_domain} - {line} - {record.Value}"
|
||
)
|
||
self.delete_record(domain, record.RecordId)
|
||
time.sleep(1) # 添加短暂延时
|
||
except Exception as e:
|
||
logger.error(f"清理记录时出错: {str(e)}")
|
||
|
||
def update_record(
|
||
self,
|
||
domain: str,
|
||
sub_domain: str,
|
||
record_type: str,
|
||
line: str,
|
||
value: str,
|
||
ttl: int,
|
||
remark: str = None,
|
||
) -> bool:
|
||
"""更新或创建DNS记录"""
|
||
try:
|
||
# 先清理现有记录
|
||
self.clean_existing_records(domain, sub_domain, record_type, line)
|
||
time.sleep(1) # 添加短暂延时
|
||
|
||
# 创建新记录
|
||
req = models.CreateRecordRequest()
|
||
req.Domain = domain
|
||
req.SubDomain = sub_domain
|
||
req.RecordType = record_type
|
||
req.RecordLine = line
|
||
req.Value = value
|
||
req.TTL = ttl
|
||
if remark:
|
||
req.Remark = remark
|
||
|
||
self.client.CreateRecord(req)
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"更新DNS记录失败: {str(e)}")
|
||
return False
|
||
|
||
def get_current_records(self, domain: str, sub_domain: str) -> Dict:
|
||
"""获取当前域名的所有记录"""
|
||
try:
|
||
records = self.get_record_list(domain, sub_domain)
|
||
current_records = {}
|
||
for record in records:
|
||
if record.Line not in current_records:
|
||
current_records[record.Line] = {}
|
||
current_records[record.Line][record.Type] = record.Value
|
||
return current_records
|
||
except Exception as e:
|
||
logger.error(f"获取当前记录失败: {str(e)}")
|
||
return {}
|
||
|
||
def check_ip_availability(self, ip: str) -> bool:
|
||
"""检查IP是否可以ping通"""
|
||
# 检查缓存
|
||
now = datetime.now()
|
||
if ip in self.ip_availability_cache:
|
||
cache_info = self.ip_availability_cache[ip]
|
||
if now - cache_info['last_check'] < timedelta(minutes=self.cache_duration):
|
||
return cache_info['available']
|
||
|
||
try:
|
||
# 根据操作系统选择ping命令参数
|
||
if os.name == 'nt': # Windows系统
|
||
ping_args = ['ping', '-n', '1', '-w', '1000', ip]
|
||
else: # Linux/Unix系统
|
||
ping_args = ['ping', '-c', '1', '-W', '1', ip]
|
||
|
||
# 最多尝试3次ping
|
||
max_retries = 3
|
||
for attempt in range(max_retries):
|
||
result = subprocess.run(ping_args,
|
||
capture_output=True,
|
||
text=True)
|
||
if result.returncode == 0:
|
||
# 更新缓存
|
||
self.ip_availability_cache[ip] = {
|
||
'available': True,
|
||
'last_check': now
|
||
}
|
||
return True
|
||
|
||
# 如果不是最后一次尝试,等待短暂时间后重试
|
||
if attempt < max_retries - 1:
|
||
time.sleep(1) # 等待1秒后重试
|
||
|
||
# 所有尝试都失败
|
||
logger.warning(f"IP {ip} ping测试失败(尝试{max_retries}次),最后一次命令输出:{result.stdout if result.stdout else result.stderr}")
|
||
# 更新缓存
|
||
self.ip_availability_cache[ip] = {
|
||
'available': False,
|
||
'last_check': now
|
||
}
|
||
return False
|
||
except Exception as e:
|
||
logger.error(f"Ping测试出错 - IP: {ip}, 错误信息: {str(e)}, 命令参数: {ping_args}")
|
||
return False
|
||
|
||
def find_best_available_ip(self, ip_data: Dict, ip_version: str) -> Optional[Tuple[str, int]]:
|
||
"""查找可用且延迟最低的IP,返回(IP, 延迟)"""
|
||
if ip_version != "v4": # 只检查IPv4地址
|
||
return self.find_best_ip(ip_data, ip_version)
|
||
|
||
# 按延迟排序所有IP地址,并获取优选IP,检查IP可用性,更新不同线路的记录等功能。
|
||
all_ips = []
|
||
for line_key in ["CM", "CU", "CT"]:
|
||
if line_key in ip_data[ip_version]:
|
||
all_ips.extend(ip_data[ip_version][line_key])
|
||
|
||
# 按延迟排序
|
||
all_ips.sort(key=lambda x: x["latency"])
|
||
|
||
# 查找第一个可用的IP
|
||
for ip_info in all_ips:
|
||
if self.check_ip_availability(ip_info["ip"]):
|
||
return (ip_info["ip"], ip_info["latency"])
|
||
|
||
return None
|
||
|
||
def find_line_best_available_ip(
|
||
self, ip_data: Dict, ip_version: str, line_key: str
|
||
) -> Optional[Tuple[str, int]]:
|
||
"""查找指定线路可用且延迟最低的IP"""
|
||
if ip_version != "v4": # 只检查IPv4地址
|
||
return self.find_line_best_ip(ip_data, ip_version, line_key)
|
||
|
||
if line_key not in ip_data[ip_version]:
|
||
return None
|
||
|
||
ips = ip_data[ip_version][line_key]
|
||
if not ips:
|
||
return None
|
||
|
||
# 按延迟排序
|
||
ips.sort(key=lambda x: x["latency"])
|
||
|
||
# 查找第一个可用的IP
|
||
for ip_info in ips:
|
||
if self.check_ip_availability(ip_info["ip"]):
|
||
return (ip_info["ip"], ip_info["latency"])
|
||
|
||
return None
|
||
def update_domain_records(self, domain_config):
|
||
"""更新指定域名的记录"""
|
||
domain = domain_config["domain"]
|
||
sub_domain = domain_config["sub_domain"]
|
||
ttl = domain_config.get("ttl", 600)
|
||
remark = domain_config.get("remark")
|
||
|
||
# 获取当前记录
|
||
current_records = self.get_current_records(domain, sub_domain)
|
||
|
||
# 获取优选IP
|
||
ip_data = self.get_optimal_ips()
|
||
if not ip_data:
|
||
logger.error(f"无法获取优选IP,跳过更新 {domain}")
|
||
return
|
||
|
||
# 处理IPv4记录
|
||
if domain_config["ipv4_enabled"] and "v4" in ip_data:
|
||
# 获取所有线路的最佳IPv4地址
|
||
line_mapping = {"移动": "CM", "联通": "CU", "电信": "CT"}
|
||
best_ips = {}
|
||
for line, line_key in line_mapping.items():
|
||
best_ip = self.find_line_best_available_ip(ip_data, "v4", line_key)
|
||
if best_ip:
|
||
ip, latency = best_ip
|
||
best_ips[line] = (ip, latency)
|
||
|
||
# 检查是否所有线路的IPv4地址都相同
|
||
if best_ips:
|
||
unique_ips = {ip for ip, _ in best_ips.values()}
|
||
if len(unique_ips) == 1:
|
||
# 所有线路的IPv4地址相同,只添加默认线路
|
||
ip = list(unique_ips)[0]
|
||
min_latency = min(latency for _, latency in best_ips.values())
|
||
current_ip = current_records.get("默认", {}).get("A")
|
||
if current_ip != ip:
|
||
logger.info(
|
||
f"更新A记录: {domain} - {sub_domain} - 默认 - {ip} (延迟: {min_latency}ms) [所有线路IP相同]"
|
||
)
|
||
if self.update_record(
|
||
domain, sub_domain, "A", "默认", ip, ttl, remark
|
||
):
|
||
# 更新成功后更新缓存
|
||
if "默认" not in current_records:
|
||
current_records["默认"] = {}
|
||
current_records["默认"]["A"] = ip
|
||
time.sleep(1)
|
||
else:
|
||
# IPv4地址不同,需要为每个线路添加记录
|
||
for line, (ip, latency) in best_ips.items():
|
||
current_ip = current_records.get(line, {}).get("A")
|
||
if current_ip != ip:
|
||
logger.info(
|
||
f"更新A记录: {domain} - {sub_domain} - {line} - {ip} (延迟: {latency}ms)"
|
||
)
|
||
if self.update_record(
|
||
domain, sub_domain, "A", line, ip, ttl, remark
|
||
):
|
||
# 更新成功后更新缓存
|
||
if line not in current_records:
|
||
current_records[line] = {}
|
||
current_records[line]["A"] = ip
|
||
time.sleep(1)
|
||
|
||
# 添加默认线路(使用延迟最低的IP)
|
||
best_ip = min(best_ips.items(), key=lambda x: x[1][1])
|
||
ip, latency = best_ip[1]
|
||
current_ip = current_records.get("默认", {}).get("A")
|
||
if current_ip != ip:
|
||
logger.info(
|
||
f"更新A记录: {domain} - {sub_domain} - 默认 - {ip} (延迟: {latency}ms)"
|
||
)
|
||
if self.update_record(
|
||
domain, sub_domain, "A", "默认", ip, ttl, remark
|
||
):
|
||
# 更新成功后更新缓存
|
||
if "默认" not in current_records:
|
||
current_records["默认"] = {}
|
||
current_records["默认"]["A"] = ip
|
||
time.sleep(1)
|
||
|
||
# 处理IPv6记录
|
||
if domain_config["ipv6_enabled"] and "v6" in ip_data:
|
||
# 获取所有线路的最佳IPv6地址
|
||
line_mapping = {"移动": "CM", "联通": "CU", "电信": "CT"}
|
||
best_ips = {}
|
||
for line, line_key in line_mapping.items():
|
||
best_ip = self.find_line_best_ip(ip_data, "v6", line_key)
|
||
if best_ip:
|
||
ip, latency = best_ip
|
||
best_ips[line] = (ip, latency)
|
||
|
||
# 检查是否所有线路的IPv6地址都相同
|
||
if best_ips:
|
||
unique_ips = {ip for ip, _ in best_ips.values()}
|
||
if len(unique_ips) == 1:
|
||
# 所有线路的IPv6地址相同,只添加默认线路
|
||
ip = list(unique_ips)[0]
|
||
min_latency = min(latency for _, latency in best_ips.values())
|
||
current_ip = current_records.get("默认", {}).get("AAAA")
|
||
if current_ip != ip:
|
||
logger.info(
|
||
f"更新AAAA记录: {domain} - {sub_domain} - 默认 - {ip} (延迟: {min_latency}ms) [所有线路IP相同]"
|
||
)
|
||
if self.update_record(
|
||
domain, sub_domain, "AAAA", "默认", ip, ttl, remark
|
||
):
|
||
# 更新成功后更新缓存
|
||
if "默认" not in current_records:
|
||
current_records["默认"] = {}
|
||
current_records["默认"]["AAAA"] = ip
|
||
time.sleep(1)
|
||
else:
|
||
# IPv6地址不同,需要为每个线路添加记录
|
||
for line, (ip, latency) in best_ips.items():
|
||
current_ip = current_records.get(line, {}).get("AAAA")
|
||
if current_ip != ip:
|
||
logger.info(
|
||
f"更新AAAA记录: {domain} - {sub_domain} - {line} - {ip} (延迟: {latency}ms)"
|
||
)
|
||
if self.update_record(
|
||
domain, sub_domain, "AAAA", line, ip, ttl, remark
|
||
):
|
||
# 更新成功后更新缓存
|
||
if line not in current_records:
|
||
current_records[line] = {}
|
||
current_records[line]["AAAA"] = ip
|
||
time.sleep(1)
|
||
|
||
# 添加默认线路(使用延迟最低的IP)
|
||
best_ip = min(best_ips.items(), key=lambda x: x[1][1])
|
||
ip, latency = best_ip[1]
|
||
current_ip = current_records.get("默认", {}).get("AAAA")
|
||
if current_ip != ip:
|
||
logger.info(
|
||
f"更新AAAA记录: {domain} - {sub_domain} - 默认 - {ip} (延迟: {latency}ms)"
|
||
)
|
||
if self.update_record(
|
||
domain, sub_domain, "AAAA", "默认", ip, ttl, remark
|
||
):
|
||
# 更新成功后更新缓存
|
||
if "默认" not in current_records:
|
||
current_records["默认"] = {}
|
||
current_records["默认"]["AAAA"] = ip
|
||
time.sleep(1)
|
||
|
||
def check_and_update(self):
|
||
"""检查并更新所有域名"""
|
||
for domain_config in config.DOMAINS:
|
||
if not domain_config["enabled"]:
|
||
continue
|
||
self.update_domain_records(domain_config)
|
||
time.sleep(1) # 添加延时
|
||
|
||
|
||
def main():
|
||
manager = DNSPodManager()
|
||
|
||
# 首次运行,更新所有域名
|
||
manager.check_and_update()
|
||
|
||
# 每5分钟检查一次是否需要更新
|
||
schedule.every(config.check_interval).minutes.do(manager.check_and_update)
|
||
logger.info(f"程序启动成功,开始监控更新(每{config.check_interval}分钟检查一次)...")
|
||
while True:
|
||
schedule.run_pending()
|
||
time.sleep(1)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|