添加长消息发送机制

This commit is contained in:
wood 2024-09-09 19:43:25 +08:00
parent 9bed2a55b6
commit c4c558ce1d
3 changed files with 1690 additions and 46 deletions

1612
.gitignore vendored Normal file

File diff suppressed because it is too large Load Diff

10
src/functions.py Normal file
View File

@ -0,0 +1,10 @@
# 长消息发送截断成多个消息
async def send_long_message(event, prefix, items):
message = prefix + "\n"
for i, item in enumerate(items, 1):
if len(message) + len(item) > 4000: # 留一些余地
await event.reply(message)
message = ""
message += f"{i}. {item}\n"
if message:
await event.reply(message)

View File

@ -3,8 +3,9 @@ import json
import tldextract import tldextract
import urllib.parse import urllib.parse
import logging import logging
from functions import send_long_message
logger = logging.getLogger('TeleGuard.LinkFilter') logger = logging.getLogger("TeleGuard.LinkFilter")
class LinkFilter: class LinkFilter:
@ -15,7 +16,8 @@ class LinkFilter:
self.whitelist = [] self.whitelist = []
self.load_data_from_file() self.load_data_from_file()
self.link_pattern = re.compile(r''' self.link_pattern = re.compile(
r"""
\b \b
(?: (?:
(?:https?://)? # http:// or https:// (optional) (?:https?://)? # http:// or https:// (optional)
@ -28,18 +30,19 @@ class LinkFilter:
(?:/[^\s]*)? # optional path and query string (?:/[^\s]*)? # optional path and query string
) )
\b \b
''', re.VERBOSE | re.IGNORECASE) """,
re.VERBOSE | re.IGNORECASE,
)
def load_json(self, file_path): def load_json(self, file_path):
try: try:
with open(file_path, 'r') as f: with open(file_path, "r") as f:
return json.load(f) return json.load(f)
except FileNotFoundError: except FileNotFoundError:
return [] return []
def save_json(self, file_path, data): def save_json(self, file_path, data):
with open(file_path, 'w') as f: with open(file_path, "w") as f:
json.dump(data, f) json.dump(data, f)
def save_keywords(self): def save_keywords(self):
@ -51,23 +54,26 @@ class LinkFilter:
def load_data_from_file(self): def load_data_from_file(self):
self.keywords = self.load_json(self.keywords_file) self.keywords = self.load_json(self.keywords_file)
self.whitelist = self.load_json(self.whitelist_file) self.whitelist = self.load_json(self.whitelist_file)
logger.info(f"Reloaded {len(self.keywords)} keywords and {len(self.whitelist)} whitelist entries") logger.info(
f"Reloaded {len(self.keywords)} keywords and {len(self.whitelist)} whitelist entries"
)
def normalize_link(self, link): def normalize_link(self, link):
# 移除协议部分(如 http:// 或 https:// # 移除协议部分(如 http:// 或 https://
link = re.sub(r'^https?://', '', link) link = re.sub(r"^https?://", "", link)
# 移除开头的双斜杠 # 移除开头的双斜杠
link = link.lstrip('/') link = link.lstrip("/")
parsed = urllib.parse.urlparse(f"http://{link}") parsed = urllib.parse.urlparse(f"http://{link}")
normalized = urllib.parse.urlunparse(('', parsed.netloc, parsed.path, parsed.params, parsed.query, '')) normalized = urllib.parse.urlunparse(
result = normalized.rstrip('/') ("", parsed.netloc, parsed.path, parsed.params, parsed.query, "")
)
result = normalized.rstrip("/")
logger.debug(f"Normalized link: {link} -> {result}") logger.debug(f"Normalized link: {link} -> {result}")
return result return result
def is_whitelisted(self, link): def is_whitelisted(self, link):
extracted = tldextract.extract(link) extracted = tldextract.extract(link)
domain = f"{extracted.domain}.{extracted.suffix}" domain = f"{extracted.domain}.{extracted.suffix}"
@ -79,7 +85,7 @@ class LinkFilter:
if self.link_pattern.match(keyword): if self.link_pattern.match(keyword):
keyword = self.normalize_link(keyword) keyword = self.normalize_link(keyword)
# 确保在这里去掉开头的双斜杠 # 确保在这里去掉开头的双斜杠
keyword = keyword.lstrip('/') keyword = keyword.lstrip("/")
if keyword not in self.keywords: if keyword not in self.keywords:
self.keywords.append(keyword) self.keywords.append(keyword)
self.save_keywords() self.save_keywords()
@ -101,10 +107,14 @@ class LinkFilter:
original_count = len(self.keywords) original_count = len(self.keywords)
# 创建一个列表,包含所有需要移除的关键词 # 创建一个列表,包含所有需要移除的关键词
removed_keywords = [kw for kw in self.keywords if substring.lower() in kw.lower()] removed_keywords = [
kw for kw in self.keywords if substring.lower() in kw.lower()
]
# 修改关键词列表,仅保留不包含指定子字符串的关键词 # 修改关键词列表,仅保留不包含指定子字符串的关键词
self.keywords = [kw for kw in self.keywords if substring.lower() not in kw.lower()] self.keywords = [
kw for kw in self.keywords if substring.lower() not in kw.lower()
]
# 如果有关键词被移除,则保存关键词列表并重新加载数据 # 如果有关键词被移除,则保存关键词列表并重新加载数据
if removed_keywords: if removed_keywords:
@ -114,8 +124,6 @@ class LinkFilter:
# 返回被移除的关键词列表 # 返回被移除的关键词列表
return removed_keywords return removed_keywords
def should_filter(self, text): def should_filter(self, text):
logger.debug(f"Checking text: {text}") logger.debug(f"Checking text: {text}")
if any(keyword.lower() in text.lower() for keyword in self.keywords): if any(keyword.lower() in text.lower() for keyword in self.keywords):
@ -127,7 +135,7 @@ class LinkFilter:
new_non_whitelisted_links = [] new_non_whitelisted_links = []
for link in links: for link in links:
normalized_link = self.normalize_link(link) normalized_link = self.normalize_link(link)
normalized_link = normalized_link.lstrip('/') # 去除开头的双斜杠 normalized_link = normalized_link.lstrip("/") # 去除开头的双斜杠
if not self.is_whitelisted(normalized_link): if not self.is_whitelisted(normalized_link):
logger.debug(f"Link not whitelisted: {normalized_link}") logger.debug(f"Link not whitelisted: {normalized_link}")
if normalized_link not in self.keywords: if normalized_link not in self.keywords:
@ -141,45 +149,59 @@ class LinkFilter:
logger.info(f"New non-whitelisted links found: {new_non_whitelisted_links}") logger.info(f"New non-whitelisted links found: {new_non_whitelisted_links}")
return False, new_non_whitelisted_links return False, new_non_whitelisted_links
async def handle_keyword_command(self, event, command, args): async def handle_keyword_command(self, event, command, args):
if command == '/list': if command == "/list":
self.load_data_from_file() self.load_data_from_file()
keywords = self.keywords keywords = self.keywords
await event.reply("当前关键词列表:\n" + "\n".join(keywords) if keywords else "关键词列表为空。") if not keywords:
elif command == '/add' and args: await event.reply("关键词列表为空。")
keyword = ' '.join(args) else:
await send_long_message(event, "当前关键词列表:", keywords)
elif command == "/add" and args:
keyword = " ".join(args)
if keyword not in self.keywords: if keyword not in self.keywords:
self.add_keyword(keyword) self.add_keyword(keyword)
await event.reply(f"关键词 '{keyword}' 已添加。") await event.reply(f"关键词 '{keyword}' 已添加。")
else: else:
await event.reply(f"关键词 '{keyword}' 已存在。") await event.reply(f"关键词 '{keyword}' 已存在。")
elif command == '/delete' and args: elif command == "/delete" and args:
keyword = ' '.join(args) keyword = " ".join(args)
if self.remove_keyword(keyword): if self.remove_keyword(keyword):
await event.reply(f"关键词 '{keyword}' 已删除。") await event.reply(f"关键词 '{keyword}' 已删除。")
else: else:
similar_keywords = [k for k in self.keywords if keyword.lower() in k.lower()] similar_keywords = [
k for k in self.keywords if keyword.lower() in k.lower()
]
if similar_keywords: if similar_keywords:
await event.reply(f"未找到精确匹配的关键词 '{keyword}'\n\n以下是相似的关键词:\n" + "\n".join(similar_keywords)) await send_long_message(
event,
f"未找到精确匹配的关键词 '{keyword}'\n\n以下是相似的关键词:",
similar_keywords,
)
else: else:
await event.reply(f"关键词 '{keyword}' 不存在。") await event.reply(f"关键词 '{keyword}' 不存在。")
elif command == '/deletecontaining' and args: elif command == "/deletecontaining" and args:
substring = ' '.join(args) substring = " ".join(args)
removed_keywords = self.remove_keywords_containing(substring) removed_keywords = self.remove_keywords_containing(substring)
if removed_keywords: if removed_keywords:
await event.reply(f"已删除包含 '{substring}' 的以下关键词:\n" + "\n".join(removed_keywords)) await send_long_message(
event, f"已删除包含 '{substring}' 的以下关键词:", removed_keywords
)
else: else:
await event.reply(f"没有找到包含 '{substring}' 的关键词。") await event.reply(f"没有找到包含 '{substring}' 的关键词。")
else: else:
await event.reply("无效的命令或参数。") await event.reply("无效的命令或参数。")
async def handle_whitelist_command(self, event, command, args): async def handle_whitelist_command(self, event, command, args):
if command == '/listwhite': if command == "/listwhite":
self.load_data_from_file() self.load_data_from_file()
whitelist = self.whitelist whitelist = self.whitelist
await event.reply("白名单域名列表:\n" + "\n".join(whitelist) if whitelist else "白名单为空。") await event.reply(
elif command == '/addwhite' and args: "白名单域名列表:\n" + "\n".join(whitelist)
if whitelist
else "白名单为空。"
)
elif command == "/addwhite" and args:
domain = args[0].lower() domain = args[0].lower()
if domain not in self.whitelist: if domain not in self.whitelist:
self.whitelist.append(domain) self.whitelist.append(domain)
@ -188,7 +210,7 @@ class LinkFilter:
await event.reply(f"域名 '{domain}' 已添加到白名单。") await event.reply(f"域名 '{domain}' 已添加到白名单。")
else: else:
await event.reply(f"域名 '{domain}' 已在白名单中。") await event.reply(f"域名 '{domain}' 已在白名单中。")
elif command == '/delwhite' and args: elif command == "/delwhite" and args:
domain = args[0].lower() domain = args[0].lower()
if domain in self.whitelist: if domain in self.whitelist:
self.whitelist.remove(domain) self.whitelist.remove(domain)