Files
zcglxt/backend/app/services/auth_service.py

286 lines
8.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
认证服务
"""
from datetime import datetime, timedelta
from typing import Optional
from fastapi import status
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.security import security_manager
from app.core.exceptions import (
InvalidCredentialsException,
UserLockedException,
UserDisabledException,
CaptchaException
)
from app.crud.user import user_crud
from app.models.user import User
from app.schemas.user import LoginResponse, UserInfo
from app.core.config import settings
import uuid
class AuthService:
"""认证服务类"""
def __init__(self):
self.max_login_failures = 5
self.lock_duration_minutes = 30
async def login(
self,
db: AsyncSession,
username: str,
password: str,
captcha: str,
captcha_key: str
) -> LoginResponse:
"""
用户登录
Args:
db: 数据库会话
username: 用户名
password: 密码
captcha: 验证码
captcha_key: 验证码UUID
Returns:
LoginResponse: 登录响应
Raises:
InvalidCredentialsException: 认证失败
UserLockedException: 用户被锁定
UserDisabledException: 用户被禁用
"""
# 验证验证码
if not await self._verify_captcha(captcha_key, captcha):
raise CaptchaException()
# 获取用户
user = await user_crud.get_by_username(db, username)
if not user:
raise InvalidCredentialsException("用户名或密码错误")
# 检查用户状态 - 使用实际的数据库字段 is_active
if not user.is_active:
raise UserDisabledException()
# 验证密码 - 使用实际的数据库字段 hashed_password
if not security_manager.verify_password(password, user.hashed_password):
raise InvalidCredentialsException("用户名或密码错误")
# 登录成功,重置失败次数
await user_crud.update_last_login(db, user)
# 生成Token
access_token = security_manager.create_access_token(
data={"sub": str(user.id), "username": user.username}
)
refresh_token = security_manager.create_refresh_token(
data={"sub": str(user.id), "username": user.username}
)
# 获取用户角色和权限
user_info = await self._build_user_info(db, user)
return LoginResponse(
access_token=access_token,
refresh_token=refresh_token,
token_type="Bearer",
expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60,
user=user_info
)
async def refresh_token(self, db: AsyncSession, refresh_token: str) -> dict:
"""
刷新访问令牌
Args:
db: 数据库会话
refresh_token: 刷新令牌
Returns:
dict: 包含新的访问令牌
"""
payload = security_manager.verify_token(refresh_token, token_type="refresh")
user_id = int(payload.get("sub"))
user = await user_crud.get(db, user_id)
if not user or not user.is_active:
raise InvalidCredentialsException("用户不存在或已被禁用")
# 生成新的访问令牌
access_token = security_manager.create_access_token(
data={"sub": str(user.id), "username": user.username}
)
return {
"access_token": access_token,
"expires_in": settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
}
async def change_password(
self,
db: AsyncSession,
user: User,
old_password: str,
new_password: str
) -> bool:
"""
修改密码
Args:
db: 数据库会话
user: 当前用户
old_password: 旧密码
new_password: 新密码
Returns:
bool: 是否修改成功
"""
# 验证旧密码
if not security_manager.verify_password(old_password, user.hashed_password):
raise InvalidCredentialsException("旧密码错误")
# 更新密码
return await user_crud.update_password(db, user, new_password)
async def reset_password(
self,
db: AsyncSession,
user_id: int,
new_password: str
) -> bool:
"""
重置用户密码(管理员功能)
Args:
db: 数据库会话
user_id: 用户ID
new_password: 新密码
Returns:
bool: 是否重置成功
"""
user = await user_crud.get(db, user_id)
if not user:
return False
return await user_crud.update_password(db, user, new_password)
async def _generate_captcha(self) -> dict:
"""
生成验证码
Returns:
包含captcha_key和captcha_base64的字典
"""
from app.utils.redis_client import redis_client
import random
import string
import base64
from io import BytesIO
from PIL import Image, ImageDraw, ImageFont
# 生成4位随机验证码
captcha_text = ''.join(random.choices(string.digits, k=4))
# 生成验证码图片
width, height = 120, 40
image = Image.new('RGB', (width, height), color='white')
draw = ImageDraw.Draw(image)
# 尝试使用系统字体
font = None
font_paths = [
"/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf",
"/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"arial.ttf",
"/usr/share/fonts/TTF/DejaVuSans-Bold.ttf"
]
for font_path in font_paths:
try:
font = ImageFont.truetype(font_path, 28)
break
except:
continue
if font is None:
font = ImageFont.load_default(size=28)
# 绘制验证码
draw.text((10, 5), captcha_text, fill='black', font=font)
# 添加干扰线
for _ in range(5):
x1 = random.randint(0, width)
y1 = random.randint(0, height)
x2 = random.randint(0, width)
y2 = random.randint(0, height)
draw.line([(x1, y1), (x2, y2)], fill='gray', width=1)
# 转换为base64
buffer = BytesIO()
image.save(buffer, format='PNG')
image_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
# 生成captcha_key
captcha_key = ''.join(random.choices(string.ascii_letters + string.digits, k=32))
# 存储到Redis5分钟过期
await redis_client.setex(
f"captcha:{captcha_key}",
300,
captcha_text
)
return {
"captcha_key": captcha_key,
"captcha_base64": f"data:image/png;base64,{image_base64}"
}
async def _verify_captcha(self, captcha_key: str, captcha: str) -> bool:
"""
验证验证码
Args:
captcha_key: 验证码密钥
captcha: 用户输入的验证码
Returns:
验证是否成功
"""
from app.utils.redis_client import redis_client
# 从Redis获取存储的验证码
stored_captcha = await redis_client.get(f"captcha:{captcha_key}")
if not stored_captcha:
return False
# 验证码不区分大小写
return stored_captcha.lower() == captcha.lower()
async def _build_user_info(self, db: AsyncSession, user: User) -> UserInfo:
"""
构建用户信息
Args:
db: 数据库会话
user: 用户对象
Returns:
UserInfo: 用户信息
"""
return UserInfo(
id=user.id,
username=user.username,
real_name=user.full_name or user.username,
email=user.email,
avatar_url=user.avatar_url,
is_admin=user.is_superuser,
status="active" if user.is_active else "disabled"
)
# 创建服务实例
auth_service = AuthService()