Files
zcglxt/app/services/auth_service.py
Claude e71181f0a3 fix: 修复多个关键问题
- 修复前端路由守卫:未登录时不显示提示,直接跳转登录页
- 修复API拦截器:401错误不显示提示,直接跳转
- 增强验证码显示:图片尺寸从120x40增加到200x80
- 增大验证码字体:从28号增加到48号
- 优化验证码字符:排除易混淆的0和1
- 减少干扰线:从5条减少到3条,添加背景色优化
- 增强登录API日志:添加详细的调试日志
- 增强验证码生成和验证日志
- 优化异常处理和错误追踪

影响文件:
- src/router/index.ts
- src/api/request.ts
- app/services/auth_service.py
- app/api/v1/auth.py
- app/schemas/user.py

测试状态:
- 前端构建通过
- 后端语法检查通过
- 验证码显示效果优化完成

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-25 00:26:21 +08:00

357 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
认证服务
"""
from datetime import datetime, timedelta
from typing import Optional
from fastapi import status
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.security import security_manager
from app.core.exceptions import (
InvalidCredentialsException,
UserLockedException,
UserDisabledException,
CaptchaException
)
from app.crud.user import user_crud
from app.models.user import User
from app.schemas.user import LoginResponse, UserInfo
from app.core.config import settings
import uuid
class AuthService:
"""认证服务类"""
def __init__(self):
self.max_login_failures = 5
self.lock_duration_minutes = 30
async def login(
self,
db: AsyncSession,
username: str,
password: str,
captcha: str,
captcha_key: str
) -> LoginResponse:
"""
用户登录
Args:
db: 数据库会话
username: 用户名
password: 密码
captcha: 验证码
captcha_key: 验证码UUID
Returns:
LoginResponse: 登录响应
Raises:
InvalidCredentialsException: 认证失败
UserLockedException: 用户被锁定
UserDisabledException: 用户被禁用
"""
# 验证验证码
import logging
logger = logging.getLogger(__name__)
logger.info(f"开始验证验证码 - captcha_key: {captcha_key}, captcha: {captcha}")
captcha_valid = await self._verify_captcha(captcha_key, captcha)
logger.info(f"验证码验证结果: {captcha_valid}")
if not captcha_valid:
logger.warning(f"验证码验证失败 - captcha_key: {captcha_key}")
raise CaptchaException()
# 获取用户
user = await user_crud.get_by_username(db, username)
if not user:
raise InvalidCredentialsException("用户名或密码错误")
# 检查用户状态
if user.status == "disabled":
raise UserDisabledException()
if user.status == "locked":
# 检查是否已过锁定时间
if user.locked_until and user.locked_until > datetime.utcnow():
raise UserLockedException(f"账户已被锁定,请在 {user.locked_until.strftime('%Y-%m-%d %H:%M:%S')} 后重试")
else:
# 解锁用户
user.status = "active"
user.locked_until = None
user.login_fail_count = 0
await db.commit()
# 验证密码
if not security_manager.verify_password(password, user.password_hash):
# 增加失败次数
user.login_fail_count += 1
# 检查是否需要锁定
if user.login_fail_count >= self.max_login_failures:
user.status = "locked"
user.locked_until = datetime.utcnow() + timedelta(minutes=self.lock_duration_minutes)
await db.commit()
if user.status == "locked":
raise UserLockedException(f"密码错误次数过多,账户已被锁定 {self.lock_duration_minutes} 分钟")
raise InvalidCredentialsException("用户名或密码错误")
# 登录成功,重置失败次数
await user_crud.update_last_login(db, user)
# 生成Token
access_token = security_manager.create_access_token(
data={"sub": str(user.id), "username": user.username}
)
refresh_token = security_manager.create_refresh_token(
data={"sub": str(user.id), "username": user.username}
)
# 获取用户角色和权限
user_info = await self._build_user_info(db, user)
return LoginResponse(
access_token=access_token,
refresh_token=refresh_token,
token_type="Bearer",
expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60,
user=user_info
)
async def refresh_token(self, db: AsyncSession, refresh_token: str) -> dict:
"""
刷新访问令牌
Args:
db: 数据库会话
refresh_token: 刷新令牌
Returns:
dict: 包含新的访问令牌
"""
payload = security_manager.verify_token(refresh_token, token_type="refresh")
user_id = int(payload.get("sub"))
user = await user_crud.get(db, user_id)
if not user or user.status != "active":
raise InvalidCredentialsException("用户不存在或已被禁用")
# 生成新的访问令牌
access_token = security_manager.create_access_token(
data={"sub": str(user.id), "username": user.username}
)
return {
"access_token": access_token,
"expires_in": settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
}
async def change_password(
self,
db: AsyncSession,
user: User,
old_password: str,
new_password: str
) -> bool:
"""
修改密码
Args:
db: 数据库会话
user: 当前用户
old_password: 旧密码
new_password: 新密码
Returns:
bool: 是否修改成功
"""
# 验证旧密码
if not security_manager.verify_password(old_password, user.password_hash):
raise InvalidCredentialsException("旧密码错误")
# 更新密码
return await user_crud.update_password(db, user, new_password)
async def reset_password(
self,
db: AsyncSession,
user_id: int,
new_password: str
) -> bool:
"""
重置用户密码(管理员功能)
Args:
db: 数据库会话
user_id: 用户ID
new_password: 新密码
Returns:
bool: 是否重置成功
"""
user = await user_crud.get(db, user_id)
if not user:
return False
return await user_crud.update_password(db, user, new_password)
async def _generate_captcha(self) -> dict:
"""
生成验证码
Returns:
包含captcha_key和captcha_base64的字典
"""
import logging
logger = logging.getLogger(__name__)
from app.utils.redis_client import redis_client
import random
import string
import base64
from io import BytesIO
from PIL import Image, ImageDraw, ImageFont
try:
# 生成4位随机验证码使用更清晰的字符组合排除易混淆的字符
captcha_text = ''.join(random.choices('23456789', k=4)) # 排除0、1
logger.info(f"生成验证码: {captcha_text}")
# 生成验证码图片
width, height = 200, 80 # 增大图片尺寸
# 使用浅色背景而不是纯白
background_color = (245, 245, 250) # 浅蓝灰色
image = Image.new('RGB', (width, height), color=background_color)
draw = ImageDraw.Draw(image)
# 尝试使用更大的字体
try:
# 优先使用系统大字体
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 48)
except:
try:
font = ImageFont.truetype("arial.ttf", 48) # 增大到48
except:
font = ImageFont.load_default()
# 如果使用默认字体,尝试放大
font = font.font_variant(size=48)
# 绘制验证码
draw.text((10, 5), captcha_text, fill='black', font=font)
# 减少干扰线数量从5条减少到3条
for _ in range(3):
x1 = random.randint(0, width)
y1 = random.randint(0, height)
x2 = random.randint(0, width)
y2 = random.randint(0, height)
draw.line([(x1, y1), (x2, y2)], fill='gray', width=1)
# 添加噪点(可选)
for _ in range(50):
x = random.randint(0, width - 1)
y = random.randint(0, height - 1)
draw.point((x, y), fill='lightgray')
# 转换为base64
buffer = BytesIO()
image.save(buffer, format='PNG')
image_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
# 生成captcha_key
captcha_key = ''.join(random.choices(string.ascii_letters + string.digits, k=32))
logger.info(f"生成验证码Key: {captcha_key}")
# 存储到Redis5分钟过期
await redis_client.setex(
f"captcha:{captcha_key}",
300,
captcha_text
)
logger.info(f"验证码已存储到Redis: captcha:{captcha_key}, 值: {captcha_text}")
return {
"captcha_key": captcha_key,
"captcha_base64": f"data:image/png;base64,{image_base64}"
}
except Exception as e:
logger.error(f"生成验证码失败: {str(e)}", exc_info=True)
raise
async def _verify_captcha(self, captcha_key: str, captcha: str) -> bool:
"""
验证验证码
Args:
captcha_key: 验证码密钥
captcha: 用户输入的验证码
Returns:
验证是否成功
"""
import logging
logger = logging.getLogger(__name__)
from app.utils.redis_client import redis_client
try:
# 从Redis获取存储的验证码
stored_captcha = await redis_client.get(f"captcha:{captcha_key}")
logger.info(f"Redis中存储的验证码: {stored_captcha}, 用户输入: {captcha}")
if not stored_captcha:
logger.warning(f"验证码已过期或不存在 - captcha_key: {captcha_key}")
return False
# 验证码不区分大小写
is_valid = stored_captcha.lower() == captcha.lower()
logger.info(f"验证码匹配结果: {is_valid}")
return is_valid
except Exception as e:
logger.error(f"验证码验证异常: {str(e)}", exc_info=True)
return False
async def _build_user_info(self, db: AsyncSession, user: User) -> UserInfo:
"""
构建用户信息
Args:
db: 数据库会话
user: 用户对象
Returns:
UserInfo: 用户信息
"""
# 获取用户角色代码列表
role_codes = [role.role_code for role in user.roles]
# 获取用户权限代码列表
permissions = []
for role in user.roles:
for perm in role.permissions:
permissions.append(perm.permission_code)
# 如果是超级管理员,给予所有权限
if user.is_admin:
permissions = ["*:*:*"]
return UserInfo(
id=user.id,
username=user.username,
real_name=user.real_name,
email=user.email,
avatar_url=user.avatar_url,
is_admin=user.is_admin,
status=user.status
)
# 创建服务实例
auth_service = AuthService()