Files
zsglpt/email_service.py
yuyx 2f762db337 feat: 添加邮件功能第一阶段 - 邮件基础设施
新增功能:
- 创建 email_service.py 邮件服务模块
  - 支持多SMTP配置(主备切换、故障转移)
  - 发送纯文本/HTML邮件
  - 发送带附件邮件(支持ZIP压缩)
  - 异步发送队列(多线程工作池)
  - 每日发送限额控制
  - 发送日志记录和统计

- 数据库表结构
  - smtp_configs: 多SMTP配置表
  - email_settings: 全局邮件设置
  - email_tokens: 邮件验证Token
  - email_logs: 邮件发送日志
  - email_stats: 邮件发送统计

- API接口
  - GET/POST /yuyx/api/email/settings: 全局邮件设置
  - CRUD /yuyx/api/smtp/configs: SMTP配置管理
  - POST /yuyx/api/smtp/configs/<id>/test: 测试SMTP连接
  - POST /yuyx/api/smtp/configs/<id>/primary: 设为主配置
  - GET /yuyx/api/email/stats: 邮件统计
  - GET /yuyx/api/email/logs: 邮件日志
  - POST /yuyx/api/email/logs/cleanup: 清理日志

- 后台管理页面
  - 新增"邮件配置"Tab
  - 全局邮件开关、故障转移开关
  - SMTP配置列表管理
  - 添加/编辑SMTP配置弹窗
  - 邮件发送统计展示
  - 邮件日志查询和清理

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 21:38:28 +08:00

1255 lines
41 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
邮件服务模块
功能:
1. 多SMTP配置管理主备切换、故障转移
2. 发送纯文本/HTML邮件
3. 发送带附件邮件支持ZIP压缩
4. 异步发送队列
5. 每日发送限额控制
6. 发送日志记录
"""
import os
import smtplib
import threading
import queue
import time
import zipfile
import secrets
from datetime import datetime, timedelta
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.mime.image import MIMEImage
from email import encoders
from email.header import Header
from email.utils import formataddr
from typing import Optional, List, Dict, Any, Callable
from io import BytesIO
import db_pool
from crypto_utils import encrypt_password, decrypt_password, is_encrypted
# ============ 常量配置 ============
# 邮件类型
EMAIL_TYPE_REGISTER = 'register' # 注册验证
EMAIL_TYPE_RESET = 'reset' # 密码重置
EMAIL_TYPE_BIND = 'bind' # 邮箱绑定
EMAIL_TYPE_TASK_COMPLETE = 'task_complete' # 任务完成通知
# Token有效期
TOKEN_EXPIRE_REGISTER = 24 * 60 * 60 # 注册验证: 24小时
TOKEN_EXPIRE_RESET = 30 * 60 # 密码重置: 30分钟
TOKEN_EXPIRE_BIND = 60 * 60 # 邮箱绑定: 1小时
# 发送频率限制(秒)
RATE_LIMIT_REGISTER = 60 # 注册邮件: 1分钟
RATE_LIMIT_RESET = 5 * 60 # 重置邮件: 5分钟
RATE_LIMIT_BIND = 60 # 绑定邮件: 1分钟
# 异步队列配置
QUEUE_WORKERS = int(os.environ.get('EMAIL_QUEUE_WORKERS', '2'))
QUEUE_MAX_SIZE = int(os.environ.get('EMAIL_QUEUE_MAX_SIZE', '100'))
# ============ 数据库操作 ============
def init_email_tables():
"""初始化邮件相关数据库表"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 1. SMTP配置表支持多配置
cursor.execute("""
CREATE TABLE IF NOT EXISTS smtp_configs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL DEFAULT '默认配置',
enabled INTEGER DEFAULT 1,
is_primary INTEGER DEFAULT 0,
priority INTEGER DEFAULT 0,
host TEXT NOT NULL DEFAULT '',
port INTEGER DEFAULT 465,
username TEXT NOT NULL DEFAULT '',
password TEXT DEFAULT '',
use_ssl INTEGER DEFAULT 1,
use_tls INTEGER DEFAULT 0,
sender_name TEXT DEFAULT '知识管理平台',
sender_email TEXT DEFAULT '',
daily_limit INTEGER DEFAULT 0,
daily_sent INTEGER DEFAULT 0,
daily_reset_date TEXT DEFAULT '',
last_success_at TIMESTAMP,
last_error TEXT DEFAULT '',
success_count INTEGER DEFAULT 0,
fail_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 创建索引
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_smtp_configs_enabled
ON smtp_configs(enabled, priority)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_smtp_configs_primary
ON smtp_configs(is_primary)
""")
# 2. 全局邮件设置表
cursor.execute("""
CREATE TABLE IF NOT EXISTS email_settings (
id INTEGER PRIMARY KEY DEFAULT 1,
enabled INTEGER DEFAULT 0,
failover_enabled INTEGER DEFAULT 1,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 初始化默认设置
cursor.execute("""
INSERT OR IGNORE INTO email_settings (id, enabled, failover_enabled)
VALUES (1, 0, 1)
""")
# 3. 邮件验证Token表
cursor.execute("""
CREATE TABLE IF NOT EXISTS email_tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
email TEXT NOT NULL,
token TEXT UNIQUE NOT NULL,
token_type TEXT NOT NULL,
expires_at TIMESTAMP NOT NULL,
used INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_tokens_token ON email_tokens(token)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_tokens_email ON email_tokens(email)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_tokens_expires ON email_tokens(expires_at)")
# 4. 邮件发送日志表
cursor.execute("""
CREATE TABLE IF NOT EXISTS email_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
smtp_config_id INTEGER,
email_to TEXT NOT NULL,
email_type TEXT NOT NULL,
subject TEXT NOT NULL,
status TEXT NOT NULL,
error_message TEXT,
attachment_count INTEGER DEFAULT 0,
attachment_size INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE SET NULL,
FOREIGN KEY (smtp_config_id) REFERENCES smtp_configs(id) ON DELETE SET NULL
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_logs_user ON email_logs(user_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_logs_status ON email_logs(status)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_logs_type ON email_logs(email_type)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_logs_created ON email_logs(created_at)")
# 5. 邮件发送统计表
cursor.execute("""
CREATE TABLE IF NOT EXISTS email_stats (
id INTEGER PRIMARY KEY DEFAULT 1,
total_sent INTEGER DEFAULT 0,
total_success INTEGER DEFAULT 0,
total_failed INTEGER DEFAULT 0,
register_sent INTEGER DEFAULT 0,
reset_sent INTEGER DEFAULT 0,
bind_sent INTEGER DEFAULT 0,
task_complete_sent INTEGER DEFAULT 0,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 初始化统计记录
cursor.execute("""
INSERT OR IGNORE INTO email_stats (id) VALUES (1)
""")
conn.commit()
print("[邮件服务] 数据库表初始化完成")
# ============ SMTP配置管理 ============
def get_email_settings() -> Dict[str, Any]:
"""获取全局邮件设置"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT enabled, failover_enabled, updated_at FROM email_settings WHERE id = 1")
row = cursor.fetchone()
if row:
return {
'enabled': bool(row[0]),
'failover_enabled': bool(row[1]),
'updated_at': row[2]
}
return {'enabled': False, 'failover_enabled': True, 'updated_at': None}
def update_email_settings(enabled: bool, failover_enabled: bool) -> bool:
"""更新全局邮件设置"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE email_settings
SET enabled = ?, failover_enabled = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = 1
""", (int(enabled), int(failover_enabled)))
conn.commit()
return True
def get_smtp_configs(include_password: bool = False) -> List[Dict[str, Any]]:
"""获取所有SMTP配置列表"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, name, enabled, is_primary, priority, host, port,
username, password, use_ssl, use_tls, sender_name, sender_email,
daily_limit, daily_sent, daily_reset_date,
last_success_at, last_error, success_count, fail_count,
created_at, updated_at
FROM smtp_configs
ORDER BY is_primary DESC, priority ASC, id ASC
""")
configs = []
for row in cursor.fetchall():
config = {
'id': row[0],
'name': row[1],
'enabled': bool(row[2]),
'is_primary': bool(row[3]),
'priority': row[4],
'host': row[5],
'port': row[6],
'username': row[7],
'password': '******' if row[8] and not include_password else (decrypt_password(row[8]) if include_password and row[8] else ''),
'has_password': bool(row[8]),
'use_ssl': bool(row[9]),
'use_tls': bool(row[10]),
'sender_name': row[11],
'sender_email': row[12],
'daily_limit': row[13],
'daily_sent': row[14],
'daily_reset_date': row[15],
'last_success_at': row[16],
'last_error': row[17],
'success_count': row[18],
'fail_count': row[19],
'created_at': row[20],
'updated_at': row[21]
}
# 计算成功率
total = config['success_count'] + config['fail_count']
config['success_rate'] = round(config['success_count'] / total * 100, 1) if total > 0 else 0
configs.append(config)
return configs
def get_smtp_config(config_id: int, include_password: bool = False) -> Optional[Dict[str, Any]]:
"""获取单个SMTP配置"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, name, enabled, is_primary, priority, host, port,
username, password, use_ssl, use_tls, sender_name, sender_email,
daily_limit, daily_sent, daily_reset_date,
last_success_at, last_error, success_count, fail_count,
created_at, updated_at
FROM smtp_configs WHERE id = ?
""", (config_id,))
row = cursor.fetchone()
if not row:
return None
return {
'id': row[0],
'name': row[1],
'enabled': bool(row[2]),
'is_primary': bool(row[3]),
'priority': row[4],
'host': row[5],
'port': row[6],
'username': row[7],
'password': '******' if row[8] and not include_password else (decrypt_password(row[8]) if include_password and row[8] else ''),
'has_password': bool(row[8]),
'use_ssl': bool(row[9]),
'use_tls': bool(row[10]),
'sender_name': row[11],
'sender_email': row[12],
'daily_limit': row[13],
'daily_sent': row[14],
'daily_reset_date': row[15],
'last_success_at': row[16],
'last_error': row[17],
'success_count': row[18],
'fail_count': row[19],
'created_at': row[20],
'updated_at': row[21]
}
def create_smtp_config(data: Dict[str, Any]) -> int:
"""创建新的SMTP配置"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 加密密码
password = encrypt_password(data.get('password', '')) if data.get('password') else ''
cursor.execute("""
INSERT INTO smtp_configs
(name, enabled, is_primary, priority, host, port, username, password,
use_ssl, use_tls, sender_name, sender_email, daily_limit)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
data.get('name', '默认配置'),
int(data.get('enabled', True)),
int(data.get('is_primary', False)),
data.get('priority', 0),
data.get('host', ''),
data.get('port', 465),
data.get('username', ''),
password,
int(data.get('use_ssl', True)),
int(data.get('use_tls', False)),
data.get('sender_name', '知识管理平台'),
data.get('sender_email', ''),
data.get('daily_limit', 0)
))
config_id = cursor.lastrowid
conn.commit()
return config_id
def update_smtp_config(config_id: int, data: Dict[str, Any]) -> bool:
"""更新SMTP配置"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
updates = []
params = []
field_mapping = {
'name': 'name',
'enabled': 'enabled',
'priority': 'priority',
'host': 'host',
'port': 'port',
'username': 'username',
'use_ssl': 'use_ssl',
'use_tls': 'use_tls',
'sender_name': 'sender_name',
'sender_email': 'sender_email',
'daily_limit': 'daily_limit'
}
for key, db_field in field_mapping.items():
if key in data:
value = data[key]
if key in ('enabled', 'use_ssl', 'use_tls'):
value = int(value)
updates.append(f"{db_field} = ?")
params.append(value)
# 密码特殊处理:只有当提供了新密码时才更新
if 'password' in data and data['password'] and data['password'] != '******':
updates.append("password = ?")
params.append(encrypt_password(data['password']))
if not updates:
return False
updates.append("updated_at = CURRENT_TIMESTAMP")
params.append(config_id)
cursor.execute(f"""
UPDATE smtp_configs SET {', '.join(updates)} WHERE id = ?
""", params)
conn.commit()
return cursor.rowcount > 0
def delete_smtp_config(config_id: int) -> bool:
"""删除SMTP配置"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM smtp_configs WHERE id = ?", (config_id,))
conn.commit()
return cursor.rowcount > 0
def set_primary_smtp_config(config_id: int) -> bool:
"""设置主SMTP配置"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 先取消所有主配置标记
cursor.execute("UPDATE smtp_configs SET is_primary = 0")
# 设置新的主配置
cursor.execute("""
UPDATE smtp_configs
SET is_primary = 1, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (config_id,))
conn.commit()
return cursor.rowcount > 0
def _get_available_smtp_config(failover: bool = True) -> Optional[Dict[str, Any]]:
"""
获取可用的SMTP配置
优先级: 主配置 > 按priority排序的启用配置
"""
today = datetime.now().strftime('%Y-%m-%d')
with db_pool.get_db() as conn:
cursor = conn.cursor()
# 先重置过期的每日计数
cursor.execute("""
UPDATE smtp_configs
SET daily_sent = 0, daily_reset_date = ?
WHERE daily_reset_date != ? OR daily_reset_date IS NULL OR daily_reset_date = ''
""", (today, today))
conn.commit()
# 获取所有启用的配置,按优先级排序
cursor.execute("""
SELECT id, name, host, port, username, password, use_ssl, use_tls,
sender_name, sender_email, daily_limit, daily_sent, is_primary
FROM smtp_configs
WHERE enabled = 1
ORDER BY is_primary DESC, priority ASC, id ASC
""")
configs = cursor.fetchall()
for row in configs:
config_id, name, host, port, username, password, use_ssl, use_tls, \
sender_name, sender_email, daily_limit, daily_sent, is_primary = row
# 检查每日限额
if daily_limit > 0 and daily_sent >= daily_limit:
continue # 超过限额,跳过此配置
# 解密密码
decrypted_password = decrypt_password(password) if password else ''
return {
'id': config_id,
'name': name,
'host': host,
'port': port,
'username': username,
'password': decrypted_password,
'use_ssl': bool(use_ssl),
'use_tls': bool(use_tls),
'sender_name': sender_name,
'sender_email': sender_email,
'is_primary': bool(is_primary)
}
return None
def _update_smtp_stats(config_id: int, success: bool, error: str = ''):
"""更新SMTP配置的统计信息"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
if success:
cursor.execute("""
UPDATE smtp_configs
SET daily_sent = daily_sent + 1,
success_count = success_count + 1,
last_success_at = CURRENT_TIMESTAMP,
last_error = '',
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (config_id,))
else:
cursor.execute("""
UPDATE smtp_configs
SET fail_count = fail_count + 1,
last_error = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (error[:500], config_id))
conn.commit()
# ============ 邮件发送核心 ============
class EmailSender:
"""邮件发送器"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.server = None
def connect(self) -> bool:
"""连接SMTP服务器"""
try:
if self.config['use_ssl']:
self.server = smtplib.SMTP_SSL(
self.config['host'],
self.config['port'],
timeout=30
)
else:
self.server = smtplib.SMTP(
self.config['host'],
self.config['port'],
timeout=30
)
if self.config['use_tls']:
self.server.starttls()
self.server.login(self.config['username'], self.config['password'])
return True
except Exception as e:
print(f"[邮件服务] SMTP连接失败 [{self.config['name']}]: {e}")
self.server = None
raise
def disconnect(self):
"""断开SMTP连接"""
if self.server:
try:
self.server.quit()
except Exception:
pass
self.server = None
def send(self, to_email: str, subject: str, body: str,
html_body: str = None, attachments: List[Dict] = None) -> bool:
"""
发送邮件
Args:
to_email: 收件人邮箱
subject: 邮件主题
body: 纯文本正文
html_body: HTML正文可选
attachments: 附件列表 [{'filename': 'xxx', 'data': bytes, 'mime_type': 'xxx'}]
"""
try:
if not self.server:
self.connect()
# 构建邮件
if html_body or attachments:
msg = MIMEMultipart('mixed')
# 添加正文
if html_body:
text_part = MIMEMultipart('alternative')
text_part.attach(MIMEText(body, 'plain', 'utf-8'))
text_part.attach(MIMEText(html_body, 'html', 'utf-8'))
msg.attach(text_part)
else:
msg.attach(MIMEText(body, 'plain', 'utf-8'))
# 添加附件
if attachments:
for att in attachments:
part = MIMEBase('application', 'octet-stream')
part.set_payload(att['data'])
encoders.encode_base64(part)
part.add_header(
'Content-Disposition',
'attachment',
filename=('utf-8', '', att['filename'])
)
msg.attach(part)
else:
msg = MIMEText(body, 'plain', 'utf-8')
# 设置邮件头
msg['Subject'] = Header(subject, 'utf-8')
msg['From'] = formataddr((
self.config['sender_name'],
self.config['sender_email'] or self.config['username']
))
msg['To'] = to_email
# 发送
self.server.sendmail(
self.config['sender_email'] or self.config['username'],
[to_email],
msg.as_string()
)
return True
except Exception as e:
print(f"[邮件服务] 发送失败: {e}")
raise
def send_email(
to_email: str,
subject: str,
body: str,
html_body: str = None,
attachments: List[Dict] = None,
email_type: str = '',
user_id: int = None,
log_callback: Callable = None
) -> Dict[str, Any]:
"""
发送邮件(带故障转移)
Returns:
{'success': bool, 'error': str, 'config_id': int}
"""
# 检查全局开关
settings = get_email_settings()
if not settings['enabled']:
return {'success': False, 'error': '邮件功能未启用', 'config_id': None}
# 获取可用配置
config = _get_available_smtp_config(settings['failover_enabled'])
if not config:
return {'success': False, 'error': '没有可用的SMTP配置', 'config_id': None}
tried_configs = []
last_error = ''
while config:
tried_configs.append(config['id'])
sender = EmailSender(config)
try:
sender.connect()
sender.send(to_email, subject, body, html_body, attachments)
sender.disconnect()
# 更新统计
_update_smtp_stats(config['id'], True)
_log_email_send(user_id, config['id'], to_email, email_type, subject, 'success', '', attachments)
_update_email_stats(email_type, True)
if log_callback:
log_callback(f"[邮件服务] 发送成功: {to_email} (使用: {config['name']})")
return {'success': True, 'error': '', 'config_id': config['id']}
except Exception as e:
last_error = str(e)
sender.disconnect()
# 更新失败统计
_update_smtp_stats(config['id'], False, last_error)
if log_callback:
log_callback(f"[邮件服务] 发送失败 [{config['name']}]: {e}")
# 故障转移:尝试下一个配置
if settings['failover_enabled']:
config = _get_next_available_smtp_config(tried_configs)
else:
config = None
# 所有配置都失败
_log_email_send(user_id, tried_configs[0] if tried_configs else None,
to_email, email_type, subject, 'failed', last_error, attachments)
_update_email_stats(email_type, False)
return {'success': False, 'error': last_error, 'config_id': tried_configs[0] if tried_configs else None}
def _get_next_available_smtp_config(exclude_ids: List[int]) -> Optional[Dict[str, Any]]:
"""获取下一个可用的SMTP配置排除已尝试的"""
today = datetime.now().strftime('%Y-%m-%d')
with db_pool.get_db() as conn:
cursor = conn.cursor()
placeholders = ','.join(['?' for _ in exclude_ids])
cursor.execute(f"""
SELECT id, name, host, port, username, password, use_ssl, use_tls,
sender_name, sender_email, daily_limit, daily_sent, is_primary
FROM smtp_configs
WHERE enabled = 1 AND id NOT IN ({placeholders})
ORDER BY is_primary DESC, priority ASC, id ASC
LIMIT 1
""", exclude_ids)
row = cursor.fetchone()
if not row:
return None
config_id, name, host, port, username, password, use_ssl, use_tls, \
sender_name, sender_email, daily_limit, daily_sent, is_primary = row
# 检查每日限额
if daily_limit > 0 and daily_sent >= daily_limit:
return _get_next_available_smtp_config(exclude_ids + [config_id])
return {
'id': config_id,
'name': name,
'host': host,
'port': port,
'username': username,
'password': decrypt_password(password) if password else '',
'use_ssl': bool(use_ssl),
'use_tls': bool(use_tls),
'sender_name': sender_name,
'sender_email': sender_email,
'is_primary': bool(is_primary)
}
def test_smtp_config(config_id: int, test_email: str) -> Dict[str, Any]:
"""测试SMTP配置"""
config = get_smtp_config(config_id, include_password=True)
if not config:
return {'success': False, 'error': '配置不存在'}
sender = EmailSender({
'name': config['name'],
'host': config['host'],
'port': config['port'],
'username': config['username'],
'password': config['password'],
'use_ssl': config['use_ssl'],
'use_tls': config['use_tls'],
'sender_name': config['sender_name'],
'sender_email': config['sender_email']
})
try:
sender.connect()
sender.send(
test_email,
'知识管理平台 - SMTP配置测试',
f'这是一封测试邮件。\n\n配置名称: {config["name"]}\nSMTP服务器: {config["host"]}:{config["port"]}\n\n如果您收到此邮件说明SMTP配置正确。',
None,
None
)
sender.disconnect()
# 更新最后成功时间
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE smtp_configs
SET last_success_at = CURRENT_TIMESTAMP, last_error = ''
WHERE id = ?
""", (config_id,))
conn.commit()
return {'success': True, 'error': ''}
except Exception as e:
# 记录错误
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE smtp_configs SET last_error = ? WHERE id = ?
""", (str(e)[:500], config_id))
conn.commit()
return {'success': False, 'error': str(e)}
# ============ 邮件日志 ============
def _log_email_send(user_id: int, smtp_config_id: int, email_to: str,
email_type: str, subject: str, status: str,
error_message: str, attachments: List[Dict] = None):
"""记录邮件发送日志"""
attachment_count = len(attachments) if attachments else 0
attachment_size = sum(len(a.get('data', b'')) for a in attachments) if attachments else 0
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO email_logs
(user_id, smtp_config_id, email_to, email_type, subject, status,
error_message, attachment_count, attachment_size)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (user_id, smtp_config_id, email_to, email_type, subject, status,
error_message, attachment_count, attachment_size))
conn.commit()
def _update_email_stats(email_type: str, success: bool):
"""更新邮件统计"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
type_field_map = {
EMAIL_TYPE_REGISTER: 'register_sent',
EMAIL_TYPE_RESET: 'reset_sent',
EMAIL_TYPE_BIND: 'bind_sent',
EMAIL_TYPE_TASK_COMPLETE: 'task_complete_sent'
}
type_field = type_field_map.get(email_type, '')
if success:
if type_field:
cursor.execute(f"""
UPDATE email_stats
SET total_sent = total_sent + 1,
total_success = total_success + 1,
{type_field} = {type_field} + 1,
last_updated = CURRENT_TIMESTAMP
WHERE id = 1
""")
else:
cursor.execute("""
UPDATE email_stats
SET total_sent = total_sent + 1,
total_success = total_success + 1,
last_updated = CURRENT_TIMESTAMP
WHERE id = 1
""")
else:
cursor.execute("""
UPDATE email_stats
SET total_sent = total_sent + 1,
total_failed = total_failed + 1,
last_updated = CURRENT_TIMESTAMP
WHERE id = 1
""")
conn.commit()
def get_email_stats() -> Dict[str, Any]:
"""获取邮件统计"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT total_sent, total_success, total_failed,
register_sent, reset_sent, bind_sent, task_complete_sent,
last_updated
FROM email_stats WHERE id = 1
""")
row = cursor.fetchone()
if row:
return {
'total_sent': row[0],
'total_success': row[1],
'total_failed': row[2],
'register_sent': row[3],
'reset_sent': row[4],
'bind_sent': row[5],
'task_complete_sent': row[6],
'last_updated': row[7],
'success_rate': round(row[1] / row[0] * 100, 1) if row[0] > 0 else 0
}
return {}
def get_email_logs(page: int = 1, page_size: int = 20,
email_type: str = None, status: str = None) -> Dict[str, Any]:
"""获取邮件日志(分页)"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
where_clauses = []
params = []
if email_type:
where_clauses.append("l.email_type = ?")
params.append(email_type)
if status:
where_clauses.append("l.status = ?")
params.append(status)
where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else ""
# 获取总数
cursor.execute(f"SELECT COUNT(*) FROM email_logs l {where_sql}", params)
total = cursor.fetchone()[0]
# 获取分页数据
offset = (page - 1) * page_size
cursor.execute(f"""
SELECT l.id, l.user_id, l.smtp_config_id, l.email_to, l.email_type,
l.subject, l.status, l.error_message, l.attachment_count,
l.attachment_size, l.created_at, u.username, s.name as smtp_name
FROM email_logs l
LEFT JOIN users u ON l.user_id = u.id
LEFT JOIN smtp_configs s ON l.smtp_config_id = s.id
{where_sql}
ORDER BY l.created_at DESC
LIMIT ? OFFSET ?
""", params + [page_size, offset])
logs = []
for row in cursor.fetchall():
logs.append({
'id': row[0],
'user_id': row[1],
'smtp_config_id': row[2],
'email_to': row[3],
'email_type': row[4],
'subject': row[5],
'status': row[6],
'error_message': row[7],
'attachment_count': row[8],
'attachment_size': row[9],
'created_at': row[10],
'username': row[11],
'smtp_name': row[12]
})
return {
'logs': logs,
'total': total,
'page': page,
'page_size': page_size,
'total_pages': (total + page_size - 1) // page_size
}
def cleanup_email_logs(days: int = 30) -> int:
"""清理过期邮件日志"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
DELETE FROM email_logs
WHERE datetime(created_at) < datetime('now', '-' || ? || ' days')
""", (days,))
deleted = cursor.rowcount
conn.commit()
return deleted
# ============ Token管理 ============
def generate_email_token(email: str, token_type: str, user_id: int = None) -> str:
"""生成邮件验证Token"""
token = secrets.token_urlsafe(32)
# 设置过期时间
expire_seconds = {
EMAIL_TYPE_REGISTER: TOKEN_EXPIRE_REGISTER,
EMAIL_TYPE_RESET: TOKEN_EXPIRE_RESET,
EMAIL_TYPE_BIND: TOKEN_EXPIRE_BIND
}.get(token_type, TOKEN_EXPIRE_REGISTER)
expires_at = datetime.now() + timedelta(seconds=expire_seconds)
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO email_tokens (user_id, email, token, token_type, expires_at)
VALUES (?, ?, ?, ?, ?)
""", (user_id, email, token, token_type, expires_at))
conn.commit()
return token
def verify_email_token(token: str, token_type: str) -> Optional[Dict[str, Any]]:
"""
验证Token
Returns:
成功返回 {'user_id': int, 'email': str},失败返回 None
"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, user_id, email, expires_at, used
FROM email_tokens
WHERE token = ? AND token_type = ?
""", (token, token_type))
row = cursor.fetchone()
if not row:
return None
token_id, user_id, email, expires_at, used = row
# 检查是否已使用
if used:
return None
# 检查是否过期
if datetime.strptime(expires_at, '%Y-%m-%d %H:%M:%S') < datetime.now():
return None
# 标记为已使用
cursor.execute("""
UPDATE email_tokens SET used = 1 WHERE id = ?
""", (token_id,))
conn.commit()
return {'user_id': user_id, 'email': email}
def check_rate_limit(email: str, token_type: str) -> bool:
"""
检查发送频率限制
Returns:
True = 可以发送, False = 受限
"""
limit_seconds = {
EMAIL_TYPE_REGISTER: RATE_LIMIT_REGISTER,
EMAIL_TYPE_RESET: RATE_LIMIT_RESET,
EMAIL_TYPE_BIND: RATE_LIMIT_BIND
}.get(token_type, 60)
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT created_at FROM email_tokens
WHERE email = ? AND token_type = ?
ORDER BY created_at DESC
LIMIT 1
""", (email, token_type))
row = cursor.fetchone()
if not row:
return True
last_sent = datetime.strptime(row[0], '%Y-%m-%d %H:%M:%S')
elapsed = (datetime.now() - last_sent).total_seconds()
return elapsed >= limit_seconds
def cleanup_expired_tokens() -> int:
"""清理过期Token"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
DELETE FROM email_tokens
WHERE datetime(expires_at) < datetime('now')
""")
deleted = cursor.rowcount
conn.commit()
return deleted
# ============ 异步发送队列 ============
class EmailQueue:
"""邮件异步发送队列"""
def __init__(self, workers: int = QUEUE_WORKERS, max_size: int = QUEUE_MAX_SIZE):
self.queue = queue.Queue(maxsize=max_size)
self.workers = []
self.running = False
self.worker_count = workers
def start(self):
"""启动队列工作线程"""
if self.running:
return
self.running = True
for i in range(self.worker_count):
worker = threading.Thread(target=self._worker, daemon=True, name=f"EmailWorker-{i+1}")
worker.start()
self.workers.append(worker)
print(f"[邮件服务] 异步队列已启动 ({self.worker_count}个工作线程)")
def stop(self):
"""停止队列"""
self.running = False
# 发送停止信号
for _ in self.workers:
try:
self.queue.put(None, timeout=1)
except queue.Full:
pass
# 等待工作线程结束
for worker in self.workers:
worker.join(timeout=5)
self.workers.clear()
print("[邮件服务] 异步队列已停止")
def _worker(self):
"""工作线程"""
while self.running:
try:
task = self.queue.get(timeout=5)
if task is None:
break
self._process_task(task)
except queue.Empty:
continue
except Exception as e:
print(f"[邮件服务] 队列工作线程错误: {e}")
def _process_task(self, task: Dict):
"""处理邮件任务"""
try:
result = send_email(
to_email=task['to_email'],
subject=task['subject'],
body=task['body'],
html_body=task.get('html_body'),
attachments=task.get('attachments'),
email_type=task.get('email_type', ''),
user_id=task.get('user_id'),
log_callback=task.get('log_callback')
)
# 执行回调
if task.get('callback'):
task['callback'](result)
except Exception as e:
print(f"[邮件服务] 处理邮件任务失败: {e}")
if task.get('callback'):
task['callback']({'success': False, 'error': str(e)})
def enqueue(self, to_email: str, subject: str, body: str,
html_body: str = None, attachments: List[Dict] = None,
email_type: str = '', user_id: int = None,
callback: Callable = None, log_callback: Callable = None) -> bool:
"""
将邮件任务加入队列
Returns:
是否成功加入队列
"""
try:
self.queue.put({
'to_email': to_email,
'subject': subject,
'body': body,
'html_body': html_body,
'attachments': attachments,
'email_type': email_type,
'user_id': user_id,
'callback': callback,
'log_callback': log_callback
}, timeout=5)
return True
except queue.Full:
print("[邮件服务] 邮件队列已满")
return False
@property
def pending_count(self) -> int:
"""队列中待处理的任务数"""
return self.queue.qsize()
# 全局队列实例
_email_queue: Optional[EmailQueue] = None
_queue_lock = threading.Lock()
def get_email_queue() -> EmailQueue:
"""获取全局邮件队列(单例)"""
global _email_queue
with _queue_lock:
if _email_queue is None:
_email_queue = EmailQueue()
_email_queue.start()
return _email_queue
def shutdown_email_queue():
"""关闭邮件队列"""
global _email_queue
with _queue_lock:
if _email_queue:
_email_queue.stop()
_email_queue = None
# ============ 便捷函数 ============
def send_email_async(to_email: str, subject: str, body: str,
html_body: str = None, attachments: List[Dict] = None,
email_type: str = '', user_id: int = None,
callback: Callable = None, log_callback: Callable = None) -> bool:
"""异步发送邮件"""
queue = get_email_queue()
return queue.enqueue(
to_email=to_email,
subject=subject,
body=body,
html_body=html_body,
attachments=attachments,
email_type=email_type,
user_id=user_id,
callback=callback,
log_callback=log_callback
)
def create_zip_attachment(files: List[Dict[str, Any]], zip_filename: str = 'screenshots.zip') -> Dict:
"""
创建ZIP附件
Args:
files: [{'filename': 'xxx.png', 'data': bytes}]
zip_filename: ZIP文件名
Returns:
{'filename': 'xxx.zip', 'data': bytes}
"""
buffer = BytesIO()
with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for f in files:
zf.writestr(f['filename'], f['data'])
return {
'filename': zip_filename,
'data': buffer.getvalue()
}
# ============ 初始化 ============
def init_email_service():
"""初始化邮件服务"""
init_email_tables()
get_email_queue()
print("[邮件服务] 初始化完成")
if __name__ == '__main__':
# 测试代码
init_email_tables()
print("邮件服务模块测试完成")