1. 添加任务完成邮件模板 (templates/email/task_complete.html) 2. 在email_service.py中添加: - task_notify_enabled 字段支持 - send_task_complete_email() 函数,支持附件大小限制和分批发送 - send_task_complete_email_async() 异步发送函数 - MAX_ATTACHMENT_SIZE 常量 (10MB) 3. 更新app.py: - 邮件设置API支持task_notify_enabled字段 - 截图回调中集成任务完成邮件发送 4. 更新admin.html: - 添加"启用任务完成通知"开关 - 更新loadEmailSettings/updateEmailSettings函数 附件超过10MB时会自动分两封邮件发送(通知+截图),作为容错机制 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1825 lines
58 KiB
Python
1825 lines
58 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
邮件服务模块
|
||
功能:
|
||
1. 多SMTP配置管理(主备切换、故障转移)
|
||
2. 发送纯文本/HTML邮件
|
||
3. 发送带附件邮件(支持ZIP压缩)
|
||
4. 异步发送队列
|
||
5. 每日发送限额控制
|
||
6. 发送日志记录
|
||
"""
|
||
|
||
import os
|
||
import smtplib
|
||
import threading
|
||
import queue
|
||
import time
|
||
import zipfile
|
||
import secrets
|
||
from datetime import datetime, timedelta
|
||
from email.mime.text import MIMEText
|
||
from email.mime.multipart import MIMEMultipart
|
||
from email.mime.base import MIMEBase
|
||
from email.mime.image import MIMEImage
|
||
from email import encoders
|
||
from email.header import Header
|
||
from email.utils import formataddr
|
||
from typing import Optional, List, Dict, Any, Callable
|
||
from io import BytesIO
|
||
|
||
import db_pool
|
||
from crypto_utils import encrypt_password, decrypt_password, is_encrypted
|
||
|
||
|
||
# ============ 常量配置 ============
|
||
|
||
# 邮件类型
|
||
EMAIL_TYPE_REGISTER = 'register' # 注册验证
|
||
EMAIL_TYPE_RESET = 'reset' # 密码重置
|
||
EMAIL_TYPE_BIND = 'bind' # 邮箱绑定
|
||
EMAIL_TYPE_TASK_COMPLETE = 'task_complete' # 任务完成通知
|
||
|
||
# Token有效期(秒)
|
||
TOKEN_EXPIRE_REGISTER = 24 * 60 * 60 # 注册验证: 24小时
|
||
TOKEN_EXPIRE_RESET = 30 * 60 # 密码重置: 30分钟
|
||
TOKEN_EXPIRE_BIND = 60 * 60 # 邮箱绑定: 1小时
|
||
|
||
# 发送频率限制(秒)
|
||
RATE_LIMIT_REGISTER = 60 # 注册邮件: 1分钟
|
||
RATE_LIMIT_RESET = 5 * 60 # 重置邮件: 5分钟
|
||
RATE_LIMIT_BIND = 60 # 绑定邮件: 1分钟
|
||
|
||
# 异步队列配置
|
||
QUEUE_WORKERS = int(os.environ.get('EMAIL_QUEUE_WORKERS', '2'))
|
||
QUEUE_MAX_SIZE = int(os.environ.get('EMAIL_QUEUE_MAX_SIZE', '100'))
|
||
|
||
# 附件大小限制(字节)
|
||
# 大多数邮件服务商限制单封邮件附件大小为10-25MB
|
||
# 为安全起见,设置为10MB,超过则分批发送
|
||
MAX_ATTACHMENT_SIZE = int(os.environ.get('EMAIL_MAX_ATTACHMENT_SIZE', str(10 * 1024 * 1024))) # 10MB
|
||
|
||
|
||
# ============ 数据库操作 ============
|
||
|
||
def init_email_tables():
|
||
"""初始化邮件相关数据库表"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
|
||
# 1. SMTP配置表(支持多配置)
|
||
cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS smtp_configs (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
name TEXT NOT NULL DEFAULT '默认配置',
|
||
enabled INTEGER DEFAULT 1,
|
||
is_primary INTEGER DEFAULT 0,
|
||
priority INTEGER DEFAULT 0,
|
||
host TEXT NOT NULL DEFAULT '',
|
||
port INTEGER DEFAULT 465,
|
||
username TEXT NOT NULL DEFAULT '',
|
||
password TEXT DEFAULT '',
|
||
use_ssl INTEGER DEFAULT 1,
|
||
use_tls INTEGER DEFAULT 0,
|
||
sender_name TEXT DEFAULT '知识管理平台',
|
||
sender_email TEXT DEFAULT '',
|
||
daily_limit INTEGER DEFAULT 0,
|
||
daily_sent INTEGER DEFAULT 0,
|
||
daily_reset_date TEXT DEFAULT '',
|
||
last_success_at TIMESTAMP,
|
||
last_error TEXT DEFAULT '',
|
||
success_count INTEGER DEFAULT 0,
|
||
fail_count INTEGER DEFAULT 0,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
""")
|
||
|
||
# 创建索引
|
||
cursor.execute("""
|
||
CREATE INDEX IF NOT EXISTS idx_smtp_configs_enabled
|
||
ON smtp_configs(enabled, priority)
|
||
""")
|
||
cursor.execute("""
|
||
CREATE INDEX IF NOT EXISTS idx_smtp_configs_primary
|
||
ON smtp_configs(is_primary)
|
||
""")
|
||
|
||
# 2. 全局邮件设置表
|
||
cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS email_settings (
|
||
id INTEGER PRIMARY KEY DEFAULT 1,
|
||
enabled INTEGER DEFAULT 0,
|
||
failover_enabled INTEGER DEFAULT 1,
|
||
register_verify_enabled INTEGER DEFAULT 0,
|
||
task_notify_enabled INTEGER DEFAULT 0,
|
||
base_url TEXT DEFAULT '',
|
||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
""")
|
||
|
||
# 初始化默认设置
|
||
cursor.execute("""
|
||
INSERT OR IGNORE INTO email_settings (id, enabled, failover_enabled)
|
||
VALUES (1, 0, 1)
|
||
""")
|
||
|
||
# 3. 邮件验证Token表
|
||
cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS email_tokens (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
user_id INTEGER,
|
||
email TEXT NOT NULL,
|
||
token TEXT UNIQUE NOT NULL,
|
||
token_type TEXT NOT NULL,
|
||
expires_at TIMESTAMP NOT NULL,
|
||
used INTEGER DEFAULT 0,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
|
||
)
|
||
""")
|
||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_tokens_token ON email_tokens(token)")
|
||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_tokens_email ON email_tokens(email)")
|
||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_tokens_expires ON email_tokens(expires_at)")
|
||
|
||
# 4. 邮件发送日志表
|
||
cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS email_logs (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
user_id INTEGER,
|
||
smtp_config_id INTEGER,
|
||
email_to TEXT NOT NULL,
|
||
email_type TEXT NOT NULL,
|
||
subject TEXT NOT NULL,
|
||
status TEXT NOT NULL,
|
||
error_message TEXT,
|
||
attachment_count INTEGER DEFAULT 0,
|
||
attachment_size INTEGER DEFAULT 0,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE SET NULL,
|
||
FOREIGN KEY (smtp_config_id) REFERENCES smtp_configs(id) ON DELETE SET NULL
|
||
)
|
||
""")
|
||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_logs_user ON email_logs(user_id)")
|
||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_logs_status ON email_logs(status)")
|
||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_logs_type ON email_logs(email_type)")
|
||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_logs_created ON email_logs(created_at)")
|
||
|
||
# 5. 邮件发送统计表
|
||
cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS email_stats (
|
||
id INTEGER PRIMARY KEY DEFAULT 1,
|
||
total_sent INTEGER DEFAULT 0,
|
||
total_success INTEGER DEFAULT 0,
|
||
total_failed INTEGER DEFAULT 0,
|
||
register_sent INTEGER DEFAULT 0,
|
||
reset_sent INTEGER DEFAULT 0,
|
||
bind_sent INTEGER DEFAULT 0,
|
||
task_complete_sent INTEGER DEFAULT 0,
|
||
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
)
|
||
""")
|
||
|
||
# 初始化统计记录
|
||
cursor.execute("""
|
||
INSERT OR IGNORE INTO email_stats (id) VALUES (1)
|
||
""")
|
||
|
||
conn.commit()
|
||
print("[邮件服务] 数据库表初始化完成")
|
||
|
||
|
||
# ============ SMTP配置管理 ============
|
||
|
||
def get_email_settings() -> Dict[str, Any]:
|
||
"""获取全局邮件设置"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
# 先检查表结构,添加新字段(兼容旧版本数据库)
|
||
try:
|
||
cursor.execute("SELECT register_verify_enabled FROM email_settings LIMIT 1")
|
||
except:
|
||
cursor.execute("ALTER TABLE email_settings ADD COLUMN register_verify_enabled INTEGER DEFAULT 0")
|
||
conn.commit()
|
||
try:
|
||
cursor.execute("SELECT base_url FROM email_settings LIMIT 1")
|
||
except:
|
||
cursor.execute("ALTER TABLE email_settings ADD COLUMN base_url TEXT DEFAULT ''")
|
||
conn.commit()
|
||
try:
|
||
cursor.execute("SELECT task_notify_enabled FROM email_settings LIMIT 1")
|
||
except:
|
||
cursor.execute("ALTER TABLE email_settings ADD COLUMN task_notify_enabled INTEGER DEFAULT 0")
|
||
conn.commit()
|
||
|
||
cursor.execute("""
|
||
SELECT enabled, failover_enabled, register_verify_enabled, base_url,
|
||
task_notify_enabled, updated_at
|
||
FROM email_settings WHERE id = 1
|
||
""")
|
||
row = cursor.fetchone()
|
||
if row:
|
||
return {
|
||
'enabled': bool(row[0]),
|
||
'failover_enabled': bool(row[1]),
|
||
'register_verify_enabled': bool(row[2]) if row[2] is not None else False,
|
||
'base_url': row[3] or '',
|
||
'task_notify_enabled': bool(row[4]) if row[4] is not None else False,
|
||
'updated_at': row[5]
|
||
}
|
||
return {
|
||
'enabled': False,
|
||
'failover_enabled': True,
|
||
'register_verify_enabled': False,
|
||
'base_url': '',
|
||
'task_notify_enabled': False,
|
||
'updated_at': None
|
||
}
|
||
|
||
|
||
def update_email_settings(
|
||
enabled: bool,
|
||
failover_enabled: bool,
|
||
register_verify_enabled: bool = None,
|
||
base_url: str = None,
|
||
task_notify_enabled: bool = None
|
||
) -> bool:
|
||
"""更新全局邮件设置"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
|
||
# 构建动态更新语句
|
||
updates = ['enabled = ?', 'failover_enabled = ?', 'updated_at = CURRENT_TIMESTAMP']
|
||
params = [int(enabled), int(failover_enabled)]
|
||
|
||
if register_verify_enabled is not None:
|
||
updates.append('register_verify_enabled = ?')
|
||
params.append(int(register_verify_enabled))
|
||
|
||
if base_url is not None:
|
||
updates.append('base_url = ?')
|
||
params.append(base_url)
|
||
|
||
if task_notify_enabled is not None:
|
||
updates.append('task_notify_enabled = ?')
|
||
params.append(int(task_notify_enabled))
|
||
|
||
cursor.execute(f"""
|
||
UPDATE email_settings
|
||
SET {', '.join(updates)}
|
||
WHERE id = 1
|
||
""", params)
|
||
conn.commit()
|
||
return True
|
||
|
||
|
||
def get_smtp_configs(include_password: bool = False) -> List[Dict[str, Any]]:
|
||
"""获取所有SMTP配置列表"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
SELECT id, name, enabled, is_primary, priority, host, port,
|
||
username, password, use_ssl, use_tls, sender_name, sender_email,
|
||
daily_limit, daily_sent, daily_reset_date,
|
||
last_success_at, last_error, success_count, fail_count,
|
||
created_at, updated_at
|
||
FROM smtp_configs
|
||
ORDER BY is_primary DESC, priority ASC, id ASC
|
||
""")
|
||
|
||
configs = []
|
||
for row in cursor.fetchall():
|
||
config = {
|
||
'id': row[0],
|
||
'name': row[1],
|
||
'enabled': bool(row[2]),
|
||
'is_primary': bool(row[3]),
|
||
'priority': row[4],
|
||
'host': row[5],
|
||
'port': row[6],
|
||
'username': row[7],
|
||
'password': '******' if row[8] and not include_password else (decrypt_password(row[8]) if include_password and row[8] else ''),
|
||
'has_password': bool(row[8]),
|
||
'use_ssl': bool(row[9]),
|
||
'use_tls': bool(row[10]),
|
||
'sender_name': row[11],
|
||
'sender_email': row[12],
|
||
'daily_limit': row[13],
|
||
'daily_sent': row[14],
|
||
'daily_reset_date': row[15],
|
||
'last_success_at': row[16],
|
||
'last_error': row[17],
|
||
'success_count': row[18],
|
||
'fail_count': row[19],
|
||
'created_at': row[20],
|
||
'updated_at': row[21]
|
||
}
|
||
|
||
# 计算成功率
|
||
total = config['success_count'] + config['fail_count']
|
||
config['success_rate'] = round(config['success_count'] / total * 100, 1) if total > 0 else 0
|
||
|
||
configs.append(config)
|
||
|
||
return configs
|
||
|
||
|
||
def get_smtp_config(config_id: int, include_password: bool = False) -> Optional[Dict[str, Any]]:
|
||
"""获取单个SMTP配置"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
SELECT id, name, enabled, is_primary, priority, host, port,
|
||
username, password, use_ssl, use_tls, sender_name, sender_email,
|
||
daily_limit, daily_sent, daily_reset_date,
|
||
last_success_at, last_error, success_count, fail_count,
|
||
created_at, updated_at
|
||
FROM smtp_configs WHERE id = ?
|
||
""", (config_id,))
|
||
|
||
row = cursor.fetchone()
|
||
if not row:
|
||
return None
|
||
|
||
return {
|
||
'id': row[0],
|
||
'name': row[1],
|
||
'enabled': bool(row[2]),
|
||
'is_primary': bool(row[3]),
|
||
'priority': row[4],
|
||
'host': row[5],
|
||
'port': row[6],
|
||
'username': row[7],
|
||
'password': '******' if row[8] and not include_password else (decrypt_password(row[8]) if include_password and row[8] else ''),
|
||
'has_password': bool(row[8]),
|
||
'use_ssl': bool(row[9]),
|
||
'use_tls': bool(row[10]),
|
||
'sender_name': row[11],
|
||
'sender_email': row[12],
|
||
'daily_limit': row[13],
|
||
'daily_sent': row[14],
|
||
'daily_reset_date': row[15],
|
||
'last_success_at': row[16],
|
||
'last_error': row[17],
|
||
'success_count': row[18],
|
||
'fail_count': row[19],
|
||
'created_at': row[20],
|
||
'updated_at': row[21]
|
||
}
|
||
|
||
|
||
def create_smtp_config(data: Dict[str, Any]) -> int:
|
||
"""创建新的SMTP配置"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
|
||
# 加密密码
|
||
password = encrypt_password(data.get('password', '')) if data.get('password') else ''
|
||
|
||
cursor.execute("""
|
||
INSERT INTO smtp_configs
|
||
(name, enabled, is_primary, priority, host, port, username, password,
|
||
use_ssl, use_tls, sender_name, sender_email, daily_limit)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", (
|
||
data.get('name', '默认配置'),
|
||
int(data.get('enabled', True)),
|
||
int(data.get('is_primary', False)),
|
||
data.get('priority', 0),
|
||
data.get('host', ''),
|
||
data.get('port', 465),
|
||
data.get('username', ''),
|
||
password,
|
||
int(data.get('use_ssl', True)),
|
||
int(data.get('use_tls', False)),
|
||
data.get('sender_name', '知识管理平台'),
|
||
data.get('sender_email', ''),
|
||
data.get('daily_limit', 0)
|
||
))
|
||
|
||
config_id = cursor.lastrowid
|
||
conn.commit()
|
||
return config_id
|
||
|
||
|
||
def update_smtp_config(config_id: int, data: Dict[str, Any]) -> bool:
|
||
"""更新SMTP配置"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
|
||
updates = []
|
||
params = []
|
||
|
||
field_mapping = {
|
||
'name': 'name',
|
||
'enabled': 'enabled',
|
||
'priority': 'priority',
|
||
'host': 'host',
|
||
'port': 'port',
|
||
'username': 'username',
|
||
'use_ssl': 'use_ssl',
|
||
'use_tls': 'use_tls',
|
||
'sender_name': 'sender_name',
|
||
'sender_email': 'sender_email',
|
||
'daily_limit': 'daily_limit'
|
||
}
|
||
|
||
for key, db_field in field_mapping.items():
|
||
if key in data:
|
||
value = data[key]
|
||
if key in ('enabled', 'use_ssl', 'use_tls'):
|
||
value = int(value)
|
||
updates.append(f"{db_field} = ?")
|
||
params.append(value)
|
||
|
||
# 密码特殊处理:只有当提供了新密码时才更新
|
||
if 'password' in data and data['password'] and data['password'] != '******':
|
||
updates.append("password = ?")
|
||
params.append(encrypt_password(data['password']))
|
||
|
||
if not updates:
|
||
return False
|
||
|
||
updates.append("updated_at = CURRENT_TIMESTAMP")
|
||
params.append(config_id)
|
||
|
||
cursor.execute(f"""
|
||
UPDATE smtp_configs SET {', '.join(updates)} WHERE id = ?
|
||
""", params)
|
||
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
|
||
|
||
def delete_smtp_config(config_id: int) -> bool:
|
||
"""删除SMTP配置"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("DELETE FROM smtp_configs WHERE id = ?", (config_id,))
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
|
||
|
||
def set_primary_smtp_config(config_id: int) -> bool:
|
||
"""设置主SMTP配置"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
|
||
# 先取消所有主配置标记
|
||
cursor.execute("UPDATE smtp_configs SET is_primary = 0")
|
||
|
||
# 设置新的主配置
|
||
cursor.execute("""
|
||
UPDATE smtp_configs
|
||
SET is_primary = 1, updated_at = CURRENT_TIMESTAMP
|
||
WHERE id = ?
|
||
""", (config_id,))
|
||
|
||
conn.commit()
|
||
return cursor.rowcount > 0
|
||
|
||
|
||
def _get_available_smtp_config(failover: bool = True) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
获取可用的SMTP配置
|
||
优先级: 主配置 > 按priority排序的启用配置
|
||
"""
|
||
today = datetime.now().strftime('%Y-%m-%d')
|
||
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
|
||
# 先重置过期的每日计数
|
||
cursor.execute("""
|
||
UPDATE smtp_configs
|
||
SET daily_sent = 0, daily_reset_date = ?
|
||
WHERE daily_reset_date != ? OR daily_reset_date IS NULL OR daily_reset_date = ''
|
||
""", (today, today))
|
||
conn.commit()
|
||
|
||
# 获取所有启用的配置,按优先级排序
|
||
cursor.execute("""
|
||
SELECT id, name, host, port, username, password, use_ssl, use_tls,
|
||
sender_name, sender_email, daily_limit, daily_sent, is_primary
|
||
FROM smtp_configs
|
||
WHERE enabled = 1
|
||
ORDER BY is_primary DESC, priority ASC, id ASC
|
||
""")
|
||
|
||
configs = cursor.fetchall()
|
||
|
||
for row in configs:
|
||
config_id, name, host, port, username, password, use_ssl, use_tls, \
|
||
sender_name, sender_email, daily_limit, daily_sent, is_primary = row
|
||
|
||
# 检查每日限额
|
||
if daily_limit > 0 and daily_sent >= daily_limit:
|
||
continue # 超过限额,跳过此配置
|
||
|
||
# 解密密码
|
||
decrypted_password = decrypt_password(password) if password else ''
|
||
|
||
return {
|
||
'id': config_id,
|
||
'name': name,
|
||
'host': host,
|
||
'port': port,
|
||
'username': username,
|
||
'password': decrypted_password,
|
||
'use_ssl': bool(use_ssl),
|
||
'use_tls': bool(use_tls),
|
||
'sender_name': sender_name,
|
||
'sender_email': sender_email,
|
||
'is_primary': bool(is_primary)
|
||
}
|
||
|
||
return None
|
||
|
||
|
||
def _update_smtp_stats(config_id: int, success: bool, error: str = ''):
|
||
"""更新SMTP配置的统计信息"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
|
||
if success:
|
||
cursor.execute("""
|
||
UPDATE smtp_configs
|
||
SET daily_sent = daily_sent + 1,
|
||
success_count = success_count + 1,
|
||
last_success_at = CURRENT_TIMESTAMP,
|
||
last_error = '',
|
||
updated_at = CURRENT_TIMESTAMP
|
||
WHERE id = ?
|
||
""", (config_id,))
|
||
else:
|
||
cursor.execute("""
|
||
UPDATE smtp_configs
|
||
SET fail_count = fail_count + 1,
|
||
last_error = ?,
|
||
updated_at = CURRENT_TIMESTAMP
|
||
WHERE id = ?
|
||
""", (error[:500], config_id))
|
||
|
||
conn.commit()
|
||
|
||
|
||
# ============ 邮件发送核心 ============
|
||
|
||
class EmailSender:
|
||
"""邮件发送器"""
|
||
|
||
def __init__(self, config: Dict[str, Any]):
|
||
self.config = config
|
||
self.server = None
|
||
|
||
def connect(self) -> bool:
|
||
"""连接SMTP服务器"""
|
||
try:
|
||
if self.config['use_ssl']:
|
||
self.server = smtplib.SMTP_SSL(
|
||
self.config['host'],
|
||
self.config['port'],
|
||
timeout=30
|
||
)
|
||
else:
|
||
self.server = smtplib.SMTP(
|
||
self.config['host'],
|
||
self.config['port'],
|
||
timeout=30
|
||
)
|
||
if self.config['use_tls']:
|
||
self.server.starttls()
|
||
|
||
self.server.login(self.config['username'], self.config['password'])
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"[邮件服务] SMTP连接失败 [{self.config['name']}]: {e}")
|
||
self.server = None
|
||
raise
|
||
|
||
def disconnect(self):
|
||
"""断开SMTP连接"""
|
||
if self.server:
|
||
try:
|
||
self.server.quit()
|
||
except Exception:
|
||
pass
|
||
self.server = None
|
||
|
||
def send(self, to_email: str, subject: str, body: str,
|
||
html_body: str = None, attachments: List[Dict] = None) -> bool:
|
||
"""
|
||
发送邮件
|
||
|
||
Args:
|
||
to_email: 收件人邮箱
|
||
subject: 邮件主题
|
||
body: 纯文本正文
|
||
html_body: HTML正文(可选)
|
||
attachments: 附件列表 [{'filename': 'xxx', 'data': bytes, 'mime_type': 'xxx'}]
|
||
"""
|
||
try:
|
||
if not self.server:
|
||
self.connect()
|
||
|
||
# 构建邮件
|
||
if html_body or attachments:
|
||
msg = MIMEMultipart('mixed')
|
||
|
||
# 添加正文
|
||
if html_body:
|
||
text_part = MIMEMultipart('alternative')
|
||
text_part.attach(MIMEText(body, 'plain', 'utf-8'))
|
||
text_part.attach(MIMEText(html_body, 'html', 'utf-8'))
|
||
msg.attach(text_part)
|
||
else:
|
||
msg.attach(MIMEText(body, 'plain', 'utf-8'))
|
||
|
||
# 添加附件
|
||
if attachments:
|
||
for att in attachments:
|
||
part = MIMEBase('application', 'octet-stream')
|
||
part.set_payload(att['data'])
|
||
encoders.encode_base64(part)
|
||
part.add_header(
|
||
'Content-Disposition',
|
||
'attachment',
|
||
filename=('utf-8', '', att['filename'])
|
||
)
|
||
msg.attach(part)
|
||
else:
|
||
msg = MIMEText(body, 'plain', 'utf-8')
|
||
|
||
# 设置邮件头
|
||
msg['Subject'] = Header(subject, 'utf-8')
|
||
msg['From'] = formataddr((
|
||
self.config['sender_name'],
|
||
self.config['sender_email'] or self.config['username']
|
||
))
|
||
msg['To'] = to_email
|
||
|
||
# 发送
|
||
self.server.sendmail(
|
||
self.config['sender_email'] or self.config['username'],
|
||
[to_email],
|
||
msg.as_string()
|
||
)
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"[邮件服务] 发送失败: {e}")
|
||
raise
|
||
|
||
|
||
def send_email(
|
||
to_email: str,
|
||
subject: str,
|
||
body: str,
|
||
html_body: str = None,
|
||
attachments: List[Dict] = None,
|
||
email_type: str = '',
|
||
user_id: int = None,
|
||
log_callback: Callable = None
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
发送邮件(带故障转移)
|
||
|
||
Returns:
|
||
{'success': bool, 'error': str, 'config_id': int}
|
||
"""
|
||
# 检查全局开关
|
||
settings = get_email_settings()
|
||
if not settings['enabled']:
|
||
return {'success': False, 'error': '邮件功能未启用', 'config_id': None}
|
||
|
||
# 获取可用配置
|
||
config = _get_available_smtp_config(settings['failover_enabled'])
|
||
if not config:
|
||
return {'success': False, 'error': '没有可用的SMTP配置', 'config_id': None}
|
||
|
||
tried_configs = []
|
||
last_error = ''
|
||
|
||
while config:
|
||
tried_configs.append(config['id'])
|
||
sender = EmailSender(config)
|
||
|
||
try:
|
||
sender.connect()
|
||
sender.send(to_email, subject, body, html_body, attachments)
|
||
sender.disconnect()
|
||
|
||
# 更新统计
|
||
_update_smtp_stats(config['id'], True)
|
||
_log_email_send(user_id, config['id'], to_email, email_type, subject, 'success', '', attachments)
|
||
_update_email_stats(email_type, True)
|
||
|
||
if log_callback:
|
||
log_callback(f"[邮件服务] 发送成功: {to_email} (使用: {config['name']})")
|
||
|
||
return {'success': True, 'error': '', 'config_id': config['id']}
|
||
|
||
except Exception as e:
|
||
last_error = str(e)
|
||
sender.disconnect()
|
||
|
||
# 更新失败统计
|
||
_update_smtp_stats(config['id'], False, last_error)
|
||
|
||
if log_callback:
|
||
log_callback(f"[邮件服务] 发送失败 [{config['name']}]: {e}")
|
||
|
||
# 故障转移:尝试下一个配置
|
||
if settings['failover_enabled']:
|
||
config = _get_next_available_smtp_config(tried_configs)
|
||
else:
|
||
config = None
|
||
|
||
# 所有配置都失败
|
||
_log_email_send(user_id, tried_configs[0] if tried_configs else None,
|
||
to_email, email_type, subject, 'failed', last_error, attachments)
|
||
_update_email_stats(email_type, False)
|
||
|
||
return {'success': False, 'error': last_error, 'config_id': tried_configs[0] if tried_configs else None}
|
||
|
||
|
||
def _get_next_available_smtp_config(exclude_ids: List[int]) -> Optional[Dict[str, Any]]:
|
||
"""获取下一个可用的SMTP配置(排除已尝试的)"""
|
||
today = datetime.now().strftime('%Y-%m-%d')
|
||
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
|
||
placeholders = ','.join(['?' for _ in exclude_ids])
|
||
cursor.execute(f"""
|
||
SELECT id, name, host, port, username, password, use_ssl, use_tls,
|
||
sender_name, sender_email, daily_limit, daily_sent, is_primary
|
||
FROM smtp_configs
|
||
WHERE enabled = 1 AND id NOT IN ({placeholders})
|
||
ORDER BY is_primary DESC, priority ASC, id ASC
|
||
LIMIT 1
|
||
""", exclude_ids)
|
||
|
||
row = cursor.fetchone()
|
||
if not row:
|
||
return None
|
||
|
||
config_id, name, host, port, username, password, use_ssl, use_tls, \
|
||
sender_name, sender_email, daily_limit, daily_sent, is_primary = row
|
||
|
||
# 检查每日限额
|
||
if daily_limit > 0 and daily_sent >= daily_limit:
|
||
return _get_next_available_smtp_config(exclude_ids + [config_id])
|
||
|
||
return {
|
||
'id': config_id,
|
||
'name': name,
|
||
'host': host,
|
||
'port': port,
|
||
'username': username,
|
||
'password': decrypt_password(password) if password else '',
|
||
'use_ssl': bool(use_ssl),
|
||
'use_tls': bool(use_tls),
|
||
'sender_name': sender_name,
|
||
'sender_email': sender_email,
|
||
'is_primary': bool(is_primary)
|
||
}
|
||
|
||
|
||
def test_smtp_config(config_id: int, test_email: str) -> Dict[str, Any]:
|
||
"""测试SMTP配置"""
|
||
config = get_smtp_config(config_id, include_password=True)
|
||
if not config:
|
||
return {'success': False, 'error': '配置不存在'}
|
||
|
||
sender = EmailSender({
|
||
'name': config['name'],
|
||
'host': config['host'],
|
||
'port': config['port'],
|
||
'username': config['username'],
|
||
'password': config['password'],
|
||
'use_ssl': config['use_ssl'],
|
||
'use_tls': config['use_tls'],
|
||
'sender_name': config['sender_name'],
|
||
'sender_email': config['sender_email']
|
||
})
|
||
|
||
try:
|
||
sender.connect()
|
||
sender.send(
|
||
test_email,
|
||
'知识管理平台 - SMTP配置测试',
|
||
f'这是一封测试邮件。\n\n配置名称: {config["name"]}\nSMTP服务器: {config["host"]}:{config["port"]}\n\n如果您收到此邮件,说明SMTP配置正确。',
|
||
None,
|
||
None
|
||
)
|
||
sender.disconnect()
|
||
|
||
# 更新最后成功时间
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
UPDATE smtp_configs
|
||
SET last_success_at = CURRENT_TIMESTAMP, last_error = ''
|
||
WHERE id = ?
|
||
""", (config_id,))
|
||
conn.commit()
|
||
|
||
return {'success': True, 'error': ''}
|
||
|
||
except Exception as e:
|
||
# 记录错误
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
UPDATE smtp_configs SET last_error = ? WHERE id = ?
|
||
""", (str(e)[:500], config_id))
|
||
conn.commit()
|
||
|
||
return {'success': False, 'error': str(e)}
|
||
|
||
|
||
# ============ 邮件日志 ============
|
||
|
||
def _log_email_send(user_id: int, smtp_config_id: int, email_to: str,
|
||
email_type: str, subject: str, status: str,
|
||
error_message: str, attachments: List[Dict] = None):
|
||
"""记录邮件发送日志"""
|
||
attachment_count = len(attachments) if attachments else 0
|
||
attachment_size = sum(len(a.get('data', b'')) for a in attachments) if attachments else 0
|
||
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
INSERT INTO email_logs
|
||
(user_id, smtp_config_id, email_to, email_type, subject, status,
|
||
error_message, attachment_count, attachment_size)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""", (user_id, smtp_config_id, email_to, email_type, subject, status,
|
||
error_message, attachment_count, attachment_size))
|
||
conn.commit()
|
||
|
||
|
||
def _update_email_stats(email_type: str, success: bool):
|
||
"""更新邮件统计"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
|
||
type_field_map = {
|
||
EMAIL_TYPE_REGISTER: 'register_sent',
|
||
EMAIL_TYPE_RESET: 'reset_sent',
|
||
EMAIL_TYPE_BIND: 'bind_sent',
|
||
EMAIL_TYPE_TASK_COMPLETE: 'task_complete_sent'
|
||
}
|
||
|
||
type_field = type_field_map.get(email_type, '')
|
||
|
||
if success:
|
||
if type_field:
|
||
cursor.execute(f"""
|
||
UPDATE email_stats
|
||
SET total_sent = total_sent + 1,
|
||
total_success = total_success + 1,
|
||
{type_field} = {type_field} + 1,
|
||
last_updated = CURRENT_TIMESTAMP
|
||
WHERE id = 1
|
||
""")
|
||
else:
|
||
cursor.execute("""
|
||
UPDATE email_stats
|
||
SET total_sent = total_sent + 1,
|
||
total_success = total_success + 1,
|
||
last_updated = CURRENT_TIMESTAMP
|
||
WHERE id = 1
|
||
""")
|
||
else:
|
||
cursor.execute("""
|
||
UPDATE email_stats
|
||
SET total_sent = total_sent + 1,
|
||
total_failed = total_failed + 1,
|
||
last_updated = CURRENT_TIMESTAMP
|
||
WHERE id = 1
|
||
""")
|
||
|
||
conn.commit()
|
||
|
||
|
||
def get_email_stats() -> Dict[str, Any]:
|
||
"""获取邮件统计"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
SELECT total_sent, total_success, total_failed,
|
||
register_sent, reset_sent, bind_sent, task_complete_sent,
|
||
last_updated
|
||
FROM email_stats WHERE id = 1
|
||
""")
|
||
row = cursor.fetchone()
|
||
if row:
|
||
return {
|
||
'total_sent': row[0],
|
||
'total_success': row[1],
|
||
'total_failed': row[2],
|
||
'register_sent': row[3],
|
||
'reset_sent': row[4],
|
||
'bind_sent': row[5],
|
||
'task_complete_sent': row[6],
|
||
'last_updated': row[7],
|
||
'success_rate': round(row[1] / row[0] * 100, 1) if row[0] > 0 else 0
|
||
}
|
||
return {}
|
||
|
||
|
||
def get_email_logs(page: int = 1, page_size: int = 20,
|
||
email_type: str = None, status: str = None) -> Dict[str, Any]:
|
||
"""获取邮件日志(分页)"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
|
||
where_clauses = []
|
||
params = []
|
||
|
||
if email_type:
|
||
where_clauses.append("l.email_type = ?")
|
||
params.append(email_type)
|
||
|
||
if status:
|
||
where_clauses.append("l.status = ?")
|
||
params.append(status)
|
||
|
||
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
|
||
|
||
# 获取总数
|
||
cursor.execute(f"SELECT COUNT(*) FROM email_logs l {where_sql}", params)
|
||
total = cursor.fetchone()[0]
|
||
|
||
# 获取分页数据
|
||
offset = (page - 1) * page_size
|
||
cursor.execute(f"""
|
||
SELECT l.id, l.user_id, l.smtp_config_id, l.email_to, l.email_type,
|
||
l.subject, l.status, l.error_message, l.attachment_count,
|
||
l.attachment_size, l.created_at, u.username, s.name as smtp_name
|
||
FROM email_logs l
|
||
LEFT JOIN users u ON l.user_id = u.id
|
||
LEFT JOIN smtp_configs s ON l.smtp_config_id = s.id
|
||
{where_sql}
|
||
ORDER BY l.created_at DESC
|
||
LIMIT ? OFFSET ?
|
||
""", params + [page_size, offset])
|
||
|
||
logs = []
|
||
for row in cursor.fetchall():
|
||
logs.append({
|
||
'id': row[0],
|
||
'user_id': row[1],
|
||
'smtp_config_id': row[2],
|
||
'email_to': row[3],
|
||
'email_type': row[4],
|
||
'subject': row[5],
|
||
'status': row[6],
|
||
'error_message': row[7],
|
||
'attachment_count': row[8],
|
||
'attachment_size': row[9],
|
||
'created_at': row[10],
|
||
'username': row[11],
|
||
'smtp_name': row[12]
|
||
})
|
||
|
||
return {
|
||
'logs': logs,
|
||
'total': total,
|
||
'page': page,
|
||
'page_size': page_size,
|
||
'total_pages': (total + page_size - 1) // page_size
|
||
}
|
||
|
||
|
||
def cleanup_email_logs(days: int = 30) -> int:
|
||
"""清理过期邮件日志"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
DELETE FROM email_logs
|
||
WHERE datetime(created_at) < datetime('now', '-' || ? || ' days')
|
||
""", (days,))
|
||
deleted = cursor.rowcount
|
||
conn.commit()
|
||
return deleted
|
||
|
||
|
||
# ============ Token管理 ============
|
||
|
||
def generate_email_token(email: str, token_type: str, user_id: int = None) -> str:
|
||
"""生成邮件验证Token"""
|
||
token = secrets.token_urlsafe(32)
|
||
|
||
# 设置过期时间
|
||
expire_seconds = {
|
||
EMAIL_TYPE_REGISTER: TOKEN_EXPIRE_REGISTER,
|
||
EMAIL_TYPE_RESET: TOKEN_EXPIRE_RESET,
|
||
EMAIL_TYPE_BIND: TOKEN_EXPIRE_BIND
|
||
}.get(token_type, TOKEN_EXPIRE_REGISTER)
|
||
|
||
expires_at = datetime.now() + timedelta(seconds=expire_seconds)
|
||
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
INSERT INTO email_tokens (user_id, email, token, token_type, expires_at)
|
||
VALUES (?, ?, ?, ?, ?)
|
||
""", (user_id, email, token, token_type, expires_at))
|
||
conn.commit()
|
||
|
||
return token
|
||
|
||
|
||
def verify_email_token(token: str, token_type: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
验证Token
|
||
|
||
Returns:
|
||
成功返回 {'user_id': int, 'email': str},失败返回 None
|
||
"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
SELECT id, user_id, email, expires_at, used
|
||
FROM email_tokens
|
||
WHERE token = ? AND token_type = ?
|
||
""", (token, token_type))
|
||
|
||
row = cursor.fetchone()
|
||
if not row:
|
||
return None
|
||
|
||
token_id, user_id, email, expires_at, used = row
|
||
|
||
# 检查是否已使用
|
||
if used:
|
||
return None
|
||
|
||
# 检查是否过期
|
||
if datetime.strptime(expires_at, '%Y-%m-%d %H:%M:%S') < datetime.now():
|
||
return None
|
||
|
||
# 标记为已使用
|
||
cursor.execute("""
|
||
UPDATE email_tokens SET used = 1 WHERE id = ?
|
||
""", (token_id,))
|
||
conn.commit()
|
||
|
||
return {'user_id': user_id, 'email': email}
|
||
|
||
|
||
def check_rate_limit(email: str, token_type: str) -> bool:
|
||
"""
|
||
检查发送频率限制
|
||
|
||
Returns:
|
||
True = 可以发送, False = 受限
|
||
"""
|
||
limit_seconds = {
|
||
EMAIL_TYPE_REGISTER: RATE_LIMIT_REGISTER,
|
||
EMAIL_TYPE_RESET: RATE_LIMIT_RESET,
|
||
EMAIL_TYPE_BIND: RATE_LIMIT_BIND
|
||
}.get(token_type, 60)
|
||
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
SELECT created_at FROM email_tokens
|
||
WHERE email = ? AND token_type = ?
|
||
ORDER BY created_at DESC
|
||
LIMIT 1
|
||
""", (email, token_type))
|
||
|
||
row = cursor.fetchone()
|
||
if not row:
|
||
return True
|
||
|
||
last_sent = datetime.strptime(row[0], '%Y-%m-%d %H:%M:%S')
|
||
elapsed = (datetime.now() - last_sent).total_seconds()
|
||
|
||
return elapsed >= limit_seconds
|
||
|
||
|
||
def cleanup_expired_tokens() -> int:
|
||
"""清理过期Token"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
DELETE FROM email_tokens
|
||
WHERE datetime(expires_at) < datetime('now')
|
||
""")
|
||
deleted = cursor.rowcount
|
||
conn.commit()
|
||
return deleted
|
||
|
||
|
||
# ============ 注册验证邮件 ============
|
||
|
||
def send_register_verification_email(
|
||
email: str,
|
||
username: str,
|
||
user_id: int,
|
||
base_url: str = None
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
发送注册验证邮件
|
||
|
||
Args:
|
||
email: 用户邮箱
|
||
username: 用户名
|
||
user_id: 用户ID
|
||
base_url: 网站基础URL
|
||
|
||
Returns:
|
||
{'success': bool, 'error': str, 'token': str}
|
||
"""
|
||
# 检查发送频率限制
|
||
if not check_rate_limit(email, EMAIL_TYPE_REGISTER):
|
||
return {
|
||
'success': False,
|
||
'error': '发送太频繁,请稍后再试',
|
||
'token': None
|
||
}
|
||
|
||
# 生成验证Token
|
||
token = generate_email_token(email, EMAIL_TYPE_REGISTER, user_id)
|
||
|
||
# 获取base_url
|
||
if not base_url:
|
||
settings = get_email_settings()
|
||
base_url = settings.get('base_url', '')
|
||
|
||
if not base_url:
|
||
# 尝试从配置获取
|
||
try:
|
||
from app_config import Config
|
||
base_url = Config.BASE_URL
|
||
except:
|
||
base_url = 'http://localhost:51233'
|
||
|
||
# 生成验证链接
|
||
verify_url = f"{base_url.rstrip('/')}/api/verify-email/{token}"
|
||
|
||
# 读取邮件模板
|
||
template_path = os.path.join(os.path.dirname(__file__), 'templates', 'email', 'register.html')
|
||
try:
|
||
with open(template_path, 'r', encoding='utf-8') as f:
|
||
html_template = f.read()
|
||
except FileNotFoundError:
|
||
# 使用简单的HTML模板
|
||
html_template = """
|
||
<html>
|
||
<body>
|
||
<h1>邮箱验证</h1>
|
||
<p>您好,{{ username }}!</p>
|
||
<p>请点击下面的链接验证您的邮箱地址:</p>
|
||
<p><a href="{{ verify_url }}">{{ verify_url }}</a></p>
|
||
<p>此链接24小时内有效。</p>
|
||
</body>
|
||
</html>
|
||
"""
|
||
|
||
# 替换模板变量
|
||
html_body = html_template.replace('{{ username }}', username)
|
||
html_body = html_body.replace('{{ verify_url }}', verify_url)
|
||
|
||
# 纯文本版本
|
||
text_body = f"""
|
||
您好,{username}!
|
||
|
||
感谢您注册知识管理平台。请点击下面的链接验证您的邮箱地址:
|
||
|
||
{verify_url}
|
||
|
||
此链接24小时内有效。
|
||
|
||
如果您没有注册过账号,请忽略此邮件。
|
||
"""
|
||
|
||
# 发送邮件
|
||
result = send_email(
|
||
to_email=email,
|
||
subject='【知识管理平台】邮箱验证',
|
||
body=text_body,
|
||
html_body=html_body,
|
||
email_type=EMAIL_TYPE_REGISTER,
|
||
user_id=user_id
|
||
)
|
||
|
||
if result['success']:
|
||
return {'success': True, 'error': '', 'token': token}
|
||
else:
|
||
return {'success': False, 'error': result['error'], 'token': None}
|
||
|
||
|
||
def resend_register_verification_email(user_id: int, email: str, username: str) -> Dict[str, Any]:
|
||
"""
|
||
重发注册验证邮件
|
||
|
||
Args:
|
||
user_id: 用户ID
|
||
email: 用户邮箱
|
||
username: 用户名
|
||
|
||
Returns:
|
||
{'success': bool, 'error': str}
|
||
"""
|
||
# 检查是否有未过期的token
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
# 先使旧token失效
|
||
cursor.execute("""
|
||
UPDATE email_tokens SET used = 1
|
||
WHERE user_id = ? AND token_type = ? AND used = 0
|
||
""", (user_id, EMAIL_TYPE_REGISTER))
|
||
conn.commit()
|
||
|
||
# 发送新的验证邮件
|
||
return send_register_verification_email(email, username, user_id)
|
||
|
||
|
||
# ============ 密码重置邮件 ============
|
||
|
||
def send_password_reset_email(
|
||
email: str,
|
||
username: str,
|
||
user_id: int,
|
||
base_url: str = None
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
发送密码重置邮件
|
||
|
||
Args:
|
||
email: 用户邮箱
|
||
username: 用户名
|
||
user_id: 用户ID
|
||
base_url: 网站基础URL
|
||
|
||
Returns:
|
||
{'success': bool, 'error': str, 'token': str}
|
||
"""
|
||
# 检查发送频率限制(密码重置限制5分钟)
|
||
if not check_rate_limit(email, EMAIL_TYPE_RESET):
|
||
return {
|
||
'success': False,
|
||
'error': '发送太频繁,请5分钟后再试',
|
||
'token': None
|
||
}
|
||
|
||
# 使旧的重置token失效
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
UPDATE email_tokens SET used = 1
|
||
WHERE user_id = ? AND token_type = ? AND used = 0
|
||
""", (user_id, EMAIL_TYPE_RESET))
|
||
conn.commit()
|
||
|
||
# 生成新的验证Token
|
||
token = generate_email_token(email, EMAIL_TYPE_RESET, user_id)
|
||
|
||
# 获取base_url
|
||
if not base_url:
|
||
settings = get_email_settings()
|
||
base_url = settings.get('base_url', '')
|
||
|
||
if not base_url:
|
||
try:
|
||
from app_config import Config
|
||
base_url = Config.BASE_URL
|
||
except:
|
||
base_url = 'http://localhost:51233'
|
||
|
||
# 生成重置链接
|
||
reset_url = f"{base_url.rstrip('/')}/reset-password/{token}"
|
||
|
||
# 读取邮件模板
|
||
template_path = os.path.join(os.path.dirname(__file__), 'templates', 'email', 'reset_password.html')
|
||
try:
|
||
with open(template_path, 'r', encoding='utf-8') as f:
|
||
html_template = f.read()
|
||
except FileNotFoundError:
|
||
html_template = """
|
||
<html>
|
||
<body>
|
||
<h1>密码重置</h1>
|
||
<p>您好,{{ username }}!</p>
|
||
<p>请点击下面的链接重置您的密码:</p>
|
||
<p><a href="{{ reset_url }}">{{ reset_url }}</a></p>
|
||
<p>此链接30分钟内有效。</p>
|
||
</body>
|
||
</html>
|
||
"""
|
||
|
||
# 替换模板变量
|
||
html_body = html_template.replace('{{ username }}', username)
|
||
html_body = html_body.replace('{{ reset_url }}', reset_url)
|
||
|
||
# 纯文本版本
|
||
text_body = f"""
|
||
您好,{username}!
|
||
|
||
我们收到了您的密码重置请求。请点击下面的链接重置您的密码:
|
||
|
||
{reset_url}
|
||
|
||
此链接30分钟内有效。
|
||
|
||
如果您没有申请过密码重置,请忽略此邮件。
|
||
"""
|
||
|
||
# 发送邮件
|
||
result = send_email(
|
||
to_email=email,
|
||
subject='【知识管理平台】密码重置',
|
||
body=text_body,
|
||
html_body=html_body,
|
||
email_type=EMAIL_TYPE_RESET,
|
||
user_id=user_id
|
||
)
|
||
|
||
if result['success']:
|
||
return {'success': True, 'error': '', 'token': token}
|
||
else:
|
||
return {'success': False, 'error': result['error'], 'token': None}
|
||
|
||
|
||
def verify_password_reset_token(token: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
验证密码重置Token(不标记为已使用)
|
||
|
||
Returns:
|
||
成功返回 {'user_id': int, 'email': str},失败返回 None
|
||
"""
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
SELECT id, user_id, email, expires_at, used
|
||
FROM email_tokens
|
||
WHERE token = ? AND token_type = ?
|
||
""", (token, EMAIL_TYPE_RESET))
|
||
|
||
row = cursor.fetchone()
|
||
if not row:
|
||
return None
|
||
|
||
token_id, user_id, email, expires_at, used = row
|
||
|
||
# 检查是否已使用
|
||
if used:
|
||
return None
|
||
|
||
# 检查是否过期
|
||
if datetime.strptime(expires_at, '%Y-%m-%d %H:%M:%S') < datetime.now():
|
||
return None
|
||
|
||
return {'user_id': user_id, 'email': email, 'token_id': token_id}
|
||
|
||
|
||
def confirm_password_reset(token: str) -> Optional[Dict[str, Any]]:
|
||
"""
|
||
确认密码重置(标记Token为已使用)
|
||
|
||
Returns:
|
||
成功返回 {'user_id': int, 'email': str},失败返回 None
|
||
"""
|
||
result = verify_password_reset_token(token)
|
||
if not result:
|
||
return None
|
||
|
||
# 标记为已使用
|
||
with db_pool.get_db() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute("""
|
||
UPDATE email_tokens SET used = 1 WHERE id = ?
|
||
""", (result['token_id'],))
|
||
conn.commit()
|
||
|
||
return {'user_id': result['user_id'], 'email': result['email']}
|
||
|
||
|
||
# ============ 异步发送队列 ============
|
||
|
||
class EmailQueue:
|
||
"""邮件异步发送队列"""
|
||
|
||
def __init__(self, workers: int = QUEUE_WORKERS, max_size: int = QUEUE_MAX_SIZE):
|
||
self.queue = queue.Queue(maxsize=max_size)
|
||
self.workers = []
|
||
self.running = False
|
||
self.worker_count = workers
|
||
|
||
def start(self):
|
||
"""启动队列工作线程"""
|
||
if self.running:
|
||
return
|
||
|
||
self.running = True
|
||
for i in range(self.worker_count):
|
||
worker = threading.Thread(target=self._worker, daemon=True, name=f"EmailWorker-{i+1}")
|
||
worker.start()
|
||
self.workers.append(worker)
|
||
|
||
print(f"[邮件服务] 异步队列已启动 ({self.worker_count}个工作线程)")
|
||
|
||
def stop(self):
|
||
"""停止队列"""
|
||
self.running = False
|
||
|
||
# 发送停止信号
|
||
for _ in self.workers:
|
||
try:
|
||
self.queue.put(None, timeout=1)
|
||
except queue.Full:
|
||
pass
|
||
|
||
# 等待工作线程结束
|
||
for worker in self.workers:
|
||
worker.join(timeout=5)
|
||
|
||
self.workers.clear()
|
||
print("[邮件服务] 异步队列已停止")
|
||
|
||
def _worker(self):
|
||
"""工作线程"""
|
||
while self.running:
|
||
try:
|
||
task = self.queue.get(timeout=5)
|
||
if task is None:
|
||
break
|
||
|
||
self._process_task(task)
|
||
|
||
except queue.Empty:
|
||
continue
|
||
except Exception as e:
|
||
print(f"[邮件服务] 队列工作线程错误: {e}")
|
||
|
||
def _process_task(self, task: Dict):
|
||
"""处理邮件任务"""
|
||
try:
|
||
result = send_email(
|
||
to_email=task['to_email'],
|
||
subject=task['subject'],
|
||
body=task['body'],
|
||
html_body=task.get('html_body'),
|
||
attachments=task.get('attachments'),
|
||
email_type=task.get('email_type', ''),
|
||
user_id=task.get('user_id'),
|
||
log_callback=task.get('log_callback')
|
||
)
|
||
|
||
# 执行回调
|
||
if task.get('callback'):
|
||
task['callback'](result)
|
||
|
||
except Exception as e:
|
||
print(f"[邮件服务] 处理邮件任务失败: {e}")
|
||
if task.get('callback'):
|
||
task['callback']({'success': False, 'error': str(e)})
|
||
|
||
def enqueue(self, to_email: str, subject: str, body: str,
|
||
html_body: str = None, attachments: List[Dict] = None,
|
||
email_type: str = '', user_id: int = None,
|
||
callback: Callable = None, log_callback: Callable = None) -> bool:
|
||
"""
|
||
将邮件任务加入队列
|
||
|
||
Returns:
|
||
是否成功加入队列
|
||
"""
|
||
try:
|
||
self.queue.put({
|
||
'to_email': to_email,
|
||
'subject': subject,
|
||
'body': body,
|
||
'html_body': html_body,
|
||
'attachments': attachments,
|
||
'email_type': email_type,
|
||
'user_id': user_id,
|
||
'callback': callback,
|
||
'log_callback': log_callback
|
||
}, timeout=5)
|
||
return True
|
||
except queue.Full:
|
||
print("[邮件服务] 邮件队列已满")
|
||
return False
|
||
|
||
@property
|
||
def pending_count(self) -> int:
|
||
"""队列中待处理的任务数"""
|
||
return self.queue.qsize()
|
||
|
||
|
||
# 全局队列实例
|
||
_email_queue: Optional[EmailQueue] = None
|
||
_queue_lock = threading.Lock()
|
||
|
||
|
||
def get_email_queue() -> EmailQueue:
|
||
"""获取全局邮件队列(单例)"""
|
||
global _email_queue
|
||
|
||
with _queue_lock:
|
||
if _email_queue is None:
|
||
_email_queue = EmailQueue()
|
||
_email_queue.start()
|
||
return _email_queue
|
||
|
||
|
||
def shutdown_email_queue():
|
||
"""关闭邮件队列"""
|
||
global _email_queue
|
||
|
||
with _queue_lock:
|
||
if _email_queue:
|
||
_email_queue.stop()
|
||
_email_queue = None
|
||
|
||
|
||
# ============ 便捷函数 ============
|
||
|
||
def send_email_async(to_email: str, subject: str, body: str,
|
||
html_body: str = None, attachments: List[Dict] = None,
|
||
email_type: str = '', user_id: int = None,
|
||
callback: Callable = None, log_callback: Callable = None) -> bool:
|
||
"""异步发送邮件"""
|
||
queue = get_email_queue()
|
||
return queue.enqueue(
|
||
to_email=to_email,
|
||
subject=subject,
|
||
body=body,
|
||
html_body=html_body,
|
||
attachments=attachments,
|
||
email_type=email_type,
|
||
user_id=user_id,
|
||
callback=callback,
|
||
log_callback=log_callback
|
||
)
|
||
|
||
|
||
def create_zip_attachment(files: List[Dict[str, Any]], zip_filename: str = 'screenshots.zip') -> Dict:
|
||
"""
|
||
创建ZIP附件
|
||
|
||
Args:
|
||
files: [{'filename': 'xxx.png', 'data': bytes}]
|
||
zip_filename: ZIP文件名
|
||
|
||
Returns:
|
||
{'filename': 'xxx.zip', 'data': bytes}
|
||
"""
|
||
buffer = BytesIO()
|
||
with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
|
||
for f in files:
|
||
zf.writestr(f['filename'], f['data'])
|
||
|
||
return {
|
||
'filename': zip_filename,
|
||
'data': buffer.getvalue()
|
||
}
|
||
|
||
|
||
# ============ 任务完成通知邮件 ============
|
||
|
||
def send_task_complete_email(
|
||
user_id: int,
|
||
email: str,
|
||
username: str,
|
||
account_name: str,
|
||
browse_type: str,
|
||
total_items: int,
|
||
total_attachments: int,
|
||
screenshot_path: str = None,
|
||
log_callback: Callable = None
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
发送任务完成通知邮件(支持附件大小限制,超过则分批发送)
|
||
|
||
Args:
|
||
user_id: 用户ID
|
||
email: 收件人邮箱
|
||
username: 用户名
|
||
account_name: 账号名称
|
||
browse_type: 浏览类型
|
||
total_items: 浏览条目数
|
||
total_attachments: 附件数量
|
||
screenshot_path: 截图文件路径
|
||
log_callback: 日志回调函数
|
||
|
||
Returns:
|
||
{'success': bool, 'error': str, 'emails_sent': int}
|
||
"""
|
||
# 检查邮件功能是否启用
|
||
settings = get_email_settings()
|
||
if not settings.get('enabled', False):
|
||
return {'success': False, 'error': '邮件功能未启用', 'emails_sent': 0}
|
||
|
||
if not settings.get('task_notify_enabled', False):
|
||
return {'success': False, 'error': '任务通知功能未启用', 'emails_sent': 0}
|
||
|
||
if not email:
|
||
return {'success': False, 'error': '用户未设置邮箱', 'emails_sent': 0}
|
||
|
||
# 获取完成时间
|
||
complete_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
# 读取截图文件
|
||
screenshot_data = None
|
||
screenshot_filename = None
|
||
if screenshot_path and os.path.exists(screenshot_path):
|
||
try:
|
||
with open(screenshot_path, 'rb') as f:
|
||
screenshot_data = f.read()
|
||
screenshot_filename = os.path.basename(screenshot_path)
|
||
except Exception as e:
|
||
if log_callback:
|
||
log_callback(f"[邮件] 读取截图文件失败: {e}")
|
||
|
||
# 读取邮件模板
|
||
template_path = os.path.join(os.path.dirname(__file__), 'templates', 'email', 'task_complete.html')
|
||
try:
|
||
with open(template_path, 'r', encoding='utf-8') as f:
|
||
html_template = f.read()
|
||
except FileNotFoundError:
|
||
html_template = """
|
||
<html>
|
||
<body>
|
||
<h1>任务完成通知</h1>
|
||
<p>您好,{{ username }}!</p>
|
||
<p>您的浏览任务已完成。</p>
|
||
<p>账号:{{ account_name }}</p>
|
||
<p>浏览类型:{{ browse_type }}</p>
|
||
<p>浏览条目:{{ total_items }} 条</p>
|
||
<p>附件数量:{{ total_attachments }} 个</p>
|
||
<p>完成时间:{{ complete_time }}</p>
|
||
</body>
|
||
</html>
|
||
"""
|
||
|
||
# 准备发送
|
||
emails_sent = 0
|
||
last_error = ''
|
||
|
||
# 检查附件大小,决定是否需要分批发送
|
||
if screenshot_data and len(screenshot_data) > MAX_ATTACHMENT_SIZE:
|
||
# 附件超过限制,不压缩直接发送(大图片压缩效果不大)
|
||
# 这种情况很少见,但作为容错处理
|
||
batch_info = '(由于文件较大,截图将单独发送)'
|
||
|
||
# 先发送不带附件的通知邮件
|
||
html_body = html_template.replace('{{ username }}', username)
|
||
html_body = html_body.replace('{{ account_name }}', account_name)
|
||
html_body = html_body.replace('{{ browse_type }}', browse_type)
|
||
html_body = html_body.replace('{{ total_items }}', str(total_items))
|
||
html_body = html_body.replace('{{ total_attachments }}', str(total_attachments))
|
||
html_body = html_body.replace('{{ complete_time }}', complete_time)
|
||
html_body = html_body.replace('{{ batch_info }}', batch_info)
|
||
|
||
text_body = f"""
|
||
您好,{username}!
|
||
|
||
您的浏览任务已完成。
|
||
|
||
账号:{account_name}
|
||
浏览类型:{browse_type}
|
||
浏览条目:{total_items} 条
|
||
附件数量:{total_attachments} 个
|
||
完成时间:{complete_time}
|
||
|
||
截图将在下一封邮件中发送。
|
||
"""
|
||
|
||
result = send_email(
|
||
to_email=email,
|
||
subject=f'【知识管理平台】任务完成 - {account_name}',
|
||
body=text_body,
|
||
html_body=html_body,
|
||
email_type=EMAIL_TYPE_TASK_COMPLETE,
|
||
user_id=user_id,
|
||
log_callback=log_callback
|
||
)
|
||
|
||
if result['success']:
|
||
emails_sent += 1
|
||
if log_callback:
|
||
log_callback(f"[邮件] 任务通知已发送")
|
||
else:
|
||
last_error = result['error']
|
||
|
||
# 单独发送截图附件
|
||
attachment = [{'filename': screenshot_filename, 'data': screenshot_data}]
|
||
result2 = send_email(
|
||
to_email=email,
|
||
subject=f'【知识管理平台】任务截图 - {account_name}',
|
||
body=f'这是 {account_name} 的任务截图。',
|
||
attachments=attachment,
|
||
email_type=EMAIL_TYPE_TASK_COMPLETE,
|
||
user_id=user_id,
|
||
log_callback=log_callback
|
||
)
|
||
|
||
if result2['success']:
|
||
emails_sent += 1
|
||
if log_callback:
|
||
log_callback(f"[邮件] 截图附件已发送")
|
||
else:
|
||
last_error = result2['error']
|
||
|
||
else:
|
||
# 正常情况:附件大小在限制内,一次性发送
|
||
batch_info = ''
|
||
attachments = None
|
||
|
||
if screenshot_data:
|
||
attachments = [{'filename': screenshot_filename, 'data': screenshot_data}]
|
||
|
||
html_body = html_template.replace('{{ username }}', username)
|
||
html_body = html_body.replace('{{ account_name }}', account_name)
|
||
html_body = html_body.replace('{{ browse_type }}', browse_type)
|
||
html_body = html_body.replace('{{ total_items }}', str(total_items))
|
||
html_body = html_body.replace('{{ total_attachments }}', str(total_attachments))
|
||
html_body = html_body.replace('{{ complete_time }}', complete_time)
|
||
html_body = html_body.replace('{{ batch_info }}', batch_info)
|
||
|
||
text_body = f"""
|
||
您好,{username}!
|
||
|
||
您的浏览任务已完成。
|
||
|
||
账号:{account_name}
|
||
浏览类型:{browse_type}
|
||
浏览条目:{total_items} 条
|
||
附件数量:{total_attachments} 个
|
||
完成时间:{complete_time}
|
||
|
||
{'截图已附在邮件中。' if screenshot_data else ''}
|
||
"""
|
||
|
||
result = send_email(
|
||
to_email=email,
|
||
subject=f'【知识管理平台】任务完成 - {account_name}',
|
||
body=text_body,
|
||
html_body=html_body,
|
||
attachments=attachments,
|
||
email_type=EMAIL_TYPE_TASK_COMPLETE,
|
||
user_id=user_id,
|
||
log_callback=log_callback
|
||
)
|
||
|
||
if result['success']:
|
||
emails_sent += 1
|
||
if log_callback:
|
||
log_callback(f"[邮件] 任务通知已发送")
|
||
else:
|
||
last_error = result['error']
|
||
|
||
return {
|
||
'success': emails_sent > 0,
|
||
'error': last_error if emails_sent == 0 else '',
|
||
'emails_sent': emails_sent
|
||
}
|
||
|
||
|
||
def send_task_complete_email_async(
|
||
user_id: int,
|
||
email: str,
|
||
username: str,
|
||
account_name: str,
|
||
browse_type: str,
|
||
total_items: int,
|
||
total_attachments: int,
|
||
screenshot_path: str = None,
|
||
log_callback: Callable = None
|
||
):
|
||
"""异步发送任务完成通知邮件"""
|
||
import threading
|
||
thread = threading.Thread(
|
||
target=send_task_complete_email,
|
||
args=(user_id, email, username, account_name, browse_type,
|
||
total_items, total_attachments, screenshot_path, log_callback),
|
||
daemon=True
|
||
)
|
||
thread.start()
|
||
|
||
|
||
# ============ 初始化 ============
|
||
|
||
def init_email_service():
|
||
"""初始化邮件服务"""
|
||
init_email_tables()
|
||
get_email_queue()
|
||
print("[邮件服务] 初始化完成")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
# 测试代码
|
||
init_email_tables()
|
||
print("邮件服务模块测试完成")
|