From 467cf4b9f1a277b1197cebe17a6170cf17113266 Mon Sep 17 00:00:00 2001 From: wood Date: Wed, 4 Sep 2024 15:28:15 +0800 Subject: [PATCH] =?UTF-8?q?```=20=E9=87=8D=E6=9E=84=E5=B9=B6=E5=A2=9E?= =?UTF-8?q?=E5=BC=BA=E6=B6=88=E6=81=AF=E5=A4=84=E7=90=86=E5=92=8C=E5=85=B3?= =?UTF-8?q?=E9=94=AE=E8=AF=8D=E7=AE=A1=E7=90=86=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 重构bot_commands模块以使用新的处理函数和命令执行逻辑。 添加处理关键词和白名单命令的新函数。 在guard模块中实现带有速率限制的消息处理函数。 删除了guard模块中过时的关键词加载和保存逻辑。 现在通过新的处理函数处理关键词和白名单命令,提高了代码的整洁度和功能的明确性。 ``` --- data/whitelist.json | 5 ++ requirements.txt | 1 + src/bot_commands.py | 112 ++++++++++++++++++++++++++++++++++++++++++-- src/guard.py | 111 +++++++++++++++++++++++++------------------ src/link_filter.py | 63 +++++++++++++++++++++++++ 5 files changed, 242 insertions(+), 50 deletions(-) create mode 100644 data/whitelist.json create mode 100644 src/link_filter.py diff --git a/data/whitelist.json b/data/whitelist.json new file mode 100644 index 0000000..9f3ba42 --- /dev/null +++ b/data/whitelist.json @@ -0,0 +1,5 @@ +[ + "q58.org", + "czl.net", + "woodchen.ink" +] diff --git a/requirements.txt b/requirements.txt index 924e2c4..0ff539c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ ccxt pyTelegramBotAPI schedule pytz +tldextract diff --git a/src/bot_commands.py b/src/bot_commands.py index 7972543..fe1e308 100644 --- a/src/bot_commands.py +++ b/src/bot_commands.py @@ -1,17 +1,32 @@ +from telethon import events from telethon.tl.types import InputPeerUser from telethon.tl.functions.bots import SetBotCommandsRequest from telethon.tl.types import BotCommand import logging +import json + +KEYWORDS_FILE = '/app/data/keywords.json' +WHITELIST_FILE = '/app/data/whitelist.json' + +def load_json(file_path): + try: + with open(file_path, 'r') as f: + return json.load(f) + except FileNotFoundError: + return [] + +def save_json(file_path, data): + with open(file_path, 'w') as f: + json.dump(data, f) async def register_commands(client, admin_id): commands = [ - # TeleGuard 命令 BotCommand('add', '添加新的关键词'), BotCommand('delete', '删除现有的关键词'), BotCommand('list', '列出所有当前的关键词'), - - # 这里可以添加其他功能的命令 - # 例如:BotCommand('price', '获取当前价格'), + BotCommand('addwhite', '添加域名到白名单'), + BotCommand('delwhite', '从白名单移除域名'), + BotCommand('listwhite', '列出白名单域名'), ] try: @@ -24,4 +39,91 @@ async def register_commands(client, admin_id): except Exception as e: logging.error(f"Failed to register bot commands: {str(e)}") -# 如果有其他功能需要注册命令,可以在这里添加新的函数 +async def handle_keyword_command(event, client): + sender = await event.get_sender() + if sender.id != int(os.environ.get('ADMIN_ID')): + return + + command = event.message.text.split(maxsplit=1) + + if len(command) > 1: + # 直接执行命令 + await execute_keyword_command(event, command[0], command[1]) + else: + # 进入交互模式 + await event.reply(f"请输入你要{command[0][1:]}的关键词:") + + async with client.conversation(sender) as conv: + response = await conv.get_response() + await execute_keyword_command(event, command[0], response.text) + +async def execute_keyword_command(event, command, keyword): + keywords = load_json(KEYWORDS_FILE) + + if command.lower() == '/add': + if keyword.lower() not in keywords: + keywords.append(keyword.lower()) + save_json(KEYWORDS_FILE, keywords) + await event.reply(f"关键词或语句 '{keyword}' 已添加到列表中。") + else: + await event.reply(f"关键词或语句 '{keyword}' 已经在列表中。") + + elif command.lower() == '/delete': + if keyword.lower() in keywords: + keywords.remove(keyword.lower()) + save_json(KEYWORDS_FILE, keywords) + await event.reply(f"关键词或语句 '{keyword}' 已从列表中删除。") + else: + await event.reply(f"关键词或语句 '{keyword}' 不在列表中。") + + elif command.lower() == '/list': + if keywords: + await event.reply(f"当前关键词和语句列表:\n" + "\n".join(keywords)) + else: + await event.reply("关键词列表为空。") + +async def handle_whitelist_command(event, client): + sender = await event.get_sender() + if sender.id != int(os.environ.get('ADMIN_ID')): + return + + command = event.message.text.split(maxsplit=1) + + if len(command) > 1: + # 直接执行命令 + await execute_whitelist_command(event, command[0], command[1]) + else: + # 进入交互模式 + await event.reply(f"请输入你要{command[0][1:]}的域名:") + + async with client.conversation(sender) as conv: + response = await conv.get_response() + await execute_whitelist_command(event, command[0], response.text) + +async def execute_whitelist_command(event, command, domain): + whitelist = load_json(WHITELIST_FILE) + + if command.lower() == '/addwhite': + if domain.lower() not in whitelist: + whitelist.append(domain.lower()) + save_json(WHITELIST_FILE, whitelist) + await event.reply(f"域名 '{domain}' 已添加到白名单。") + else: + await event.reply(f"域名 '{domain}' 已经在白名单中。") + + elif command.lower() == '/delwhite': + if domain.lower() in whitelist: + whitelist.remove(domain.lower()) + save_json(WHITELIST_FILE, whitelist) + await event.reply(f"域名 '{domain}' 已从白名单中移除。") + else: + await event.reply(f"域名 '{domain}' 不在白名单中。") + + elif command.lower() == '/listwhite': + if whitelist: + await event.reply("白名单域名列表:\n" + "\n".join(whitelist)) + else: + await event.reply("白名单为空。") + +def get_keywords(): + return load_json(KEYWORDS_FILE) diff --git a/src/guard.py b/src/guard.py index 4aacaca..ccd49a6 100644 --- a/src/guard.py +++ b/src/guard.py @@ -1,36 +1,43 @@ import os -import json import logging import asyncio import time from telethon import TelegramClient, events from collections import deque +from link_filter import LinkFilter +from bot_commands import handle_keyword_command, handle_whitelist_command, get_keywords # 环境变量 BOT_TOKEN = os.environ.get('BOT_TOKEN') ADMIN_ID = int(os.environ.get('ADMIN_ID')) KEYWORDS_FILE = '/app/data/keywords.json' +WHITELIST_FILE = '/app/data/whitelist.json' # 设置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger('TeleGuard') -def load_keywords(): - if os.path.exists(KEYWORDS_FILE): - with open(KEYWORDS_FILE, 'r') as f: - return json.load(f) - return ['推广', '广告', 'ad', 'promotion'] - -def save_keywords(keywords): - with open(KEYWORDS_FILE, 'w') as f: - json.dump(keywords, f) - -KEYWORDS = load_keywords() +# 创建 LinkFilter 实例 +link_filter = LinkFilter('/app/data/keywords.json', '/app/data/whitelist.json') +# 限速器 class RateLimiter: def __init__(self, max_calls, period): + """ + 初始化RateLimiter类的实例。 + + 参数: + max_calls (int): 限制的最大调用次数。 + period (float): 限定的时间周期(秒)。 + + 该构造函数设置了速率限制器的基本参数,并初始化了一个双端队列, + 用于跟踪调用的时间点,以 enforcement of the rate limiting policy。 + """ + # 限制的最大调用次数 self.max_calls = max_calls + # 限定的时间周期(秒) self.period = period + # 用于存储调用时间的双端队列 self.calls = deque() async def __aenter__(self): @@ -45,50 +52,64 @@ class RateLimiter: async def __aexit__(self, *args): pass - rate_limiter = RateLimiter(max_calls=10, period=1) # 每秒最多处理10条消息 -async def process_message(event): - if not event.is_private and any(keyword in event.message.text.lower() for keyword in KEYWORDS): - if event.sender_id != ADMIN_ID: - await event.delete() - await event.respond("已撤回该消息。注:已发送的推广链接不要多次发送,置顶已有项目的推广链接也会自动撤回。") +# 延迟删除消息函数 +async def delete_message_after_delay(client, chat, message, delay): + # 延迟指定的时间 + await asyncio.sleep(delay) + + # 尝试删除消息 + try: + await client.delete_messages(chat, message) + except Exception as e: + # 如果删除失败,记录错误 + logger.error(f"Failed to delete message: {e}") +# 处理消息函数 +async def process_message(event, client): + if not event.is_private: + # 检查消息是否包含需要过滤的链接 + if link_filter.should_filter(event.message.text): + if event.sender_id != ADMIN_ID: + await event.delete() + notification = await event.respond("已撤回该消息。注:重复发送的链接会被自动撤回。") + asyncio.create_task(delete_message_after_delay(client, event.chat_id, notification, 30 * 60)) + return + + # 检查关键词 + keywords = get_keywords() + if any(keyword in event.message.text.lower() for keyword in keywords): + if event.sender_id != ADMIN_ID: + await event.delete() + notification = await event.respond("已撤回该消息。注:已发送的推广链接不要多次发送,置顶已有项目的推广链接也会自动撤回。") + asyncio.create_task(delete_message_after_delay(client, event.chat_id, notification, 30 * 60)) + +# 启动机器人函数 async def start_bot(): client = TelegramClient('bot', api_id=6, api_hash='eb06d4abfb49dc3eeb1aeb98ae0f581e') await client.start(bot_token=BOT_TOKEN) - @client.on(events.NewMessage(pattern='')) - async def handler(event): - global KEYWORDS - if event.is_private and event.sender_id == ADMIN_ID: - command = event.message.text.split(maxsplit=1) - if command[0].lower() == '/add' and len(command) > 1: - new_keyword = command[1].lower() - if new_keyword not in KEYWORDS: - KEYWORDS.append(new_keyword) - save_keywords(KEYWORDS) - await event.respond(f"关键词或语句 '{new_keyword}' 已添加到列表中。") - else: - await event.respond(f"关键词或语句 '{new_keyword}' 已经在列表中。") - elif command[0].lower() == '/delete' and len(command) > 1: - keyword_to_delete = command[1].lower() - if keyword_to_delete in KEYWORDS: - KEYWORDS.remove(keyword_to_delete) - save_keywords(KEYWORDS) - await event.respond(f"关键词或语句 '{keyword_to_delete}' 已从列表中删除。") - else: - await event.respond(f"关键词或语句 '{keyword_to_delete}' 不在列表中。") - elif command[0].lower() == '/list': - await event.respond(f"当前关键词和语句列表:\n" + "\n".join(KEYWORDS)) - return + @client.on(events.NewMessage(pattern='/add|/delete|/list')) + async def keyword_handler(event): + await handle_keyword_command(event, client) + link_filter.reload_keywords() # 重新加载关键词 - async with rate_limiter: - await process_message(event) + @client.on(events.NewMessage(pattern='/addwhite|/delwhite|/listwhite')) + async def whitelist_handler(event): + await handle_whitelist_command(event, client) + link_filter.reload_whitelist() # 重新加载白名单 + + @client.on(events.NewMessage(pattern='')) + async def message_handler(event): + if not event.is_private or event.sender_id != ADMIN_ID: + async with rate_limiter: + await process_message(event, client) logger.info("TeleGuard is running...") await client.run_until_disconnected() - + +# 主函数 def run(): while True: try: diff --git a/src/link_filter.py b/src/link_filter.py new file mode 100644 index 0000000..c197972 --- /dev/null +++ b/src/link_filter.py @@ -0,0 +1,63 @@ +import re +import json +import tldextract + +class LinkFilter: + def __init__(self, keywords_file, whitelist_file): + self.keywords_file = keywords_file + self.whitelist_file = whitelist_file + self.keywords = self.load_json(keywords_file) + self.whitelist = self.load_json(whitelist_file) + + # 正则表达式匹配各种链接格式 + self.link_pattern = re.compile(r''' + \b + (?: + (?:https?://)? # http:// or https:// (optional) + (?:(?:www\.)? # www. (optional) + (?:[a-zA-Z0-9-]+\.)+ # domain + [a-zA-Z]{2,} # TLD + | # or + (?:t\.me|telegram\.me) # Telegram links + ) + (?:/[^\s]*)? # optional path + ) + \b + ''', re.VERBOSE | re.IGNORECASE) + + def load_json(self, file_path): + try: + with open(file_path, 'r') as f: + return json.load(f) + except FileNotFoundError: + return [] + + def save_keywords(self): + with open(self.keywords_file, 'w') as f: + json.dump(self.keywords, f) + + def is_whitelisted(self, link): + extracted = tldextract.extract(link) + domain = f"{extracted.domain}.{extracted.suffix}" + return domain in self.whitelist + + def add_keyword(self, link): + if link not in self.keywords: + self.keywords.append(link) + self.save_keywords() + + def should_filter(self, text): + links = self.link_pattern.findall(text) + for link in links: + if not self.is_whitelisted(link): + if link in self.keywords: + return True + else: + self.add_keyword(link) + return False + + def reload_keywords(self): + self.keywords = self.load_json(self.keywords_file) + + def reload_whitelist(self): + self.whitelist = self.load_json(self.whitelist_file)