dnspod-yxip/main.py
2025-02-23 00:50:22 +08:00

470 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import time
import requests
import schedule
from loguru import logger
from typing import Dict, List, Optional, Tuple
import config
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.dnspod.v20210323 import dnspod_client, models
import subprocess
import os
from datetime import datetime, timedelta
# 配置日志
logger.add("logs/dnspod.log", rotation="10 MB", level=config.LOG_LEVEL)
class DNSPodManager:
def __init__(self):
# 实例化一个认证对象
cred = credential.Credential(config.SECRET_ID, config.SECRET_KEY)
# 实例化一个http选项可选的没有特殊需求可以跳过
httpProfile = HttpProfile()
httpProfile.endpoint = "dnspod.tencentcloudapi.com"
# 实例化一个client选项可选的没有特殊需求可以跳过
clientProfile = ClientProfile()
clientProfile.httpProfile = httpProfile
# 实例化要请求产品的client对象
self.client = dnspod_client.DnspodClient(cred, "", clientProfile)
# 记录当前使用的IP
self.current_ips = (
{}
) # 格式: {domain: {'默认': {'A': ip, 'AAAA': ip}, '移动': {...}}}
# IP可用性缓存格式{ip: {'available': bool, 'last_check': datetime}}
self.ip_availability_cache = {}
# IP可用性缓存时间设置为检查间隔的1/3
self.cache_duration = max(1, config.check_interval // 3)
# 初始化时获取所有域名当前的记录
self.init_current_records()
def init_current_records(self):
"""初始化时获取所有域名当前的解析记录"""
logger.info("正在获取所有域名当前的解析记录...")
for domain_config in config.DOMAINS:
if not domain_config["enabled"]:
continue
domain = domain_config["domain"]
sub_domain = domain_config["sub_domain"]
# 获取当前记录
current_records = self.get_current_records(domain, sub_domain)
if current_records:
self.current_ips[domain] = current_records
logger.info(f"域名 {domain} - {sub_domain} 当前记录:")
for line, records in current_records.items():
for record_type, ip in records.items():
logger.info(f" - {line} - {record_type}: {ip}")
else:
logger.warning(f"域名 {domain} - {sub_domain} 暂无解析记录")
def get_optimal_ips(self) -> Dict:
"""获取优选IP"""
try:
response = requests.get(config.API_URL)
data = response.json()
if data.get("success"):
return data["data"]
raise Exception("API返回数据格式错误")
except Exception as e:
logger.error(f"获取优选IP失败: {str(e)}")
return None
def find_best_ip(self, ip_data: Dict, ip_version: str) -> Optional[Tuple[str, int]]:
"""查找延迟最低的IP返回(IP, 延迟)"""
best_ip = None
min_latency = float("inf")
# 遍历所有线路
for line_key in ["CM", "CU", "CT"]:
if line_key in ip_data[ip_version]:
ips = ip_data[ip_version][line_key]
for ip_info in ips:
if ip_info["latency"] < min_latency:
min_latency = ip_info["latency"]
best_ip = (ip_info["ip"], ip_info["latency"])
return best_ip
def find_line_best_ip(
self, ip_data: Dict, ip_version: str, line_key: str
) -> Optional[Tuple[str, int]]:
"""查找指定线路延迟最低的IP"""
if line_key not in ip_data[ip_version]:
return None
ips = ip_data[ip_version][line_key]
if not ips:
return None
best_ip = min(ips, key=lambda x: x["latency"])
return (best_ip["ip"], best_ip["latency"])
def get_record_list(
self, domain: str, sub_domain: str = None, record_type: str = None
) -> List:
"""获取域名记录列表"""
try:
# 实例化一个请求对象
req = models.DescribeRecordListRequest()
req.Domain = domain
if sub_domain:
req.Subdomain = sub_domain
if record_type:
req.RecordType = record_type
# 通过client对象调用DescribeRecordList接口
resp = self.client.DescribeRecordList(req)
return resp.RecordList
except Exception as e:
logger.error(f"获取记录列表失败: {str(e)}")
return []
def delete_record(self, domain: str, record_id: int) -> bool:
"""删除DNS记录"""
try:
req = models.DeleteRecordRequest()
req.Domain = domain
req.RecordId = record_id
self.client.DeleteRecord(req)
return True
except Exception as e:
logger.error(f"删除记录失败: {str(e)}")
return False
def clean_existing_records(
self, domain: str, sub_domain: str, record_type: str, line: str
) -> None:
"""清理指定类型和线路的现有记录"""
try:
# 定义我们要管理的线路
managed_lines = ["默认", "移动", "联通", "电信"]
# 如果不是我们管理的线路,直接返回
if line not in managed_lines:
return
records = self.get_record_list(domain, sub_domain, record_type)
for record in records:
# 只删除我们管理的线路中的记录
if record.Line == line and record.Line in managed_lines:
logger.info(
f"删除旧记录: {domain} - {sub_domain} - {line} - {record.Value}"
)
self.delete_record(domain, record.RecordId)
time.sleep(1) # 添加短暂延时
except Exception as e:
logger.error(f"清理记录时出错: {str(e)}")
def update_record(
self,
domain: str,
sub_domain: str,
record_type: str,
line: str,
value: str,
ttl: int,
remark: str = None,
) -> bool:
"""更新或创建DNS记录"""
try:
# 先清理现有记录
self.clean_existing_records(domain, sub_domain, record_type, line)
time.sleep(1) # 添加短暂延时
# 创建新记录
req = models.CreateRecordRequest()
req.Domain = domain
req.SubDomain = sub_domain
req.RecordType = record_type
req.RecordLine = line
req.Value = value
req.TTL = ttl
if remark:
req.Remark = remark
self.client.CreateRecord(req)
return True
except Exception as e:
logger.error(f"更新DNS记录失败: {str(e)}")
return False
def get_current_records(self, domain: str, sub_domain: str) -> Dict:
"""获取当前域名的所有记录"""
try:
records = self.get_record_list(domain, sub_domain)
current_records = {}
for record in records:
if record.Line not in current_records:
current_records[record.Line] = {}
current_records[record.Line][record.Type] = record.Value
return current_records
except Exception as e:
logger.error(f"获取当前记录失败: {str(e)}")
return {}
def check_ip_availability(self, ip: str) -> bool:
"""检查IP是否可以ping通"""
# 检查缓存
now = datetime.now()
if ip in self.ip_availability_cache:
cache_info = self.ip_availability_cache[ip]
if now - cache_info['last_check'] < timedelta(minutes=self.cache_duration):
return cache_info['available']
try:
# 根据操作系统选择ping命令参数
if os.name == 'nt': # Windows系统
ping_args = ['ping', '-n', '1', '-w', '1000', ip]
else: # Linux/Unix系统
ping_args = ['ping', '-c', '1', '-W', '1', ip]
# 最多尝试3次ping
max_retries = 3
for attempt in range(max_retries):
result = subprocess.run(ping_args,
capture_output=True,
text=True)
if result.returncode == 0:
# 更新缓存
self.ip_availability_cache[ip] = {
'available': True,
'last_check': now
}
return True
# 如果不是最后一次尝试,等待短暂时间后重试
if attempt < max_retries - 1:
time.sleep(1) # 等待1秒后重试
# 所有尝试都失败
logger.warning(f"IP {ip} ping测试失败尝试{max_retries}次),最后一次命令输出:{result.stdout if result.stdout else result.stderr}")
# 更新缓存
self.ip_availability_cache[ip] = {
'available': False,
'last_check': now
}
return False
except Exception as e:
logger.error(f"Ping测试出错 - IP: {ip}, 错误信息: {str(e)}, 命令参数: {ping_args}")
return False
def find_best_available_ip(self, ip_data: Dict, ip_version: str) -> Optional[Tuple[str, int]]:
"""查找可用且延迟最低的IP返回(IP, 延迟)"""
if ip_version != "v4": # 只检查IPv4地址
return self.find_best_ip(ip_data, ip_version)
# 按延迟排序所有IP地址并获取优选IP检查IP可用性更新不同线路的记录等功能。
all_ips = []
for line_key in ["CM", "CU", "CT"]:
if line_key in ip_data[ip_version]:
all_ips.extend(ip_data[ip_version][line_key])
# 按延迟排序
all_ips.sort(key=lambda x: x["latency"])
# 查找第一个可用的IP
for ip_info in all_ips:
if self.check_ip_availability(ip_info["ip"]):
return (ip_info["ip"], ip_info["latency"])
return None
def find_line_best_available_ip(
self, ip_data: Dict, ip_version: str, line_key: str
) -> Optional[Tuple[str, int]]:
"""查找指定线路可用且延迟最低的IP"""
if ip_version != "v4": # 只检查IPv4地址
return self.find_line_best_ip(ip_data, ip_version, line_key)
if line_key not in ip_data[ip_version]:
return None
ips = ip_data[ip_version][line_key]
if not ips:
return None
# 按延迟排序
ips.sort(key=lambda x: x["latency"])
# 查找第一个可用的IP
for ip_info in ips:
if self.check_ip_availability(ip_info["ip"]):
return (ip_info["ip"], ip_info["latency"])
return None
def update_domain_records(self, domain_config):
"""更新指定域名的记录"""
domain = domain_config["domain"]
sub_domain = domain_config["sub_domain"]
ttl = domain_config.get("ttl", 600)
remark = domain_config.get("remark")
# 获取当前记录
current_records = self.get_current_records(domain, sub_domain)
# 获取优选IP
ip_data = self.get_optimal_ips()
if not ip_data:
logger.error(f"无法获取优选IP跳过更新 {domain}")
return
# 处理IPv4记录
if domain_config["ipv4_enabled"] and "v4" in ip_data:
# 获取所有线路的最佳IPv4地址
line_mapping = {"移动": "CM", "联通": "CU", "电信": "CT"}
best_ips = {}
for line, line_key in line_mapping.items():
best_ip = self.find_line_best_available_ip(ip_data, "v4", line_key)
if best_ip:
ip, latency = best_ip
best_ips[line] = (ip, latency)
# 检查是否所有线路的IPv4地址都相同
if best_ips:
unique_ips = {ip for ip, _ in best_ips.values()}
if len(unique_ips) == 1:
# 所有线路的IPv4地址相同只添加默认线路
ip = list(unique_ips)[0]
min_latency = min(latency for _, latency in best_ips.values())
current_ip = current_records.get("默认", {}).get("A")
if current_ip != ip:
logger.info(
f"更新A记录: {domain} - {sub_domain} - 默认 - {ip} (延迟: {min_latency}ms) [所有线路IP相同]"
)
if self.update_record(
domain, sub_domain, "A", "默认", ip, ttl, remark
):
# 更新成功后更新缓存
if "默认" not in current_records:
current_records["默认"] = {}
current_records["默认"]["A"] = ip
time.sleep(1)
else:
# IPv4地址不同需要为每个线路添加记录
for line, (ip, latency) in best_ips.items():
current_ip = current_records.get(line, {}).get("A")
if current_ip != ip:
logger.info(
f"更新A记录: {domain} - {sub_domain} - {line} - {ip} (延迟: {latency}ms)"
)
if self.update_record(
domain, sub_domain, "A", line, ip, ttl, remark
):
# 更新成功后更新缓存
if line not in current_records:
current_records[line] = {}
current_records[line]["A"] = ip
time.sleep(1)
# 添加默认线路使用延迟最低的IP
best_ip = min(best_ips.items(), key=lambda x: x[1][1])
ip, latency = best_ip[1]
current_ip = current_records.get("默认", {}).get("A")
if current_ip != ip:
logger.info(
f"更新A记录: {domain} - {sub_domain} - 默认 - {ip} (延迟: {latency}ms)"
)
if self.update_record(
domain, sub_domain, "A", "默认", ip, ttl, remark
):
# 更新成功后更新缓存
if "默认" not in current_records:
current_records["默认"] = {}
current_records["默认"]["A"] = ip
time.sleep(1)
# 处理IPv6记录
if domain_config["ipv6_enabled"] and "v6" in ip_data:
# 获取所有线路的最佳IPv6地址
line_mapping = {"移动": "CM", "联通": "CU", "电信": "CT"}
best_ips = {}
for line, line_key in line_mapping.items():
best_ip = self.find_line_best_ip(ip_data, "v6", line_key)
if best_ip:
ip, latency = best_ip
best_ips[line] = (ip, latency)
# 检查是否所有线路的IPv6地址都相同
if best_ips:
unique_ips = {ip for ip, _ in best_ips.values()}
if len(unique_ips) == 1:
# 所有线路的IPv6地址相同只添加默认线路
ip = list(unique_ips)[0]
min_latency = min(latency for _, latency in best_ips.values())
current_ip = current_records.get("默认", {}).get("AAAA")
if current_ip != ip:
logger.info(
f"更新AAAA记录: {domain} - {sub_domain} - 默认 - {ip} (延迟: {min_latency}ms) [所有线路IP相同]"
)
if self.update_record(
domain, sub_domain, "AAAA", "默认", ip, ttl, remark
):
# 更新成功后更新缓存
if "默认" not in current_records:
current_records["默认"] = {}
current_records["默认"]["AAAA"] = ip
time.sleep(1)
else:
# IPv6地址不同需要为每个线路添加记录
for line, (ip, latency) in best_ips.items():
current_ip = current_records.get(line, {}).get("AAAA")
if current_ip != ip:
logger.info(
f"更新AAAA记录: {domain} - {sub_domain} - {line} - {ip} (延迟: {latency}ms)"
)
if self.update_record(
domain, sub_domain, "AAAA", line, ip, ttl, remark
):
# 更新成功后更新缓存
if line not in current_records:
current_records[line] = {}
current_records[line]["AAAA"] = ip
time.sleep(1)
# 添加默认线路使用延迟最低的IP
best_ip = min(best_ips.items(), key=lambda x: x[1][1])
ip, latency = best_ip[1]
current_ip = current_records.get("默认", {}).get("AAAA")
if current_ip != ip:
logger.info(
f"更新AAAA记录: {domain} - {sub_domain} - 默认 - {ip} (延迟: {latency}ms)"
)
if self.update_record(
domain, sub_domain, "AAAA", "默认", ip, ttl, remark
):
# 更新成功后更新缓存
if "默认" not in current_records:
current_records["默认"] = {}
current_records["默认"]["AAAA"] = ip
time.sleep(1)
def check_and_update(self):
"""检查并更新所有域名"""
for domain_config in config.DOMAINS:
if not domain_config["enabled"]:
continue
self.update_domain_records(domain_config)
time.sleep(1) # 添加延时
def main():
manager = DNSPodManager()
# 首次运行,更新所有域名
manager.check_and_update()
# 每5分钟检查一次是否需要更新
schedule.every(config.check_interval).minutes.do(manager.check_and_update)
logger.info(f"程序启动成功,开始监控更新(每{config.check_interval}分钟检查一次)...")
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
main()