#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 邮件服务模块 功能: 1. 多SMTP配置管理(主备切换、故障转移) 2. 发送纯文本/HTML邮件 3. 发送带附件邮件(支持ZIP压缩) 4. 异步发送队列 5. 每日发送限额控制 6. 发送日志记录 """ import os import smtplib import threading import queue import time import zipfile import secrets from datetime import datetime, timedelta from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email.mime.image import MIMEImage from email import encoders from email.header import Header from email.utils import formataddr from typing import Optional, List, Dict, Any, Callable from io import BytesIO import db_pool from crypto_utils import encrypt_password, decrypt_password, is_encrypted # ============ 常量配置 ============ # 邮件类型 EMAIL_TYPE_REGISTER = 'register' # 注册验证 EMAIL_TYPE_RESET = 'reset' # 密码重置 EMAIL_TYPE_BIND = 'bind' # 邮箱绑定 EMAIL_TYPE_TASK_COMPLETE = 'task_complete' # 任务完成通知 # Token有效期(秒) TOKEN_EXPIRE_REGISTER = 24 * 60 * 60 # 注册验证: 24小时 TOKEN_EXPIRE_RESET = 30 * 60 # 密码重置: 30分钟 TOKEN_EXPIRE_BIND = 60 * 60 # 邮箱绑定: 1小时 # 发送频率限制(秒) RATE_LIMIT_REGISTER = 60 # 注册邮件: 1分钟 RATE_LIMIT_RESET = 5 * 60 # 重置邮件: 5分钟 RATE_LIMIT_BIND = 60 # 绑定邮件: 1分钟 # 异步队列配置 QUEUE_WORKERS = int(os.environ.get('EMAIL_QUEUE_WORKERS', '2')) QUEUE_MAX_SIZE = int(os.environ.get('EMAIL_QUEUE_MAX_SIZE', '100')) # ============ 数据库操作 ============ def init_email_tables(): """初始化邮件相关数据库表""" with db_pool.get_db() as conn: cursor = conn.cursor() # 1. SMTP配置表(支持多配置) cursor.execute(""" CREATE TABLE IF NOT EXISTS smtp_configs ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL DEFAULT '默认配置', enabled INTEGER DEFAULT 1, is_primary INTEGER DEFAULT 0, priority INTEGER DEFAULT 0, host TEXT NOT NULL DEFAULT '', port INTEGER DEFAULT 465, username TEXT NOT NULL DEFAULT '', password TEXT DEFAULT '', use_ssl INTEGER DEFAULT 1, use_tls INTEGER DEFAULT 0, sender_name TEXT DEFAULT '知识管理平台', sender_email TEXT DEFAULT '', daily_limit INTEGER DEFAULT 0, daily_sent INTEGER DEFAULT 0, daily_reset_date TEXT DEFAULT '', last_success_at TIMESTAMP, last_error TEXT DEFAULT '', success_count INTEGER DEFAULT 0, fail_count INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # 创建索引 cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_smtp_configs_enabled ON smtp_configs(enabled, priority) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_smtp_configs_primary ON smtp_configs(is_primary) """) # 2. 全局邮件设置表 cursor.execute(""" CREATE TABLE IF NOT EXISTS email_settings ( id INTEGER PRIMARY KEY DEFAULT 1, enabled INTEGER DEFAULT 0, failover_enabled INTEGER DEFAULT 1, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # 初始化默认设置 cursor.execute(""" INSERT OR IGNORE INTO email_settings (id, enabled, failover_enabled) VALUES (1, 0, 1) """) # 3. 邮件验证Token表 cursor.execute(""" CREATE TABLE IF NOT EXISTS email_tokens ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER, email TEXT NOT NULL, token TEXT UNIQUE NOT NULL, token_type TEXT NOT NULL, expires_at TIMESTAMP NOT NULL, used INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE ) """) cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_tokens_token ON email_tokens(token)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_tokens_email ON email_tokens(email)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_tokens_expires ON email_tokens(expires_at)") # 4. 邮件发送日志表 cursor.execute(""" CREATE TABLE IF NOT EXISTS email_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER, smtp_config_id INTEGER, email_to TEXT NOT NULL, email_type TEXT NOT NULL, subject TEXT NOT NULL, status TEXT NOT NULL, error_message TEXT, attachment_count INTEGER DEFAULT 0, attachment_size INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE SET NULL, FOREIGN KEY (smtp_config_id) REFERENCES smtp_configs(id) ON DELETE SET NULL ) """) cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_logs_user ON email_logs(user_id)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_logs_status ON email_logs(status)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_logs_type ON email_logs(email_type)") cursor.execute("CREATE INDEX IF NOT EXISTS idx_email_logs_created ON email_logs(created_at)") # 5. 邮件发送统计表 cursor.execute(""" CREATE TABLE IF NOT EXISTS email_stats ( id INTEGER PRIMARY KEY DEFAULT 1, total_sent INTEGER DEFAULT 0, total_success INTEGER DEFAULT 0, total_failed INTEGER DEFAULT 0, register_sent INTEGER DEFAULT 0, reset_sent INTEGER DEFAULT 0, bind_sent INTEGER DEFAULT 0, task_complete_sent INTEGER DEFAULT 0, last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # 初始化统计记录 cursor.execute(""" INSERT OR IGNORE INTO email_stats (id) VALUES (1) """) conn.commit() print("[邮件服务] 数据库表初始化完成") # ============ SMTP配置管理 ============ def get_email_settings() -> Dict[str, Any]: """获取全局邮件设置""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute("SELECT enabled, failover_enabled, updated_at FROM email_settings WHERE id = 1") row = cursor.fetchone() if row: return { 'enabled': bool(row[0]), 'failover_enabled': bool(row[1]), 'updated_at': row[2] } return {'enabled': False, 'failover_enabled': True, 'updated_at': None} def update_email_settings(enabled: bool, failover_enabled: bool) -> bool: """更新全局邮件设置""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute(""" UPDATE email_settings SET enabled = ?, failover_enabled = ?, updated_at = CURRENT_TIMESTAMP WHERE id = 1 """, (int(enabled), int(failover_enabled))) conn.commit() return True def get_smtp_configs(include_password: bool = False) -> List[Dict[str, Any]]: """获取所有SMTP配置列表""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute(""" SELECT id, name, enabled, is_primary, priority, host, port, username, password, use_ssl, use_tls, sender_name, sender_email, daily_limit, daily_sent, daily_reset_date, last_success_at, last_error, success_count, fail_count, created_at, updated_at FROM smtp_configs ORDER BY is_primary DESC, priority ASC, id ASC """) configs = [] for row in cursor.fetchall(): config = { 'id': row[0], 'name': row[1], 'enabled': bool(row[2]), 'is_primary': bool(row[3]), 'priority': row[4], 'host': row[5], 'port': row[6], 'username': row[7], 'password': '******' if row[8] and not include_password else (decrypt_password(row[8]) if include_password and row[8] else ''), 'has_password': bool(row[8]), 'use_ssl': bool(row[9]), 'use_tls': bool(row[10]), 'sender_name': row[11], 'sender_email': row[12], 'daily_limit': row[13], 'daily_sent': row[14], 'daily_reset_date': row[15], 'last_success_at': row[16], 'last_error': row[17], 'success_count': row[18], 'fail_count': row[19], 'created_at': row[20], 'updated_at': row[21] } # 计算成功率 total = config['success_count'] + config['fail_count'] config['success_rate'] = round(config['success_count'] / total * 100, 1) if total > 0 else 0 configs.append(config) return configs def get_smtp_config(config_id: int, include_password: bool = False) -> Optional[Dict[str, Any]]: """获取单个SMTP配置""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute(""" SELECT id, name, enabled, is_primary, priority, host, port, username, password, use_ssl, use_tls, sender_name, sender_email, daily_limit, daily_sent, daily_reset_date, last_success_at, last_error, success_count, fail_count, created_at, updated_at FROM smtp_configs WHERE id = ? """, (config_id,)) row = cursor.fetchone() if not row: return None return { 'id': row[0], 'name': row[1], 'enabled': bool(row[2]), 'is_primary': bool(row[3]), 'priority': row[4], 'host': row[5], 'port': row[6], 'username': row[7], 'password': '******' if row[8] and not include_password else (decrypt_password(row[8]) if include_password and row[8] else ''), 'has_password': bool(row[8]), 'use_ssl': bool(row[9]), 'use_tls': bool(row[10]), 'sender_name': row[11], 'sender_email': row[12], 'daily_limit': row[13], 'daily_sent': row[14], 'daily_reset_date': row[15], 'last_success_at': row[16], 'last_error': row[17], 'success_count': row[18], 'fail_count': row[19], 'created_at': row[20], 'updated_at': row[21] } def create_smtp_config(data: Dict[str, Any]) -> int: """创建新的SMTP配置""" with db_pool.get_db() as conn: cursor = conn.cursor() # 加密密码 password = encrypt_password(data.get('password', '')) if data.get('password') else '' cursor.execute(""" INSERT INTO smtp_configs (name, enabled, is_primary, priority, host, port, username, password, use_ssl, use_tls, sender_name, sender_email, daily_limit) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( data.get('name', '默认配置'), int(data.get('enabled', True)), int(data.get('is_primary', False)), data.get('priority', 0), data.get('host', ''), data.get('port', 465), data.get('username', ''), password, int(data.get('use_ssl', True)), int(data.get('use_tls', False)), data.get('sender_name', '知识管理平台'), data.get('sender_email', ''), data.get('daily_limit', 0) )) config_id = cursor.lastrowid conn.commit() return config_id def update_smtp_config(config_id: int, data: Dict[str, Any]) -> bool: """更新SMTP配置""" with db_pool.get_db() as conn: cursor = conn.cursor() updates = [] params = [] field_mapping = { 'name': 'name', 'enabled': 'enabled', 'priority': 'priority', 'host': 'host', 'port': 'port', 'username': 'username', 'use_ssl': 'use_ssl', 'use_tls': 'use_tls', 'sender_name': 'sender_name', 'sender_email': 'sender_email', 'daily_limit': 'daily_limit' } for key, db_field in field_mapping.items(): if key in data: value = data[key] if key in ('enabled', 'use_ssl', 'use_tls'): value = int(value) updates.append(f"{db_field} = ?") params.append(value) # 密码特殊处理:只有当提供了新密码时才更新 if 'password' in data and data['password'] and data['password'] != '******': updates.append("password = ?") params.append(encrypt_password(data['password'])) if not updates: return False updates.append("updated_at = CURRENT_TIMESTAMP") params.append(config_id) cursor.execute(f""" UPDATE smtp_configs SET {', '.join(updates)} WHERE id = ? """, params) conn.commit() return cursor.rowcount > 0 def delete_smtp_config(config_id: int) -> bool: """删除SMTP配置""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM smtp_configs WHERE id = ?", (config_id,)) conn.commit() return cursor.rowcount > 0 def set_primary_smtp_config(config_id: int) -> bool: """设置主SMTP配置""" with db_pool.get_db() as conn: cursor = conn.cursor() # 先取消所有主配置标记 cursor.execute("UPDATE smtp_configs SET is_primary = 0") # 设置新的主配置 cursor.execute(""" UPDATE smtp_configs SET is_primary = 1, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (config_id,)) conn.commit() return cursor.rowcount > 0 def _get_available_smtp_config(failover: bool = True) -> Optional[Dict[str, Any]]: """ 获取可用的SMTP配置 优先级: 主配置 > 按priority排序的启用配置 """ today = datetime.now().strftime('%Y-%m-%d') with db_pool.get_db() as conn: cursor = conn.cursor() # 先重置过期的每日计数 cursor.execute(""" UPDATE smtp_configs SET daily_sent = 0, daily_reset_date = ? WHERE daily_reset_date != ? OR daily_reset_date IS NULL OR daily_reset_date = '' """, (today, today)) conn.commit() # 获取所有启用的配置,按优先级排序 cursor.execute(""" SELECT id, name, host, port, username, password, use_ssl, use_tls, sender_name, sender_email, daily_limit, daily_sent, is_primary FROM smtp_configs WHERE enabled = 1 ORDER BY is_primary DESC, priority ASC, id ASC """) configs = cursor.fetchall() for row in configs: config_id, name, host, port, username, password, use_ssl, use_tls, \ sender_name, sender_email, daily_limit, daily_sent, is_primary = row # 检查每日限额 if daily_limit > 0 and daily_sent >= daily_limit: continue # 超过限额,跳过此配置 # 解密密码 decrypted_password = decrypt_password(password) if password else '' return { 'id': config_id, 'name': name, 'host': host, 'port': port, 'username': username, 'password': decrypted_password, 'use_ssl': bool(use_ssl), 'use_tls': bool(use_tls), 'sender_name': sender_name, 'sender_email': sender_email, 'is_primary': bool(is_primary) } return None def _update_smtp_stats(config_id: int, success: bool, error: str = ''): """更新SMTP配置的统计信息""" with db_pool.get_db() as conn: cursor = conn.cursor() if success: cursor.execute(""" UPDATE smtp_configs SET daily_sent = daily_sent + 1, success_count = success_count + 1, last_success_at = CURRENT_TIMESTAMP, last_error = '', updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (config_id,)) else: cursor.execute(""" UPDATE smtp_configs SET fail_count = fail_count + 1, last_error = ?, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (error[:500], config_id)) conn.commit() # ============ 邮件发送核心 ============ class EmailSender: """邮件发送器""" def __init__(self, config: Dict[str, Any]): self.config = config self.server = None def connect(self) -> bool: """连接SMTP服务器""" try: if self.config['use_ssl']: self.server = smtplib.SMTP_SSL( self.config['host'], self.config['port'], timeout=30 ) else: self.server = smtplib.SMTP( self.config['host'], self.config['port'], timeout=30 ) if self.config['use_tls']: self.server.starttls() self.server.login(self.config['username'], self.config['password']) return True except Exception as e: print(f"[邮件服务] SMTP连接失败 [{self.config['name']}]: {e}") self.server = None raise def disconnect(self): """断开SMTP连接""" if self.server: try: self.server.quit() except Exception: pass self.server = None def send(self, to_email: str, subject: str, body: str, html_body: str = None, attachments: List[Dict] = None) -> bool: """ 发送邮件 Args: to_email: 收件人邮箱 subject: 邮件主题 body: 纯文本正文 html_body: HTML正文(可选) attachments: 附件列表 [{'filename': 'xxx', 'data': bytes, 'mime_type': 'xxx'}] """ try: if not self.server: self.connect() # 构建邮件 if html_body or attachments: msg = MIMEMultipart('mixed') # 添加正文 if html_body: text_part = MIMEMultipart('alternative') text_part.attach(MIMEText(body, 'plain', 'utf-8')) text_part.attach(MIMEText(html_body, 'html', 'utf-8')) msg.attach(text_part) else: msg.attach(MIMEText(body, 'plain', 'utf-8')) # 添加附件 if attachments: for att in attachments: part = MIMEBase('application', 'octet-stream') part.set_payload(att['data']) encoders.encode_base64(part) part.add_header( 'Content-Disposition', 'attachment', filename=('utf-8', '', att['filename']) ) msg.attach(part) else: msg = MIMEText(body, 'plain', 'utf-8') # 设置邮件头 msg['Subject'] = Header(subject, 'utf-8') msg['From'] = formataddr(( self.config['sender_name'], self.config['sender_email'] or self.config['username'] )) msg['To'] = to_email # 发送 self.server.sendmail( self.config['sender_email'] or self.config['username'], [to_email], msg.as_string() ) return True except Exception as e: print(f"[邮件服务] 发送失败: {e}") raise def send_email( to_email: str, subject: str, body: str, html_body: str = None, attachments: List[Dict] = None, email_type: str = '', user_id: int = None, log_callback: Callable = None ) -> Dict[str, Any]: """ 发送邮件(带故障转移) Returns: {'success': bool, 'error': str, 'config_id': int} """ # 检查全局开关 settings = get_email_settings() if not settings['enabled']: return {'success': False, 'error': '邮件功能未启用', 'config_id': None} # 获取可用配置 config = _get_available_smtp_config(settings['failover_enabled']) if not config: return {'success': False, 'error': '没有可用的SMTP配置', 'config_id': None} tried_configs = [] last_error = '' while config: tried_configs.append(config['id']) sender = EmailSender(config) try: sender.connect() sender.send(to_email, subject, body, html_body, attachments) sender.disconnect() # 更新统计 _update_smtp_stats(config['id'], True) _log_email_send(user_id, config['id'], to_email, email_type, subject, 'success', '', attachments) _update_email_stats(email_type, True) if log_callback: log_callback(f"[邮件服务] 发送成功: {to_email} (使用: {config['name']})") return {'success': True, 'error': '', 'config_id': config['id']} except Exception as e: last_error = str(e) sender.disconnect() # 更新失败统计 _update_smtp_stats(config['id'], False, last_error) if log_callback: log_callback(f"[邮件服务] 发送失败 [{config['name']}]: {e}") # 故障转移:尝试下一个配置 if settings['failover_enabled']: config = _get_next_available_smtp_config(tried_configs) else: config = None # 所有配置都失败 _log_email_send(user_id, tried_configs[0] if tried_configs else None, to_email, email_type, subject, 'failed', last_error, attachments) _update_email_stats(email_type, False) return {'success': False, 'error': last_error, 'config_id': tried_configs[0] if tried_configs else None} def _get_next_available_smtp_config(exclude_ids: List[int]) -> Optional[Dict[str, Any]]: """获取下一个可用的SMTP配置(排除已尝试的)""" today = datetime.now().strftime('%Y-%m-%d') with db_pool.get_db() as conn: cursor = conn.cursor() placeholders = ','.join(['?' for _ in exclude_ids]) cursor.execute(f""" SELECT id, name, host, port, username, password, use_ssl, use_tls, sender_name, sender_email, daily_limit, daily_sent, is_primary FROM smtp_configs WHERE enabled = 1 AND id NOT IN ({placeholders}) ORDER BY is_primary DESC, priority ASC, id ASC LIMIT 1 """, exclude_ids) row = cursor.fetchone() if not row: return None config_id, name, host, port, username, password, use_ssl, use_tls, \ sender_name, sender_email, daily_limit, daily_sent, is_primary = row # 检查每日限额 if daily_limit > 0 and daily_sent >= daily_limit: return _get_next_available_smtp_config(exclude_ids + [config_id]) return { 'id': config_id, 'name': name, 'host': host, 'port': port, 'username': username, 'password': decrypt_password(password) if password else '', 'use_ssl': bool(use_ssl), 'use_tls': bool(use_tls), 'sender_name': sender_name, 'sender_email': sender_email, 'is_primary': bool(is_primary) } def test_smtp_config(config_id: int, test_email: str) -> Dict[str, Any]: """测试SMTP配置""" config = get_smtp_config(config_id, include_password=True) if not config: return {'success': False, 'error': '配置不存在'} sender = EmailSender({ 'name': config['name'], 'host': config['host'], 'port': config['port'], 'username': config['username'], 'password': config['password'], 'use_ssl': config['use_ssl'], 'use_tls': config['use_tls'], 'sender_name': config['sender_name'], 'sender_email': config['sender_email'] }) try: sender.connect() sender.send( test_email, '知识管理平台 - SMTP配置测试', f'这是一封测试邮件。\n\n配置名称: {config["name"]}\nSMTP服务器: {config["host"]}:{config["port"]}\n\n如果您收到此邮件,说明SMTP配置正确。', None, None ) sender.disconnect() # 更新最后成功时间 with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute(""" UPDATE smtp_configs SET last_success_at = CURRENT_TIMESTAMP, last_error = '' WHERE id = ? """, (config_id,)) conn.commit() return {'success': True, 'error': ''} except Exception as e: # 记录错误 with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute(""" UPDATE smtp_configs SET last_error = ? WHERE id = ? """, (str(e)[:500], config_id)) conn.commit() return {'success': False, 'error': str(e)} # ============ 邮件日志 ============ def _log_email_send(user_id: int, smtp_config_id: int, email_to: str, email_type: str, subject: str, status: str, error_message: str, attachments: List[Dict] = None): """记录邮件发送日志""" attachment_count = len(attachments) if attachments else 0 attachment_size = sum(len(a.get('data', b'')) for a in attachments) if attachments else 0 with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO email_logs (user_id, smtp_config_id, email_to, email_type, subject, status, error_message, attachment_count, attachment_size) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, (user_id, smtp_config_id, email_to, email_type, subject, status, error_message, attachment_count, attachment_size)) conn.commit() def _update_email_stats(email_type: str, success: bool): """更新邮件统计""" with db_pool.get_db() as conn: cursor = conn.cursor() type_field_map = { EMAIL_TYPE_REGISTER: 'register_sent', EMAIL_TYPE_RESET: 'reset_sent', EMAIL_TYPE_BIND: 'bind_sent', EMAIL_TYPE_TASK_COMPLETE: 'task_complete_sent' } type_field = type_field_map.get(email_type, '') if success: if type_field: cursor.execute(f""" UPDATE email_stats SET total_sent = total_sent + 1, total_success = total_success + 1, {type_field} = {type_field} + 1, last_updated = CURRENT_TIMESTAMP WHERE id = 1 """) else: cursor.execute(""" UPDATE email_stats SET total_sent = total_sent + 1, total_success = total_success + 1, last_updated = CURRENT_TIMESTAMP WHERE id = 1 """) else: cursor.execute(""" UPDATE email_stats SET total_sent = total_sent + 1, total_failed = total_failed + 1, last_updated = CURRENT_TIMESTAMP WHERE id = 1 """) conn.commit() def get_email_stats() -> Dict[str, Any]: """获取邮件统计""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute(""" SELECT total_sent, total_success, total_failed, register_sent, reset_sent, bind_sent, task_complete_sent, last_updated FROM email_stats WHERE id = 1 """) row = cursor.fetchone() if row: return { 'total_sent': row[0], 'total_success': row[1], 'total_failed': row[2], 'register_sent': row[3], 'reset_sent': row[4], 'bind_sent': row[5], 'task_complete_sent': row[6], 'last_updated': row[7], 'success_rate': round(row[1] / row[0] * 100, 1) if row[0] > 0 else 0 } return {} def get_email_logs(page: int = 1, page_size: int = 20, email_type: str = None, status: str = None) -> Dict[str, Any]: """获取邮件日志(分页)""" with db_pool.get_db() as conn: cursor = conn.cursor() where_clauses = [] params = [] if email_type: where_clauses.append("l.email_type = ?") params.append(email_type) if status: where_clauses.append("l.status = ?") params.append(status) where_sql = f"WHERE {' AND '.join(where_clauses)}" if where_clauses else "" # 获取总数 cursor.execute(f"SELECT COUNT(*) FROM email_logs l {where_sql}", params) total = cursor.fetchone()[0] # 获取分页数据 offset = (page - 1) * page_size cursor.execute(f""" SELECT l.id, l.user_id, l.smtp_config_id, l.email_to, l.email_type, l.subject, l.status, l.error_message, l.attachment_count, l.attachment_size, l.created_at, u.username, s.name as smtp_name FROM email_logs l LEFT JOIN users u ON l.user_id = u.id LEFT JOIN smtp_configs s ON l.smtp_config_id = s.id {where_sql} ORDER BY l.created_at DESC LIMIT ? OFFSET ? """, params + [page_size, offset]) logs = [] for row in cursor.fetchall(): logs.append({ 'id': row[0], 'user_id': row[1], 'smtp_config_id': row[2], 'email_to': row[3], 'email_type': row[4], 'subject': row[5], 'status': row[6], 'error_message': row[7], 'attachment_count': row[8], 'attachment_size': row[9], 'created_at': row[10], 'username': row[11], 'smtp_name': row[12] }) return { 'logs': logs, 'total': total, 'page': page, 'page_size': page_size, 'total_pages': (total + page_size - 1) // page_size } def cleanup_email_logs(days: int = 30) -> int: """清理过期邮件日志""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute(""" DELETE FROM email_logs WHERE datetime(created_at) < datetime('now', '-' || ? || ' days') """, (days,)) deleted = cursor.rowcount conn.commit() return deleted # ============ Token管理 ============ def generate_email_token(email: str, token_type: str, user_id: int = None) -> str: """生成邮件验证Token""" token = secrets.token_urlsafe(32) # 设置过期时间 expire_seconds = { EMAIL_TYPE_REGISTER: TOKEN_EXPIRE_REGISTER, EMAIL_TYPE_RESET: TOKEN_EXPIRE_RESET, EMAIL_TYPE_BIND: TOKEN_EXPIRE_BIND }.get(token_type, TOKEN_EXPIRE_REGISTER) expires_at = datetime.now() + timedelta(seconds=expire_seconds) with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO email_tokens (user_id, email, token, token_type, expires_at) VALUES (?, ?, ?, ?, ?) """, (user_id, email, token, token_type, expires_at)) conn.commit() return token def verify_email_token(token: str, token_type: str) -> Optional[Dict[str, Any]]: """ 验证Token Returns: 成功返回 {'user_id': int, 'email': str},失败返回 None """ with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute(""" SELECT id, user_id, email, expires_at, used FROM email_tokens WHERE token = ? AND token_type = ? """, (token, token_type)) row = cursor.fetchone() if not row: return None token_id, user_id, email, expires_at, used = row # 检查是否已使用 if used: return None # 检查是否过期 if datetime.strptime(expires_at, '%Y-%m-%d %H:%M:%S') < datetime.now(): return None # 标记为已使用 cursor.execute(""" UPDATE email_tokens SET used = 1 WHERE id = ? """, (token_id,)) conn.commit() return {'user_id': user_id, 'email': email} def check_rate_limit(email: str, token_type: str) -> bool: """ 检查发送频率限制 Returns: True = 可以发送, False = 受限 """ limit_seconds = { EMAIL_TYPE_REGISTER: RATE_LIMIT_REGISTER, EMAIL_TYPE_RESET: RATE_LIMIT_RESET, EMAIL_TYPE_BIND: RATE_LIMIT_BIND }.get(token_type, 60) with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute(""" SELECT created_at FROM email_tokens WHERE email = ? AND token_type = ? ORDER BY created_at DESC LIMIT 1 """, (email, token_type)) row = cursor.fetchone() if not row: return True last_sent = datetime.strptime(row[0], '%Y-%m-%d %H:%M:%S') elapsed = (datetime.now() - last_sent).total_seconds() return elapsed >= limit_seconds def cleanup_expired_tokens() -> int: """清理过期Token""" with db_pool.get_db() as conn: cursor = conn.cursor() cursor.execute(""" DELETE FROM email_tokens WHERE datetime(expires_at) < datetime('now') """) deleted = cursor.rowcount conn.commit() return deleted # ============ 异步发送队列 ============ class EmailQueue: """邮件异步发送队列""" def __init__(self, workers: int = QUEUE_WORKERS, max_size: int = QUEUE_MAX_SIZE): self.queue = queue.Queue(maxsize=max_size) self.workers = [] self.running = False self.worker_count = workers def start(self): """启动队列工作线程""" if self.running: return self.running = True for i in range(self.worker_count): worker = threading.Thread(target=self._worker, daemon=True, name=f"EmailWorker-{i+1}") worker.start() self.workers.append(worker) print(f"[邮件服务] 异步队列已启动 ({self.worker_count}个工作线程)") def stop(self): """停止队列""" self.running = False # 发送停止信号 for _ in self.workers: try: self.queue.put(None, timeout=1) except queue.Full: pass # 等待工作线程结束 for worker in self.workers: worker.join(timeout=5) self.workers.clear() print("[邮件服务] 异步队列已停止") def _worker(self): """工作线程""" while self.running: try: task = self.queue.get(timeout=5) if task is None: break self._process_task(task) except queue.Empty: continue except Exception as e: print(f"[邮件服务] 队列工作线程错误: {e}") def _process_task(self, task: Dict): """处理邮件任务""" try: result = send_email( to_email=task['to_email'], subject=task['subject'], body=task['body'], html_body=task.get('html_body'), attachments=task.get('attachments'), email_type=task.get('email_type', ''), user_id=task.get('user_id'), log_callback=task.get('log_callback') ) # 执行回调 if task.get('callback'): task['callback'](result) except Exception as e: print(f"[邮件服务] 处理邮件任务失败: {e}") if task.get('callback'): task['callback']({'success': False, 'error': str(e)}) def enqueue(self, to_email: str, subject: str, body: str, html_body: str = None, attachments: List[Dict] = None, email_type: str = '', user_id: int = None, callback: Callable = None, log_callback: Callable = None) -> bool: """ 将邮件任务加入队列 Returns: 是否成功加入队列 """ try: self.queue.put({ 'to_email': to_email, 'subject': subject, 'body': body, 'html_body': html_body, 'attachments': attachments, 'email_type': email_type, 'user_id': user_id, 'callback': callback, 'log_callback': log_callback }, timeout=5) return True except queue.Full: print("[邮件服务] 邮件队列已满") return False @property def pending_count(self) -> int: """队列中待处理的任务数""" return self.queue.qsize() # 全局队列实例 _email_queue: Optional[EmailQueue] = None _queue_lock = threading.Lock() def get_email_queue() -> EmailQueue: """获取全局邮件队列(单例)""" global _email_queue with _queue_lock: if _email_queue is None: _email_queue = EmailQueue() _email_queue.start() return _email_queue def shutdown_email_queue(): """关闭邮件队列""" global _email_queue with _queue_lock: if _email_queue: _email_queue.stop() _email_queue = None # ============ 便捷函数 ============ def send_email_async(to_email: str, subject: str, body: str, html_body: str = None, attachments: List[Dict] = None, email_type: str = '', user_id: int = None, callback: Callable = None, log_callback: Callable = None) -> bool: """异步发送邮件""" queue = get_email_queue() return queue.enqueue( to_email=to_email, subject=subject, body=body, html_body=html_body, attachments=attachments, email_type=email_type, user_id=user_id, callback=callback, log_callback=log_callback ) def create_zip_attachment(files: List[Dict[str, Any]], zip_filename: str = 'screenshots.zip') -> Dict: """ 创建ZIP附件 Args: files: [{'filename': 'xxx.png', 'data': bytes}] zip_filename: ZIP文件名 Returns: {'filename': 'xxx.zip', 'data': bytes} """ buffer = BytesIO() with zipfile.ZipFile(buffer, 'w', zipfile.ZIP_DEFLATED) as zf: for f in files: zf.writestr(f['filename'], f['data']) return { 'filename': zip_filename, 'data': buffer.getvalue() } # ============ 初始化 ============ def init_email_service(): """初始化邮件服务""" init_email_tables() get_email_queue() print("[邮件服务] 初始化完成") if __name__ == '__main__': # 测试代码 init_email_tables() print("邮件服务模块测试完成")