From b03910f654fc2d7707e5e88cd2c5d3ff841239c7 Mon Sep 17 00:00:00 2001 From: wood Date: Wed, 4 Sep 2024 21:27:59 +0800 Subject: [PATCH] =?UTF-8?q?refactor(bot=5Fcommands):=20=E7=A7=BB=E9=99=A4?= =?UTF-8?q?=E6=9C=AA=E4=BD=BF=E7=94=A8=E7=9A=84=E5=AF=BC=E5=85=A5=E5=92=8C?= =?UTF-8?q?=E5=87=BD=E6=95=B0=E4=BB=A5=E7=AE=80=E5=8C=96=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 从`bot_commands.py`中移除未使用的导入和函数,例如`handle_command`,因为这些元素在当前实现中未被调用或不再需要。更新`__all__`列表以反映`register_commands`函数的唯一导出。通过消除冗余代码,提高了代码库的清晰度和维护性。 --- src/bot_commands.py | 88 +-------------------------------------------- src/guard.py | 19 ++++++++-- src/link_filter.py | 51 ++++++++++++++++++++++++++ 3 files changed, 69 insertions(+), 89 deletions(-) diff --git a/src/bot_commands.py b/src/bot_commands.py index 628a5e1..377b8a8 100644 --- a/src/bot_commands.py +++ b/src/bot_commands.py @@ -1,15 +1,6 @@ -import os from telethon.tl.types import InputPeerUser from telethon.tl.functions.bots import SetBotCommandsRequest from telethon.tl.types import BotCommand -from link_filter import LinkFilter - -KEYWORDS_FILE = '/app/data/keywords.json' -WHITELIST_FILE = '/app/data/whitelist.json' -ADMIN_ID = int(os.environ.get('ADMIN_ID')) - -# 创建 LinkFilter 实例 -link_filter = LinkFilter(KEYWORDS_FILE, WHITELIST_FILE) async def register_commands(client, admin_id): commands = [ @@ -31,81 +22,4 @@ async def register_commands(client, admin_id): except Exception as e: print(f"Failed to register bot commands: {str(e)}") -async def handle_command(event, client): - sender = await event.get_sender() - if sender.id != ADMIN_ID: - return - - link_filter.load_data_from_file() # 在处理命令前重新加载数据 - - command, *args = event.message.text.split() - command = command.lower() - - if command in ['/add', '/delete', '/list']: - await handle_keyword_command(event, command, args) - elif command in ['/addwhite', '/delwhite', '/listwhite']: - await handle_whitelist_command(event, command, args) - -async def handle_keyword_command(event, command, args): - if command == '/list': - link_filter.load_data_from_file() # 确保使用最新数据 - keywords = link_filter.keywords - await event.reply("当前关键词列表:\n" + "\n".join(keywords) if keywords else "关键词列表为空。") - elif command == '/add' and args: - keyword = ' '.join(args) - if keyword not in link_filter.keywords: - link_filter.add_keyword(keyword) - await event.reply(f"关键词 '{keyword}' 已添加。") - else: - await event.reply(f"关键词 '{keyword}' 已存在。") - elif command == '/delete' and args: - keyword = ' '.join(args) - if link_filter.remove_keyword(keyword): - await event.reply(f"关键词 '{keyword}' 已删除。") - else: - # 如果没有精确匹配,尝试查找部分匹配的关键词 - similar_keywords = [k for k in link_filter.keywords if keyword.lower() in k.lower()] - if similar_keywords: - await event.reply(f"未找到精确匹配的关键词 '{keyword}'。\n\n以下是相似的关键词:\n" + "\n".join(similar_keywords)) - else: - await event.reply(f"关键词 '{keyword}' 不存在。") - else: - await event.reply("无效的命令或参数。") - - -async def handle_whitelist_command(event, command, args): - if command == '/listwhite': - link_filter.load_data_from_file() # 确保使用最新数据 - whitelist = link_filter.whitelist - await event.reply("白名单域名列表:\n" + "\n".join(whitelist) if whitelist else "白名单为空。") - elif command == '/addwhite' and args: - domain = args[0].lower() - if domain not in link_filter.whitelist: - link_filter.whitelist.append(domain) - link_filter.save_whitelist() - link_filter.load_data_from_file() # 重新加载以确保数据同步 - await event.reply(f"域名 '{domain}' 已添加到白名单。") - else: - await event.reply(f"域名 '{domain}' 已在白名单中。") - elif command == '/delwhite' and args: - domain = args[0].lower() - if domain in link_filter.whitelist: - link_filter.whitelist.remove(domain) - link_filter.save_whitelist() - link_filter.load_data_from_file() # 重新加载以确保数据同步 - await event.reply(f"域名 '{domain}' 已从白名单中删除。") - else: - await event.reply(f"域名 '{domain}' 不在白名单中。") - else: - await event.reply("无效的命令或参数。") - - -def get_keywords(): - link_filter.load_data_from_file() - return link_filter.keywords - -def get_whitelist(): - link_filter.load_data_from_file() - return link_filter.whitelist - -__all__ = ['handle_command', 'get_keywords', 'get_whitelist', 'register_commands'] +__all__ = ['register_commands'] diff --git a/src/guard.py b/src/guard.py index 80cb2c0..e3fc620 100644 --- a/src/guard.py +++ b/src/guard.py @@ -6,8 +6,7 @@ from telethon import TelegramClient, events from collections import deque import time from link_filter import LinkFilter -from bot_commands import handle_command -import logging +from bot_commands import register_commands @@ -86,10 +85,26 @@ async def message_handler(event, link_filter, rate_limiter): async with rate_limiter: await process_message(event, event.client) +async def command_handler(event, link_filter): + if event.is_private and event.sender_id == ADMIN_ID: + link_filter.load_data_from_file() + + command, *args = event.message.text.split() + command = command.lower() + + if command in ['/add', '/delete', '/list']: + await link_filter.handle_keyword_command(event, command, args) + elif command in ['/addwhite', '/delwhite', '/listwhite']: + await link_filter.handle_whitelist_command(event, command, args) + + if event.raw_text.startswith(('/add', '/delete', '/list', '/addwhite', '/delwhite', '/listwhite')): + link_filter.load_data_from_file() async def start_bot(): async with TelegramClient('bot', api_id=6, api_hash='eb06d4abfb49dc3eeb1aeb98ae0f581e') as client: await client.start(bot_token=BOT_TOKEN) + await register_commands(client, ADMIN_ID) + client.add_event_handler( partial(command_handler, link_filter=link_filter), events.NewMessage(pattern='/add|/delete|/list|/addwhite|/delwhite|/listwhite') diff --git a/src/link_filter.py b/src/link_filter.py index c6105f5..bf0ac55 100644 --- a/src/link_filter.py +++ b/src/link_filter.py @@ -111,3 +111,54 @@ class LinkFilter: if new_non_whitelisted_links: logger.info(f"New non-whitelisted links found: {new_non_whitelisted_links}") return False, new_non_whitelisted_links + + async def handle_keyword_command(self, event, command, args): + if command == '/list': + self.load_data_from_file() + keywords = self.keywords + await event.reply("当前关键词列表:\n" + "\n".join(keywords) if keywords else "关键词列表为空。") + elif command == '/add' and args: + keyword = ' '.join(args) + if keyword not in self.keywords: + self.add_keyword(keyword) + await event.reply(f"关键词 '{keyword}' 已添加。") + else: + await event.reply(f"关键词 '{keyword}' 已存在。") + elif command == '/delete' and args: + keyword = ' '.join(args) + if self.remove_keyword(keyword): + await event.reply(f"关键词 '{keyword}' 已删除。") + else: + similar_keywords = [k for k in self.keywords if keyword.lower() in k.lower()] + if similar_keywords: + await event.reply(f"未找到精确匹配的关键词 '{keyword}'。\n\n以下是相似的关键词:\n" + "\n".join(similar_keywords)) + else: + await event.reply(f"关键词 '{keyword}' 不存在。") + else: + await event.reply("无效的命令或参数。") + + async def handle_whitelist_command(self, event, command, args): + if command == '/listwhite': + self.load_data_from_file() + whitelist = self.whitelist + await event.reply("白名单域名列表:\n" + "\n".join(whitelist) if whitelist else "白名单为空。") + elif command == '/addwhite' and args: + domain = args[0].lower() + if domain not in self.whitelist: + self.whitelist.append(domain) + self.save_whitelist() + self.load_data_from_file() + await event.reply(f"域名 '{domain}' 已添加到白名单。") + else: + await event.reply(f"域名 '{domain}' 已在白名单中。") + elif command == '/delwhite' and args: + domain = args[0].lower() + if domain in self.whitelist: + self.whitelist.remove(domain) + self.save_whitelist() + self.load_data_from_file() + await event.reply(f"域名 '{domain}' 已从白名单中删除。") + else: + await event.reply(f"域名 '{domain}' 不在白名单中。") + else: + await event.reply("无效的命令或参数。") \ No newline at end of file