修复37项安全漏洞和Bug

高危修复:
- app.py: 添加ip_rate_limit_lock线程锁保护IP限流字典
- app.py: 添加validate_ip_port()验证代理IP/端口范围
- database.py: SQL字段名白名单验证防止注入
- playwright_automation.py: 改进浏览器进程强制清理逻辑

中危修复:
- database.py: 统一时区处理函数get_cst_now()
- database.py: 消除循环导入,移动app_security导入到顶部
- playwright_automation.py: 所有bare except改为except Exception
- app_config.py: dotenv导入失败警告+安全配置检查
- db_pool.py: 添加详细异常堆栈日志
- app_security.py: 用户名过滤零宽字符
- database.py: delete_old_task_logs分批删除避免锁表

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-11 19:35:29 +08:00
parent de51e1b7c7
commit 2e4b64dcb2
6 changed files with 290 additions and 96 deletions

134
app.py
View File

@@ -137,6 +137,7 @@ captcha_storage = {}
# IP限流存储:{ip: {"attempts": count, "lock_until": timestamp, "first_attempt": timestamp}}
ip_rate_limit = {}
ip_rate_limit_lock = threading.Lock() # Bug fix: 保护 ip_rate_limit 字典的线程安全
# 限流配置 - 从 config 读取,避免硬编码
MAX_CAPTCHA_ATTEMPTS = config.MAX_CAPTCHA_ATTEMPTS
@@ -402,18 +403,49 @@ def log_to_client(message, user_id=None, account_id=None):
def validate_ip_port(ip_port_str):
"""验证IP:PORT格式是否有效
Bug fix: 验证IP范围(0-255)和端口范围(1-65535)
Args:
ip_port_str: 格式为 "IP:PORT" 的字符串
Returns:
bool: 是否有效
"""
import re
pattern = re.compile(r'^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3}):(\d{1,5})$')
match = pattern.match(ip_port_str)
if not match:
return False
# 验证IP每个部分在0-255范围内
for i in range(1, 5):
octet = int(match.group(i))
if octet < 0 or octet > 255:
return False
# 验证端口在1-65535范围内
port = int(match.group(5))
if port < 1 or port > 65535:
return False
return True
def get_proxy_from_api(api_url, max_retries=3):
"""从API获取代理IP支持重试
Args:
api_url: 代理API地址
max_retries: 最大重试次数
Returns:
代理服务器地址(格式: http://IP:PORT或 None
"""
import re
# IP:PORT 格式正则
# IP:PORT 格式正则(基础格式检查)
ip_port_pattern = re.compile(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5}$')
for attempt in range(max_retries):
@@ -442,13 +474,13 @@ def get_proxy_from_api(api_url, max_retries=3):
# 不是JSON继续使用原始文本
pass
# 验证IP:PORT格式
if ip_port_pattern.match(text):
# 验证IP:PORT格式(基础格式检查 + 范围验证)
if ip_port_pattern.match(text) and validate_ip_port(text):
proxy_server = f"http://{text}"
print(f"✓ 获取代理成功: {proxy_server} (尝试 {attempt + 1}/{max_retries})")
return proxy_server
else:
print(f"✗ 代理格式无效: {text[:50]} (尝试 {attempt + 1}/{max_retries})")
print(f"✗ 代理格式或范围无效: {text[:50]} (尝试 {attempt + 1}/{max_retries})")
else:
print(f"✗ 获取代理失败: HTTP {response.status_code} (尝试 {attempt + 1}/{max_retries})")
except Exception as e:
@@ -608,61 +640,69 @@ from task_checkpoint import get_checkpoint_manager, TaskStage
checkpoint_mgr = None # 任务断点管理器
def check_ip_rate_limit(ip_address):
"""检查IP是否被限流"""
"""检查IP是否被限流
Bug fix: 使用线程锁保护 ip_rate_limit 字典操作,防止竞态条件
"""
current_time = time.time()
# 安全修复修正过期IP清理逻辑
# 原问题first_attempt不存在时默认使用current_time导致永远不会被清理
expired_ips = []
for ip, data in ip_rate_limit.items():
lock_expired = data.get("lock_until", 0) < current_time
first_attempt = data.get("first_attempt")
# 修复如果first_attempt不存在或超过1小时视为过期
attempt_expired = first_attempt is None or (current_time - first_attempt > 3600)
if lock_expired and attempt_expired:
expired_ips.append(ip)
with ip_rate_limit_lock:
# 安全修复修正过期IP清理逻辑
# 原问题first_attempt不存在时默认使用current_time导致永远不会被清理
expired_ips = []
for ip, data in ip_rate_limit.items():
lock_expired = data.get("lock_until", 0) < current_time
first_attempt = data.get("first_attempt")
# 修复如果first_attempt不存在或超过1小时视为过期
attempt_expired = first_attempt is None or (current_time - first_attempt > 3600)
if lock_expired and attempt_expired:
expired_ips.append(ip)
for ip in expired_ips:
del ip_rate_limit[ip]
for ip in expired_ips:
del ip_rate_limit[ip]
# 检查IP是否被锁定
if ip_address in ip_rate_limit:
ip_data = ip_rate_limit[ip_address]
# 检查IP是否被锁定
if ip_address in ip_rate_limit:
ip_data = ip_rate_limit[ip_address]
# 如果IP被锁定且未到解锁时间
if ip_data.get("lock_until", 0) > current_time:
remaining_time = int(ip_data["lock_until"] - current_time)
return False, "IP已被锁定,请{}分钟后再试".format(remaining_time // 60 + 1)
# 如果IP被锁定且未到解锁时间
if ip_data.get("lock_until", 0) > current_time:
remaining_time = int(ip_data["lock_until"] - current_time)
return False, "IP已被锁定,请{}分钟后再试".format(remaining_time // 60 + 1)
# 如果超过1小时,重置计数
first_attempt = ip_data.get("first_attempt")
if first_attempt is None or current_time - first_attempt > 3600:
ip_rate_limit[ip_address] = {
"attempts": 0,
"first_attempt": current_time
}
# 如果超过1小时,重置计数
first_attempt = ip_data.get("first_attempt")
if first_attempt is None or current_time - first_attempt > 3600:
ip_rate_limit[ip_address] = {
"attempts": 0,
"first_attempt": current_time
}
return True, None
return True, None
def record_failed_captcha(ip_address):
"""记录验证码失败尝试"""
"""记录验证码失败尝试
Bug fix: 使用线程锁保护 ip_rate_limit 字典操作,防止竞态条件
"""
current_time = time.time()
if ip_address not in ip_rate_limit:
ip_rate_limit[ip_address] = {
"attempts": 1,
"first_attempt": current_time
}
else:
ip_rate_limit[ip_address]["attempts"] += 1
with ip_rate_limit_lock:
if ip_address not in ip_rate_limit:
ip_rate_limit[ip_address] = {
"attempts": 1,
"first_attempt": current_time
}
else:
ip_rate_limit[ip_address]["attempts"] += 1
# 检查是否超过限制
if ip_rate_limit[ip_address]["attempts"] >= MAX_IP_ATTEMPTS_PER_HOUR:
ip_rate_limit[ip_address]["lock_until"] = current_time + IP_LOCK_DURATION
return True # 表示IP已被锁定
# 检查是否超过限制
if ip_rate_limit[ip_address]["attempts"] >= MAX_IP_ATTEMPTS_PER_HOUR:
ip_rate_limit[ip_address]["lock_until"] = current_time + IP_LOCK_DURATION
return True # 表示IP已被锁定
return False # 表示还未锁定
return False # 表示还未锁定
@app.route("/api/generate_captcha", methods=["POST"])