mirror of
https://github.com/woodchen-ink/Q58Bot.git
synced 2025-07-18 13:52:07 +08:00
实现日志记录以增强调试和监控
在`guard.py`和`link_filter.py`模块中集成日志记录功能,以促进更好的调试和运行时监控。添加了`logger`实例,并在关键操作中散布日志条目,记录消息处理、链接过滤决策和配置加载等活动。
This commit is contained in:
parent
b1cd9b3a2a
commit
8642737fa4
11
src/guard.py
11
src/guard.py
@ -7,6 +7,11 @@ from collections import deque
|
|||||||
import time
|
import time
|
||||||
from link_filter import LinkFilter
|
from link_filter import LinkFilter
|
||||||
from bot_commands import handle_command
|
from bot_commands import handle_command
|
||||||
|
import logging
|
||||||
|
|
||||||
|
# 设置日志
|
||||||
|
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||||
|
logger = logging.getLogger('TeleGuard')
|
||||||
|
|
||||||
# 环境变量
|
# 环境变量
|
||||||
BOT_TOKEN = os.environ.get('BOT_TOKEN')
|
BOT_TOKEN = os.environ.get('BOT_TOKEN')
|
||||||
@ -50,16 +55,18 @@ async def delete_message_after_delay(client, chat, message, delay):
|
|||||||
|
|
||||||
async def process_message(event, client):
|
async def process_message(event, client):
|
||||||
if not event.is_private:
|
if not event.is_private:
|
||||||
|
logger.debug(f"Processing message: {event.message.text}")
|
||||||
should_filter, new_links = link_filter.should_filter(event.message.text)
|
should_filter, new_links = link_filter.should_filter(event.message.text)
|
||||||
if should_filter:
|
if should_filter:
|
||||||
|
logger.info(f"Message should be filtered: {event.message.text}")
|
||||||
if event.sender_id != ADMIN_ID:
|
if event.sender_id != ADMIN_ID:
|
||||||
await event.delete()
|
await event.delete()
|
||||||
notification = await event.respond("已撤回该消息。注:包含关键词或重复发送的非白名单链接会被自动撤回。")
|
notification = await event.respond("已撤回该消息。注:包含关键词或重复发送的非白名单链接会被自动撤回。")
|
||||||
asyncio.create_task(delete_message_after_delay(client, event.chat_id, notification, 3 * 60))
|
asyncio.create_task(delete_message_after_delay(client, event.chat_id, notification, 3 * 60))
|
||||||
return
|
return
|
||||||
if new_links:
|
if new_links:
|
||||||
# 可以在这里添加日志记录或其他操作
|
logger.info(f"New non-whitelisted links found: {new_links}")
|
||||||
pass
|
|
||||||
|
|
||||||
async def command_handler(event, link_filter):
|
async def command_handler(event, link_filter):
|
||||||
if event.is_private and event.sender_id == ADMIN_ID:
|
if event.is_private and event.sender_id == ADMIN_ID:
|
||||||
|
@ -2,6 +2,10 @@ import re
|
|||||||
import json
|
import json
|
||||||
import tldextract
|
import tldextract
|
||||||
import urllib.parse
|
import urllib.parse
|
||||||
|
import logging
|
||||||
|
|
||||||
|
logger = logging.getLogger('TeleGuard.LinkFilter')
|
||||||
|
|
||||||
|
|
||||||
class LinkFilter:
|
class LinkFilter:
|
||||||
def __init__(self, keywords_file, whitelist_file):
|
def __init__(self, keywords_file, whitelist_file):
|
||||||
@ -16,15 +20,17 @@ class LinkFilter:
|
|||||||
(?:
|
(?:
|
||||||
(?:https?://)? # http:// or https:// (optional)
|
(?:https?://)? # http:// or https:// (optional)
|
||||||
(?:(?:www\.)? # www. (optional)
|
(?:(?:www\.)? # www. (optional)
|
||||||
(?:[a-zA-Z0-9-]+\.)+ # domain
|
(?:[a-zA-Z0-9-]+\.)+ # domain
|
||||||
[a-zA-Z]{2,} # TLD
|
[a-zA-Z]{2,} # TLD
|
||||||
| # or
|
| # or
|
||||||
(?:t\.me|telegram\.me) # Telegram links
|
(?:t\.me|telegram\.me) # Telegram links
|
||||||
)
|
)
|
||||||
(?:/[^\s]*)? # optional path and query string
|
(?:/[^\s]*)? # optional path and query string
|
||||||
)
|
)
|
||||||
\b
|
\b
|
||||||
''', re.VERBOSE | re.IGNORECASE)
|
''', re.VERBOSE | re.IGNORECASE)
|
||||||
|
logger.info(f"LinkFilter initialized with keywords file: {keywords_file} and whitelist file: {whitelist_file}")
|
||||||
|
|
||||||
|
|
||||||
def load_json(self, file_path):
|
def load_json(self, file_path):
|
||||||
try:
|
try:
|
||||||
@ -50,12 +56,17 @@ class LinkFilter:
|
|||||||
def normalize_link(self, link):
|
def normalize_link(self, link):
|
||||||
link = re.sub(r'^https?://', '', link)
|
link = re.sub(r'^https?://', '', link)
|
||||||
parsed = urllib.parse.urlparse(f"http://{link}")
|
parsed = urllib.parse.urlparse(f"http://{link}")
|
||||||
return urllib.parse.urlunparse(('', parsed.netloc, parsed.path, parsed.params, parsed.query, '')).rstrip('/')
|
normalized = urllib.parse.urlunparse(('', parsed.netloc, parsed.path, parsed.params, parsed.query, ''))
|
||||||
|
result = normalized.rstrip('/')
|
||||||
|
logger.debug(f"Normalized link: {link} -> {result}")
|
||||||
|
return result
|
||||||
|
|
||||||
def is_whitelisted(self, link):
|
def is_whitelisted(self, link):
|
||||||
extracted = tldextract.extract(link)
|
extracted = tldextract.extract(link)
|
||||||
domain = f"{extracted.domain}.{extracted.suffix}"
|
domain = f"{extracted.domain}.{extracted.suffix}"
|
||||||
return domain in self.whitelist
|
result = domain in self.whitelist
|
||||||
|
logger.debug(f"Whitelist check for {link}: {'Passed' if result else 'Failed'}")
|
||||||
|
return result
|
||||||
|
|
||||||
def add_keyword(self, keyword):
|
def add_keyword(self, keyword):
|
||||||
if self.link_pattern.match(keyword):
|
if self.link_pattern.match(keyword):
|
||||||
@ -63,6 +74,9 @@ class LinkFilter:
|
|||||||
if keyword not in self.keywords:
|
if keyword not in self.keywords:
|
||||||
self.keywords.append(keyword)
|
self.keywords.append(keyword)
|
||||||
self.save_keywords()
|
self.save_keywords()
|
||||||
|
logger.info(f"New keyword added: {keyword}")
|
||||||
|
else:
|
||||||
|
logger.debug(f"Keyword already exists: {keyword}")
|
||||||
|
|
||||||
def remove_keyword(self, keyword):
|
def remove_keyword(self, keyword):
|
||||||
if self.link_pattern.match(keyword):
|
if self.link_pattern.match(keyword):
|
||||||
@ -74,18 +88,25 @@ class LinkFilter:
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
def should_filter(self, text):
|
def should_filter(self, text):
|
||||||
|
logger.debug(f"Checking text: {text}")
|
||||||
if any(keyword.lower() in text.lower() for keyword in self.keywords):
|
if any(keyword.lower() in text.lower() for keyword in self.keywords):
|
||||||
|
logger.info(f"Text contains keyword: {text}")
|
||||||
return True, []
|
return True, []
|
||||||
|
|
||||||
links = self.link_pattern.findall(text)
|
links = self.link_pattern.findall(text)
|
||||||
|
logger.debug(f"Found links: {links}")
|
||||||
new_non_whitelisted_links = []
|
new_non_whitelisted_links = []
|
||||||
for link in links:
|
for link in links:
|
||||||
normalized_link = self.normalize_link(link)
|
normalized_link = self.normalize_link(link)
|
||||||
if not self.is_whitelisted(normalized_link):
|
if not self.is_whitelisted(normalized_link):
|
||||||
|
logger.debug(f"Link not whitelisted: {normalized_link}")
|
||||||
if normalized_link not in self.keywords:
|
if normalized_link not in self.keywords:
|
||||||
new_non_whitelisted_links.append(normalized_link)
|
new_non_whitelisted_links.append(normalized_link)
|
||||||
self.add_keyword(normalized_link)
|
self.add_keyword(normalized_link)
|
||||||
else:
|
else:
|
||||||
|
logger.info(f"Existing keyword found: {normalized_link}")
|
||||||
return True, []
|
return True, []
|
||||||
|
|
||||||
|
if new_non_whitelisted_links:
|
||||||
|
logger.info(f"New non-whitelisted links found: {new_non_whitelisted_links}")
|
||||||
return False, new_non_whitelisted_links
|
return False, new_non_whitelisted_links
|
||||||
|
Loading…
x
Reference in New Issue
Block a user