refactor(bot_commands): 简化命令处理并重构对话管理

重构了命令注册和处理函数,简化了与用户之间的对话管理。通过将关键词和白名单命令处理分离到独立的异步函数中,提高了代码可读性和可维护性。此外,优化了参数处理,允许更直接的命令交互,并对错误情况给出了更清晰的反馈。
This commit is contained in:
wood 2024-09-04 16:35:22 +08:00
parent 3746b4743b
commit 7b73013749
3 changed files with 46 additions and 109 deletions

View File

@ -11,6 +11,8 @@
1. TeleGuard一个 Telegram 机器人,用于管理群组中的关键词并自动删除包含这些关键词的消息。 1. TeleGuard一个 Telegram 机器人,用于管理群组中的关键词并自动删除包含这些关键词的消息。
2. 币安价格更新器:定期获取并发送指定加密货币的价格信息。 2. 币安价格更新器:定期获取并发送指定加密货币的价格信息。
3. 链接拦截:拦截并撤回非白名单域名链接的第二次发送。
这些功能被整合到一个 Docker 容器中,可以同时运行。 这些功能被整合到一个 Docker 容器中,可以同时运行。
@ -26,6 +28,10 @@
- 发送详细的价格更新包括当前价格、24小时变化、高低点等 - 发送详细的价格更新包括当前价格、24小时变化、高低点等
- 可自定义更新频率和货币对 - 可自定义更新频率和货币对
### 链接拦截
- 非白名单域名链接, 在发送第二次会被拦截撤回
## 安装与配置 ## 安装与配置
1. 克隆此仓库到本地 1. 克隆此仓库到本地

View File

@ -1,14 +1,6 @@
import os import os
from telethon import events
from telethon.tl.types import InputPeerUser
from telethon.tl.functions.bots import SetBotCommandsRequest
from telethon.tl.types import BotCommand
import logging
import json import json
__all__ = ['register_commands', 'handle_command', 'get_keywords', 'get_whitelist']
KEYWORDS_FILE = '/app/data/keywords.json' KEYWORDS_FILE = '/app/data/keywords.json'
WHITELIST_FILE = '/app/data/whitelist.json' WHITELIST_FILE = '/app/data/whitelist.json'
ADMIN_ID = int(os.environ.get('ADMIN_ID')) ADMIN_ID = int(os.environ.get('ADMIN_ID'))
@ -24,128 +16,69 @@ def save_json(file_path, data):
with open(file_path, 'w') as f: with open(file_path, 'w') as f:
json.dump(data, f) json.dump(data, f)
async def register_commands(client, admin_id): async def handle_command(event, client):
commands = [
BotCommand('add', '添加新的关键词'),
BotCommand('delete', '删除现有的关键词'),
BotCommand('list', '列出所有当前的关键词'),
BotCommand('addwhite', '添加域名到白名单'),
BotCommand('delwhite', '从白名单移除域名'),
BotCommand('listwhite', '列出白名单域名'),
]
try:
await client(SetBotCommandsRequest(
commands=commands,
scope=InputPeerUser(admin_id, 0),
lang_code=''
))
logging.info("Bot commands registered successfully.")
except Exception as e:
logging.error(f"Failed to register bot commands: {str(e)}")
async def handle_keyword_command(event, client):
sender = await event.get_sender() sender = await event.get_sender()
if sender.id != ADMIN_ID: if sender.id != ADMIN_ID:
return return
command = event.message.text.split(maxsplit=1) command, *args = event.message.text.split()
command = command.lower()
if command[0].lower() == '/list': if command in ['/add', '/delete', '/list']:
await execute_keyword_command(event, '/list', '') await handle_keyword_command(event, command, args)
elif len(command) > 1: elif command in ['/addwhite', '/delwhite', '/listwhite']:
await execute_keyword_command(event, command[0], command[1]) await handle_whitelist_command(event, command, args)
else:
await event.reply(f"请输入你要{command[0][1:]}的关键词:")
async with client.conversation(sender) as conv:
response = await conv.get_response()
await execute_keyword_command(event, command[0], response.text)
async def execute_keyword_command(event, command, keyword): async def handle_keyword_command(event, command, args):
keywords = load_json(KEYWORDS_FILE) keywords = load_json(KEYWORDS_FILE)
if command.lower() == '/list': if command == '/list':
if keywords: await event.reply("当前关键词列表:\n" + "\n".join(keywords) if keywords else "关键词列表为空。")
await event.reply(f"当前关键词和语句列表:\n" + "\n".join(keywords)) elif command == '/add' and args:
else: keyword = args[0].lower()
await event.reply("关键词列表为空。") if keyword not in keywords:
return keywords.append(keyword)
if command.lower() == '/add':
if keyword.lower() not in keywords:
keywords.append(keyword.lower())
save_json(KEYWORDS_FILE, keywords) save_json(KEYWORDS_FILE, keywords)
await event.reply(f"关键词或语句 '{keyword}' 已添加到列表中") await event.reply(f"关键词 '{keyword}' 已添加。")
else: else:
await event.reply(f"关键词或语句 '{keyword}'经在列表中") await event.reply(f"关键词 '{keyword}' 已存在。")
elif command == '/delete' and args:
elif command.lower() == '/delete': keyword = args[0].lower()
if keyword.lower() in keywords: if keyword in keywords:
keywords.remove(keyword.lower()) keywords.remove(keyword)
save_json(KEYWORDS_FILE, keywords) save_json(KEYWORDS_FILE, keywords)
await event.reply(f"关键词或语句 '{keyword}'从列表中删除。") await event.reply(f"关键词 '{keyword}'删除。")
else: else:
await event.reply(f"关键词或语句 '{keyword}' 不在列表中。") await event.reply(f"关键词 '{keyword}' 不存在。")
async def handle_whitelist_command(event, client):
sender = await event.get_sender()
if sender.id != ADMIN_ID:
return
command = event.message.text.split(maxsplit=1)
if command[0].lower() == '/listwhite':
await execute_whitelist_command(event, '/listwhite', '')
elif len(command) > 1:
await execute_whitelist_command(event, command[0], command[1])
else: else:
await event.reply(f"请输入你要{command[0][1:]}的域名:") await event.reply("无效的命令或参数。")
async with client.conversation(sender) as conv:
response = await conv.get_response()
await execute_whitelist_command(event, command[0], response.text)
async def execute_whitelist_command(event, command, domain): async def handle_whitelist_command(event, command, args):
whitelist = load_json(WHITELIST_FILE) whitelist = load_json(WHITELIST_FILE)
if command.lower() == '/listwhite': if command == '/listwhite':
if whitelist: await event.reply("白名单域名列表:\n" + "\n".join(whitelist) if whitelist else "白名单为空。")
await event.reply("白名单域名列表:\n" + "\n".join(whitelist)) elif command == '/addwhite' and args:
else: domain = args[0].lower()
await event.reply("白名单为空。") if domain not in whitelist:
return whitelist.append(domain)
if command.lower() == '/addwhite':
if domain.lower() not in whitelist:
whitelist.append(domain.lower())
save_json(WHITELIST_FILE, whitelist) save_json(WHITELIST_FILE, whitelist)
await event.reply(f"域名 '{domain}' 已添加到白名单。") await event.reply(f"域名 '{domain}' 已添加到白名单。")
else: else:
await event.reply(f"域名 '{domain}'在白名单中。") await event.reply(f"域名 '{domain}' 已在白名单中。")
elif command == '/delwhite' and args:
elif command.lower() == '/delwhite': domain = args[0].lower()
if domain.lower() in whitelist: if domain in whitelist:
whitelist.remove(domain.lower()) whitelist.remove(domain)
save_json(WHITELIST_FILE, whitelist) save_json(WHITELIST_FILE, whitelist)
await event.reply(f"域名 '{domain}' 已从白名单中除。") await event.reply(f"域名 '{domain}' 已从白名单中除。")
else: else:
await event.reply(f"域名 '{domain}' 不在白名单中。") await event.reply(f"域名 '{domain}' 不在白名单中。")
else:
await event.reply("无效的命令或参数。")
def get_keywords(): def get_keywords():
return load_json(KEYWORDS_FILE) return load_json(KEYWORDS_FILE)
def get_whitelist(): def get_whitelist():
return load_json(WHITELIST_FILE) return load_json(WHITELIST_FILE)
async def handle_command(event, client):
sender = await event.get_sender()
if sender.id != ADMIN_ID:
return
command = event.message.text.split(maxsplit=1)
if command[0].lower() in ['/add', '/delete', '/list']:
await handle_keyword_command(event, client)
elif command[0].lower() in ['/addwhite', '/delwhite', '/listwhite']:
await handle_whitelist_command(event, client)

View File

@ -84,6 +84,7 @@ async def command_handler(event):
elif event.raw_text.startswith(('/addwhite', '/delwhite', '/listwhite')): elif event.raw_text.startswith(('/addwhite', '/delwhite', '/listwhite')):
link_filter.reload_whitelist() link_filter.reload_whitelist()
async def message_handler(event): async def message_handler(event):
if not event.is_private or event.sender_id != ADMIN_ID: if not event.is_private or event.sender_id != ADMIN_ID:
async with rate_limiter: async with rate_limiter:
@ -99,9 +100,6 @@ async def start_bot():
logger.info("TeleGuard is running...") logger.info("TeleGuard is running...")
await client.run_until_disconnected() await client.run_until_disconnected()
# 主函数 # 主函数
def run(): def run():
while True: while True: