修复多项安全漏洞
安全修复清单: 1. 验证码改为图片方式返回,防止明文泄露 2. CORS配置从环境变量读取,不再使用通配符"*" 3. VIP API添加@admin_required装饰器,统一认证 4. 用户登录统一错误消息,防止用户枚举 5. IP限流不再信任X-Forwarded-For头,防止伪造绕过 6. 密码强度要求提升(8位+字母+数字) 7. 日志不���记录完整session/cookie内容,防止敏感信息泄露 8. XSS防护:日志输出和Bug反馈内容转义HTML 9. SQL注入防护:LIKE查询参数转义 10. 路径遍历防护:截图目录白名单验证 11. 验证码重放防护:验证前删除验证码 12. 数据库连接池健康检查 13. 正则DoS防护:限制数字匹配长度 14. Account类密码私有化,__repr__不暴露密码 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
188
app.py
188
app.py
@@ -60,9 +60,17 @@ app.config.from_object(config)
|
||||
# 确保SECRET_KEY已设置
|
||||
if not app.config.get('SECRET_KEY'):
|
||||
raise RuntimeError("SECRET_KEY未配置,请检查app_config.py")
|
||||
# 安全修复:从环境变量获取允许的CORS源,不再使用"*"
|
||||
cors_origins = os.environ.get('CORS_ALLOWED_ORIGINS', '').strip()
|
||||
if cors_origins:
|
||||
cors_allowed = [origin.strip() for origin in cors_origins.split(',') if origin.strip()]
|
||||
else:
|
||||
# 默认只允许同源,生产环<E4BAA7><E78EAF>应该明确配置
|
||||
cors_allowed = []
|
||||
|
||||
socketio = SocketIO(
|
||||
app,
|
||||
cors_allowed_origins="*",
|
||||
cors_allowed_origins=cors_allowed if cors_allowed else None, # None表示只允许同源
|
||||
async_mode='threading', # 明确指定async模式
|
||||
ping_timeout=60, # ping超时60秒
|
||||
ping_interval=25, # 每25秒ping一次
|
||||
@@ -169,12 +177,22 @@ class Admin(UserMixin):
|
||||
|
||||
|
||||
class Account:
|
||||
"""账号类"""
|
||||
"""账号类
|
||||
|
||||
安全注意事项:
|
||||
- 密码以私有属性存储,避免意外泄露
|
||||
- __repr__不包含密码,防止日志意外记录
|
||||
- 生产环境应考虑使用加密存储(如Fernet加密)
|
||||
"""
|
||||
__slots__ = ['id', 'user_id', 'username', '_password', 'remember', 'remark',
|
||||
'status', 'is_running', 'should_stop', 'total_items',
|
||||
'total_attachments', 'automation', 'last_browse_type', 'proxy_config']
|
||||
|
||||
def __init__(self, account_id, user_id, username, password, remember=True, remark=''):
|
||||
self.id = account_id
|
||||
self.user_id = user_id
|
||||
self.username = username
|
||||
self.password = password
|
||||
self._password = password # 安全修复:使用私有属性存储密码
|
||||
self.remember = remember
|
||||
self.remark = remark
|
||||
self.status = "未开始"
|
||||
@@ -186,6 +204,15 @@ class Account:
|
||||
self.last_browse_type = "注册前未读"
|
||||
self.proxy_config = None # 保存代理配置,浏览和截图共用
|
||||
|
||||
@property
|
||||
def password(self):
|
||||
"""获取密码(仅供自动化登录使用)"""
|
||||
return self._password
|
||||
|
||||
def __repr__(self):
|
||||
"""安全的字符串表示,不包含密码"""
|
||||
return f"Account(id={self.id}, username={self.username}, status={self.status})"
|
||||
|
||||
def to_dict(self):
|
||||
result = {
|
||||
"id": self.id,
|
||||
@@ -307,8 +334,8 @@ def admin_required(f):
|
||||
"""管理员权限装饰器"""
|
||||
@wraps(f)
|
||||
def decorated_function(*args, **kwargs):
|
||||
logger.debug(f"[admin_required] Session内容: {dict(session)}")
|
||||
logger.debug(f"[admin_required] Cookies: {request.cookies}")
|
||||
# 安全修复:不再记录完整的session和cookies内容,防止敏感信息泄露
|
||||
logger.debug(f"[admin_required] 检查会话,admin_id存在: {'admin_id' in session}")
|
||||
if 'admin_id' not in session:
|
||||
logger.warning(f"[admin_required] 拒绝访问 {request.path} - session中无admin_id")
|
||||
return jsonify({"error": "需要管理员权限"}), 403
|
||||
@@ -319,10 +346,12 @@ def admin_required(f):
|
||||
|
||||
def log_to_client(message, user_id=None, account_id=None):
|
||||
"""发送日志到Web客户端(用户隔离)"""
|
||||
from app_security import escape_html
|
||||
timestamp = get_beijing_now().strftime('%H:%M:%S')
|
||||
# 安全修复:转义HTML特殊字符,防止XSS攻击
|
||||
log_data = {
|
||||
'timestamp': timestamp,
|
||||
'message': message,
|
||||
'message': escape_html(str(message)) if message else '',
|
||||
'account_id': account_id
|
||||
}
|
||||
|
||||
@@ -579,10 +608,17 @@ def check_ip_rate_limit(ip_address):
|
||||
"""检查IP是否被限流"""
|
||||
current_time = time.time()
|
||||
|
||||
# 清理过期的IP记录
|
||||
expired_ips = [ip for ip, data in ip_rate_limit.items()
|
||||
if data.get("lock_until", 0) < current_time and
|
||||
current_time - data.get("first_attempt", current_time) > 3600]
|
||||
# 安全修复:修正过期IP清理逻辑
|
||||
# 原问题:first_attempt不存在时默认使用current_time,导致永远不会被清理
|
||||
expired_ips = []
|
||||
for ip, data in ip_rate_limit.items():
|
||||
lock_expired = data.get("lock_until", 0) < current_time
|
||||
first_attempt = data.get("first_attempt")
|
||||
# 修复:如果first_attempt不存在或超过1小时,视为过期
|
||||
attempt_expired = first_attempt is None or (current_time - first_attempt > 3600)
|
||||
if lock_expired and attempt_expired:
|
||||
expired_ips.append(ip)
|
||||
|
||||
for ip in expired_ips:
|
||||
del ip_rate_limit[ip]
|
||||
|
||||
@@ -596,7 +632,8 @@ def check_ip_rate_limit(ip_address):
|
||||
return False, "IP已被锁定,请{}分钟后再试".format(remaining_time // 60 + 1)
|
||||
|
||||
# 如果超过1小时,重置计数
|
||||
if current_time - ip_data.get("first_attempt", current_time) > 3600:
|
||||
first_attempt = ip_data.get("first_attempt")
|
||||
if first_attempt is None or current_time - first_attempt > 3600:
|
||||
ip_rate_limit[ip_address] = {
|
||||
"attempts": 0,
|
||||
"first_attempt": current_time
|
||||
@@ -627,26 +664,85 @@ def record_failed_captcha(ip_address):
|
||||
|
||||
@app.route("/api/generate_captcha", methods=["POST"])
|
||||
def generate_captcha():
|
||||
"""生成4位数字验证码"""
|
||||
"""生成4位数字验证码图片
|
||||
|
||||
安全修复:验证码不再以明文返回,而是生成base64图片
|
||||
"""
|
||||
import uuid
|
||||
import base64
|
||||
from io import BytesIO
|
||||
|
||||
session_id = str(uuid.uuid4())
|
||||
|
||||
|
||||
# 生成4位随机数字
|
||||
code = "".join([str(random.randint(0, 9)) for _ in range(4)])
|
||||
|
||||
|
||||
# 存储验证码,5分钟过期
|
||||
captcha_storage[session_id] = {
|
||||
"code": code,
|
||||
"expire_time": time.time() + 300,
|
||||
"failed_attempts": 0
|
||||
}
|
||||
|
||||
|
||||
# 清理过期验证码
|
||||
expired_keys = [k for k, v in captcha_storage.items() if v["expire_time"] < time.time()]
|
||||
for k in expired_keys:
|
||||
del captcha_storage[k]
|
||||
|
||||
return jsonify({"session_id": session_id, "captcha": code})
|
||||
|
||||
# 生成验证码图片
|
||||
try:
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
import io
|
||||
|
||||
# 创建图片
|
||||
width, height = 120, 40
|
||||
image = Image.new('RGB', (width, height), color=(255, 255, 255))
|
||||
draw = ImageDraw.Draw(image)
|
||||
|
||||
# 添加干扰线
|
||||
for _ in range(5):
|
||||
x1 = random.randint(0, width)
|
||||
y1 = random.randint(0, height)
|
||||
x2 = random.randint(0, width)
|
||||
y2 = random.randint(0, height)
|
||||
draw.line([(x1, y1), (x2, y2)], fill=(random.randint(0, 200), random.randint(0, 200), random.randint(0, 200)))
|
||||
|
||||
# 添加干扰点
|
||||
for _ in range(50):
|
||||
x = random.randint(0, width)
|
||||
y = random.randint(0, height)
|
||||
draw.point((x, y), fill=(random.randint(0, 200), random.randint(0, 200), random.randint(0, 200)))
|
||||
|
||||
# 绘制验证码文字
|
||||
try:
|
||||
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 28)
|
||||
except:
|
||||
font = ImageFont.load_default()
|
||||
|
||||
for i, char in enumerate(code):
|
||||
x = 10 + i * 25 + random.randint(-3, 3)
|
||||
y = random.randint(2, 8)
|
||||
color = (random.randint(0, 150), random.randint(0, 150), random.randint(0, 150))
|
||||
draw.text((x, y), char, font=font, fill=color)
|
||||
|
||||
# 转换为base64
|
||||
buffer = io.BytesIO()
|
||||
image.save(buffer, format='PNG')
|
||||
img_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
|
||||
|
||||
return jsonify({
|
||||
"session_id": session_id,
|
||||
"captcha_image": f"data:image/png;base64,{img_base64}"
|
||||
})
|
||||
except ImportError:
|
||||
# 如果没有PIL,退回到简单文本(但添加混淆)
|
||||
# 安全警告:生产环境应安装PIL
|
||||
logger.warning("PIL未安装,验证码安全性降低")
|
||||
# 不直接返回验证码,返回混淆后的提示
|
||||
return jsonify({
|
||||
"session_id": session_id,
|
||||
"captcha_hint": "验证码图片生成失败,请联系管理员"
|
||||
}), 500
|
||||
|
||||
|
||||
@app.route('/api/login', methods=['POST'])
|
||||
@@ -666,16 +762,12 @@ def login():
|
||||
if not success:
|
||||
return jsonify({"error": message}), 400
|
||||
|
||||
# 先检查用户是否存在
|
||||
user_exists = database.get_user_by_username(username)
|
||||
if not user_exists:
|
||||
return jsonify({"error": "账号未注册", "need_captcha": True}), 401
|
||||
|
||||
# 检查密码是否正确
|
||||
# 安全修复:使用通用错误消息防止用户枚举
|
||||
# 不再分开检查用户是否存在和密码是否正确
|
||||
user = database.verify_user(username, password)
|
||||
if not user:
|
||||
# 密码错误
|
||||
return jsonify({"error": "密码错误", "need_captcha": True}), 401
|
||||
# 返回通用错误消息,不透露账号是否存在
|
||||
return jsonify({"error": "用户名或密码错误", "need_captcha": True}), 401
|
||||
|
||||
# 检查审核状态
|
||||
if user['status'] != 'approved':
|
||||
@@ -699,8 +791,13 @@ def logout():
|
||||
# ==================== 管理员认证API ====================
|
||||
|
||||
@app.route('/yuyx/api/debug-config', methods=['GET'])
|
||||
@admin_required # 安全修复:添加管理员认证
|
||||
def debug_config():
|
||||
"""调试配置信息"""
|
||||
"""调试配置信息(仅管理员可访问,生产环境应禁用)"""
|
||||
# 安全修复:生产环境禁用调试端点
|
||||
if not app.debug:
|
||||
return jsonify({"error": "调试端点已在生产环境禁用"}), 403
|
||||
|
||||
return jsonify({
|
||||
"secret_key_set": bool(app.secret_key),
|
||||
"secret_key_length": len(app.secret_key) if app.secret_key else 0,
|
||||
@@ -711,7 +808,8 @@ def debug_config():
|
||||
"SESSION_COOKIE_SAMESITE": app.config.get('SESSION_COOKIE_SAMESITE'),
|
||||
"PERMANENT_SESSION_LIFETIME": str(app.config.get('PERMANENT_SESSION_LIFETIME')),
|
||||
},
|
||||
"current_session": dict(session),
|
||||
# 安全修复:移除敏感的session内容,只显示有无
|
||||
"has_session": bool(session),
|
||||
"cookies_received": list(request.cookies.keys())
|
||||
})
|
||||
|
||||
@@ -751,9 +849,8 @@ def admin_login():
|
||||
session.permanent = True # 设置为永久会话(使用PERMANENT_SESSION_LIFETIME配置)
|
||||
session.modified = True # 强制标记session为已修改,确保保存
|
||||
|
||||
logger.info(f"[admin_login] 管理员 {username} 登录成功, session已设置: admin_id={admin['id']}")
|
||||
logger.debug(f"[admin_login] Session内容: {dict(session)}")
|
||||
logger.debug(f"[admin_login] Cookie将被设置: name={app.config.get('SESSION_COOKIE_NAME', 'session')}")
|
||||
logger.info(f"[admin_login] 管理员 {username} 登录成功")
|
||||
# 安全修复:不再记录完整session内容
|
||||
|
||||
# 根据请求类型返回不同响应
|
||||
if request.is_json:
|
||||
@@ -1234,12 +1331,14 @@ def get_accounts():
|
||||
user_id = current_user.id
|
||||
refresh = request.args.get('refresh', 'false').lower() == 'true'
|
||||
|
||||
# 如果user_accounts中没有数据或者请求刷新,则从数据库加载
|
||||
if user_id not in user_accounts or len(user_accounts.get(user_id, {})) == 0 or refresh:
|
||||
load_user_accounts(user_id)
|
||||
# 安全修复:使用锁保护检查和加载操作,防止竞态条件
|
||||
with user_accounts_lock:
|
||||
# 如果user_accounts中没有数据或者请求刷新,则从数据库加载
|
||||
if user_id not in user_accounts or len(user_accounts.get(user_id, {})) == 0 or refresh:
|
||||
load_user_accounts(user_id)
|
||||
|
||||
accounts = user_accounts.get(user_id, {})
|
||||
return jsonify([acc.to_dict() for acc in accounts.values()])
|
||||
accounts = user_accounts.get(user_id, {})
|
||||
return jsonify([acc.to_dict() for acc in accounts.values()])
|
||||
|
||||
|
||||
@app.route('/api/accounts', methods=['POST'])
|
||||
@@ -2252,20 +2351,17 @@ def serve_static(filename):
|
||||
# ==================== 管理员VIP管理API ====================
|
||||
|
||||
@app.route('/yuyx/api/vip/config', methods=['GET'])
|
||||
@admin_required # 安全修复:使用统一的装饰器
|
||||
def get_vip_config_api():
|
||||
"""获取VIP配置"""
|
||||
if 'admin_id' not in session:
|
||||
return jsonify({"error": "需要管理员权限"}), 403
|
||||
config = database.get_vip_config()
|
||||
return jsonify(config)
|
||||
|
||||
|
||||
@app.route('/yuyx/api/vip/config', methods=['POST'])
|
||||
@admin_required # 安全修复:使用统一的装饰器
|
||||
def set_vip_config_api():
|
||||
"""设置默认VIP天数"""
|
||||
if 'admin_id' not in session:
|
||||
return jsonify({"error": "需要管理员权限"}), 403
|
||||
|
||||
data = request.json
|
||||
days = data.get('default_vip_days', 0)
|
||||
|
||||
@@ -2277,11 +2373,9 @@ def set_vip_config_api():
|
||||
|
||||
|
||||
@app.route('/yuyx/api/users/<int:user_id>/vip', methods=['POST'])
|
||||
@admin_required # 安全修复:使用统一的装饰器
|
||||
def set_user_vip_api(user_id):
|
||||
"""设置用户VIP"""
|
||||
if 'admin_id' not in session:
|
||||
return jsonify({"error": "需要管理员权限"}), 403
|
||||
|
||||
data = request.json
|
||||
days = data.get('days', 30)
|
||||
|
||||
@@ -2297,22 +2391,18 @@ def set_user_vip_api(user_id):
|
||||
|
||||
|
||||
@app.route('/yuyx/api/users/<int:user_id>/vip', methods=['DELETE'])
|
||||
@admin_required # 安全修复:使用统一的装饰器
|
||||
def remove_user_vip_api(user_id):
|
||||
"""移除用户VIP"""
|
||||
if 'admin_id' not in session:
|
||||
return jsonify({"error": "需要管理员权限"}), 403
|
||||
|
||||
if database.remove_user_vip(user_id):
|
||||
return jsonify({"message": "VIP已移除"})
|
||||
return jsonify({"error": "移除失败"}), 400
|
||||
|
||||
|
||||
@app.route('/yuyx/api/users/<int:user_id>/vip', methods=['GET'])
|
||||
@admin_required # 安全修复:使用统一的装饰器
|
||||
def get_user_vip_info_api(user_id):
|
||||
"""获取用户VIP信息(管理员)"""
|
||||
if 'admin_id' not in session:
|
||||
return jsonify({"error": "需要管理员权限"}), 403
|
||||
|
||||
vip_info = database.get_user_vip_info(user_id)
|
||||
return jsonify(vip_info)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user