Files
zsglpt/email_service.py
yuyx a7976bcdfc fix: SMTP配额重置使用北京时间
- 添加pytz时区支持
- 配额重置日期改为使用北京时间(UTC+8)
- 确保配额在北京时间凌晨0点重置,而不是UTC时间

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-13 03:57:44 +08:00

2140 lines
68 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
邮件服务模块
功能:
1. 多SMTP配置管理主备切换、故障转移
2. 发送纯文本/HTML邮件
3. 发送带附件邮件支持ZIP压缩
4. 异步发送队列
5. 每日发送限额控制
6. 发送日志记录
"""
import os
import smtplib
import threading
import queue
import time
import zipfile
import secrets
from datetime import datetime, timedelta
import pytz
# 北京时区
BEIJING_TZ = pytz.timezone('Asia/Shanghai')
def get_beijing_today():
"""获取北京时间的今天日期字符串"""
return datetime.now(BEIJING_TZ).strftime('%Y-%m-%d')
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.mime.image import MIMEImage
from email import encoders
from email.header import Header
from email.utils import formataddr
from typing import Optional, List, Dict, Any, Callable
from io import BytesIO
import db_pool
from crypto_utils import encrypt_password, decrypt_password, is_encrypted
def parse_datetime(dt_str: str) -> datetime:
"""解析数据库中的时间字符串,支持带微秒和不带微秒的格式"""
if not dt_str:
return datetime.min
# 尝试多种格式
formats = [
'%Y-%m-%d %H:%M:%S.%f', # 带微秒
'%Y-%m-%d %H:%M:%S', # 不带微秒
]
for fmt in formats:
try:
return datetime.strptime(dt_str, fmt)
except ValueError:
continue
# 如果都失败,返回最小时间(视为过期)
return datetime.min
# ============ 常量配置 ============
# 邮件类型
EMAIL_TYPE_REGISTER = 'register' # 注册验证
EMAIL_TYPE_RESET = 'reset' # 密码重置
EMAIL_TYPE_BIND = 'bind' # 邮箱绑定
EMAIL_TYPE_TASK_COMPLETE = 'task_complete' # 任务完成通知
# Token有效期
TOKEN_EXPIRE_REGISTER = 24 * 60 * 60 # 注册验证: 24小时
TOKEN_EXPIRE_RESET = 30 * 60 # 密码重置: 30分钟
TOKEN_EXPIRE_BIND = 60 * 60 # 邮箱绑定: 1小时
# 发送频率限制(秒)
RATE_LIMIT_REGISTER = 60 # 注册邮件: 1分钟
RATE_LIMIT_RESET = 5 * 60 # 重置邮件: 5分钟
RATE_LIMIT_BIND = 60 # 绑定邮件: 1分钟
# 异步队列配置
QUEUE_WORKERS = int(os.environ.get('EMAIL_QUEUE_WORKERS', '2'))
QUEUE_MAX_SIZE = int(os.environ.get('EMAIL_QUEUE_MAX_SIZE', '100'))
# 附件大小限制(字节)
# 大多数邮件服务商限制单封邮件附件大小为10-25MB
# 为安全起见设置为10MB超过则分批发送
MAX_ATTACHMENT_SIZE = int(os.environ.get('EMAIL_MAX_ATTACHMENT_SIZE', str(10 * 1024 * 1024))) # 10MB
# ============ 数据库操作 ============
def init_email_tables():
"""初始化邮件相关数据库表"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 1. SMTP配置表支持多配置
cursor.execute("""
CREATE TABLE IF NOT EXISTS smtp_configs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL DEFAULT '默认配置',
enabled INTEGER DEFAULT 1,
is_primary INTEGER DEFAULT 0,
priority INTEGER DEFAULT 0,
host TEXT NOT NULL DEFAULT '',
port INTEGER DEFAULT 465,
username TEXT NOT NULL DEFAULT '',
password TEXT DEFAULT '',
use_ssl INTEGER DEFAULT 1,
use_tls INTEGER DEFAULT 0,
sender_name TEXT DEFAULT '自动化学习',
sender_email TEXT DEFAULT '',
daily_limit INTEGER DEFAULT 0,
daily_sent INTEGER DEFAULT 0,
daily_reset_date TEXT DEFAULT '',
last_success_at TIMESTAMP,
last_error TEXT DEFAULT '',
success_count INTEGER DEFAULT 0,
fail_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 创建索引
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_smtp_configs_enabled
ON smtp_configs(enabled, priority)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_smtp_configs_primary
ON smtp_configs(is_primary)
""")
# 2. 全局邮件设置表
cursor.execute("""
CREATE TABLE IF NOT EXISTS email_settings (
id INTEGER PRIMARY KEY DEFAULT 1,
enabled INTEGER DEFAULT 0,
failover_enabled INTEGER DEFAULT 1,
register_verify_enabled INTEGER DEFAULT 0,
task_notify_enabled INTEGER DEFAULT 0,
base_url TEXT DEFAULT '',
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 初始化默认设置
cursor.execute("""
INSERT OR IGNORE INTO email_settings (id, enabled, failover_enabled)
VALUES (1, 0, 1)
""")
# 3. 邮件验证Token表
cursor.execute("""
CREATE TABLE IF NOT EXISTS email_tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
email TEXT NOT NULL,
token TEXT UNIQUE NOT NULL,
token_type TEXT NOT NULL,
expires_at TIMESTAMP NOT NULL,
used INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_tokens_token ON email_tokens(token)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_tokens_email ON email_tokens(email)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_tokens_expires ON email_tokens(expires_at)")
# 4. 邮件发送日志表
cursor.execute("""
CREATE TABLE IF NOT EXISTS email_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
smtp_config_id INTEGER,
email_to TEXT NOT NULL,
email_type TEXT NOT NULL,
subject TEXT NOT NULL,
status TEXT NOT NULL,
error_message TEXT,
attachment_count INTEGER DEFAULT 0,
attachment_size INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE SET NULL,
FOREIGN KEY (smtp_config_id) REFERENCES smtp_configs(id) ON DELETE SET NULL
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_logs_user ON email_logs(user_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_logs_status ON email_logs(status)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_logs_type ON email_logs(email_type)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_logs_created ON email_logs(created_at)")
# 5. 邮件发送统计表
cursor.execute("""
CREATE TABLE IF NOT EXISTS email_stats (
id INTEGER PRIMARY KEY DEFAULT 1,
total_sent INTEGER DEFAULT 0,
total_success INTEGER DEFAULT 0,
total_failed INTEGER DEFAULT 0,
register_sent INTEGER DEFAULT 0,
reset_sent INTEGER DEFAULT 0,
bind_sent INTEGER DEFAULT 0,
task_complete_sent INTEGER DEFAULT 0,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 初始化统计记录
cursor.execute("""
INSERT OR IGNORE INTO email_stats (id) VALUES (1)
""")
conn.commit()
print("[邮件服务] 数据库表初始化完成")
# ============ SMTP配置管理 ============
def get_email_settings() -> Dict[str, Any]:
"""获取全局邮件设置"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 先检查表结构,添加新字段(兼容旧版本数据库)
try:
cursor.execute("SELECT register_verify_enabled FROM email_settings LIMIT 1")
except:
cursor.execute("ALTER TABLE email_settings ADD COLUMN register_verify_enabled INTEGER DEFAULT 0")
conn.commit()
try:
cursor.execute("SELECT base_url FROM email_settings LIMIT 1")
except:
cursor.execute("ALTER TABLE email_settings ADD COLUMN base_url TEXT DEFAULT ''")
conn.commit()
try:
cursor.execute("SELECT task_notify_enabled FROM email_settings LIMIT 1")
except:
cursor.execute("ALTER TABLE email_settings ADD COLUMN task_notify_enabled INTEGER DEFAULT 0")
conn.commit()
cursor.execute("""
SELECT enabled, failover_enabled, register_verify_enabled, base_url,
task_notify_enabled, updated_at
FROM email_settings WHERE id = 1
""")
row = cursor.fetchone()
if row:
return {
'enabled': bool(row[0]),
'failover_enabled': bool(row[1]),
'register_verify_enabled': bool(row[2]) if row[2] is not None else False,
'base_url': row[3] or '',
'task_notify_enabled': bool(row[4]) if row[4] is not None else False,
'updated_at': row[5]
}
return {
'enabled': False,
'failover_enabled': True,
'register_verify_enabled': False,
'base_url': '',
'task_notify_enabled': False,
'updated_at': None
}
def update_email_settings(
enabled: bool,
failover_enabled: bool,
register_verify_enabled: bool = None,
base_url: str = None,
task_notify_enabled: bool = None
) -> bool:
"""更新全局邮件设置"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 构建动态更新语句
updates = ['enabled = ?', 'failover_enabled = ?', 'updated_at = CURRENT_TIMESTAMP']
params = [int(enabled), int(failover_enabled)]
if register_verify_enabled is not None:
updates.append('register_verify_enabled = ?')
params.append(int(register_verify_enabled))
if base_url is not None:
updates.append('base_url = ?')
params.append(base_url)
if task_notify_enabled is not None:
updates.append('task_notify_enabled = ?')
params.append(int(task_notify_enabled))
cursor.execute(f"""
UPDATE email_settings
SET {', '.join(updates)}
WHERE id = 1
""", params)
conn.commit()
return True
def get_smtp_configs(include_password: bool = False) -> List[Dict[str, Any]]:
"""获取所有SMTP配置列表"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, name, enabled, is_primary, priority, host, port,
username, password, use_ssl, use_tls, sender_name, sender_email,
daily_limit, daily_sent, daily_reset_date,
last_success_at, last_error, success_count, fail_count,
created_at, updated_at
FROM smtp_configs
ORDER BY is_primary DESC, priority ASC, id ASC
""")
configs = []
for row in cursor.fetchall():
config = {
'id': row[0],
'name': row[1],
'enabled': bool(row[2]),
'is_primary': bool(row[3]),
'priority': row[4],
'host': row[5],
'port': row[6],
'username': row[7],
'password': '******' if row[8] and not include_password else (decrypt_password(row[8]) if include_password and row[8] else ''),
'has_password': bool(row[8]),
'use_ssl': bool(row[9]),
'use_tls': bool(row[10]),
'sender_name': row[11],
'sender_email': row[12],
'daily_limit': row[13],
'daily_sent': row[14],
'daily_reset_date': row[15],
'last_success_at': row[16],
'last_error': row[17],
'success_count': row[18],
'fail_count': row[19],
'created_at': row[20],
'updated_at': row[21]
}
# 计算成功率
total = config['success_count'] + config['fail_count']
config['success_rate'] = round(config['success_count'] / total * 100, 1) if total > 0 else 0
configs.append(config)
return configs
def get_smtp_config(config_id: int, include_password: bool = False) -> Optional[Dict[str, Any]]:
"""获取单个SMTP配置"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, name, enabled, is_primary, priority, host, port,
username, password, use_ssl, use_tls, sender_name, sender_email,
daily_limit, daily_sent, daily_reset_date,
last_success_at, last_error, success_count, fail_count,
created_at, updated_at
FROM smtp_configs WHERE id = ?
""", (config_id,))
row = cursor.fetchone()
if not row:
return None
return {
'id': row[0],
'name': row[1],
'enabled': bool(row[2]),
'is_primary': bool(row[3]),
'priority': row[4],
'host': row[5],
'port': row[6],
'username': row[7],
'password': '******' if row[8] and not include_password else (decrypt_password(row[8]) if include_password and row[8] else ''),
'has_password': bool(row[8]),
'use_ssl': bool(row[9]),
'use_tls': bool(row[10]),
'sender_name': row[11],
'sender_email': row[12],
'daily_limit': row[13],
'daily_sent': row[14],
'daily_reset_date': row[15],
'last_success_at': row[16],
'last_error': row[17],
'success_count': row[18],
'fail_count': row[19],
'created_at': row[20],
'updated_at': row[21]
}
def create_smtp_config(data: Dict[str, Any]) -> int:
"""创建新的SMTP配置"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 加密密码
password = encrypt_password(data.get('password', '')) if data.get('password') else ''
cursor.execute("""
INSERT INTO smtp_configs
(name, enabled, is_primary, priority, host, port, username, password,
use_ssl, use_tls, sender_name, sender_email, daily_limit)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
data.get('name', '默认配置'),
int(data.get('enabled', True)),
int(data.get('is_primary', False)),
data.get('priority', 0),
data.get('host', ''),
data.get('port', 465),
data.get('username', ''),
password,
int(data.get('use_ssl', True)),
int(data.get('use_tls', False)),
data.get('sender_name', '自动化学习'),
data.get('sender_email', ''),
data.get('daily_limit', 0)
))
config_id = cursor.lastrowid
conn.commit()
return config_id
def update_smtp_config(config_id: int, data: Dict[str, Any]) -> bool:
"""更新SMTP配置"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
updates = []
params = []
field_mapping = {
'name': 'name',
'enabled': 'enabled',
'priority': 'priority',
'host': 'host',
'port': 'port',
'username': 'username',
'use_ssl': 'use_ssl',
'use_tls': 'use_tls',
'sender_name': 'sender_name',
'sender_email': 'sender_email',
'daily_limit': 'daily_limit'
}
for key, db_field in field_mapping.items():
if key in data:
value = data[key]
if key in ('enabled', 'use_ssl', 'use_tls'):
value = int(value)
updates.append(f"{db_field} = ?")
params.append(value)
# 密码特殊处理:只有当提供了新密码时才更新
if 'password' in data and data['password'] and data['password'] != '******':
updates.append("password = ?")
params.append(encrypt_password(data['password']))
if not updates:
return False
updates.append("updated_at = CURRENT_TIMESTAMP")
params.append(config_id)
cursor.execute(f"""
UPDATE smtp_configs SET {', '.join(updates)} WHERE id = ?
""", params)
conn.commit()
return cursor.rowcount > 0
def delete_smtp_config(config_id: int) -> bool:
"""删除SMTP配置"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM smtp_configs WHERE id = ?", (config_id,))
conn.commit()
return cursor.rowcount > 0
def set_primary_smtp_config(config_id: int) -> bool:
"""设置主SMTP配置"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 先取消所有主配置标记
cursor.execute("UPDATE smtp_configs SET is_primary = 0")
# 设置新的主配置
cursor.execute("""
UPDATE smtp_configs
SET is_primary = 1, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (config_id,))
conn.commit()
return cursor.rowcount > 0
def _get_available_smtp_config(failover: bool = True) -> Optional[Dict[str, Any]]:
"""
获取可用的SMTP配置
优先级: 主配置 > 按priority排序的启用配置
"""
today = get_beijing_today()
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 先重置过期的每日计数
cursor.execute("""
UPDATE smtp_configs
SET daily_sent = 0, daily_reset_date = ?
WHERE daily_reset_date != ? OR daily_reset_date IS NULL OR daily_reset_date = ''
""", (today, today))
conn.commit()
# 获取所有启用的配置,按优先级排序
cursor.execute("""
SELECT id, name, host, port, username, password, use_ssl, use_tls,
sender_name, sender_email, daily_limit, daily_sent, is_primary
FROM smtp_configs
WHERE enabled = 1
ORDER BY is_primary DESC, priority ASC, id ASC
""")
configs = cursor.fetchall()
for row in configs:
config_id, name, host, port, username, password, use_ssl, use_tls, \
sender_name, sender_email, daily_limit, daily_sent, is_primary = row
# 检查每日限额
if daily_limit > 0 and daily_sent >= daily_limit:
continue # 超过限额,跳过此配置
# 解密密码
decrypted_password = decrypt_password(password) if password else ''
return {
'id': config_id,
'name': name,
'host': host,
'port': port,
'username': username,
'password': decrypted_password,
'use_ssl': bool(use_ssl),
'use_tls': bool(use_tls),
'sender_name': sender_name,
'sender_email': sender_email,
'is_primary': bool(is_primary)
}
return None
def _update_smtp_stats(config_id: int, success: bool, error: str = ''):
"""更新SMTP配置的统计信息"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
if success:
cursor.execute("""
UPDATE smtp_configs
SET daily_sent = daily_sent + 1,
success_count = success_count + 1,
last_success_at = CURRENT_TIMESTAMP,
last_error = '',
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (config_id,))
else:
cursor.execute("""
UPDATE smtp_configs
SET fail_count = fail_count + 1,
last_error = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (error[:500], config_id))
conn.commit()
# ============ 邮件发送核心 ============
class EmailSender:
"""邮件发送器"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.server = None
def connect(self) -> bool:
"""连接SMTP服务器"""
try:
if self.config['use_ssl']:
self.server = smtplib.SMTP_SSL(
self.config['host'],
self.config['port'],
timeout=30
)
else:
self.server = smtplib.SMTP(
self.config['host'],
self.config['port'],
timeout=30
)
if self.config['use_tls']:
self.server.starttls()
self.server.login(self.config['username'], self.config['password'])
return True
except Exception as e:
print(f"[邮件服务] SMTP连接失败 [{self.config['name']}]: {e}")
self.server = None
raise
def disconnect(self):
"""断开SMTP连接"""
if self.server:
try:
self.server.quit()
except Exception:
pass
self.server = None
def send(self, to_email: str, subject: str, body: str,
html_body: str = None, attachments: List[Dict] = None) -> bool:
"""
发送邮件
Args:
to_email: 收件人邮箱
subject: 邮件主题
body: 纯文本正文
html_body: HTML正文可选
attachments: 附件列表 [{'filename': 'xxx', 'data': bytes, 'mime_type': 'xxx'}]
"""
try:
if not self.server:
self.connect()
# 构建邮件
if html_body or attachments:
msg = MIMEMultipart('mixed')
# 添加正文
if html_body:
text_part = MIMEMultipart('alternative')
text_part.attach(MIMEText(body, 'plain', 'utf-8'))
text_part.attach(MIMEText(html_body, 'html', 'utf-8'))
msg.attach(text_part)
else:
msg.attach(MIMEText(body, 'plain', 'utf-8'))
# 添加附件
if attachments:
for att in attachments:
part = MIMEBase('application', 'octet-stream')
part.set_payload(att['data'])
encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
'attachment',
filename=('utf-8', '', att['filename'])
)
msg.attach(part)
else:
msg = MIMEText(body, 'plain', 'utf-8')
# 设置邮件头
msg['Subject'] = Header(subject, 'utf-8')
msg['From'] = formataddr((
self.config['sender_name'],
self.config['sender_email'] or self.config['username']
))
msg['To'] = to_email
# 发送
self.server.sendmail(
self.config['sender_email'] or self.config['username'],
[to_email],
msg.as_string()
)
return True
except Exception as e:
print(f"[邮件服务] 发送失败: {e}")
raise
def send_email(
to_email: str,
subject: str,
body: str,
html_body: str = None,
attachments: List[Dict] = None,
email_type: str = '',
user_id: int = None,
log_callback: Callable = None
) -> Dict[str, Any]:
"""
发送邮件(带故障转移)
Returns:
{'success': bool, 'error': str, 'config_id': int}
"""
# 检查全局开关
settings = get_email_settings()
if not settings['enabled']:
return {'success': False, 'error': '邮件功能未启用', 'config_id': None}
# 获取可用配置
config = _get_available_smtp_config(settings['failover_enabled'])
if not config:
return {'success': False, 'error': '没有可用的SMTP配置', 'config_id': None}
tried_configs = []
last_error = ''
while config:
tried_configs.append(config['id'])
sender = EmailSender(config)
try:
sender.connect()
sender.send(to_email, subject, body, html_body, attachments)
sender.disconnect()
# 更新统计
_update_smtp_stats(config['id'], True)
_log_email_send(user_id, config['id'], to_email, email_type, subject, 'success', '', attachments)
_update_email_stats(email_type, True)
if log_callback:
log_callback(f"[邮件服务] 发送成功: {to_email} (使用: {config['name']})")
return {'success': True, 'error': '', 'config_id': config['id']}
except Exception as e:
last_error = str(e)
sender.disconnect()
# 更新失败统计
_update_smtp_stats(config['id'], False, last_error)
if log_callback:
log_callback(f"[邮件服务] 发送失败 [{config['name']}]: {e}")
# 故障转移:尝试下一个配置
if settings['failover_enabled']:
config = _get_next_available_smtp_config(tried_configs)
else:
config = None
# 所有配置都失败
_log_email_send(user_id, tried_configs[0] if tried_configs else None,
to_email, email_type, subject, 'failed', last_error, attachments)
_update_email_stats(email_type, False)
return {'success': False, 'error': last_error, 'config_id': tried_configs[0] if tried_configs else None}
def _get_next_available_smtp_config(exclude_ids: List[int]) -> Optional[Dict[str, Any]]:
"""获取下一个可用的SMTP配置排除已尝试的"""
today = get_beijing_today()
with db_pool.get_db() as conn:
cursor = conn.cursor()
placeholders = ','.join(['?' for _ in exclude_ids])
cursor.execute(f"""
SELECT id, name, host, port, username, password, use_ssl, use_tls,
sender_name, sender_email, daily_limit, daily_sent, is_primary
FROM smtp_configs
WHERE enabled = 1 AND id NOT IN ({placeholders})
ORDER BY is_primary DESC, priority ASC, id ASC
LIMIT 1
""", exclude_ids)
row = cursor.fetchone()
if not row:
return None
config_id, name, host, port, username, password, use_ssl, use_tls, \
sender_name, sender_email, daily_limit, daily_sent, is_primary = row
# 检查每日限额
if daily_limit > 0 and daily_sent >= daily_limit:
return _get_next_available_smtp_config(exclude_ids + [config_id])
return {
'id': config_id,
'name': name,
'host': host,
'port': port,
'username': username,
'password': decrypt_password(password) if password else '',
'use_ssl': bool(use_ssl),
'use_tls': bool(use_tls),
'sender_name': sender_name,
'sender_email': sender_email,
'is_primary': bool(is_primary)
}
def test_smtp_config(config_id: int, test_email: str) -> Dict[str, Any]:
"""测试SMTP配置"""
config = get_smtp_config(config_id, include_password=True)
if not config:
return {'success': False, 'error': '配置不存在'}
sender = EmailSender({
'name': config['name'],
'host': config['host'],
'port': config['port'],
'username': config['username'],
'password': config['password'],
'use_ssl': config['use_ssl'],
'use_tls': config['use_tls'],
'sender_name': config['sender_name'],
'sender_email': config['sender_email']
})
try:
sender.connect()
sender.send(
test_email,
'自动化学习 - SMTP配置测试',
f'这是一封测试邮件。\n\n配置名称: {config["name"]}\nSMTP服务器: {config["host"]}:{config["port"]}\n\n如果您收到此邮件说明SMTP配置正确。',
None,
None
)
sender.disconnect()
# 更新最后成功时间
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE smtp_configs
SET last_success_at = CURRENT_TIMESTAMP, last_error = ''
WHERE id = ?
""", (config_id,))
conn.commit()
return {'success': True, 'error': ''}
except Exception as e:
# 记录错误
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE smtp_configs SET last_error = ? WHERE id = ?
""", (str(e)[:500], config_id))
conn.commit()
return {'success': False, 'error': str(e)}
# ============ 邮件日志 ============
def _log_email_send(user_id: int, smtp_config_id: int, email_to: str,
email_type: str, subject: str, status: str,
error_message: str, attachments: List[Dict] = None):
"""记录邮件发送日志"""
attachment_count = len(attachments) if attachments else 0
attachment_size = sum(len(a.get('data', b'')) for a in attachments) if attachments else 0
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO email_logs
(user_id, smtp_config_id, email_to, email_type, subject, status,
error_message, attachment_count, attachment_size)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (user_id, smtp_config_id, email_to, email_type, subject, status,
error_message, attachment_count, attachment_size))
conn.commit()
def _update_email_stats(email_type: str, success: bool):
"""更新邮件统计"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
type_field_map = {
EMAIL_TYPE_REGISTER: 'register_sent',
EMAIL_TYPE_RESET: 'reset_sent',
EMAIL_TYPE_BIND: 'bind_sent',
EMAIL_TYPE_TASK_COMPLETE: 'task_complete_sent'
}
type_field = type_field_map.get(email_type, '')
if success:
if type_field:
cursor.execute(f"""
UPDATE email_stats
SET total_sent = total_sent + 1,
total_success = total_success + 1,
{type_field} = {type_field} + 1,
last_updated = CURRENT_TIMESTAMP
WHERE id = 1
""")
else:
cursor.execute("""
UPDATE email_stats
SET total_sent = total_sent + 1,
total_success = total_success + 1,
last_updated = CURRENT_TIMESTAMP
WHERE id = 1
""")
else:
cursor.execute("""
UPDATE email_stats
SET total_sent = total_sent + 1,
total_failed = total_failed + 1,
last_updated = CURRENT_TIMESTAMP
WHERE id = 1
""")
conn.commit()
def get_email_stats() -> Dict[str, Any]:
"""获取邮件统计"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT total_sent, total_success, total_failed,
register_sent, reset_sent, bind_sent, task_complete_sent,
last_updated
FROM email_stats WHERE id = 1
""")
row = cursor.fetchone()
if row:
return {
'total_sent': row[0],
'total_success': row[1],
'total_failed': row[2],
'register_sent': row[3],
'reset_sent': row[4],
'bind_sent': row[5],
'task_complete_sent': row[6],
'last_updated': row[7],
'success_rate': round(row[1] / row[0] * 100, 1) if row[0] > 0 else 0
}
return {}
def get_email_logs(page: int = 1, page_size: int = 20,
email_type: str = None, status: str = None) -> Dict[str, Any]:
"""获取邮件日志(分页)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
where_clauses = []
params = []
if email_type:
where_clauses.append("l.email_type = ?")
params.append(email_type)
if status:
where_clauses.append("l.status = ?")
params.append(status)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
# 获取总数
cursor.execute(f"SELECT COUNT(*) FROM email_logs l {where_sql}", params)
total = cursor.fetchone()[0]
# 获取分页数据
offset = (page - 1) * page_size
cursor.execute(f"""
SELECT l.id, l.user_id, l.smtp_config_id, l.email_to, l.email_type,
l.subject, l.status, l.error_message, l.attachment_count,
l.attachment_size, l.created_at, u.username, s.name as smtp_name
FROM email_logs l
LEFT JOIN users u ON l.user_id = u.id
LEFT JOIN smtp_configs s ON l.smtp_config_id = s.id
{where_sql}
ORDER BY l.created_at DESC
LIMIT ? OFFSET ?
""", params + [page_size, offset])
logs = []
for row in cursor.fetchall():
logs.append({
'id': row[0],
'user_id': row[1],
'smtp_config_id': row[2],
'email_to': row[3],
'email_type': row[4],
'subject': row[5],
'status': row[6],
'error_message': row[7],
'attachment_count': row[8],
'attachment_size': row[9],
'created_at': row[10],
'username': row[11],
'smtp_name': row[12]
})
return {
'logs': logs,
'total': total,
'page': page,
'page_size': page_size,
'total_pages': (total + page_size - 1) // page_size
}
def cleanup_email_logs(days: int = 30) -> int:
"""清理过期邮件日志"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
DELETE FROM email_logs
WHERE datetime(created_at) < datetime('now', '-' || ? || ' days')
""", (days,))
deleted = cursor.rowcount
conn.commit()
return deleted
# ============ Token管理 ============
def generate_email_token(email: str, token_type: str, user_id: int = None) -> str:
"""生成邮件验证Token"""
token = secrets.token_urlsafe(32)
# 设置过期时间
expire_seconds = {
EMAIL_TYPE_REGISTER: TOKEN_EXPIRE_REGISTER,
EMAIL_TYPE_RESET: TOKEN_EXPIRE_RESET,
EMAIL_TYPE_BIND: TOKEN_EXPIRE_BIND
}.get(token_type, TOKEN_EXPIRE_REGISTER)
expires_at = datetime.now() + timedelta(seconds=expire_seconds)
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO email_tokens (user_id, email, token, token_type, expires_at)
VALUES (?, ?, ?, ?, ?)
""", (user_id, email, token, token_type, expires_at))
conn.commit()
return token
def verify_email_token(token: str, token_type: str) -> Optional[Dict[str, Any]]:
"""
验证Token
Returns:
成功返回 {'user_id': int, 'email': str},失败返回 None
"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, user_id, email, expires_at, used
FROM email_tokens
WHERE token = ? AND token_type = ?
""", (token, token_type))
row = cursor.fetchone()
if not row:
return None
token_id, user_id, email, expires_at, used = row
# 检查是否已使用
if used:
return None
# 检查是否过期
if parse_datetime(expires_at) < datetime.now():
return None
# 标记为已使用
cursor.execute("""
UPDATE email_tokens SET used = 1 WHERE id = ?
""", (token_id,))
conn.commit()
return {'user_id': user_id, 'email': email}
def check_rate_limit(email: str, token_type: str) -> bool:
"""
检查发送频率限制
Returns:
True = 可以发送, False = 受限
"""
limit_seconds = {
EMAIL_TYPE_REGISTER: RATE_LIMIT_REGISTER,
EMAIL_TYPE_RESET: RATE_LIMIT_RESET,
EMAIL_TYPE_BIND: RATE_LIMIT_BIND
}.get(token_type, 60)
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT created_at FROM email_tokens
WHERE email = ? AND token_type = ?
ORDER BY created_at DESC
LIMIT 1
""", (email, token_type))
row = cursor.fetchone()
if not row:
return True
last_sent = parse_datetime(row[0])
elapsed = (datetime.now() - last_sent).total_seconds()
return elapsed >= limit_seconds
def cleanup_expired_tokens() -> int:
"""清理过期Token"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
DELETE FROM email_tokens
WHERE datetime(expires_at) < datetime('now')
""")
deleted = cursor.rowcount
conn.commit()
return deleted
# ============ 注册验证邮件 ============
def send_register_verification_email(
email: str,
username: str,
user_id: int,
base_url: str = None
) -> Dict[str, Any]:
"""
发送注册验证邮件
Args:
email: 用户邮箱
username: 用户名
user_id: 用户ID
base_url: 网站基础URL
Returns:
{'success': bool, 'error': str, 'token': str}
"""
# 检查发送频率限制
if not check_rate_limit(email, EMAIL_TYPE_REGISTER):
return {
'success': False,
'error': '发送太频繁,请稍后再试',
'token': None
}
# 生成验证Token
token = generate_email_token(email, EMAIL_TYPE_REGISTER, user_id)
# 获取base_url
if not base_url:
settings = get_email_settings()
base_url = settings.get('base_url', '')
if not base_url:
# 尝试从配置获取
try:
from app_config import Config
base_url = Config.BASE_URL
except:
base_url = 'http://localhost:51233'
# 生成验证链接
verify_url = f"{base_url.rstrip('/')}/api/verify-email/{token}"
# 读取邮件模板
template_path = os.path.join(os.path.dirname(__file__), 'templates', 'email', 'register.html')
try:
with open(template_path, 'r', encoding='utf-8') as f:
html_template = f.read()
except FileNotFoundError:
# 使用简单的HTML模板
html_template = """
<html>
<body>
<h1>邮箱验证</h1>
<p>您好,{{ username }}</p>
<p>请点击下面的链接验证您的邮箱地址:</p>
<p><a href="{{ verify_url }}">{{ verify_url }}</a></p>
<p>此链接24小时内有效。</p>
</body>
</html>
"""
# 替换模板变量
html_body = html_template.replace('{{ username }}', username)
html_body = html_body.replace('{{ verify_url }}', verify_url)
# 纯文本版本
text_body = f"""
您好,{username}
感谢您注册自动化学习。请点击下面的链接验证您的邮箱地址:
{verify_url}
此链接24小时内有效。
如果您没有注册过账号,请忽略此邮件。
"""
# 发送邮件
result = send_email(
to_email=email,
subject='【自动化学习】邮箱验证',
body=text_body,
html_body=html_body,
email_type=EMAIL_TYPE_REGISTER,
user_id=user_id
)
if result['success']:
return {'success': True, 'error': '', 'token': token}
else:
return {'success': False, 'error': result['error'], 'token': None}
def resend_register_verification_email(user_id: int, email: str, username: str) -> Dict[str, Any]:
"""
重发注册验证邮件
Args:
user_id: 用户ID
email: 用户邮箱
username: 用户名
Returns:
{'success': bool, 'error': str}
"""
# 检查是否有未过期的token
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 先使旧token失效
cursor.execute("""
UPDATE email_tokens SET used = 1
WHERE user_id = ? AND token_type = ? AND used = 0
""", (user_id, EMAIL_TYPE_REGISTER))
conn.commit()
# 发送新的验证邮件
return send_register_verification_email(email, username, user_id)
# ============ 密码重置邮件 ============
def send_password_reset_email(
email: str,
username: str,
user_id: int,
base_url: str = None
) -> Dict[str, Any]:
"""
发送密码重置邮件
Args:
email: 用户邮箱
username: 用户名
user_id: 用户ID
base_url: 网站基础URL
Returns:
{'success': bool, 'error': str, 'token': str}
"""
# 检查发送频率限制密码重置限制5分钟
if not check_rate_limit(email, EMAIL_TYPE_RESET):
return {
'success': False,
'error': '发送太频繁请5分钟后再试',
'token': None
}
# 使旧的重置token失效
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE email_tokens SET used = 1
WHERE user_id = ? AND token_type = ? AND used = 0
""", (user_id, EMAIL_TYPE_RESET))
conn.commit()
# 生成新的验证Token
token = generate_email_token(email, EMAIL_TYPE_RESET, user_id)
# 获取base_url
if not base_url:
settings = get_email_settings()
base_url = settings.get('base_url', '')
if not base_url:
try:
from app_config import Config
base_url = Config.BASE_URL
except:
base_url = 'http://localhost:51233'
# 生成重置链接
reset_url = f"{base_url.rstrip('/')}/reset-password/{token}"
# 读取邮件模板
template_path = os.path.join(os.path.dirname(__file__), 'templates', 'email', 'reset_password.html')
try:
with open(template_path, 'r', encoding='utf-8') as f:
html_template = f.read()
except FileNotFoundError:
html_template = """
<html>
<body>
<h1>密码重置</h1>
<p>您好,{{ username }}</p>
<p>请点击下面的链接重置您的密码:</p>
<p><a href="{{ reset_url }}">{{ reset_url }}</a></p>
<p>此链接30分钟内有效。</p>
</body>
</html>
"""
# 替换模板变量
html_body = html_template.replace('{{ username }}', username)
html_body = html_body.replace('{{ reset_url }}', reset_url)
# 纯文本版本
text_body = f"""
您好,{username}
我们收到了您的密码重置请求。请点击下面的链接重置您的密码:
{reset_url}
此链接30分钟内有效。
如果您没有申请过密码重置,请忽略此邮件。
"""
# 发送邮件
result = send_email(
to_email=email,
subject='【自动化学习】密码重置',
body=text_body,
html_body=html_body,
email_type=EMAIL_TYPE_RESET,
user_id=user_id
)
if result['success']:
return {'success': True, 'error': '', 'token': token}
else:
return {'success': False, 'error': result['error'], 'token': None}
def verify_password_reset_token(token: str) -> Optional[Dict[str, Any]]:
"""
验证密码重置Token不标记为已使用
Returns:
成功返回 {'user_id': int, 'email': str},失败返回 None
"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, user_id, email, expires_at, used
FROM email_tokens
WHERE token = ? AND token_type = ?
""", (token, EMAIL_TYPE_RESET))
row = cursor.fetchone()
if not row:
return None
token_id, user_id, email, expires_at, used = row
# 检查是否已使用
if used:
return None
# 检查是否过期
if parse_datetime(expires_at) < datetime.now():
return None
return {'user_id': user_id, 'email': email, 'token_id': token_id}
def confirm_password_reset(token: str) -> Optional[Dict[str, Any]]:
"""
确认密码重置标记Token为已使用
Returns:
成功返回 {'user_id': int, 'email': str},失败返回 None
"""
result = verify_password_reset_token(token)
if not result:
return None
# 标记为已使用
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE email_tokens SET used = 1 WHERE id = ?
""", (result['token_id'],))
conn.commit()
return {'user_id': result['user_id'], 'email': result['email']}
# ============ 邮箱绑定验证 ============
def send_bind_email_verification(
user_id: int,
email: str,
username: str,
base_url: str = None
) -> Dict[str, Any]:
"""
发送邮箱绑定验证邮件
Args:
user_id: 用户ID
email: 要绑定的邮箱
username: 用户名
base_url: 网站基础URL
Returns:
{'success': bool, 'error': str, 'token': str}
"""
# 检查发送频率限制绑定邮件限制1分钟
if not check_rate_limit(email, EMAIL_TYPE_BIND):
return {
'success': False,
'error': '发送太频繁请1分钟后再试',
'token': None
}
# 使旧的绑定token失效
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE email_tokens SET used = 1
WHERE user_id = ? AND token_type = ? AND used = 0
""", (user_id, EMAIL_TYPE_BIND))
conn.commit()
# 生成新的验证Token
token = generate_email_token(email, EMAIL_TYPE_BIND, user_id)
# 获取base_url
if not base_url:
settings = get_email_settings()
base_url = settings.get('base_url', '')
if not base_url:
try:
from app_config import Config
base_url = Config.BASE_URL
except:
base_url = 'http://localhost:51233'
# 生成验证链接
verify_url = f"{base_url.rstrip('/')}/api/verify-bind-email/{token}"
# 读取邮件模板
template_path = os.path.join(os.path.dirname(__file__), 'templates', 'email', 'bind_email.html')
try:
with open(template_path, 'r', encoding='utf-8') as f:
html_template = f.read()
except FileNotFoundError:
html_template = """
<html>
<body>
<h1>邮箱绑定验证</h1>
<p>您好,{{ username }}</p>
<p>请点击下面的链接完成邮箱绑定:</p>
<p><a href="{{ verify_url }}">{{ verify_url }}</a></p>
<p>此链接1小时内有效。</p>
</body>
</html>
"""
# 替换模板变量
html_body = html_template.replace('{{ username }}', username)
html_body = html_body.replace('{{ verify_url }}', verify_url)
# 纯文本版本
text_body = f"""
您好,{username}
您正在绑定此邮箱到您的账号。请点击下面的链接完成验证:
{verify_url}
此链接1小时内有效。
如果这不是您的操作,请忽略此邮件。
"""
# 发送邮件
result = send_email(
to_email=email,
subject='【自动化学习】邮箱绑定验证',
body=text_body,
html_body=html_body,
email_type=EMAIL_TYPE_BIND,
user_id=user_id
)
if result['success']:
return {'success': True, 'error': '', 'token': token}
else:
return {'success': False, 'error': result['error'], 'token': None}
def verify_bind_email_token(token: str) -> Optional[Dict[str, Any]]:
"""
验证邮箱绑定Token
Returns:
成功返回 {'user_id': int, 'email': str},失败返回 None
"""
return verify_email_token(token, EMAIL_TYPE_BIND)
# ============ 异步发送队列 ============
class EmailQueue:
"""邮件异步发送队列"""
def __init__(self, workers: int = QUEUE_WORKERS, max_size: int = QUEUE_MAX_SIZE):
self.queue = queue.Queue(maxsize=max_size)
self.workers = []
self.running = False
self.worker_count = workers
def start(self):
"""启动队列工作线程"""
if self.running:
return
self.running = True
for i in range(self.worker_count):
worker = threading.Thread(target=self._worker, daemon=True, name=f"EmailWorker-{i+1}")
worker.start()
self.workers.append(worker)
print(f"[邮件服务] 异步队列已启动 ({self.worker_count}个工作线程)")
def stop(self):
"""停止队列"""
self.running = False
# 发送停止信号
for _ in self.workers:
try:
self.queue.put(None, timeout=1)
except queue.Full:
pass
# 等待工作线程结束
for worker in self.workers:
worker.join(timeout=5)
self.workers.clear()
print("[邮件服务] 异步队列已停止")
def _worker(self):
"""工作线程"""
while self.running:
try:
task = self.queue.get(timeout=5)
if task is None:
break
self._process_task(task)
except queue.Empty:
continue
except Exception as e:
print(f"[邮件服务] 队列工作线程错误: {e}")
def _process_task(self, task: Dict):
"""处理邮件任务"""
try:
result = send_email(
to_email=task['to_email'],
subject=task['subject'],
body=task['body'],
html_body=task.get('html_body'),
attachments=task.get('attachments'),
email_type=task.get('email_type', ''),
user_id=task.get('user_id'),
log_callback=task.get('log_callback')
)
# 执行回调
if task.get('callback'):
task['callback'](result)
except Exception as e:
print(f"[邮件服务] 处理邮件任务失败: {e}")
if task.get('callback'):
task['callback']({'success': False, 'error': str(e)})
def enqueue(self, to_email: str, subject: str, body: str,
html_body: str = None, attachments: List[Dict] = None,
email_type: str = '', user_id: int = None,
callback: Callable = None, log_callback: Callable = None) -> bool:
"""
将邮件任务加入队列
Returns:
是否成功加入队列
"""
try:
self.queue.put({
'to_email': to_email,
'subject': subject,
'body': body,
'html_body': html_body,
'attachments': attachments,
'email_type': email_type,
'user_id': user_id,
'callback': callback,
'log_callback': log_callback
}, timeout=5)
return True
except queue.Full:
print("[邮件服务] 邮件队列已满")
return False
@property
def pending_count(self) -> int:
"""队列中待处理的任务数"""
return self.queue.qsize()
# 全局队列实例
_email_queue: Optional[EmailQueue] = None
_queue_lock = threading.Lock()
def get_email_queue() -> EmailQueue:
"""获取全局邮件队列(单例)"""
global _email_queue
with _queue_lock:
if _email_queue is None:
_email_queue = EmailQueue()
_email_queue.start()
return _email_queue
def shutdown_email_queue():
"""关闭邮件队列"""
global _email_queue
with _queue_lock:
if _email_queue:
_email_queue.stop()
_email_queue = None
# ============ 便捷函数 ============
def send_email_async(to_email: str, subject: str, body: str,
html_body: str = None, attachments: List[Dict] = None,
email_type: str = '', user_id: int = None,
callback: Callable = None, log_callback: Callable = None) -> bool:
"""异步发送邮件"""
queue = get_email_queue()
return queue.enqueue(
to_email=to_email,
subject=subject,
body=body,
html_body=html_body,
attachments=attachments,
email_type=email_type,
user_id=user_id,
callback=callback,
log_callback=log_callback
)
def create_zip_attachment(files: List[Dict[str, Any]], zip_filename: str = 'screenshots.zip') -> Dict:
"""
创建ZIP附件
Args:
files: [{'filename': 'xxx.png', 'data': bytes}]
zip_filename: ZIP文件名
Returns:
{'filename': 'xxx.zip', 'data': bytes}
"""
buffer = BytesIO()
with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for f in files:
zf.writestr(f['filename'], f['data'])
return {
'filename': zip_filename,
'data': buffer.getvalue()
}
# ============ 任务完成通知邮件 ============
def send_task_complete_email(
user_id: int,
email: str,
username: str,
account_name: str,
browse_type: str,
total_items: int,
total_attachments: int,
screenshot_path: str = None,
log_callback: Callable = None
) -> Dict[str, Any]:
"""
发送任务完成通知邮件(支持附件大小限制,超过则分批发送)
Args:
user_id: 用户ID
email: 收件人邮箱
username: 用户名
account_name: 账号名称
browse_type: 浏览类型
total_items: 浏览条目数
total_attachments: 附件数量
screenshot_path: 截图文件路径
log_callback: 日志回调函数
Returns:
{'success': bool, 'error': str, 'emails_sent': int}
"""
# 检查邮件功能是否启用
settings = get_email_settings()
if not settings.get('enabled', False):
return {'success': False, 'error': '邮件功能未启用', 'emails_sent': 0}
if not settings.get('task_notify_enabled', False):
return {'success': False, 'error': '任务通知功能未启用', 'emails_sent': 0}
if not email:
return {'success': False, 'error': '用户未设置邮箱', 'emails_sent': 0}
# 获取完成时间
complete_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 读取截图文件
screenshot_data = None
screenshot_filename = None
if screenshot_path and os.path.exists(screenshot_path):
try:
with open(screenshot_path, 'rb') as f:
screenshot_data = f.read()
screenshot_filename = os.path.basename(screenshot_path)
except Exception as e:
if log_callback:
log_callback(f"[邮件] 读取截图文件失败: {e}")
# 读取邮件模板
template_path = os.path.join(os.path.dirname(__file__), 'templates', 'email', 'task_complete.html')
try:
with open(template_path, 'r', encoding='utf-8') as f:
html_template = f.read()
except FileNotFoundError:
html_template = """
<html>
<body>
<h1>任务完成通知</h1>
<p>您好,{{ username }}</p>
<p>您的浏览任务已完成。</p>
<p>账号:{{ account_name }}</p>
<p>浏览类型:{{ browse_type }}</p>
<p>浏览条目:{{ total_items }} 条</p>
<p>附件数量:{{ total_attachments }} 个</p>
<p>完成时间:{{ complete_time }}</p>
</body>
</html>
"""
# 准备发送
emails_sent = 0
last_error = ''
# 检查附件大小,决定是否需要分批发送
if screenshot_data and len(screenshot_data) > MAX_ATTACHMENT_SIZE:
# 附件超过限制,不压缩直接发送(大图片压缩效果不大)
# 这种情况很少见,但作为容错处理
batch_info = '(由于文件较大,截图将单独发送)'
# 先发送不带附件的通知邮件
html_body = html_template.replace('{{ username }}', username)
html_body = html_body.replace('{{ account_name }}', account_name)
html_body = html_body.replace('{{ browse_type }}', browse_type)
html_body = html_body.replace('{{ total_items }}', str(total_items))
html_body = html_body.replace('{{ total_attachments }}', str(total_attachments))
html_body = html_body.replace('{{ complete_time }}', complete_time)
html_body = html_body.replace('{{ batch_info }}', batch_info)
text_body = f"""
您好,{username}
您的浏览任务已完成。
账号:{account_name}
浏览类型:{browse_type}
浏览条目:{total_items}
附件数量:{total_attachments}
完成时间:{complete_time}
截图将在下一封邮件中发送。
"""
result = send_email(
to_email=email,
subject=f'【自动化学习】任务完成 - {account_name}',
body=text_body,
html_body=html_body,
email_type=EMAIL_TYPE_TASK_COMPLETE,
user_id=user_id,
log_callback=log_callback
)
if result['success']:
emails_sent += 1
if log_callback:
log_callback(f"[邮件] 任务通知已发送")
else:
last_error = result['error']
# 单独发送截图附件
attachment = [{'filename': screenshot_filename, 'data': screenshot_data}]
result2 = send_email(
to_email=email,
subject=f'【自动化学习】任务截图 - {account_name}',
body=f'这是 {account_name} 的任务截图。',
attachments=attachment,
email_type=EMAIL_TYPE_TASK_COMPLETE,
user_id=user_id,
log_callback=log_callback
)
if result2['success']:
emails_sent += 1
if log_callback:
log_callback(f"[邮件] 截图附件已发送")
else:
last_error = result2['error']
else:
# 正常情况:附件大小在限制内,一次性发送
batch_info = ''
attachments = None
if screenshot_data:
attachments = [{'filename': screenshot_filename, 'data': screenshot_data}]
html_body = html_template.replace('{{ username }}', username)
html_body = html_body.replace('{{ account_name }}', account_name)
html_body = html_body.replace('{{ browse_type }}', browse_type)
html_body = html_body.replace('{{ total_items }}', str(total_items))
html_body = html_body.replace('{{ total_attachments }}', str(total_attachments))
html_body = html_body.replace('{{ complete_time }}', complete_time)
html_body = html_body.replace('{{ batch_info }}', batch_info)
text_body = f"""
您好,{username}
您的浏览任务已完成。
账号:{account_name}
浏览类型:{browse_type}
浏览条目:{total_items}
附件数量:{total_attachments}
完成时间:{complete_time}
{'截图已附在邮件中。' if screenshot_data else ''}
"""
result = send_email(
to_email=email,
subject=f'【自动化学习】任务完成 - {account_name}',
body=text_body,
html_body=html_body,
attachments=attachments,
email_type=EMAIL_TYPE_TASK_COMPLETE,
user_id=user_id,
log_callback=log_callback
)
if result['success']:
emails_sent += 1
if log_callback:
log_callback(f"[邮件] 任务通知已发送")
else:
last_error = result['error']
return {
'success': emails_sent > 0,
'error': last_error if emails_sent == 0 else '',
'emails_sent': emails_sent
}
def send_task_complete_email_async(
user_id: int,
email: str,
username: str,
account_name: str,
browse_type: str,
total_items: int,
total_attachments: int,
screenshot_path: str = None,
log_callback: Callable = None
):
"""异步发送任务完成通知邮件"""
import threading
thread = threading.Thread(
target=send_task_complete_email,
args=(user_id, email, username, account_name, browse_type,
total_items, total_attachments, screenshot_path, log_callback),
daemon=True
)
thread.start()
def send_batch_task_complete_email(
user_id: int,
email: str,
username: str,
schedule_name: str,
browse_type: str,
screenshots: List[Dict[str, Any]]
) -> Dict[str, Any]:
"""
发送批次任务完成通知邮件(多账号截图打包)
Args:
user_id: 用户ID
email: 收件人邮箱
username: 用户名
schedule_name: 定时任务名称
browse_type: 浏览类型
screenshots: 截图列表 [{'account_name': x, 'path': y, 'items': n, 'attachments': m}, ...]
Returns:
{'success': bool, 'error': str}
"""
# 检查邮件功能是否启用
settings = get_email_settings()
if not settings.get('enabled', False):
return {'success': False, 'error': '邮件功能未启用'}
if not settings.get('task_notify_enabled', False):
return {'success': False, 'error': '任务通知功能未启用'}
if not email:
return {'success': False, 'error': '用户未设置邮箱'}
if not screenshots:
return {'success': False, 'error': '没有截图需要发送'}
# 获取完成时间
complete_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 统计信息
total_items_sum = sum(s.get('items', 0) for s in screenshots)
total_attachments_sum = sum(s.get('attachments', 0) for s in screenshots)
account_count = len(screenshots)
# 构建账号详情HTML
accounts_html = ""
for s in screenshots:
accounts_html += f"""
<tr>
<td style="padding: 8px; border: 1px solid #ddd;">{s.get('account_name', '未知')}</td>
<td style="padding: 8px; border: 1px solid #ddd; text-align: center;">{s.get('items', 0)}</td>
<td style="padding: 8px; border: 1px solid #ddd; text-align: center;">{s.get('attachments', 0)}</td>
</tr>
"""
# 构建HTML邮件内容
html_content = f"""
<html>
<body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333;">
<div style="max-width: 600px; margin: 0 auto; padding: 20px;">
<h2 style="color: #667eea;">定时任务完成通知</h2>
<p>您好,{username}</p>
<p>您的定时任务 <strong>{schedule_name}</strong> 已完成执行。</p>
<div style="background: #f8f9fa; padding: 15px; border-radius: 8px; margin: 15px 0;">
<p style="margin: 5px 0;"><strong>浏览类型:</strong>{browse_type}</p>
<p style="margin: 5px 0;"><strong>执行账号:</strong>{account_count} 个</p>
<p style="margin: 5px 0;"><strong>总浏览条目:</strong>{total_items_sum} 条</p>
<p style="margin: 5px 0;"><strong>总附件数量:</strong>{total_attachments_sum} 个</p>
<p style="margin: 5px 0;"><strong>完成时间:</strong>{complete_time}</p>
</div>
<h3 style="color: #667eea; margin-top: 20px;">账号执行详情</h3>
<table style="width: 100%; border-collapse: collapse; margin: 10px 0;">
<tr style="background: #667eea; color: white;">
<th style="padding: 10px; border: 1px solid #ddd;">账号名称</th>
<th style="padding: 10px; border: 1px solid #ddd;">浏览条目</th>
<th style="padding: 10px; border: 1px solid #ddd;">附件数量</th>
</tr>
{accounts_html}
</table>
<p style="color: #666; font-size: 12px; margin-top: 20px;">
截图已打包为ZIP附件请查收。
</p>
</div>
</body>
</html>
"""
# 收集所有截图文件
screenshot_files = []
for s in screenshots:
if s.get('path') and os.path.exists(s['path']):
try:
with open(s['path'], 'rb') as f:
screenshot_files.append({
'filename': f"{s.get('account_name', 'screenshot')}_{os.path.basename(s['path'])}",
'data': f.read()
})
except Exception as e:
print(f"[邮件] 读取截图文件失败: {e}")
# 如果有截图打包成ZIP
zip_data = None
zip_filename = None
if screenshot_files:
try:
zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for sf in screenshot_files:
zf.writestr(sf['filename'], sf['data'])
zip_data = zip_buffer.getvalue()
zip_filename = f"screenshots_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip"
except Exception as e:
print(f"[邮件] 打包截图失败: {e}")
# 发送邮件
attachments = []
if zip_data and zip_filename:
attachments.append({
'filename': zip_filename,
'data': zip_data,
'mime_type': 'application/zip'
})
result = send_email(
to_email=email,
subject=f'【自动化学习】定时任务完成 - {schedule_name}',
body='',
html_body=html_content,
attachments=attachments,
email_type='batch_task_complete'
)
if result['success']:
# 记录发送日志
log_email_send(
email_type='batch_task_complete',
to_email=email,
subject=f'定时任务完成 - {schedule_name}',
success=True
)
return {'success': True}
else:
log_email_send(
email_type='batch_task_complete',
to_email=email,
subject=f'定时任务完成 - {schedule_name}',
success=False,
error=result.get('error', '')
)
return {'success': False, 'error': result.get('error', '发送失败')}
def send_batch_task_complete_email_async(
user_id: int,
email: str,
username: str,
schedule_name: str,
browse_type: str,
screenshots: List[Dict[str, Any]]
):
"""异步发送批次任务完成通知邮件"""
import threading
thread = threading.Thread(
target=send_batch_task_complete_email,
args=(user_id, email, username, schedule_name, browse_type, screenshots),
daemon=True
)
thread.start()
# ============ 初始化 ============
def init_email_service():
"""初始化邮件服务"""
init_email_tables()
get_email_queue()
print("[邮件服务] 初始化完成")
if __name__ == '__main__':
# 测试代码
init_email_tables()
print("邮件服务模块测试完成")