Files
zsglpt/app_utils.py
Yu Yon b5344cd55e 修复所有bug并添加新功能
- 修复添加账号按钮无反应问题
- 添加账号备注字段(可选)
- 添加账号设置按钮(修改密码/备注)
- 修复用户反馈���能
- 添加定时任务执行日志
- 修复容器重启后账号加载问题
- 修复所有JavaScript语法错误
- 优化账号加载机制(4层保障)

🤖 Generated with Claude Code
2025-12-10 11:19:16 +08:00

356 lines
10 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
应用工具模块
提取重复的业务逻辑
"""
from typing import Dict, Any, Optional, Tuple
from flask import session, jsonify
from app_logger import get_logger, audit_logger
from app_security import get_client_ip
import database
logger = get_logger('app_utils')
class ValidationError(Exception):
"""验证错误异常"""
pass
def verify_user_file_permission(user_id: int, filename: str) -> Tuple[bool, Optional[str]]:
"""
验证用户文件访问权限
Args:
user_id: 用户ID
filename: 文件名
Returns:
(是否有权限, 错误消息)
"""
# 获取用户信息
user = database.get_user_by_id(user_id)
if not user:
return False, "用户不存在"
username = user['username']
# 检查文件名是否以用户名开头
if not filename.startswith(f"{username}_"):
logger.warning(f"用户 {username} (ID:{user_id}) 尝试访问未授权文件: {filename}")
return False, "无权访问此文件"
return True, None
def log_task_event(account_id: int, status: str, message: str,
browse_type: Optional[str] = None,
screenshot_path: Optional[str] = None) -> bool:
"""
记录任务日志(统一接口)
Args:
account_id: 账号ID
status: 状态running/completed/failed/stopped
message: 消息
browse_type: 浏览类型
screenshot_path: 截图路径
Returns:
是否成功
"""
try:
return database.create_task_log(
account_id=account_id,
status=status,
message=message,
browse_type=browse_type,
screenshot_path=screenshot_path
)
except Exception as e:
logger.error(f"记录任务日志失败: {e}", exc_info=True)
return False
def update_account_status(account_id: int, status: str,
error_message: Optional[str] = None) -> bool:
"""
更新账号状态(统一接口)
Args:
account_id: 账号ID
status: 状态idle/running/error/stopped
error_message: 错误消息仅当status=error时
Returns:
是否成功
"""
try:
return database.update_account_status(
account_id=account_id,
status=status,
error_message=error_message
)
except Exception as e:
logger.error(f"更新账号状态失败 (account_id={account_id}): {e}", exc_info=True)
return False
def get_or_create_config_cache() -> Optional[Dict[str, Any]]:
"""
获取或创建系统配置缓存
缓存存储在session中避免重复查询数据库
Returns:
配置字典失败返回None
"""
# 尝试从session获取缓存
if '_system_config' in session:
return session['_system_config']
# 从数据库加载
try:
config = database.get_system_config()
if config:
# 存入session缓存
session['_system_config'] = config
return config
return None
except Exception as e:
logger.error(f"获取系统配置失败: {e}", exc_info=True)
return None
def clear_config_cache():
"""清除配置缓存(配置变更时调用)"""
if '_system_config' in session:
del session['_system_config']
logger.debug("已清除系统配置缓存")
def safe_close_browser(automation_obj, account_id: int):
"""
安全关闭浏览器(统一错误处理)
Args:
automation_obj: PlaywrightAutomation对象
account_id: 账号ID
"""
if automation_obj:
try:
automation_obj.close()
logger.info(f"账号 {account_id} 的浏览器已关闭")
except Exception as e:
logger.error(f"关闭账号 {account_id} 的浏览器失败: {e}", exc_info=True)
def format_error_response(error: str, status_code: int = 400,
need_captcha: bool = False,
extra_data: Optional[Dict] = None) -> Tuple[Any, int]:
"""
格式化错误响应(统一接口)
Args:
error: 错误消息
status_code: HTTP状态码
need_captcha: 是否需要验证码
extra_data: 额外数据
Returns:
(jsonify响应, 状态码)
"""
response_data = {"error": error}
if need_captcha:
response_data["need_captcha"] = True
if extra_data:
response_data.update(extra_data)
return jsonify(response_data), status_code
def format_success_response(message: str = "操作成功",
extra_data: Optional[Dict] = None) -> Any:
"""
格式化成功响应(统一接口)
Args:
message: 成功消息
extra_data: 额外数据
Returns:
jsonify响应
"""
response_data = {"success": True, "message": message}
if extra_data:
response_data.update(extra_data)
return jsonify(response_data)
def log_user_action(action: str, user_id: int, username: str,
success: bool, details: Optional[str] = None):
"""
记录用户操作到审计日志(统一接口)
Args:
action: 操作类型login/register/logout等
user_id: 用户ID
username: 用户名
success: 是否成功
details: 详细信息
"""
ip = get_client_ip()
if action == 'login':
audit_logger.log_user_login(user_id, username, ip, success)
elif action == 'logout':
audit_logger.log_user_logout(user_id, username, ip)
elif action == 'register':
audit_logger.log_user_created(user_id, username, created_by='self')
if details:
logger.info(f"用户操作: {action}, 用户={username}, 成功={success}, 详情={details}")
def validate_pagination(page: Any, page_size: Any,
max_page_size: int = 100) -> Tuple[int, int, Optional[str]]:
"""
验证分页参数
Args:
page: 页码
page_size: 每页大小
max_page_size: 最大每页大小
Returns:
(页码, 每页大小, 错误消息)
"""
try:
page = int(page) if page else 1
page_size = int(page_size) if page_size else 20
except (ValueError, TypeError):
return 1, 20, "无效的分页参数"
if page < 1:
return 1, 20, "页码必须大于0"
if page_size < 1 or page_size > max_page_size:
return page, 20, f"每页大小必须在1-{max_page_size}之间"
return page, page_size, None
def check_user_ownership(user_id: int, resource_type: str,
resource_id: int) -> Tuple[bool, Optional[str]]:
"""
检查用户是否拥有资源
Args:
user_id: 用户ID
resource_type: 资源类型account/task等
resource_id: 资源ID
Returns:
(是否拥有, 错误消息)
"""
try:
if resource_type == 'account':
account = database.get_account_by_id(resource_id)
if not account:
return False, "账号不存在"
if account['user_id'] != user_id:
return False, "无权访问此账号"
return True, None
elif resource_type == 'task':
# 通过account查询所属用户
# 这里需要根据实际数据库结构实现
pass
return False, "不支持的资源类型"
except Exception as e:
logger.error(f"检查资源所有权失败: {e}", exc_info=True)
return False, "系统错误"
def verify_and_consume_captcha(session_id: str, code: str, captcha_storage: dict, max_attempts: int = 5) -> Tuple[bool, str]:
"""
验证并消费验证码
Args:
session_id: 验证码会话ID
code: 用户输入的验证码
captcha_storage: 验证码存储字典
max_attempts: 最大尝试次数默认5次
Returns:
Tuple[bool, str]: (是否成功, 消息)
- 成功时返回 (True, "验证成功")
- 失败时返回 (False, 错误消息)
Example:
success, message = verify_and_consume_captcha(
captcha_session,
captcha_code,
captcha_storage,
max_attempts=5
)
if not success:
return jsonify({"error": message}), 400
"""
import time
# 检查验证码是否存在
if session_id not in captcha_storage:
return False, "验证码已过期或不存在,请重新获取"
captcha_data = captcha_storage[session_id]
# 检查过期时间
if captcha_data["expire_time"] < time.time():
del captcha_storage[session_id]
return False, "验证码已过期,请重新获取"
# 检查尝试次数
if captcha_data.get("failed_attempts", 0) >= max_attempts:
del captcha_storage[session_id]
return False, f"验证码错误次数过多({max_attempts}次),请重新获取"
# 验证代码(不区分大小写)
if captcha_data["code"].lower() != code.lower():
captcha_data["failed_attempts"] = captcha_data.get("failed_attempts", 0) + 1
return False, "验证码错误"
# 验证成功,删除验证码(防止重复使用)
del captcha_storage[session_id]
return True, "验证成功"
if __name__ == '__main__':
# 测试代码
print("测试应用工具模块...")
print("=" * 60)
# 测试分页验证
print("\n1. 测试分页验证:")
page, page_size, error = validate_pagination("2", "50")
print(f" 页码={page}, 每页={page_size}, 错误={error}")
page, page_size, error = validate_pagination("invalid", "50")
print(f" 无效输入: 页码={page}, 每页={page_size}, 错误={error}")
# 测试响应格式化
print("\n2. 测试响应格式化:")
print(f" 错误响应: {format_error_response('测试错误', need_captcha=True)}")
print(f" 成功响应: {format_success_response('测试成功', {'data': [1, 2, 3]})}")
print("\n" + "=" * 60)
print("✓ 工具模块加载成功!")