Files
zsglpt/app_utils.py
Yu Yon 0fd7137cea Initial commit: 知识管理平台
主要功能:
- 多用户管理系统
- 浏览器自动化(Playwright)
- 任务编排和执行
- Docker容器化部署
- 数据持久化和日志管理

技术栈:
- Flask 3.0.0
- Playwright 1.40.0
- SQLite with connection pooling
- Docker + Docker Compose

部署说明详见README.md
2025-11-16 19:03:07 +08:00

303 lines
8.3 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
应用工具模块
提取重复的业务逻辑
"""
from typing import Dict, Any, Optional, Tuple
from flask import session, jsonify
from app_logger import get_logger, audit_logger
from app_security import get_client_ip
import database
logger = get_logger('app_utils')
class ValidationError(Exception):
"""验证错误异常"""
pass
def verify_user_file_permission(user_id: int, filename: str) -> Tuple[bool, Optional[str]]:
"""
验证用户文件访问权限
Args:
user_id: 用户ID
filename: 文件名
Returns:
(是否有权限, 错误消息)
"""
# 获取用户信息
user = database.get_user_by_id(user_id)
if not user:
return False, "用户不存在"
username = user['username']
# 检查文件名是否以用户名开头
if not filename.startswith(f"{username}_"):
logger.warning(f"用户 {username} (ID:{user_id}) 尝试访问未授权文件: {filename}")
return False, "无权访问此文件"
return True, None
def log_task_event(account_id: int, status: str, message: str,
browse_type: Optional[str] = None,
screenshot_path: Optional[str] = None) -> bool:
"""
记录任务日志(统一接口)
Args:
account_id: 账号ID
status: 状态running/completed/failed/stopped
message: 消息
browse_type: 浏览类型
screenshot_path: 截图路径
Returns:
是否成功
"""
try:
return database.create_task_log(
account_id=account_id,
status=status,
message=message,
browse_type=browse_type,
screenshot_path=screenshot_path
)
except Exception as e:
logger.error(f"记录任务日志失败: {e}", exc_info=True)
return False
def update_account_status(account_id: int, status: str,
error_message: Optional[str] = None) -> bool:
"""
更新账号状态(统一接口)
Args:
account_id: 账号ID
status: 状态idle/running/error/stopped
error_message: 错误消息仅当status=error时
Returns:
是否成功
"""
try:
return database.update_account_status(
account_id=account_id,
status=status,
error_message=error_message
)
except Exception as e:
logger.error(f"更新账号状态失败 (account_id={account_id}): {e}", exc_info=True)
return False
def get_or_create_config_cache() -> Optional[Dict[str, Any]]:
"""
获取或创建系统配置缓存
缓存存储在session中避免重复查询数据库
Returns:
配置字典失败返回None
"""
# 尝试从session获取缓存
if '_system_config' in session:
return session['_system_config']
# 从数据库加载
try:
config = database.get_system_config()
if config:
# 存入session缓存
session['_system_config'] = config
return config
return None
except Exception as e:
logger.error(f"获取系统配置失败: {e}", exc_info=True)
return None
def clear_config_cache():
"""清除配置缓存(配置变更时调用)"""
if '_system_config' in session:
del session['_system_config']
logger.debug("已清除系统配置缓存")
def safe_close_browser(automation_obj, account_id: int):
"""
安全关闭浏览器(统一错误处理)
Args:
automation_obj: PlaywrightAutomation对象
account_id: 账号ID
"""
if automation_obj:
try:
automation_obj.close()
logger.info(f"账号 {account_id} 的浏览器已关闭")
except Exception as e:
logger.error(f"关闭账号 {account_id} 的浏览器失败: {e}", exc_info=True)
def format_error_response(error: str, status_code: int = 400,
need_captcha: bool = False,
extra_data: Optional[Dict] = None) -> Tuple[Any, int]:
"""
格式化错误响应(统一接口)
Args:
error: 错误消息
status_code: HTTP状态码
need_captcha: 是否需要验证码
extra_data: 额外数据
Returns:
(jsonify响应, 状态码)
"""
response_data = {"error": error}
if need_captcha:
response_data["need_captcha"] = True
if extra_data:
response_data.update(extra_data)
return jsonify(response_data), status_code
def format_success_response(message: str = "操作成功",
extra_data: Optional[Dict] = None) -> Any:
"""
格式化成功响应(统一接口)
Args:
message: 成功消息
extra_data: 额外数据
Returns:
jsonify响应
"""
response_data = {"success": True, "message": message}
if extra_data:
response_data.update(extra_data)
return jsonify(response_data)
def log_user_action(action: str, user_id: int, username: str,
success: bool, details: Optional[str] = None):
"""
记录用户操作到审计日志(统一接口)
Args:
action: 操作类型login/register/logout等
user_id: 用户ID
username: 用户名
success: 是否成功
details: 详细信息
"""
ip = get_client_ip()
if action == 'login':
audit_logger.log_user_login(user_id, username, ip, success)
elif action == 'logout':
audit_logger.log_user_logout(user_id, username, ip)
elif action == 'register':
audit_logger.log_user_created(user_id, username, created_by='self')
if details:
logger.info(f"用户操作: {action}, 用户={username}, 成功={success}, 详情={details}")
def validate_pagination(page: Any, page_size: Any,
max_page_size: int = 100) -> Tuple[int, int, Optional[str]]:
"""
验证分页参数
Args:
page: 页码
page_size: 每页大小
max_page_size: 最大每页大小
Returns:
(页码, 每页大小, 错误消息)
"""
try:
page = int(page) if page else 1
page_size = int(page_size) if page_size else 20
except (ValueError, TypeError):
return 1, 20, "无效的分页参数"
if page < 1:
return 1, 20, "页码必须大于0"
if page_size < 1 or page_size > max_page_size:
return page, 20, f"每页大小必须在1-{max_page_size}之间"
return page, page_size, None
def check_user_ownership(user_id: int, resource_type: str,
resource_id: int) -> Tuple[bool, Optional[str]]:
"""
检查用户是否拥有资源
Args:
user_id: 用户ID
resource_type: 资源类型account/task等
resource_id: 资源ID
Returns:
(是否拥有, 错误消息)
"""
try:
if resource_type == 'account':
account = database.get_account_by_id(resource_id)
if not account:
return False, "账号不存在"
if account['user_id'] != user_id:
return False, "无权访问此账号"
return True, None
elif resource_type == 'task':
# 通过account查询所属用户
# 这里需要根据实际数据库结构实现
pass
return False, "不支持的资源类型"
except Exception as e:
logger.error(f"检查资源所有权失败: {e}", exc_info=True)
return False, "系统错误"
if __name__ == '__main__':
# 测试代码
print("测试应用工具模块...")
print("=" * 60)
# 测试分页验证
print("\n1. 测试分页验证:")
page, page_size, error = validate_pagination("2", "50")
print(f" 页码={page}, 每页={page_size}, 错误={error}")
page, page_size, error = validate_pagination("invalid", "50")
print(f" 无效输入: 页码={page}, 每页={page_size}, 错误={error}")
# 测试响应格式化
print("\n2. 测试响应格式化:")
print(f" 错误响应: {format_error_response('测试错误', need_captcha=True)}")
print(f" 成功响应: {format_success_response('测试成功', {'data': [1, 2, 3]})}")
print("\n" + "=" * 60)
print("✓ 工具模块加载成功!")