""" 认证服务 """ from datetime import datetime, timedelta from typing import Optional from fastapi import status from sqlalchemy.ext.asyncio import AsyncSession from app.core.security import security_manager from app.core.exceptions import ( InvalidCredentialsException, UserLockedException, UserDisabledException, CaptchaException ) from app.crud.user import user_crud from app.models.user import User from app.schemas.user import LoginResponse, UserInfo from app.core.config import settings import uuid class AuthService: """认证服务类""" def __init__(self): self.max_login_failures = 5 self.lock_duration_minutes = 30 async def login( self, db: AsyncSession, username: str, password: str, captcha: str, captcha_key: str ) -> LoginResponse: """ 用户登录 Args: db: 数据库会话 username: 用户名 password: 密码 captcha: 验证码 captcha_key: 验证码UUID Returns: LoginResponse: 登录响应 Raises: InvalidCredentialsException: 认证失败 UserLockedException: 用户被锁定 UserDisabledException: 用户被禁用 """ # 验证验证码 if not await self._verify_captcha(captcha_key, captcha): raise CaptchaException() # 获取用户 user = await user_crud.get_by_username(db, username) if not user: raise InvalidCredentialsException("用户名或密码错误") # 检查用户状态 - 使用实际的数据库字段 is_active if not user.is_active: raise UserDisabledException() # 验证密码 - 使用实际的数据库字段 hashed_password if not security_manager.verify_password(password, user.hashed_password): raise InvalidCredentialsException("用户名或密码错误") # 登录成功,重置失败次数 await user_crud.update_last_login(db, user) # 生成Token access_token = security_manager.create_access_token( data={"sub": str(user.id), "username": user.username} ) refresh_token = security_manager.create_refresh_token( data={"sub": str(user.id), "username": user.username} ) # 获取用户角色和权限 user_info = await self._build_user_info(db, user) return LoginResponse( access_token=access_token, refresh_token=refresh_token, token_type="Bearer", expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60, user=user_info ) async def refresh_token(self, db: AsyncSession, refresh_token: str) -> dict: """ 刷新访问令牌 Args: db: 数据库会话 refresh_token: 刷新令牌 Returns: dict: 包含新的访问令牌 """ payload = security_manager.verify_token(refresh_token, token_type="refresh") user_id = int(payload.get("sub")) user = await user_crud.get(db, user_id) if not user or not user.is_active: raise InvalidCredentialsException("用户不存在或已被禁用") # 生成新的访问令牌 access_token = security_manager.create_access_token( data={"sub": str(user.id), "username": user.username} ) return { "access_token": access_token, "expires_in": settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60 } async def change_password( self, db: AsyncSession, user: User, old_password: str, new_password: str ) -> bool: """ 修改密码 Args: db: 数据库会话 user: 当前用户 old_password: 旧密码 new_password: 新密码 Returns: bool: 是否修改成功 """ # 验证旧密码 if not security_manager.verify_password(old_password, user.hashed_password): raise InvalidCredentialsException("旧密码错误") # 更新密码 return await user_crud.update_password(db, user, new_password) async def reset_password( self, db: AsyncSession, user_id: int, new_password: str ) -> bool: """ 重置用户密码(管理员功能) Args: db: 数据库会话 user_id: 用户ID new_password: 新密码 Returns: bool: 是否重置成功 """ user = await user_crud.get(db, user_id) if not user: return False return await user_crud.update_password(db, user, new_password) async def _generate_captcha(self) -> dict: """ 生成验证码 Returns: 包含captcha_key和captcha_base64的字典 """ from app.utils.redis_client import redis_client import random import string import base64 from io import BytesIO from PIL import Image, ImageDraw, ImageFont # 生成4位随机验证码 captcha_text = ''.join(random.choices(string.digits, k=4)) # 生成验证码图片 width, height = 120, 40 image = Image.new('RGB', (width, height), color='white') draw = ImageDraw.Draw(image) # 尝试使用系统字体 font = None font_paths = [ "/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", "arial.ttf", "/usr/share/fonts/TTF/DejaVuSans-Bold.ttf" ] for font_path in font_paths: try: font = ImageFont.truetype(font_path, 28) break except: continue if font is None: font = ImageFont.load_default(size=28) # 绘制验证码 draw.text((10, 5), captcha_text, fill='black', font=font) # 添加干扰线 for _ in range(5): x1 = random.randint(0, width) y1 = random.randint(0, height) x2 = random.randint(0, width) y2 = random.randint(0, height) draw.line([(x1, y1), (x2, y2)], fill='gray', width=1) # 转换为base64 buffer = BytesIO() image.save(buffer, format='PNG') image_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8') # 生成captcha_key captcha_key = ''.join(random.choices(string.ascii_letters + string.digits, k=32)) # 存储到Redis,5分钟过期 await redis_client.setex( f"captcha:{captcha_key}", 300, captcha_text ) return { "captcha_key": captcha_key, "captcha_base64": f"data:image/png;base64,{image_base64}" } async def _verify_captcha(self, captcha_key: str, captcha: str) -> bool: """ 验证验证码 Args: captcha_key: 验证码密钥 captcha: 用户输入的验证码 Returns: 验证是否成功 """ from app.utils.redis_client import redis_client # 从Redis获取存储的验证码 stored_captcha = await redis_client.get(f"captcha:{captcha_key}") if not stored_captcha: return False # 验证码不区分大小写 return stored_captcha.lower() == captcha.lower() async def _build_user_info(self, db: AsyncSession, user: User) -> UserInfo: """ 构建用户信息 Args: db: 数据库会话 user: 用户对象 Returns: UserInfo: 用户信息 """ return UserInfo( id=user.id, username=user.username, real_name=user.full_name or user.username, email=user.email, avatar_url=user.avatar_url, is_admin=user.is_superuser, status="active" if user.is_active else "disabled" ) # 创建服务实例 auth_service = AuthService()