mirror of
https://github.com/woodchen-ink/Q58Bot.git
synced 2025-07-18 13:52:07 +08:00
refactor(bot_commands): 移除未使用的导入和函数以简化结构
从`bot_commands.py`中移除未使用的导入和函数,例如`handle_command`,因为这些元素在当前实现中未被调用或不再需要。更新`__all__`列表以反映`register_commands`函数的唯一导出。通过消除冗余代码,提高了代码库的清晰度和维护性。
This commit is contained in:
parent
9b8070594d
commit
b03910f654
@ -1,15 +1,6 @@
|
|||||||
import os
|
|
||||||
from telethon.tl.types import InputPeerUser
|
from telethon.tl.types import InputPeerUser
|
||||||
from telethon.tl.functions.bots import SetBotCommandsRequest
|
from telethon.tl.functions.bots import SetBotCommandsRequest
|
||||||
from telethon.tl.types import BotCommand
|
from telethon.tl.types import BotCommand
|
||||||
from link_filter import LinkFilter
|
|
||||||
|
|
||||||
KEYWORDS_FILE = '/app/data/keywords.json'
|
|
||||||
WHITELIST_FILE = '/app/data/whitelist.json'
|
|
||||||
ADMIN_ID = int(os.environ.get('ADMIN_ID'))
|
|
||||||
|
|
||||||
# 创建 LinkFilter 实例
|
|
||||||
link_filter = LinkFilter(KEYWORDS_FILE, WHITELIST_FILE)
|
|
||||||
|
|
||||||
async def register_commands(client, admin_id):
|
async def register_commands(client, admin_id):
|
||||||
commands = [
|
commands = [
|
||||||
@ -31,81 +22,4 @@ async def register_commands(client, admin_id):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Failed to register bot commands: {str(e)}")
|
print(f"Failed to register bot commands: {str(e)}")
|
||||||
|
|
||||||
async def handle_command(event, client):
|
__all__ = ['register_commands']
|
||||||
sender = await event.get_sender()
|
|
||||||
if sender.id != ADMIN_ID:
|
|
||||||
return
|
|
||||||
|
|
||||||
link_filter.load_data_from_file() # 在处理命令前重新加载数据
|
|
||||||
|
|
||||||
command, *args = event.message.text.split()
|
|
||||||
command = command.lower()
|
|
||||||
|
|
||||||
if command in ['/add', '/delete', '/list']:
|
|
||||||
await handle_keyword_command(event, command, args)
|
|
||||||
elif command in ['/addwhite', '/delwhite', '/listwhite']:
|
|
||||||
await handle_whitelist_command(event, command, args)
|
|
||||||
|
|
||||||
async def handle_keyword_command(event, command, args):
|
|
||||||
if command == '/list':
|
|
||||||
link_filter.load_data_from_file() # 确保使用最新数据
|
|
||||||
keywords = link_filter.keywords
|
|
||||||
await event.reply("当前关键词列表:\n" + "\n".join(keywords) if keywords else "关键词列表为空。")
|
|
||||||
elif command == '/add' and args:
|
|
||||||
keyword = ' '.join(args)
|
|
||||||
if keyword not in link_filter.keywords:
|
|
||||||
link_filter.add_keyword(keyword)
|
|
||||||
await event.reply(f"关键词 '{keyword}' 已添加。")
|
|
||||||
else:
|
|
||||||
await event.reply(f"关键词 '{keyword}' 已存在。")
|
|
||||||
elif command == '/delete' and args:
|
|
||||||
keyword = ' '.join(args)
|
|
||||||
if link_filter.remove_keyword(keyword):
|
|
||||||
await event.reply(f"关键词 '{keyword}' 已删除。")
|
|
||||||
else:
|
|
||||||
# 如果没有精确匹配,尝试查找部分匹配的关键词
|
|
||||||
similar_keywords = [k for k in link_filter.keywords if keyword.lower() in k.lower()]
|
|
||||||
if similar_keywords:
|
|
||||||
await event.reply(f"未找到精确匹配的关键词 '{keyword}'。\n\n以下是相似的关键词:\n" + "\n".join(similar_keywords))
|
|
||||||
else:
|
|
||||||
await event.reply(f"关键词 '{keyword}' 不存在。")
|
|
||||||
else:
|
|
||||||
await event.reply("无效的命令或参数。")
|
|
||||||
|
|
||||||
|
|
||||||
async def handle_whitelist_command(event, command, args):
|
|
||||||
if command == '/listwhite':
|
|
||||||
link_filter.load_data_from_file() # 确保使用最新数据
|
|
||||||
whitelist = link_filter.whitelist
|
|
||||||
await event.reply("白名单域名列表:\n" + "\n".join(whitelist) if whitelist else "白名单为空。")
|
|
||||||
elif command == '/addwhite' and args:
|
|
||||||
domain = args[0].lower()
|
|
||||||
if domain not in link_filter.whitelist:
|
|
||||||
link_filter.whitelist.append(domain)
|
|
||||||
link_filter.save_whitelist()
|
|
||||||
link_filter.load_data_from_file() # 重新加载以确保数据同步
|
|
||||||
await event.reply(f"域名 '{domain}' 已添加到白名单。")
|
|
||||||
else:
|
|
||||||
await event.reply(f"域名 '{domain}' 已在白名单中。")
|
|
||||||
elif command == '/delwhite' and args:
|
|
||||||
domain = args[0].lower()
|
|
||||||
if domain in link_filter.whitelist:
|
|
||||||
link_filter.whitelist.remove(domain)
|
|
||||||
link_filter.save_whitelist()
|
|
||||||
link_filter.load_data_from_file() # 重新加载以确保数据同步
|
|
||||||
await event.reply(f"域名 '{domain}' 已从白名单中删除。")
|
|
||||||
else:
|
|
||||||
await event.reply(f"域名 '{domain}' 不在白名单中。")
|
|
||||||
else:
|
|
||||||
await event.reply("无效的命令或参数。")
|
|
||||||
|
|
||||||
|
|
||||||
def get_keywords():
|
|
||||||
link_filter.load_data_from_file()
|
|
||||||
return link_filter.keywords
|
|
||||||
|
|
||||||
def get_whitelist():
|
|
||||||
link_filter.load_data_from_file()
|
|
||||||
return link_filter.whitelist
|
|
||||||
|
|
||||||
__all__ = ['handle_command', 'get_keywords', 'get_whitelist', 'register_commands']
|
|
||||||
|
19
src/guard.py
19
src/guard.py
@ -6,8 +6,7 @@ from telethon import TelegramClient, events
|
|||||||
from collections import deque
|
from collections import deque
|
||||||
import time
|
import time
|
||||||
from link_filter import LinkFilter
|
from link_filter import LinkFilter
|
||||||
from bot_commands import handle_command
|
from bot_commands import register_commands
|
||||||
import logging
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -86,10 +85,26 @@ async def message_handler(event, link_filter, rate_limiter):
|
|||||||
async with rate_limiter:
|
async with rate_limiter:
|
||||||
await process_message(event, event.client)
|
await process_message(event, event.client)
|
||||||
|
|
||||||
|
async def command_handler(event, link_filter):
|
||||||
|
if event.is_private and event.sender_id == ADMIN_ID:
|
||||||
|
link_filter.load_data_from_file()
|
||||||
|
|
||||||
|
command, *args = event.message.text.split()
|
||||||
|
command = command.lower()
|
||||||
|
|
||||||
|
if command in ['/add', '/delete', '/list']:
|
||||||
|
await link_filter.handle_keyword_command(event, command, args)
|
||||||
|
elif command in ['/addwhite', '/delwhite', '/listwhite']:
|
||||||
|
await link_filter.handle_whitelist_command(event, command, args)
|
||||||
|
|
||||||
|
if event.raw_text.startswith(('/add', '/delete', '/list', '/addwhite', '/delwhite', '/listwhite')):
|
||||||
|
link_filter.load_data_from_file()
|
||||||
async def start_bot():
|
async def start_bot():
|
||||||
async with TelegramClient('bot', api_id=6, api_hash='eb06d4abfb49dc3eeb1aeb98ae0f581e') as client:
|
async with TelegramClient('bot', api_id=6, api_hash='eb06d4abfb49dc3eeb1aeb98ae0f581e') as client:
|
||||||
await client.start(bot_token=BOT_TOKEN)
|
await client.start(bot_token=BOT_TOKEN)
|
||||||
|
|
||||||
|
await register_commands(client, ADMIN_ID)
|
||||||
|
|
||||||
client.add_event_handler(
|
client.add_event_handler(
|
||||||
partial(command_handler, link_filter=link_filter),
|
partial(command_handler, link_filter=link_filter),
|
||||||
events.NewMessage(pattern='/add|/delete|/list|/addwhite|/delwhite|/listwhite')
|
events.NewMessage(pattern='/add|/delete|/list|/addwhite|/delwhite|/listwhite')
|
||||||
|
@ -111,3 +111,54 @@ class LinkFilter:
|
|||||||
if new_non_whitelisted_links:
|
if new_non_whitelisted_links:
|
||||||
logger.info(f"New non-whitelisted links found: {new_non_whitelisted_links}")
|
logger.info(f"New non-whitelisted links found: {new_non_whitelisted_links}")
|
||||||
return False, new_non_whitelisted_links
|
return False, new_non_whitelisted_links
|
||||||
|
|
||||||
|
async def handle_keyword_command(self, event, command, args):
|
||||||
|
if command == '/list':
|
||||||
|
self.load_data_from_file()
|
||||||
|
keywords = self.keywords
|
||||||
|
await event.reply("当前关键词列表:\n" + "\n".join(keywords) if keywords else "关键词列表为空。")
|
||||||
|
elif command == '/add' and args:
|
||||||
|
keyword = ' '.join(args)
|
||||||
|
if keyword not in self.keywords:
|
||||||
|
self.add_keyword(keyword)
|
||||||
|
await event.reply(f"关键词 '{keyword}' 已添加。")
|
||||||
|
else:
|
||||||
|
await event.reply(f"关键词 '{keyword}' 已存在。")
|
||||||
|
elif command == '/delete' and args:
|
||||||
|
keyword = ' '.join(args)
|
||||||
|
if self.remove_keyword(keyword):
|
||||||
|
await event.reply(f"关键词 '{keyword}' 已删除。")
|
||||||
|
else:
|
||||||
|
similar_keywords = [k for k in self.keywords if keyword.lower() in k.lower()]
|
||||||
|
if similar_keywords:
|
||||||
|
await event.reply(f"未找到精确匹配的关键词 '{keyword}'。\n\n以下是相似的关键词:\n" + "\n".join(similar_keywords))
|
||||||
|
else:
|
||||||
|
await event.reply(f"关键词 '{keyword}' 不存在。")
|
||||||
|
else:
|
||||||
|
await event.reply("无效的命令或参数。")
|
||||||
|
|
||||||
|
async def handle_whitelist_command(self, event, command, args):
|
||||||
|
if command == '/listwhite':
|
||||||
|
self.load_data_from_file()
|
||||||
|
whitelist = self.whitelist
|
||||||
|
await event.reply("白名单域名列表:\n" + "\n".join(whitelist) if whitelist else "白名单为空。")
|
||||||
|
elif command == '/addwhite' and args:
|
||||||
|
domain = args[0].lower()
|
||||||
|
if domain not in self.whitelist:
|
||||||
|
self.whitelist.append(domain)
|
||||||
|
self.save_whitelist()
|
||||||
|
self.load_data_from_file()
|
||||||
|
await event.reply(f"域名 '{domain}' 已添加到白名单。")
|
||||||
|
else:
|
||||||
|
await event.reply(f"域名 '{domain}' 已在白名单中。")
|
||||||
|
elif command == '/delwhite' and args:
|
||||||
|
domain = args[0].lower()
|
||||||
|
if domain in self.whitelist:
|
||||||
|
self.whitelist.remove(domain)
|
||||||
|
self.save_whitelist()
|
||||||
|
self.load_data_from_file()
|
||||||
|
await event.reply(f"域名 '{domain}' 已从白名单中删除。")
|
||||||
|
else:
|
||||||
|
await event.reply(f"域名 '{domain}' 不在白名单中。")
|
||||||
|
else:
|
||||||
|
await event.reply("无效的命令或参数。")
|
Loading…
x
Reference in New Issue
Block a user