This commit is contained in:
wood chen 2024-09-18 02:17:45 +08:00
parent 800a60c659
commit c0cf68a834
11 changed files with 3 additions and 685 deletions

View File

@ -24,10 +24,10 @@ jobs:
go-version: '1.22' # 使用你项目需要的 Go 版本 go-version: '1.22' # 使用你项目需要的 Go 版本
- name: Build for amd64 - name: Build for amd64
run: CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o main-amd64 . run: CGO_ENABLED=1 GOOS=linux GOARCH=amd64 go build -o main-amd64 .
- name: Build for arm64 - name: Build for arm64
run: CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o main-arm64 . run: CGO_ENABLED=1 GOOS=linux GOARCH=arm64 go build -o main-arm64 .
- name: Set up QEMU - name: Set up QEMU
uses: docker/setup-qemu-action@v3 uses: docker/setup-qemu-action@v3

46
main.go
View File

@ -7,8 +7,6 @@ import (
"time" "time"
"github.com/woodchen-ink/Q58Bot/service" "github.com/woodchen-ink/Q58Bot/service"
tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api/v5"
) )
var ( var (
@ -39,47 +37,6 @@ func initializeVariables() {
} }
} }
func setupBot() {
bot, err := tgbotapi.NewBotAPI(BOT_TOKEN)
if err != nil {
log.Panic(err)
}
bot.Debug = true
log.Printf("Authorized on account %s", bot.Self.UserName)
u := tgbotapi.NewUpdate(0)
u.Timeout = 60
updates := bot.GetUpdatesChan(u)
for update := range updates {
if update.Message == nil {
continue
}
if update.Message.Chat.ID != ADMIN_ID {
continue
}
msg := tgbotapi.NewMessage(update.Message.Chat.ID, "")
switch update.Message.Command() {
case "start":
msg.Text = "Hello! I'm your bot."
case "help":
msg.Text = "I can help you with various tasks."
default:
msg.Text = "I don't know that command"
}
if _, err := bot.Send(msg); err != nil {
log.Panic(err)
}
}
}
func runGuard() { func runGuard() {
for { for {
try(func() { try(func() {
@ -110,8 +67,7 @@ func try(fn func(), name string) {
func main() { func main() {
log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile) log.SetFlags(log.Ldate | log.Ltime | log.Lshortfile)
// 使用 goroutines 运行 bot、guard 和 binance 服务 // 使用 goroutines 运行 guard 和 binance 服务
go setupBot()
go runGuard() go runGuard()
go runBinance() go runBinance()

View File

@ -1,5 +0,0 @@
telethon
ccxt
pyTelegramBotAPI
schedule
pytz

View File

@ -1,92 +0,0 @@
import os
import ccxt
import telebot
import schedule
import time
from datetime import datetime, timedelta
import pytz
singapore_tz = pytz.timezone('Asia/Singapore')
exchange = ccxt.binance()
BOT_TOKEN = os.environ['BOT_TOKEN']
CHAT_ID = os.environ['CHAT_ID']
bot = telebot.TeleBot(BOT_TOKEN)
SYMBOLS = os.environ['SYMBOLS'].split(',')
# 用于存储上一条消息的ID
last_message_id = None
def get_ticker_info(symbol):
ticker = exchange.fetch_ticker(symbol)
return {
'symbol': symbol,
'last': ticker['last'],
'change_percent': ticker['percentage'],
# 'high': ticker['high'],
# 'low': ticker['low'],
# 'volume': ticker['baseVolume'],
# 'quote_volume': ticker['quoteVolume'],
# 'bid': ticker['bid'],
# 'ask': ticker['ask']
}
def format_change(change_percent):
if change_percent > 0:
return f"🔼 +{change_percent:.2f}%"
elif change_percent < 0:
return f"🔽 {change_percent:.2f}%"
else:
return f"◀▶ {change_percent:.2f}%"
def send_price_update():
global last_message_id
now = datetime.now(singapore_tz)
message = f"市场更新 - {now.strftime('%Y-%m-%d %H:%M:%S')} (SGT)\n\n"
for symbol in SYMBOLS:
info = get_ticker_info(symbol)
change_str = format_change(info['change_percent'])
message += f"*{info['symbol']}*\n"
message += f"价格: ${info['last']:.7f}\n"
message += f"24h 涨跌: {change_str}\n"
# message += f"24h 高/低: ${info['high']:.7f} / ${info['low']:.7f}\n"
# message += f"24h 成交量: {info['volume']:.2f}\n"
# message += f"24h 成交额: ${info['quote_volume']:.2f}\n"
# message += f"买一/卖一: ${info['bid']:.7f} / ${info['ask']:.7f}\n\n"
# 如果存在上一条消息,则删除它
if last_message_id:
try:
bot.delete_message(chat_id=CHAT_ID, message_id=last_message_id)
except Exception as e:
print(f"Failed to delete previous message: {e}")
# 发送新消息并保存其ID
sent_message = bot.send_message(CHAT_ID, message, parse_mode='Markdown')
last_message_id = sent_message.message_id
def run():
# 立即执行一次价格更新
print("Sending initial price update...")
send_price_update()
# 设置定时任务,每小时整点执行
for hour in range(24):
schedule.every().day.at(f"{hour:02d}:00").do(send_price_update)
print("Scheduled tasks set. Waiting for next hour...")
# 等待下一个整点
now = datetime.now(singapore_tz)
next_hour = (now + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
time.sleep((next_hour - now).total_seconds())
print("Starting main loop...")
while True:
schedule.run_pending()
time.sleep(30) # 每30秒检查一次可以根据需要调整
if __name__ == '__main__':
run()

View File

@ -1,26 +0,0 @@
from telethon.tl.types import InputPeerUser
from telethon.tl.functions.bots import SetBotCommandsRequest
from telethon.tl.types import BotCommand
async def register_commands(client, admin_id):
commands = [
BotCommand('add', '添加新的关键词'),
BotCommand('delete', '删除现有的关键词'),
BotCommand('list', '列出所有当前的关键词'),
BotCommand('deletecontaining', '删除所有包含指定词语的关键词'),
BotCommand('addwhite', '添加域名到白名单'),
BotCommand('delwhite', '从白名单移除域名'),
BotCommand('listwhite', '列出白名单域名'),
]
try:
await client(SetBotCommandsRequest(
commands=commands,
scope=InputPeerUser(admin_id, 0),
lang_code=''
))
print("Bot commands registered successfully.")
except Exception as e:
print(f"Failed to register bot commands: {str(e)}")
__all__ = ['register_commands']

View File

@ -1,95 +0,0 @@
import sqlite3
import logging
import os
import time
logger = logging.getLogger(__name__)
class Database:
def __init__(self, db_file):
self.db_file = db_file
os.makedirs(os.path.dirname(db_file), exist_ok=True)
self._keywords_cache = None
self._whitelist_cache = None
self._cache_time = 0
self.create_tables()
def create_tables(self):
with sqlite3.connect(self.db_file) as conn:
cursor = conn.cursor()
# 创建关键词表并添加索引
cursor.execute('''
CREATE TABLE IF NOT EXISTS keywords
(id INTEGER PRIMARY KEY, keyword TEXT UNIQUE)
''')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_keyword ON keywords(keyword)')
# 创建白名单表并添加索引
cursor.execute('''
CREATE TABLE IF NOT EXISTS whitelist
(id INTEGER PRIMARY KEY, domain TEXT UNIQUE)
''')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_domain ON whitelist(domain)')
# 创建全文搜索虚拟表
cursor.execute('''
CREATE VIRTUAL TABLE IF NOT EXISTS keywords_fts USING fts5(keyword)
''')
conn.commit()
def execute_query(self, query, params=None):
with sqlite3.connect(self.db_file) as conn:
cursor = conn.cursor()
if params:
cursor.execute(query, params)
else:
cursor.execute(query)
conn.commit()
return cursor.fetchall()
def add_keyword(self, keyword):
self.execute_query("INSERT OR IGNORE INTO keywords (keyword) VALUES (?)", (keyword,))
self.execute_query("INSERT OR IGNORE INTO keywords_fts (keyword) VALUES (?)", (keyword,))
self._invalidate_cache()
def remove_keyword(self, keyword):
self.execute_query("DELETE FROM keywords WHERE keyword = ?", (keyword,))
self.execute_query("DELETE FROM keywords_fts WHERE keyword = ?", (keyword,))
self._invalidate_cache()
def get_all_keywords(self):
current_time = time.time()
if self._keywords_cache is None or current_time - self._cache_time > 300: # 5分钟缓存
self._keywords_cache = [row[0] for row in self.execute_query("SELECT keyword FROM keywords")]
self._cache_time = current_time
return self._keywords_cache
def remove_keywords_containing(self, substring):
query = "DELETE FROM keywords WHERE keyword LIKE ?"
result = self.execute_query(query, (f"%{substring}%",))
self.execute_query("DELETE FROM keywords_fts WHERE keyword LIKE ?", (f"%{substring}%",))
self._invalidate_cache()
return result
def add_whitelist(self, domain):
self.execute_query("INSERT OR IGNORE INTO whitelist (domain) VALUES (?)", (domain,))
self._invalidate_cache()
def remove_whitelist(self, domain):
self.execute_query("DELETE FROM whitelist WHERE domain = ?", (domain,))
self._invalidate_cache()
def get_all_whitelist(self):
current_time = time.time()
if self._whitelist_cache is None or current_time - self._cache_time > 300: # 5分钟缓存
self._whitelist_cache = [row[0] for row in self.execute_query("SELECT domain FROM whitelist")]
self._cache_time = current_time
return self._whitelist_cache
def search_keywords(self, pattern):
return [row[0] for row in self.execute_query("SELECT keyword FROM keywords_fts WHERE keyword MATCH ?", (pattern,))]
def _invalidate_cache(self):
self._keywords_cache = None
self._whitelist_cache = None
self._cache_time = 0

View File

@ -1,10 +0,0 @@
# 长消息发送截断成多个消息
async def send_long_message(event, prefix, items):
message = prefix + "\n"
for i, item in enumerate(items, 1):
if len(message) + len(item) > 4000: # 留一些余地
await event.reply(message)
message = ""
message += f"{i}. {item}\n"
if message:
await event.reply(message)

View File

@ -1,128 +0,0 @@
import os
import logging
import asyncio
from functools import partial
from telethon import TelegramClient, events
from collections import deque
import time
from link_filter import LinkFilter
from bot_commands import register_commands
# 环境变量
BOT_TOKEN = os.environ.get('BOT_TOKEN')
ADMIN_ID = int(os.environ.get('ADMIN_ID'))
DB_FILE = '/app/data/q58.db' # 新的数据库文件路径
# 设置日志
DEBUG_MODE = os.environ.get('DEBUG_MODE', 'False').lower() == 'true'
logging.basicConfig(level=logging.INFO if not DEBUG_MODE else logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('TeleGuard')
link_filter_logger = logging.getLogger('TeleGuard.LinkFilter')
link_filter_logger.setLevel(logging.DEBUG if DEBUG_MODE else logging.INFO)
# 调整第三方库的日志级别
logging.getLogger('telethon').setLevel(logging.WARNING)
# 创建 LinkFilter 实例
link_filter = LinkFilter(DB_FILE)
class RateLimiter:
def __init__(self, max_calls, period):
self.max_calls = max_calls
self.period = period
self.calls = deque()
async def __aenter__(self):
now = time.time()
while self.calls and now - self.calls[0] >= self.period:
self.calls.popleft()
if len(self.calls) >= self.max_calls:
await asyncio.sleep(self.period - (now - self.calls[0]))
self.calls.append(time.time())
return self
async def __aexit__(self, *args):
pass
rate_limiter = RateLimiter(max_calls=10, period=1)
async def delete_message_after_delay(client, chat, message, delay):
await asyncio.sleep(delay)
try:
await client.delete_messages(chat, message)
except Exception as e:
logger.error(f"Failed to delete message: {e}")
async def process_message(event, client):
if not event.is_private:
logger.debug(f"Processing message: {event.message.text}")
should_filter, new_links = link_filter.should_filter(event.message.text)
if should_filter:
logger.info(f"Message should be filtered: {event.message.text}")
if event.sender_id != ADMIN_ID:
await event.delete()
notification = await event.respond("已撤回该消息。注:一个链接不能发两次.")
asyncio.create_task(delete_message_after_delay(client, event.chat_id, notification, 3 * 60))
return
if new_links:
logger.info(f"New non-whitelisted links found: {new_links}")
async def message_handler(event, link_filter, rate_limiter):
if not event.is_private or event.sender_id != ADMIN_ID:
async with rate_limiter:
await process_message(event, event.client)
# 处理请求
async def command_handler(event, link_filter):
if event.is_private and event.sender_id == ADMIN_ID:
link_filter.load_data_from_file()
command, *args = event.message.text.split()
command = command.lower()
if command in ['/add', '/delete', '/list', '/deletecontaining']:
await link_filter.handle_keyword_command(event, command, args)
elif command in ['/addwhite', '/delwhite', '/listwhite']:
await link_filter.handle_whitelist_command(event, command, args)
if event.raw_text.startswith(('/add', '/delete', '/deletecontaining','/list', '/addwhite', '/delwhite', '/listwhite')):
link_filter.load_data_from_file()
async def start_bot():
async with TelegramClient('bot', api_id=6, api_hash='eb06d4abfb49dc3eeb1aeb98ae0f581e') as client:
await client.start(bot_token=BOT_TOKEN)
await register_commands(client, ADMIN_ID)
client.add_event_handler(
partial(command_handler, link_filter=link_filter),
events.NewMessage(pattern='/add|/delete|/list|/addwhite|/delwhite|/listwhite')
)
client.add_event_handler(
partial(message_handler, link_filter=link_filter, rate_limiter=rate_limiter),
events.NewMessage()
)
logger.info("TeleGuard is running...")
await client.run_until_disconnected()
async def main():
while True:
try:
await start_bot()
except (KeyboardInterrupt, SystemExit):
logger.info("TeleGuard is shutting down...")
break
except Exception as e:
logger.error(f"An error occurred in TeleGuard: {str(e)}")
logger.info("Attempting to restart TeleGuard in 60 seconds...")
await asyncio.sleep(60)
def run():
asyncio.run(main())
if __name__ == '__main__':
run()

View File

@ -1,183 +0,0 @@
import re
import urllib.parse
import logging
from database import Database
from functions import send_long_message
logger = logging.getLogger("TeleGuard.LinkFilter")
class LinkFilter:
def __init__(self, db_file):
self.db = Database(db_file)
self.load_data_from_file()
self.link_pattern = re.compile(
r"""
\b
(?:
(?:https?://)? # http:// or https:// (optional)
(?:(?:www\.)? # www. (optional)
(?:[a-zA-Z0-9-]+\.)+ # domain
[a-zA-Z]{2,} # TLD
| # or
(?:t\.me|telegram\.me) # Telegram links
)
(?:/[^\s]*)? # optional path and query string
)
\b
""",
re.VERBOSE | re.IGNORECASE,
)
def load_data_from_file(self):
self.keywords = self.db.get_all_keywords()
self.whitelist = self.db.get_all_whitelist()
logger.info(
f"Loaded {len(self.keywords)} keywords and {len(self.whitelist)} whitelist entries from database"
)
def normalize_link(self, link):
link = re.sub(r"^https?://", "", link)
link = link.lstrip("/")
parsed = urllib.parse.urlparse(f"http://{link}")
normalized = urllib.parse.urlunparse(
("", parsed.netloc, parsed.path, parsed.params, parsed.query, "")
)
result = normalized.rstrip("/")
logger.debug(f"Normalized link: {link} -> {result}")
return result
def extract_domain(self, url):
parsed = urllib.parse.urlparse(url)
domain = parsed.netloc or parsed.path
domain = domain.split(':')[0] # Remove port if present
parts = domain.split('.')
if len(parts) > 2:
domain = '.'.join(parts[-2:])
return domain.lower()
def is_whitelisted(self, link):
domain = self.extract_domain(link)
result = domain in self.db.get_all_whitelist() # 使用缓存机制
logger.debug(f"Whitelist check for {link}: {'Passed' if result else 'Failed'}")
return result
def add_keyword(self, keyword):
if self.link_pattern.match(keyword):
keyword = self.normalize_link(keyword)
keyword = keyword.lstrip("/")
if keyword not in self.db.get_all_keywords(): # 使用缓存机制
self.db.add_keyword(keyword)
logger.info(f"New keyword added: {keyword}")
self.load_data_from_file()
else:
logger.debug(f"Keyword already exists: {keyword}")
def remove_keyword(self, keyword):
if keyword in self.db.get_all_keywords(): # 使用缓存机制
self.db.remove_keyword(keyword)
self.load_data_from_file()
return True
return False
def remove_keywords_containing(self, substring):
removed_keywords = self.db.remove_keywords_containing(substring)
self.load_data_from_file()
return removed_keywords
def should_filter(self, text):
logger.debug(f"Checking text: {text}")
if any(keyword.lower() in text.lower() for keyword in self.db.get_all_keywords()): # 使用缓存机制
logger.info(f"Text contains keyword: {text}")
return True, []
links = self.link_pattern.findall(text)
logger.debug(f"Found links: {links}")
new_non_whitelisted_links = []
for link in links:
normalized_link = self.normalize_link(link)
normalized_link = normalized_link.lstrip("/")
if not self.is_whitelisted(normalized_link):
logger.debug(f"Link not whitelisted: {normalized_link}")
if normalized_link not in self.db.get_all_keywords(): # 使用缓存机制
new_non_whitelisted_links.append(normalized_link)
self.add_keyword(normalized_link)
else:
logger.info(f"Existing keyword found: {normalized_link}")
return True, []
if new_non_whitelisted_links:
logger.info(f"New non-whitelisted links found: {new_non_whitelisted_links}")
return False, new_non_whitelisted_links
async def handle_keyword_command(self, event, command, args):
if command == "/list":
keywords = self.db.get_all_keywords() # 使用缓存机制
if not keywords:
await event.reply("关键词列表为空。")
else:
await send_long_message(event, "当前关键词列表:", keywords)
elif command == "/add" and args:
keyword = " ".join(args)
if keyword not in self.db.get_all_keywords(): # 使用缓存机制
self.add_keyword(keyword)
await event.reply(f"关键词 '{keyword}' 已添加。")
else:
await event.reply(f"关键词 '{keyword}' 已存在。")
elif command == "/delete" and args:
keyword = " ".join(args)
if self.remove_keyword(keyword):
await event.reply(f"关键词 '{keyword}' 已删除。")
else:
similar_keywords = self.db.search_keywords(keyword) # 使用模糊搜索
if similar_keywords:
await send_long_message(
event,
f"未找到精确匹配的关键词 '{keyword}'\n\n以下是相似的关键词:",
similar_keywords,
)
else:
await event.reply(f"关键词 '{keyword}' 不存在。")
elif command == "/deletecontaining" and args:
substring = " ".join(args)
removed_keywords = self.remove_keywords_containing(substring)
if removed_keywords:
await send_long_message(
event, f"已删除包含 '{substring}' 的以下关键词:", removed_keywords
)
else:
await event.reply(f"没有找到包含 '{substring}' 的关键词。")
elif command == "/search" and args:
pattern = " ".join(args)
search_results = self.db.search_keywords(pattern)
if search_results:
await send_long_message(event, f"搜索 '{pattern}' 的结果:", search_results)
else:
await event.reply(f"没有找到匹配 '{pattern}' 的关键词。")
else:
await event.reply("无效的命令或参数。")
async def handle_whitelist_command(self, event, command, args):
if command == "/listwhite":
whitelist = self.db.get_all_whitelist() # 使用缓存机制
await event.reply(
"白名单域名列表:\n" + "\n".join(whitelist)
if whitelist
else "白名单为空。"
)
elif command == "/addwhite" and args:
domain = args[0].lower()
if domain not in self.db.get_all_whitelist(): # 使用缓存机制
self.db.add_whitelist(domain)
await event.reply(f"域名 '{domain}' 已添加到白名单。")
else:
await event.reply(f"域名 '{domain}' 已在白名单中。")
elif command == "/delwhite" and args:
domain = args[0].lower()
if domain in self.db.get_all_whitelist(): # 使用缓存机制
self.db.remove_whitelist(domain)
await event.reply(f"域名 '{domain}' 已从白名单中删除。")
else:
await event.reply(f"域名 '{domain}' 不在白名单中。")
else:
await event.reply("无效的命令或参数。")

View File

@ -1,54 +0,0 @@
import os
import multiprocessing
import guard
import binance
import logging
import asyncio
from bot_commands import register_commands
from telethon import TelegramClient
BOT_TOKEN = os.environ.get('BOT_TOKEN')
ADMIN_ID = int(os.environ.get('ADMIN_ID'))
async def setup_bot():
client = TelegramClient('bot', api_id=6, api_hash='eb06d4abfb49dc3eeb1aeb98ae0f581e')
await client.start(bot_token=BOT_TOKEN)
await register_commands(client, ADMIN_ID)
await client.disconnect()
def run_setup_bot():
asyncio.run(setup_bot())
def run_guard():
while True:
try:
guard.run()
except Exception as e:
logging.error(f"Guard process crashed: {str(e)}")
logging.info("Restarting Guard process...")
def run_binance():
while True:
try:
binance.run()
except Exception as e:
logging.error(f"Binance process crashed: {str(e)}")
logging.info("Restarting Binance process...")
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# 注册机器人命令
run_setup_bot()
# 创建两个进程分别运行 guard 和 binance 服务
guard_process = multiprocessing.Process(target=run_guard)
binance_process = multiprocessing.Process(target=run_binance)
# 启动进程
guard_process.start()
binance_process.start()
# 等待进程结束
guard_process.join()
binance_process.join()

View File

@ -1,45 +0,0 @@
import json
import os
import logging
from database import Database
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def migrate_data(json_file, db_file):
try:
# 确保 data 目录存在
os.makedirs(os.path.dirname(db_file), exist_ok=True)
logger.info(f"Ensuring directory exists: {os.path.dirname(db_file)}")
# 创建数据库连接
db = Database(db_file)
logger.info(f"Database connection created: {db_file}")
# 读取 JSON 文件
with open(json_file, 'r') as f:
keywords = json.load(f)
logger.info(f"JSON file loaded: {json_file}")
if not isinstance(keywords, list):
raise ValueError(f"Expected a list in JSON file, but got {type(keywords)}")
# 迁移关键词
for keyword in keywords:
db.add_keyword(keyword)
logger.info(f"Migration complete. Migrated {len(keywords)} keywords.")
# 验证迁移
migrated_keywords = db.get_all_keywords()
logger.info(f"Verified {len(migrated_keywords)} keywords in the database.")
except Exception as e:
logger.error(f"An error occurred during migration: {str(e)}")
raise
if __name__ == "__main__":
json_file = '/app/data/keywords.json' # 旧的 JSON 文件路径
db_file = '/app/data/q58.db' # 新的数据库文件路径
migrate_data(json_file, db_file)