Files
zsglpt/screenshot_worker.py
yuyx b9edc4aaa2 修复多项安全漏洞
安全修复清单:
1. 验证码改为图片方式返回,防止明文泄露
2. CORS配置从环境变量读取,不再使用通配符"*"
3. VIP API添加@admin_required装饰器,统一认证
4. 用户登录统一错误消息,防止用户枚举
5. IP限流不再信任X-Forwarded-For头,防止伪造绕过
6. 密码强度要求提升(8位+字母+数字)
7. 日志不���记录完整session/cookie内容,防止敏感信息泄露
8. XSS防护:日志输出和Bug反馈内容转义HTML
9. SQL注入防护:LIKE查询参数转义
10. 路径遍历防护:截图目录白名单验证
11. 验证码重放防护:验证前删除验证码
12. 数据库连接池健康检查
13. 正则DoS防护:限制数字匹配长度
14. Account类密码私有化,__repr__不暴露密码

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 17:53:48 +08:00

173 lines
5.2 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
独立截图进程 - 使用已保存的Cookies直接截图
"""
import sys
import json
import os
import time
import traceback
def take_screenshot(config):
"""执行截图任务"""
from playwright.sync_api import sync_playwright
username = config['username']
browse_type = config.get('browse_type', '应读')
screenshot_path = config['screenshot_path']
cookies_file = config.get('cookies_file', '')
# 安全修复:验证截图路径在允许的目录内,防止路径遍历攻击
ALLOWED_SCREENSHOT_DIRS = [
'/root/zsglpt/screenshots',
'/root/zsglpt/static/screenshots',
'/tmp/zsglpt_screenshots'
]
def is_safe_screenshot_path(path):
"""验证截图路径是否安全"""
abs_path = os.path.abspath(path)
return any(abs_path.startswith(os.path.abspath(allowed_dir))
for allowed_dir in ALLOWED_SCREENSHOT_DIRS)
if not is_safe_screenshot_path(screenshot_path):
return {
'success': False,
'message': '非法截图路径',
'screenshot_path': ''
}
result = {
'success': False,
'message': '',
'screenshot_path': screenshot_path
}
playwright = None
browser = None
context = None
try:
print(f"[截图进程] 启动浏览器...", flush=True)
playwright = sync_playwright().start()
browser = playwright.chromium.launch(
headless=True,
args=[
'--no-sandbox',
'--disable-setuid-sandbox',
'--disable-dev-shm-usage',
'--disable-gpu'
]
)
# 创建 context
context = browser.new_context(
viewport={'width': 1920, 'height': 1080},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
)
page = context.new_page()
page.set_default_timeout(30000)
# 加载已保存的 Cookies
if cookies_file and os.path.exists(cookies_file):
print(f"[截图进程] 加载Cookies: {cookies_file}", flush=True)
with open(cookies_file, 'r', encoding='utf-8') as f:
cookies_data = json.load(f)
cookies = cookies_data.get('cookies', [])
if cookies:
context.add_cookies(cookies)
print(f"[截图进程] 已加载 {len(cookies)} 个Cookie", flush=True)
else:
print(f"[截图进程] 警告: Cookies文件不存在", flush=True)
# 根据浏览类型导航到对应页面
if browse_type == "应读":
url = "https://zsgl.gat.zj.gov.cn/web/learn/readList"
elif browse_type == "应学":
url = "https://zsgl.gat.zj.gov.cn/web/learn/learnList"
elif browse_type == "应考":
url = "https://zsgl.gat.zj.gov.cn/web/exam"
else:
url = "https://zsgl.gat.zj.gov.cn/web/learn/readList"
print(f"[截图进程] 导航到: {url}", flush=True)
page.goto(url, wait_until='networkidle', timeout=30000)
# 等待页面加载
time.sleep(3)
# 检查是否被重定向到登录页
if '/login' in page.url.lower() or '/web/' == page.url.rstrip('/').split('/')[-1]:
print(f"[截图进程] 登录已过期,需要重新登录", flush=True)
result['message'] = '登录已过期'
return result
# 确保截图目录存在
os.makedirs(os.path.dirname(screenshot_path), exist_ok=True)
# 截图
print(f"[截图进程] 截图保存到: {screenshot_path}", flush=True)
page.screenshot(path=screenshot_path, full_page=False, type='jpeg', quality=85)
# 验证截图文件
if os.path.exists(screenshot_path) and os.path.getsize(screenshot_path) > 1000:
result['success'] = True
result['message'] = '截图成功'
print(f"[截图进程] 截图成功!", flush=True)
else:
result['message'] = '截图文件异常'
except Exception as e:
result['message'] = f'截图出错: {str(e)}'
print(f"[截图进程] 错误: {traceback.format_exc()}", flush=True)
finally:
try:
if context:
context.close()
if browser:
browser.close()
if playwright:
playwright.stop()
except:
pass
return result
def main():
if len(sys.argv) < 2:
print("用法: python screenshot_worker.py <config_json_file>")
sys.exit(1)
config_file = sys.argv[1]
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
except Exception as e:
print(json.dumps({'success': False, 'message': f'读取配置失败: {e}'}))
sys.exit(1)
result = take_screenshot(config)
# 输出 JSON 结果
print("===RESULT===", flush=True)
print(json.dumps(result, ensure_ascii=False), flush=True)
# 清理配置文件
try:
os.remove(config_file)
except:
pass
sys.exit(0 if result['success'] else 1)
if __name__ == '__main__':
main()