重构Telegram机器人处理消息和命令的逻辑。

重构message_handler和command_handler以使用部分函数,从而传递额外的参数。更新了start_bot函数,将client上下文管理与启动逻辑结合,以确保资源正确释放。整个修改旨在提高代码的模块化和异常处理能力。
This commit is contained in:
wood 2024-09-04 19:04:15 +08:00
parent cc2ff1ac9f
commit 8a2478553a

View File

@ -1,13 +1,13 @@
import os import os
import logging import logging
import asyncio import asyncio
import time from functools import partial
from telethon import TelegramClient, events from telethon import TelegramClient, events
from collections import deque from collections import deque
import time
from link_filter import LinkFilter from link_filter import LinkFilter
from bot_commands import handle_command from bot_commands import handle_command
# 环境变量 # 环境变量
BOT_TOKEN = os.environ.get('BOT_TOKEN') BOT_TOKEN = os.environ.get('BOT_TOKEN')
ADMIN_ID = int(os.environ.get('ADMIN_ID')) ADMIN_ID = int(os.environ.get('ADMIN_ID'))
@ -21,14 +21,10 @@ logger = logging.getLogger('TeleGuard')
# 创建 LinkFilter 实例 # 创建 LinkFilter 实例
link_filter = LinkFilter(KEYWORDS_FILE, WHITELIST_FILE) link_filter = LinkFilter(KEYWORDS_FILE, WHITELIST_FILE)
# 限速器
class RateLimiter: class RateLimiter:
def __init__(self, max_calls, period): def __init__(self, max_calls, period):
# 限制的最大调用次数
self.max_calls = max_calls self.max_calls = max_calls
# 限定的时间周期(秒)
self.period = period self.period = period
# 用于存储调用时间的双端队列
self.calls = deque() self.calls = deque()
async def __aenter__(self): async def __aenter__(self):
@ -43,73 +39,66 @@ class RateLimiter:
async def __aexit__(self, *args): async def __aexit__(self, *args):
pass pass
rate_limiter = RateLimiter(max_calls=10, period=1) # 每秒最多处理10条消息 rate_limiter = RateLimiter(max_calls=10, period=1)
# 延迟删除消息函数
async def delete_message_after_delay(client, chat, message, delay): async def delete_message_after_delay(client, chat, message, delay):
# 延迟指定的时间
await asyncio.sleep(delay) await asyncio.sleep(delay)
# 尝试删除消息
try: try:
await client.delete_messages(chat, message) await client.delete_messages(chat, message)
except Exception as e: except Exception as e:
# 如果删除失败,记录错误
logger.error(f"Failed to delete message: {e}") logger.error(f"Failed to delete message: {e}")
# 处理消息函数
async def process_message(event, client): async def process_message(event, client):
if not event.is_private: if not event.is_private:
# 检查消息是否应该被过滤
should_filter, new_links = link_filter.should_filter(event.message.text) should_filter, new_links = link_filter.should_filter(event.message.text)
if should_filter: if should_filter:
if event.sender_id != ADMIN_ID: if event.sender_id != ADMIN_ID:
await event.delete() await event.delete()
notification = await event.respond("已撤回该消息。注:包含关键词或重复发送的非白名单链接会被自动撤回。") notification = await event.respond("已撤回该消息。注:包含关键词或重复发送的非白名单链接会被自动撤回。")
asyncio.create_task(delete_message_after_delay(client, event.chat_id, notification, 3 * 60)) asyncio.create_task(delete_message_after_delay(client, event.chat_id, notification, 3 * 60))
return return
if new_links: if new_links:
# 这是第一次发送这些非白名单链接,我们允许消息通过,不发送任何警告 # 可以在这里添加日志记录或其他操作
# 如果需要,可以在这里添加日志记录或其他操作
pass pass
async def command_handler(event, link_filter):
async def command_handler(event):
if event.is_private and event.sender_id == ADMIN_ID: if event.is_private and event.sender_id == ADMIN_ID:
await handle_command(event, event.client) await handle_command(event, event.client)
if event.raw_text.startswith(('/add', '/delete', '/list')): if event.raw_text.startswith(('/add', '/delete', '/list', '/addwhite', '/delwhite', '/listwhite')):
link_filter.reload_keywords() link_filter.load_data_from_file()
elif event.raw_text.startswith(('/addwhite', '/delwhite', '/listwhite')):
link_filter.reload_whitelist()
async def message_handler(event, link_filter, rate_limiter):
async def message_handler(event):
if not event.is_private or event.sender_id != ADMIN_ID: if not event.is_private or event.sender_id != ADMIN_ID:
async with rate_limiter: async with rate_limiter:
await process_message(event, event.client) await process_message(event, event.client)
# 启动机器人函数
async def start_bot(): async def start_bot():
client = TelegramClient('bot', api_id=6, api_hash='eb06d4abfb49dc3eeb1aeb98ae0f581e') async with TelegramClient('bot', api_id=6, api_hash='eb06d4abfb49dc3eeb1aeb98ae0f581e') as client:
await client.start(bot_token=BOT_TOKEN) await client.start(bot_token=BOT_TOKEN)
client.add_event_handler(command_handler, events.NewMessage(pattern='/add|/delete|/list|/addwhite|/delwhite|/listwhite')) client.add_event_handler(
client.add_event_handler(message_handler, events.NewMessage()) partial(command_handler, link_filter=link_filter),
events.NewMessage(pattern='/add|/delete|/list|/addwhite|/delwhite|/listwhite')
)
client.add_event_handler(
partial(message_handler, link_filter=link_filter, rate_limiter=rate_limiter),
events.NewMessage()
)
logger.info("TeleGuard is running...") logger.info("TeleGuard is running...")
await client.run_until_disconnected() await client.run_until_disconnected()
# 主函数 async def main():
def run():
while True: while True:
try: try:
asyncio.get_event_loop().run_until_complete(start_bot()) await start_bot()
except (KeyboardInterrupt, SystemExit):
logger.info("TeleGuard is shutting down...")
break
except Exception as e: except Exception as e:
logger.error(f"An error occurred in TeleGuard: {str(e)}") logger.error(f"An error occurred in TeleGuard: {str(e)}")
logger.info("Attempting to restart TeleGuard in 60 seconds...") logger.info("Attempting to restart TeleGuard in 60 seconds...")
time.sleep(60) # 等待60秒后重试 await asyncio.sleep(60)
if __name__ == '__main__': if __name__ == '__main__':
run() asyncio.run(main())