重构并增强消息处理和关键词管理功能

重构bot_commands模块以使用新的处理函数和命令执行逻辑。
添加处理关键词和白名单命令的新函数。
在guard模块中实现带有速率限制的消息处理函数。
删除了guard模块中过时的关键词加载和保存逻辑。
现在通过新的处理函数处理关键词和白名单命令,提高了代码的整洁度和功能的明确性。
```
This commit is contained in:
wood 2024-09-04 15:28:15 +08:00
parent dc6a40d96a
commit 467cf4b9f1
5 changed files with 242 additions and 50 deletions

5
data/whitelist.json Normal file
View File

@ -0,0 +1,5 @@
[
"q58.org",
"czl.net",
"woodchen.ink"
]

View File

@ -3,3 +3,4 @@ ccxt
pyTelegramBotAPI pyTelegramBotAPI
schedule schedule
pytz pytz
tldextract

View File

@ -1,17 +1,32 @@
from telethon import events
from telethon.tl.types import InputPeerUser from telethon.tl.types import InputPeerUser
from telethon.tl.functions.bots import SetBotCommandsRequest from telethon.tl.functions.bots import SetBotCommandsRequest
from telethon.tl.types import BotCommand from telethon.tl.types import BotCommand
import logging import logging
import json
KEYWORDS_FILE = '/app/data/keywords.json'
WHITELIST_FILE = '/app/data/whitelist.json'
def load_json(file_path):
try:
with open(file_path, 'r') as f:
return json.load(f)
except FileNotFoundError:
return []
def save_json(file_path, data):
with open(file_path, 'w') as f:
json.dump(data, f)
async def register_commands(client, admin_id): async def register_commands(client, admin_id):
commands = [ commands = [
# TeleGuard 命令
BotCommand('add', '添加新的关键词'), BotCommand('add', '添加新的关键词'),
BotCommand('delete', '删除现有的关键词'), BotCommand('delete', '删除现有的关键词'),
BotCommand('list', '列出所有当前的关键词'), BotCommand('list', '列出所有当前的关键词'),
BotCommand('addwhite', '添加域名到白名单'),
# 这里可以添加其他功能的命令 BotCommand('delwhite', '从白名单移除域名'),
# 例如BotCommand('price', '获取当前价格'), BotCommand('listwhite', '列出白名单域名'),
] ]
try: try:
@ -24,4 +39,91 @@ async def register_commands(client, admin_id):
except Exception as e: except Exception as e:
logging.error(f"Failed to register bot commands: {str(e)}") logging.error(f"Failed to register bot commands: {str(e)}")
# 如果有其他功能需要注册命令,可以在这里添加新的函数 async def handle_keyword_command(event, client):
sender = await event.get_sender()
if sender.id != int(os.environ.get('ADMIN_ID')):
return
command = event.message.text.split(maxsplit=1)
if len(command) > 1:
# 直接执行命令
await execute_keyword_command(event, command[0], command[1])
else:
# 进入交互模式
await event.reply(f"请输入你要{command[0][1:]}的关键词:")
async with client.conversation(sender) as conv:
response = await conv.get_response()
await execute_keyword_command(event, command[0], response.text)
async def execute_keyword_command(event, command, keyword):
keywords = load_json(KEYWORDS_FILE)
if command.lower() == '/add':
if keyword.lower() not in keywords:
keywords.append(keyword.lower())
save_json(KEYWORDS_FILE, keywords)
await event.reply(f"关键词或语句 '{keyword}' 已添加到列表中。")
else:
await event.reply(f"关键词或语句 '{keyword}' 已经在列表中。")
elif command.lower() == '/delete':
if keyword.lower() in keywords:
keywords.remove(keyword.lower())
save_json(KEYWORDS_FILE, keywords)
await event.reply(f"关键词或语句 '{keyword}' 已从列表中删除。")
else:
await event.reply(f"关键词或语句 '{keyword}' 不在列表中。")
elif command.lower() == '/list':
if keywords:
await event.reply(f"当前关键词和语句列表:\n" + "\n".join(keywords))
else:
await event.reply("关键词列表为空。")
async def handle_whitelist_command(event, client):
sender = await event.get_sender()
if sender.id != int(os.environ.get('ADMIN_ID')):
return
command = event.message.text.split(maxsplit=1)
if len(command) > 1:
# 直接执行命令
await execute_whitelist_command(event, command[0], command[1])
else:
# 进入交互模式
await event.reply(f"请输入你要{command[0][1:]}的域名:")
async with client.conversation(sender) as conv:
response = await conv.get_response()
await execute_whitelist_command(event, command[0], response.text)
async def execute_whitelist_command(event, command, domain):
whitelist = load_json(WHITELIST_FILE)
if command.lower() == '/addwhite':
if domain.lower() not in whitelist:
whitelist.append(domain.lower())
save_json(WHITELIST_FILE, whitelist)
await event.reply(f"域名 '{domain}' 已添加到白名单。")
else:
await event.reply(f"域名 '{domain}' 已经在白名单中。")
elif command.lower() == '/delwhite':
if domain.lower() in whitelist:
whitelist.remove(domain.lower())
save_json(WHITELIST_FILE, whitelist)
await event.reply(f"域名 '{domain}' 已从白名单中移除。")
else:
await event.reply(f"域名 '{domain}' 不在白名单中。")
elif command.lower() == '/listwhite':
if whitelist:
await event.reply("白名单域名列表:\n" + "\n".join(whitelist))
else:
await event.reply("白名单为空。")
def get_keywords():
return load_json(KEYWORDS_FILE)

View File

@ -1,36 +1,43 @@
import os import os
import json
import logging import logging
import asyncio import asyncio
import time import time
from telethon import TelegramClient, events from telethon import TelegramClient, events
from collections import deque from collections import deque
from link_filter import LinkFilter
from bot_commands import handle_keyword_command, handle_whitelist_command, get_keywords
# 环境变量 # 环境变量
BOT_TOKEN = os.environ.get('BOT_TOKEN') BOT_TOKEN = os.environ.get('BOT_TOKEN')
ADMIN_ID = int(os.environ.get('ADMIN_ID')) ADMIN_ID = int(os.environ.get('ADMIN_ID'))
KEYWORDS_FILE = '/app/data/keywords.json' KEYWORDS_FILE = '/app/data/keywords.json'
WHITELIST_FILE = '/app/data/whitelist.json'
# 设置日志 # 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('TeleGuard') logger = logging.getLogger('TeleGuard')
def load_keywords(): # 创建 LinkFilter 实例
if os.path.exists(KEYWORDS_FILE): link_filter = LinkFilter('/app/data/keywords.json', '/app/data/whitelist.json')
with open(KEYWORDS_FILE, 'r') as f:
return json.load(f)
return ['推广', '广告', 'ad', 'promotion']
def save_keywords(keywords):
with open(KEYWORDS_FILE, 'w') as f:
json.dump(keywords, f)
KEYWORDS = load_keywords()
# 限速器
class RateLimiter: class RateLimiter:
def __init__(self, max_calls, period): def __init__(self, max_calls, period):
"""
初始化RateLimiter类的实例
参数:
max_calls (int): 限制的最大调用次数
period (float): 限定的时间周期
该构造函数设置了速率限制器的基本参数并初始化了一个双端队列
用于跟踪调用的时间点 enforcement of the rate limiting policy
"""
# 限制的最大调用次数
self.max_calls = max_calls self.max_calls = max_calls
# 限定的时间周期(秒)
self.period = period self.period = period
# 用于存储调用时间的双端队列
self.calls = deque() self.calls = deque()
async def __aenter__(self): async def __aenter__(self):
@ -45,50 +52,64 @@ class RateLimiter:
async def __aexit__(self, *args): async def __aexit__(self, *args):
pass pass
rate_limiter = RateLimiter(max_calls=10, period=1) # 每秒最多处理10条消息 rate_limiter = RateLimiter(max_calls=10, period=1) # 每秒最多处理10条消息
async def process_message(event): # 延迟删除消息函数
if not event.is_private and any(keyword in event.message.text.lower() for keyword in KEYWORDS): async def delete_message_after_delay(client, chat, message, delay):
# 延迟指定的时间
await asyncio.sleep(delay)
# 尝试删除消息
try:
await client.delete_messages(chat, message)
except Exception as e:
# 如果删除失败,记录错误
logger.error(f"Failed to delete message: {e}")
# 处理消息函数
async def process_message(event, client):
if not event.is_private:
# 检查消息是否包含需要过滤的链接
if link_filter.should_filter(event.message.text):
if event.sender_id != ADMIN_ID: if event.sender_id != ADMIN_ID:
await event.delete() await event.delete()
await event.respond("已撤回该消息。注:已发送的推广链接不要多次发送,置顶已有项目的推广链接也会自动撤回。") notification = await event.respond("已撤回该消息。注:重复发送的链接会被自动撤回。")
asyncio.create_task(delete_message_after_delay(client, event.chat_id, notification, 30 * 60))
return
# 检查关键词
keywords = get_keywords()
if any(keyword in event.message.text.lower() for keyword in keywords):
if event.sender_id != ADMIN_ID:
await event.delete()
notification = await event.respond("已撤回该消息。注:已发送的推广链接不要多次发送,置顶已有项目的推广链接也会自动撤回。")
asyncio.create_task(delete_message_after_delay(client, event.chat_id, notification, 30 * 60))
# 启动机器人函数
async def start_bot(): async def start_bot():
client = TelegramClient('bot', api_id=6, api_hash='eb06d4abfb49dc3eeb1aeb98ae0f581e') client = TelegramClient('bot', api_id=6, api_hash='eb06d4abfb49dc3eeb1aeb98ae0f581e')
await client.start(bot_token=BOT_TOKEN) await client.start(bot_token=BOT_TOKEN)
@client.on(events.NewMessage(pattern='')) @client.on(events.NewMessage(pattern='/add|/delete|/list'))
async def handler(event): async def keyword_handler(event):
global KEYWORDS await handle_keyword_command(event, client)
if event.is_private and event.sender_id == ADMIN_ID: link_filter.reload_keywords() # 重新加载关键词
command = event.message.text.split(maxsplit=1)
if command[0].lower() == '/add' and len(command) > 1:
new_keyword = command[1].lower()
if new_keyword not in KEYWORDS:
KEYWORDS.append(new_keyword)
save_keywords(KEYWORDS)
await event.respond(f"关键词或语句 '{new_keyword}' 已添加到列表中。")
else:
await event.respond(f"关键词或语句 '{new_keyword}' 已经在列表中。")
elif command[0].lower() == '/delete' and len(command) > 1:
keyword_to_delete = command[1].lower()
if keyword_to_delete in KEYWORDS:
KEYWORDS.remove(keyword_to_delete)
save_keywords(KEYWORDS)
await event.respond(f"关键词或语句 '{keyword_to_delete}' 已从列表中删除。")
else:
await event.respond(f"关键词或语句 '{keyword_to_delete}' 不在列表中。")
elif command[0].lower() == '/list':
await event.respond(f"当前关键词和语句列表:\n" + "\n".join(KEYWORDS))
return
@client.on(events.NewMessage(pattern='/addwhite|/delwhite|/listwhite'))
async def whitelist_handler(event):
await handle_whitelist_command(event, client)
link_filter.reload_whitelist() # 重新加载白名单
@client.on(events.NewMessage(pattern=''))
async def message_handler(event):
if not event.is_private or event.sender_id != ADMIN_ID:
async with rate_limiter: async with rate_limiter:
await process_message(event) await process_message(event, client)
logger.info("TeleGuard is running...") logger.info("TeleGuard is running...")
await client.run_until_disconnected() await client.run_until_disconnected()
# 主函数
def run(): def run():
while True: while True:
try: try:

63
src/link_filter.py Normal file
View File

@ -0,0 +1,63 @@
import re
import json
import tldextract
class LinkFilter:
def __init__(self, keywords_file, whitelist_file):
self.keywords_file = keywords_file
self.whitelist_file = whitelist_file
self.keywords = self.load_json(keywords_file)
self.whitelist = self.load_json(whitelist_file)
# 正则表达式匹配各种链接格式
self.link_pattern = re.compile(r'''
\b
(?:
(?:https?://)? # http:// or https:// (optional)
(?:(?:www\.)? # www. (optional)
(?:[a-zA-Z0-9-]+\.)+ # domain
[a-zA-Z]{2,} # TLD
| # or
(?:t\.me|telegram\.me) # Telegram links
)
(?:/[^\s]*)? # optional path
)
\b
''', re.VERBOSE | re.IGNORECASE)
def load_json(self, file_path):
try:
with open(file_path, 'r') as f:
return json.load(f)
except FileNotFoundError:
return []
def save_keywords(self):
with open(self.keywords_file, 'w') as f:
json.dump(self.keywords, f)
def is_whitelisted(self, link):
extracted = tldextract.extract(link)
domain = f"{extracted.domain}.{extracted.suffix}"
return domain in self.whitelist
def add_keyword(self, link):
if link not in self.keywords:
self.keywords.append(link)
self.save_keywords()
def should_filter(self, text):
links = self.link_pattern.findall(text)
for link in links:
if not self.is_whitelisted(link):
if link in self.keywords:
return True
else:
self.add_keyword(link)
return False
def reload_keywords(self):
self.keywords = self.load_json(self.keywords_file)
def reload_whitelist(self):
self.whitelist = self.load_json(self.whitelist_file)