Files
zsglpt/email_service.py
yuyx de8edcb3a6 feat: 添加邮件功能第二阶段 - 注册邮箱验证
实现注册时的邮箱验证功能:
- 修改注册API支持邮箱验证流程
- 新增邮箱验证API (/api/verify-email/<token>)
- 新增重发验证邮件API (/api/resend-verify-email)
- 新增邮箱验证状态查询API (/api/email/verify-status)

新增文件:
- templates/email/register.html - 注册验证邮件模板
- templates/verify_success.html - 验证成功页面
- templates/verify_failed.html - 验证失败页面

修改文件:
- email_service.py - 添加发送注册验证邮件函数
- app.py - 添加邮箱验证相关API
- database.py - 添加get_user_by_email函数
- app_config.py - 添加BASE_URL配置
- templates/register.html - 支持邮箱必填切换
- templates/login.html - 添加重发验证邮件功能
- templates/admin.html - 添加注册验证开关和BASE_URL设置

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 21:51:07 +08:00

1423 lines
45 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
邮件服务模块
功能:
1. 多SMTP配置管理主备切换、故障转移
2. 发送纯文本/HTML邮件
3. 发送带附件邮件支持ZIP压缩
4. 异步发送队列
5. 每日发送限额控制
6. 发送日志记录
"""
import os
import smtplib
import threading
import queue
import time
import zipfile
import secrets
from datetime import datetime, timedelta
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.mime.image import MIMEImage
from email import encoders
from email.header import Header
from email.utils import formataddr
from typing import Optional, List, Dict, Any, Callable
from io import BytesIO
import db_pool
from crypto_utils import encrypt_password, decrypt_password, is_encrypted
# ============ 常量配置 ============
# 邮件类型
EMAIL_TYPE_REGISTER = 'register' # 注册验证
EMAIL_TYPE_RESET = 'reset' # 密码重置
EMAIL_TYPE_BIND = 'bind' # 邮箱绑定
EMAIL_TYPE_TASK_COMPLETE = 'task_complete' # 任务完成通知
# Token有效期
TOKEN_EXPIRE_REGISTER = 24 * 60 * 60 # 注册验证: 24小时
TOKEN_EXPIRE_RESET = 30 * 60 # 密码重置: 30分钟
TOKEN_EXPIRE_BIND = 60 * 60 # 邮箱绑定: 1小时
# 发送频率限制(秒)
RATE_LIMIT_REGISTER = 60 # 注册邮件: 1分钟
RATE_LIMIT_RESET = 5 * 60 # 重置邮件: 5分钟
RATE_LIMIT_BIND = 60 # 绑定邮件: 1分钟
# 异步队列配置
QUEUE_WORKERS = int(os.environ.get('EMAIL_QUEUE_WORKERS', '2'))
QUEUE_MAX_SIZE = int(os.environ.get('EMAIL_QUEUE_MAX_SIZE', '100'))
# ============ 数据库操作 ============
def init_email_tables():
"""初始化邮件相关数据库表"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 1. SMTP配置表支持多配置
cursor.execute("""
CREATE TABLE IF NOT EXISTS smtp_configs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL DEFAULT '默认配置',
enabled INTEGER DEFAULT 1,
is_primary INTEGER DEFAULT 0,
priority INTEGER DEFAULT 0,
host TEXT NOT NULL DEFAULT '',
port INTEGER DEFAULT 465,
username TEXT NOT NULL DEFAULT '',
password TEXT DEFAULT '',
use_ssl INTEGER DEFAULT 1,
use_tls INTEGER DEFAULT 0,
sender_name TEXT DEFAULT '知识管理平台',
sender_email TEXT DEFAULT '',
daily_limit INTEGER DEFAULT 0,
daily_sent INTEGER DEFAULT 0,
daily_reset_date TEXT DEFAULT '',
last_success_at TIMESTAMP,
last_error TEXT DEFAULT '',
success_count INTEGER DEFAULT 0,
fail_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 创建索引
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_smtp_configs_enabled
ON smtp_configs(enabled, priority)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_smtp_configs_primary
ON smtp_configs(is_primary)
""")
# 2. 全局邮件设置表
cursor.execute("""
CREATE TABLE IF NOT EXISTS email_settings (
id INTEGER PRIMARY KEY DEFAULT 1,
enabled INTEGER DEFAULT 0,
failover_enabled INTEGER DEFAULT 1,
register_verify_enabled INTEGER DEFAULT 0,
base_url TEXT DEFAULT '',
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 初始化默认设置
cursor.execute("""
INSERT OR IGNORE INTO email_settings (id, enabled, failover_enabled)
VALUES (1, 0, 1)
""")
# 3. 邮件验证Token表
cursor.execute("""
CREATE TABLE IF NOT EXISTS email_tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
email TEXT NOT NULL,
token TEXT UNIQUE NOT NULL,
token_type TEXT NOT NULL,
expires_at TIMESTAMP NOT NULL,
used INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_tokens_token ON email_tokens(token)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_tokens_email ON email_tokens(email)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_tokens_expires ON email_tokens(expires_at)")
# 4. 邮件发送日志表
cursor.execute("""
CREATE TABLE IF NOT EXISTS email_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
smtp_config_id INTEGER,
email_to TEXT NOT NULL,
email_type TEXT NOT NULL,
subject TEXT NOT NULL,
status TEXT NOT NULL,
error_message TEXT,
attachment_count INTEGER DEFAULT 0,
attachment_size INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE SET NULL,
FOREIGN KEY (smtp_config_id) REFERENCES smtp_configs(id) ON DELETE SET NULL
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_logs_user ON email_logs(user_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_logs_status ON email_logs(status)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_logs_type ON email_logs(email_type)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_logs_created ON email_logs(created_at)")
# 5. 邮件发送统计表
cursor.execute("""
CREATE TABLE IF NOT EXISTS email_stats (
id INTEGER PRIMARY KEY DEFAULT 1,
total_sent INTEGER DEFAULT 0,
total_success INTEGER DEFAULT 0,
total_failed INTEGER DEFAULT 0,
register_sent INTEGER DEFAULT 0,
reset_sent INTEGER DEFAULT 0,
bind_sent INTEGER DEFAULT 0,
task_complete_sent INTEGER DEFAULT 0,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 初始化统计记录
cursor.execute("""
INSERT OR IGNORE INTO email_stats (id) VALUES (1)
""")
conn.commit()
print("[邮件服务] 数据库表初始化完成")
# ============ SMTP配置管理 ============
def get_email_settings() -> Dict[str, Any]:
"""获取全局邮件设置"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 先检查表结构,添加新字段(兼容旧版本数据库)
try:
cursor.execute("SELECT register_verify_enabled FROM email_settings LIMIT 1")
except:
cursor.execute("ALTER TABLE email_settings ADD COLUMN register_verify_enabled INTEGER DEFAULT 0")
conn.commit()
try:
cursor.execute("SELECT base_url FROM email_settings LIMIT 1")
except:
cursor.execute("ALTER TABLE email_settings ADD COLUMN base_url TEXT DEFAULT ''")
conn.commit()
cursor.execute("""
SELECT enabled, failover_enabled, register_verify_enabled, base_url, updated_at
FROM email_settings WHERE id = 1
""")
row = cursor.fetchone()
if row:
return {
'enabled': bool(row[0]),
'failover_enabled': bool(row[1]),
'register_verify_enabled': bool(row[2]) if row[2] is not None else False,
'base_url': row[3] or '',
'updated_at': row[4]
}
return {
'enabled': False,
'failover_enabled': True,
'register_verify_enabled': False,
'base_url': '',
'updated_at': None
}
def update_email_settings(
enabled: bool,
failover_enabled: bool,
register_verify_enabled: bool = None,
base_url: str = None
) -> bool:
"""更新全局邮件设置"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 构建动态更新语句
updates = ['enabled = ?', 'failover_enabled = ?', 'updated_at = CURRENT_TIMESTAMP']
params = [int(enabled), int(failover_enabled)]
if register_verify_enabled is not None:
updates.append('register_verify_enabled = ?')
params.append(int(register_verify_enabled))
if base_url is not None:
updates.append('base_url = ?')
params.append(base_url)
cursor.execute(f"""
UPDATE email_settings
SET {', '.join(updates)}
WHERE id = 1
""", params)
conn.commit()
return True
def get_smtp_configs(include_password: bool = False) -> List[Dict[str, Any]]:
"""获取所有SMTP配置列表"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, name, enabled, is_primary, priority, host, port,
username, password, use_ssl, use_tls, sender_name, sender_email,
daily_limit, daily_sent, daily_reset_date,
last_success_at, last_error, success_count, fail_count,
created_at, updated_at
FROM smtp_configs
ORDER BY is_primary DESC, priority ASC, id ASC
""")
configs = []
for row in cursor.fetchall():
config = {
'id': row[0],
'name': row[1],
'enabled': bool(row[2]),
'is_primary': bool(row[3]),
'priority': row[4],
'host': row[5],
'port': row[6],
'username': row[7],
'password': '******' if row[8] and not include_password else (decrypt_password(row[8]) if include_password and row[8] else ''),
'has_password': bool(row[8]),
'use_ssl': bool(row[9]),
'use_tls': bool(row[10]),
'sender_name': row[11],
'sender_email': row[12],
'daily_limit': row[13],
'daily_sent': row[14],
'daily_reset_date': row[15],
'last_success_at': row[16],
'last_error': row[17],
'success_count': row[18],
'fail_count': row[19],
'created_at': row[20],
'updated_at': row[21]
}
# 计算成功率
total = config['success_count'] + config['fail_count']
config['success_rate'] = round(config['success_count'] / total * 100, 1) if total > 0 else 0
configs.append(config)
return configs
def get_smtp_config(config_id: int, include_password: bool = False) -> Optional[Dict[str, Any]]:
"""获取单个SMTP配置"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, name, enabled, is_primary, priority, host, port,
username, password, use_ssl, use_tls, sender_name, sender_email,
daily_limit, daily_sent, daily_reset_date,
last_success_at, last_error, success_count, fail_count,
created_at, updated_at
FROM smtp_configs WHERE id = ?
""", (config_id,))
row = cursor.fetchone()
if not row:
return None
return {
'id': row[0],
'name': row[1],
'enabled': bool(row[2]),
'is_primary': bool(row[3]),
'priority': row[4],
'host': row[5],
'port': row[6],
'username': row[7],
'password': '******' if row[8] and not include_password else (decrypt_password(row[8]) if include_password and row[8] else ''),
'has_password': bool(row[8]),
'use_ssl': bool(row[9]),
'use_tls': bool(row[10]),
'sender_name': row[11],
'sender_email': row[12],
'daily_limit': row[13],
'daily_sent': row[14],
'daily_reset_date': row[15],
'last_success_at': row[16],
'last_error': row[17],
'success_count': row[18],
'fail_count': row[19],
'created_at': row[20],
'updated_at': row[21]
}
def create_smtp_config(data: Dict[str, Any]) -> int:
"""创建新的SMTP配置"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 加密密码
password = encrypt_password(data.get('password', '')) if data.get('password') else ''
cursor.execute("""
INSERT INTO smtp_configs
(name, enabled, is_primary, priority, host, port, username, password,
use_ssl, use_tls, sender_name, sender_email, daily_limit)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
data.get('name', '默认配置'),
int(data.get('enabled', True)),
int(data.get('is_primary', False)),
data.get('priority', 0),
data.get('host', ''),
data.get('port', 465),
data.get('username', ''),
password,
int(data.get('use_ssl', True)),
int(data.get('use_tls', False)),
data.get('sender_name', '知识管理平台'),
data.get('sender_email', ''),
data.get('daily_limit', 0)
))
config_id = cursor.lastrowid
conn.commit()
return config_id
def update_smtp_config(config_id: int, data: Dict[str, Any]) -> bool:
"""更新SMTP配置"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
updates = []
params = []
field_mapping = {
'name': 'name',
'enabled': 'enabled',
'priority': 'priority',
'host': 'host',
'port': 'port',
'username': 'username',
'use_ssl': 'use_ssl',
'use_tls': 'use_tls',
'sender_name': 'sender_name',
'sender_email': 'sender_email',
'daily_limit': 'daily_limit'
}
for key, db_field in field_mapping.items():
if key in data:
value = data[key]
if key in ('enabled', 'use_ssl', 'use_tls'):
value = int(value)
updates.append(f"{db_field} = ?")
params.append(value)
# 密码特殊处理:只有当提供了新密码时才更新
if 'password' in data and data['password'] and data['password'] != '******':
updates.append("password = ?")
params.append(encrypt_password(data['password']))
if not updates:
return False
updates.append("updated_at = CURRENT_TIMESTAMP")
params.append(config_id)
cursor.execute(f"""
UPDATE smtp_configs SET {', '.join(updates)} WHERE id = ?
""", params)
conn.commit()
return cursor.rowcount > 0
def delete_smtp_config(config_id: int) -> bool:
"""删除SMTP配置"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM smtp_configs WHERE id = ?", (config_id,))
conn.commit()
return cursor.rowcount > 0
def set_primary_smtp_config(config_id: int) -> bool:
"""设置主SMTP配置"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 先取消所有主配置标记
cursor.execute("UPDATE smtp_configs SET is_primary = 0")
# 设置新的主配置
cursor.execute("""
UPDATE smtp_configs
SET is_primary = 1, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (config_id,))
conn.commit()
return cursor.rowcount > 0
def _get_available_smtp_config(failover: bool = True) -> Optional[Dict[str, Any]]:
"""
获取可用的SMTP配置
优先级: 主配置 > 按priority排序的启用配置
"""
today = datetime.now().strftime('%Y-%m-%d')
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 先重置过期的每日计数
cursor.execute("""
UPDATE smtp_configs
SET daily_sent = 0, daily_reset_date = ?
WHERE daily_reset_date != ? OR daily_reset_date IS NULL OR daily_reset_date = ''
""", (today, today))
conn.commit()
# 获取所有启用的配置,按优先级排序
cursor.execute("""
SELECT id, name, host, port, username, password, use_ssl, use_tls,
sender_name, sender_email, daily_limit, daily_sent, is_primary
FROM smtp_configs
WHERE enabled = 1
ORDER BY is_primary DESC, priority ASC, id ASC
""")
configs = cursor.fetchall()
for row in configs:
config_id, name, host, port, username, password, use_ssl, use_tls, \
sender_name, sender_email, daily_limit, daily_sent, is_primary = row
# 检查每日限额
if daily_limit > 0 and daily_sent >= daily_limit:
continue # 超过限额,跳过此配置
# 解密密码
decrypted_password = decrypt_password(password) if password else ''
return {
'id': config_id,
'name': name,
'host': host,
'port': port,
'username': username,
'password': decrypted_password,
'use_ssl': bool(use_ssl),
'use_tls': bool(use_tls),
'sender_name': sender_name,
'sender_email': sender_email,
'is_primary': bool(is_primary)
}
return None
def _update_smtp_stats(config_id: int, success: bool, error: str = ''):
"""更新SMTP配置的统计信息"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
if success:
cursor.execute("""
UPDATE smtp_configs
SET daily_sent = daily_sent + 1,
success_count = success_count + 1,
last_success_at = CURRENT_TIMESTAMP,
last_error = '',
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (config_id,))
else:
cursor.execute("""
UPDATE smtp_configs
SET fail_count = fail_count + 1,
last_error = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (error[:500], config_id))
conn.commit()
# ============ 邮件发送核心 ============
class EmailSender:
"""邮件发送器"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.server = None
def connect(self) -> bool:
"""连接SMTP服务器"""
try:
if self.config['use_ssl']:
self.server = smtplib.SMTP_SSL(
self.config['host'],
self.config['port'],
timeout=30
)
else:
self.server = smtplib.SMTP(
self.config['host'],
self.config['port'],
timeout=30
)
if self.config['use_tls']:
self.server.starttls()
self.server.login(self.config['username'], self.config['password'])
return True
except Exception as e:
print(f"[邮件服务] SMTP连接失败 [{self.config['name']}]: {e}")
self.server = None
raise
def disconnect(self):
"""断开SMTP连接"""
if self.server:
try:
self.server.quit()
except Exception:
pass
self.server = None
def send(self, to_email: str, subject: str, body: str,
html_body: str = None, attachments: List[Dict] = None) -> bool:
"""
发送邮件
Args:
to_email: 收件人邮箱
subject: 邮件主题
body: 纯文本正文
html_body: HTML正文可选
attachments: 附件列表 [{'filename': 'xxx', 'data': bytes, 'mime_type': 'xxx'}]
"""
try:
if not self.server:
self.connect()
# 构建邮件
if html_body or attachments:
msg = MIMEMultipart('mixed')
# 添加正文
if html_body:
text_part = MIMEMultipart('alternative')
text_part.attach(MIMEText(body, 'plain', 'utf-8'))
text_part.attach(MIMEText(html_body, 'html', 'utf-8'))
msg.attach(text_part)
else:
msg.attach(MIMEText(body, 'plain', 'utf-8'))
# 添加附件
if attachments:
for att in attachments:
part = MIMEBase('application', 'octet-stream')
part.set_payload(att['data'])
encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
'attachment',
filename=('utf-8', '', att['filename'])
)
msg.attach(part)
else:
msg = MIMEText(body, 'plain', 'utf-8')
# 设置邮件头
msg['Subject'] = Header(subject, 'utf-8')
msg['From'] = formataddr((
self.config['sender_name'],
self.config['sender_email'] or self.config['username']
))
msg['To'] = to_email
# 发送
self.server.sendmail(
self.config['sender_email'] or self.config['username'],
[to_email],
msg.as_string()
)
return True
except Exception as e:
print(f"[邮件服务] 发送失败: {e}")
raise
def send_email(
to_email: str,
subject: str,
body: str,
html_body: str = None,
attachments: List[Dict] = None,
email_type: str = '',
user_id: int = None,
log_callback: Callable = None
) -> Dict[str, Any]:
"""
发送邮件(带故障转移)
Returns:
{'success': bool, 'error': str, 'config_id': int}
"""
# 检查全局开关
settings = get_email_settings()
if not settings['enabled']:
return {'success': False, 'error': '邮件功能未启用', 'config_id': None}
# 获取可用配置
config = _get_available_smtp_config(settings['failover_enabled'])
if not config:
return {'success': False, 'error': '没有可用的SMTP配置', 'config_id': None}
tried_configs = []
last_error = ''
while config:
tried_configs.append(config['id'])
sender = EmailSender(config)
try:
sender.connect()
sender.send(to_email, subject, body, html_body, attachments)
sender.disconnect()
# 更新统计
_update_smtp_stats(config['id'], True)
_log_email_send(user_id, config['id'], to_email, email_type, subject, 'success', '', attachments)
_update_email_stats(email_type, True)
if log_callback:
log_callback(f"[邮件服务] 发送成功: {to_email} (使用: {config['name']})")
return {'success': True, 'error': '', 'config_id': config['id']}
except Exception as e:
last_error = str(e)
sender.disconnect()
# 更新失败统计
_update_smtp_stats(config['id'], False, last_error)
if log_callback:
log_callback(f"[邮件服务] 发送失败 [{config['name']}]: {e}")
# 故障转移:尝试下一个配置
if settings['failover_enabled']:
config = _get_next_available_smtp_config(tried_configs)
else:
config = None
# 所有配置都失败
_log_email_send(user_id, tried_configs[0] if tried_configs else None,
to_email, email_type, subject, 'failed', last_error, attachments)
_update_email_stats(email_type, False)
return {'success': False, 'error': last_error, 'config_id': tried_configs[0] if tried_configs else None}
def _get_next_available_smtp_config(exclude_ids: List[int]) -> Optional[Dict[str, Any]]:
"""获取下一个可用的SMTP配置排除已尝试的"""
today = datetime.now().strftime('%Y-%m-%d')
with db_pool.get_db() as conn:
cursor = conn.cursor()
placeholders = ','.join(['?' for _ in exclude_ids])
cursor.execute(f"""
SELECT id, name, host, port, username, password, use_ssl, use_tls,
sender_name, sender_email, daily_limit, daily_sent, is_primary
FROM smtp_configs
WHERE enabled = 1 AND id NOT IN ({placeholders})
ORDER BY is_primary DESC, priority ASC, id ASC
LIMIT 1
""", exclude_ids)
row = cursor.fetchone()
if not row:
return None
config_id, name, host, port, username, password, use_ssl, use_tls, \
sender_name, sender_email, daily_limit, daily_sent, is_primary = row
# 检查每日限额
if daily_limit > 0 and daily_sent >= daily_limit:
return _get_next_available_smtp_config(exclude_ids + [config_id])
return {
'id': config_id,
'name': name,
'host': host,
'port': port,
'username': username,
'password': decrypt_password(password) if password else '',
'use_ssl': bool(use_ssl),
'use_tls': bool(use_tls),
'sender_name': sender_name,
'sender_email': sender_email,
'is_primary': bool(is_primary)
}
def test_smtp_config(config_id: int, test_email: str) -> Dict[str, Any]:
"""测试SMTP配置"""
config = get_smtp_config(config_id, include_password=True)
if not config:
return {'success': False, 'error': '配置不存在'}
sender = EmailSender({
'name': config['name'],
'host': config['host'],
'port': config['port'],
'username': config['username'],
'password': config['password'],
'use_ssl': config['use_ssl'],
'use_tls': config['use_tls'],
'sender_name': config['sender_name'],
'sender_email': config['sender_email']
})
try:
sender.connect()
sender.send(
test_email,
'知识管理平台 - SMTP配置测试',
f'这是一封测试邮件。\n\n配置名称: {config["name"]}\nSMTP服务器: {config["host"]}:{config["port"]}\n\n如果您收到此邮件说明SMTP配置正确。',
None,
None
)
sender.disconnect()
# 更新最后成功时间
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE smtp_configs
SET last_success_at = CURRENT_TIMESTAMP, last_error = ''
WHERE id = ?
""", (config_id,))
conn.commit()
return {'success': True, 'error': ''}
except Exception as e:
# 记录错误
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE smtp_configs SET last_error = ? WHERE id = ?
""", (str(e)[:500], config_id))
conn.commit()
return {'success': False, 'error': str(e)}
# ============ 邮件日志 ============
def _log_email_send(user_id: int, smtp_config_id: int, email_to: str,
email_type: str, subject: str, status: str,
error_message: str, attachments: List[Dict] = None):
"""记录邮件发送日志"""
attachment_count = len(attachments) if attachments else 0
attachment_size = sum(len(a.get('data', b'')) for a in attachments) if attachments else 0
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO email_logs
(user_id, smtp_config_id, email_to, email_type, subject, status,
error_message, attachment_count, attachment_size)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (user_id, smtp_config_id, email_to, email_type, subject, status,
error_message, attachment_count, attachment_size))
conn.commit()
def _update_email_stats(email_type: str, success: bool):
"""更新邮件统计"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
type_field_map = {
EMAIL_TYPE_REGISTER: 'register_sent',
EMAIL_TYPE_RESET: 'reset_sent',
EMAIL_TYPE_BIND: 'bind_sent',
EMAIL_TYPE_TASK_COMPLETE: 'task_complete_sent'
}
type_field = type_field_map.get(email_type, '')
if success:
if type_field:
cursor.execute(f"""
UPDATE email_stats
SET total_sent = total_sent + 1,
total_success = total_success + 1,
{type_field} = {type_field} + 1,
last_updated = CURRENT_TIMESTAMP
WHERE id = 1
""")
else:
cursor.execute("""
UPDATE email_stats
SET total_sent = total_sent + 1,
total_success = total_success + 1,
last_updated = CURRENT_TIMESTAMP
WHERE id = 1
""")
else:
cursor.execute("""
UPDATE email_stats
SET total_sent = total_sent + 1,
total_failed = total_failed + 1,
last_updated = CURRENT_TIMESTAMP
WHERE id = 1
""")
conn.commit()
def get_email_stats() -> Dict[str, Any]:
"""获取邮件统计"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT total_sent, total_success, total_failed,
register_sent, reset_sent, bind_sent, task_complete_sent,
last_updated
FROM email_stats WHERE id = 1
""")
row = cursor.fetchone()
if row:
return {
'total_sent': row[0],
'total_success': row[1],
'total_failed': row[2],
'register_sent': row[3],
'reset_sent': row[4],
'bind_sent': row[5],
'task_complete_sent': row[6],
'last_updated': row[7],
'success_rate': round(row[1] / row[0] * 100, 1) if row[0] > 0 else 0
}
return {}
def get_email_logs(page: int = 1, page_size: int = 20,
email_type: str = None, status: str = None) -> Dict[str, Any]:
"""获取邮件日志(分页)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
where_clauses = []
params = []
if email_type:
where_clauses.append("l.email_type = ?")
params.append(email_type)
if status:
where_clauses.append("l.status = ?")
params.append(status)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
# 获取总数
cursor.execute(f"SELECT COUNT(*) FROM email_logs l {where_sql}", params)
total = cursor.fetchone()[0]
# 获取分页数据
offset = (page - 1) * page_size
cursor.execute(f"""
SELECT l.id, l.user_id, l.smtp_config_id, l.email_to, l.email_type,
l.subject, l.status, l.error_message, l.attachment_count,
l.attachment_size, l.created_at, u.username, s.name as smtp_name
FROM email_logs l
LEFT JOIN users u ON l.user_id = u.id
LEFT JOIN smtp_configs s ON l.smtp_config_id = s.id
{where_sql}
ORDER BY l.created_at DESC
LIMIT ? OFFSET ?
""", params + [page_size, offset])
logs = []
for row in cursor.fetchall():
logs.append({
'id': row[0],
'user_id': row[1],
'smtp_config_id': row[2],
'email_to': row[3],
'email_type': row[4],
'subject': row[5],
'status': row[6],
'error_message': row[7],
'attachment_count': row[8],
'attachment_size': row[9],
'created_at': row[10],
'username': row[11],
'smtp_name': row[12]
})
return {
'logs': logs,
'total': total,
'page': page,
'page_size': page_size,
'total_pages': (total + page_size - 1) // page_size
}
def cleanup_email_logs(days: int = 30) -> int:
"""清理过期邮件日志"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
DELETE FROM email_logs
WHERE datetime(created_at) < datetime('now', '-' || ? || ' days')
""", (days,))
deleted = cursor.rowcount
conn.commit()
return deleted
# ============ Token管理 ============
def generate_email_token(email: str, token_type: str, user_id: int = None) -> str:
"""生成邮件验证Token"""
token = secrets.token_urlsafe(32)
# 设置过期时间
expire_seconds = {
EMAIL_TYPE_REGISTER: TOKEN_EXPIRE_REGISTER,
EMAIL_TYPE_RESET: TOKEN_EXPIRE_RESET,
EMAIL_TYPE_BIND: TOKEN_EXPIRE_BIND
}.get(token_type, TOKEN_EXPIRE_REGISTER)
expires_at = datetime.now() + timedelta(seconds=expire_seconds)
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO email_tokens (user_id, email, token, token_type, expires_at)
VALUES (?, ?, ?, ?, ?)
""", (user_id, email, token, token_type, expires_at))
conn.commit()
return token
def verify_email_token(token: str, token_type: str) -> Optional[Dict[str, Any]]:
"""
验证Token
Returns:
成功返回 {'user_id': int, 'email': str},失败返回 None
"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, user_id, email, expires_at, used
FROM email_tokens
WHERE token = ? AND token_type = ?
""", (token, token_type))
row = cursor.fetchone()
if not row:
return None
token_id, user_id, email, expires_at, used = row
# 检查是否已使用
if used:
return None
# 检查是否过期
if datetime.strptime(expires_at, '%Y-%m-%d %H:%M:%S') < datetime.now():
return None
# 标记为已使用
cursor.execute("""
UPDATE email_tokens SET used = 1 WHERE id = ?
""", (token_id,))
conn.commit()
return {'user_id': user_id, 'email': email}
def check_rate_limit(email: str, token_type: str) -> bool:
"""
检查发送频率限制
Returns:
True = 可以发送, False = 受限
"""
limit_seconds = {
EMAIL_TYPE_REGISTER: RATE_LIMIT_REGISTER,
EMAIL_TYPE_RESET: RATE_LIMIT_RESET,
EMAIL_TYPE_BIND: RATE_LIMIT_BIND
}.get(token_type, 60)
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT created_at FROM email_tokens
WHERE email = ? AND token_type = ?
ORDER BY created_at DESC
LIMIT 1
""", (email, token_type))
row = cursor.fetchone()
if not row:
return True
last_sent = datetime.strptime(row[0], '%Y-%m-%d %H:%M:%S')
elapsed = (datetime.now() - last_sent).total_seconds()
return elapsed >= limit_seconds
def cleanup_expired_tokens() -> int:
"""清理过期Token"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
DELETE FROM email_tokens
WHERE datetime(expires_at) < datetime('now')
""")
deleted = cursor.rowcount
conn.commit()
return deleted
# ============ 注册验证邮件 ============
def send_register_verification_email(
email: str,
username: str,
user_id: int,
base_url: str = None
) -> Dict[str, Any]:
"""
发送注册验证邮件
Args:
email: 用户邮箱
username: 用户名
user_id: 用户ID
base_url: 网站基础URL
Returns:
{'success': bool, 'error': str, 'token': str}
"""
# 检查发送频率限制
if not check_rate_limit(email, EMAIL_TYPE_REGISTER):
return {
'success': False,
'error': '发送太频繁,请稍后再试',
'token': None
}
# 生成验证Token
token = generate_email_token(email, EMAIL_TYPE_REGISTER, user_id)
# 获取base_url
if not base_url:
settings = get_email_settings()
base_url = settings.get('base_url', '')
if not base_url:
# 尝试从配置获取
try:
from app_config import Config
base_url = Config.BASE_URL
except:
base_url = 'http://localhost:51233'
# 生成验证链接
verify_url = f"{base_url.rstrip('/')}/api/verify-email/{token}"
# 读取邮件模板
template_path = os.path.join(os.path.dirname(__file__), 'templates', 'email', 'register.html')
try:
with open(template_path, 'r', encoding='utf-8') as f:
html_template = f.read()
except FileNotFoundError:
# 使用简单的HTML模板
html_template = """
<html>
<body>
<h1>邮箱验证</h1>
<p>您好,{{ username }}</p>
<p>请点击下面的链接验证您的邮箱地址:</p>
<p><a href="{{ verify_url }}">{{ verify_url }}</a></p>
<p>此链接24小时内有效。</p>
</body>
</html>
"""
# 替换模板变量
html_body = html_template.replace('{{ username }}', username)
html_body = html_body.replace('{{ verify_url }}', verify_url)
# 纯文本版本
text_body = f"""
您好,{username}
感谢您注册知识管理平台。请点击下面的链接验证您的邮箱地址:
{verify_url}
此链接24小时内有效。
如果您没有注册过账号,请忽略此邮件。
"""
# 发送邮件
result = send_email(
to_email=email,
subject='【知识管理平台】邮箱验证',
body=text_body,
html_body=html_body,
email_type=EMAIL_TYPE_REGISTER,
user_id=user_id
)
if result['success']:
return {'success': True, 'error': '', 'token': token}
else:
return {'success': False, 'error': result['error'], 'token': None}
def resend_register_verification_email(user_id: int, email: str, username: str) -> Dict[str, Any]:
"""
重发注册验证邮件
Args:
user_id: 用户ID
email: 用户邮箱
username: 用户名
Returns:
{'success': bool, 'error': str}
"""
# 检查是否有未过期的token
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 先使旧token失效
cursor.execute("""
UPDATE email_tokens SET used = 1
WHERE user_id = ? AND token_type = ? AND used = 0
""", (user_id, EMAIL_TYPE_REGISTER))
conn.commit()
# 发送新的验证邮件
return send_register_verification_email(email, username, user_id)
# ============ 异步发送队列 ============
class EmailQueue:
"""邮件异步发送队列"""
def __init__(self, workers: int = QUEUE_WORKERS, max_size: int = QUEUE_MAX_SIZE):
self.queue = queue.Queue(maxsize=max_size)
self.workers = []
self.running = False
self.worker_count = workers
def start(self):
"""启动队列工作线程"""
if self.running:
return
self.running = True
for i in range(self.worker_count):
worker = threading.Thread(target=self._worker, daemon=True, name=f"EmailWorker-{i+1}")
worker.start()
self.workers.append(worker)
print(f"[邮件服务] 异步队列已启动 ({self.worker_count}个工作线程)")
def stop(self):
"""停止队列"""
self.running = False
# 发送停止信号
for _ in self.workers:
try:
self.queue.put(None, timeout=1)
except queue.Full:
pass
# 等待工作线程结束
for worker in self.workers:
worker.join(timeout=5)
self.workers.clear()
print("[邮件服务] 异步队列已停止")
def _worker(self):
"""工作线程"""
while self.running:
try:
task = self.queue.get(timeout=5)
if task is None:
break
self._process_task(task)
except queue.Empty:
continue
except Exception as e:
print(f"[邮件服务] 队列工作线程错误: {e}")
def _process_task(self, task: Dict):
"""处理邮件任务"""
try:
result = send_email(
to_email=task['to_email'],
subject=task['subject'],
body=task['body'],
html_body=task.get('html_body'),
attachments=task.get('attachments'),
email_type=task.get('email_type', ''),
user_id=task.get('user_id'),
log_callback=task.get('log_callback')
)
# 执行回调
if task.get('callback'):
task['callback'](result)
except Exception as e:
print(f"[邮件服务] 处理邮件任务失败: {e}")
if task.get('callback'):
task['callback']({'success': False, 'error': str(e)})
def enqueue(self, to_email: str, subject: str, body: str,
html_body: str = None, attachments: List[Dict] = None,
email_type: str = '', user_id: int = None,
callback: Callable = None, log_callback: Callable = None) -> bool:
"""
将邮件任务加入队列
Returns:
是否成功加入队列
"""
try:
self.queue.put({
'to_email': to_email,
'subject': subject,
'body': body,
'html_body': html_body,
'attachments': attachments,
'email_type': email_type,
'user_id': user_id,
'callback': callback,
'log_callback': log_callback
}, timeout=5)
return True
except queue.Full:
print("[邮件服务] 邮件队列已满")
return False
@property
def pending_count(self) -> int:
"""队列中待处理的任务数"""
return self.queue.qsize()
# 全局队列实例
_email_queue: Optional[EmailQueue] = None
_queue_lock = threading.Lock()
def get_email_queue() -> EmailQueue:
"""获取全局邮件队列(单例)"""
global _email_queue
with _queue_lock:
if _email_queue is None:
_email_queue = EmailQueue()
_email_queue.start()
return _email_queue
def shutdown_email_queue():
"""关闭邮件队列"""
global _email_queue
with _queue_lock:
if _email_queue:
_email_queue.stop()
_email_queue = None
# ============ 便捷函数 ============
def send_email_async(to_email: str, subject: str, body: str,
html_body: str = None, attachments: List[Dict] = None,
email_type: str = '', user_id: int = None,
callback: Callable = None, log_callback: Callable = None) -> bool:
"""异步发送邮件"""
queue = get_email_queue()
return queue.enqueue(
to_email=to_email,
subject=subject,
body=body,
html_body=html_body,
attachments=attachments,
email_type=email_type,
user_id=user_id,
callback=callback,
log_callback=log_callback
)
def create_zip_attachment(files: List[Dict[str, Any]], zip_filename: str = 'screenshots.zip') -> Dict:
"""
创建ZIP附件
Args:
files: [{'filename': 'xxx.png', 'data': bytes}]
zip_filename: ZIP文件名
Returns:
{'filename': 'xxx.zip', 'data': bytes}
"""
buffer = BytesIO()
with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for f in files:
zf.writestr(f['filename'], f['data'])
return {
'filename': zip_filename,
'data': buffer.getvalue()
}
# ============ 初始化 ============
def init_email_service():
"""初始化邮件服务"""
init_email_tables()
get_email_queue()
print("[邮件服务] 初始化完成")
if __name__ == '__main__':
# 测试代码
init_email_tables()
print("邮件服务模块测试完成")