消息处理和链接过滤增强

- 优化了消息处理逻辑,以检查并删除包含已知关键词或新非白名单链接的消息。
- 扩展了链接过滤器,以支持子域名、主域名和通配符匹配,提高了过滤器的灵活性和有效性。
- 修正了should_filter方法,首次发送的非白名单链接不会触发消息删除,仅记录链接以供未来检查。
- 调整了代码以使用环境变量加载关键词和白名单文件路径,提高了配置的灵活性。
- 限速器和消息删除逻辑进行了优化,调整了通知消息的自动删除时间间隔。
This commit is contained in:
wood 2024-09-04 17:08:28 +08:00
parent 645db7cb5b
commit f6fc01c26a
2 changed files with 34 additions and 19 deletions

View File

@ -5,7 +5,7 @@ import time
from telethon import TelegramClient, events from telethon import TelegramClient, events
from collections import deque from collections import deque
from link_filter import LinkFilter from link_filter import LinkFilter
from bot_commands import handle_command, get_keywords from bot_commands import handle_command
# 环境变量 # 环境变量
@ -19,7 +19,7 @@ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(level
logger = logging.getLogger('TeleGuard') logger = logging.getLogger('TeleGuard')
# 创建 LinkFilter 实例 # 创建 LinkFilter 实例
link_filter = LinkFilter('/app/data/keywords.json', '/app/data/whitelist.json') link_filter = LinkFilter(KEYWORDS_FILE, WHITELIST_FILE)
# 限速器 # 限速器
class RateLimiter: class RateLimiter:
@ -60,21 +60,20 @@ async def delete_message_after_delay(client, chat, message, delay):
# 处理消息函数 # 处理消息函数
async def process_message(event, client): async def process_message(event, client):
if not event.is_private: if not event.is_private:
# 检查消息是否包含需要过滤的链接 # 检查消息是否包含已知的关键词(包括之前添加的非白名单链接)
if link_filter.should_filter(event.message.text): if any(keyword in event.message.text for keyword in link_filter.keywords):
if event.sender_id != ADMIN_ID: if event.sender_id != ADMIN_ID:
await event.delete() await event.delete()
notification = await event.respond("已撤回该消息。注:重复发送的链接会被自动撤回。") notification = await event.respond("已撤回该消息。注:重复发送的推广链接会被自动撤回。")
asyncio.create_task(delete_message_after_delay(client, event.chat_id, notification, 30 * 60)) asyncio.create_task(delete_message_after_delay(client, event.chat_id, notification, 3 * 60))
return return
# 检查关键词 # 检查是否有新的非白名单链接
keywords = get_keywords() new_links = link_filter.should_filter(event.message.text)
if any(keyword in event.message.text.lower() for keyword in keywords): if new_links:
if event.sender_id != ADMIN_ID: # 这是第一次发送这些非白名单链接,我们允许消息通过,不发送任何警告
await event.delete() pass
notification = await event.respond("已撤回该消息。注:已发送的推广链接不要多次发送,置顶已有项目的推广链接也会自动撤回。")
asyncio.create_task(delete_message_after_delay(client, event.chat_id, notification, 30 * 60))
async def command_handler(event): async def command_handler(event):
if event.is_private and event.sender_id == ADMIN_ID: if event.is_private and event.sender_id == ADMIN_ID:

View File

@ -38,8 +38,24 @@ class LinkFilter:
def is_whitelisted(self, link): def is_whitelisted(self, link):
extracted = tldextract.extract(link) extracted = tldextract.extract(link)
domain = f"{extracted.domain}.{extracted.suffix}" full_domain = '.'.join(part for part in [extracted.subdomain, extracted.domain, extracted.suffix] if part)
return domain in self.whitelist main_domain = f"{extracted.domain}.{extracted.suffix}"
# 检查完整域名(包括子域名)
if full_domain in self.whitelist:
return True
# 检查主域名
if main_domain in self.whitelist:
return True
# 检查是否有通配符匹配
wildcard_domain = f"*.{main_domain}"
if wildcard_domain in self.whitelist:
return True
return False
def add_keyword(self, link): def add_keyword(self, link):
if link not in self.keywords: if link not in self.keywords:
@ -48,13 +64,13 @@ class LinkFilter:
def should_filter(self, text): def should_filter(self, text):
links = self.link_pattern.findall(text) links = self.link_pattern.findall(text)
new_non_whitelisted_links = []
for link in links: for link in links:
if not self.is_whitelisted(link): if not self.is_whitelisted(link):
if link in self.keywords: if link not in self.keywords:
return True new_non_whitelisted_links.append(link)
else:
self.add_keyword(link) self.add_keyword(link)
return False return new_non_whitelisted_links
def reload_keywords(self): def reload_keywords(self):
self.keywords = self.load_json(self.keywords_file) self.keywords = self.load_json(self.keywords_file)