Files
zcglxt/backend/app/core/security.py

179 lines
4.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
安全相关工具模块
"""
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import HTTPException, status
from app.core.config import settings
# 密码加密上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class SecurityManager:
"""安全管理器"""
def __init__(self):
self.secret_key = settings.SECRET_KEY
self.algorithm = settings.ALGORITHM
self.access_token_expire_minutes = settings.ACCESS_TOKEN_EXPIRE_MINUTES
self.refresh_token_expire_days = settings.REFRESH_TOKEN_EXPIRE_DAYS
def verify_password(self, plain_password: str, hashed_password: str) -> bool:
"""
验证密码
Args:
plain_password: 明文密码
hashed_password: 哈希密码
Returns:
bool: 密码是否匹配
"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(self, password: str) -> str:
"""
获取密码哈希值
Args:
password: 明文密码
Returns:
str: 哈希后的密码
"""
return pwd_context.hash(password)
def create_access_token(
self,
data: Dict[str, Any],
expires_delta: Optional[timedelta] = None
) -> str:
"""
创建访问令牌
Args:
data: 要编码的数据
expires_delta: 过期时间增量
Returns:
str: JWT令牌
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=self.access_token_expire_minutes)
to_encode.update({
"exp": expire,
"type": "access"
})
encoded_jwt = jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
return encoded_jwt
def create_refresh_token(
self,
data: Dict[str, Any],
expires_delta: Optional[timedelta] = None
) -> str:
"""
创建刷新令牌
Args:
data: 要编码的数据
expires_delta: 过期时间增量
Returns:
str: JWT令牌
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(days=self.refresh_token_expire_days)
to_encode.update({
"exp": expire,
"type": "refresh"
})
encoded_jwt = jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
return encoded_jwt
def decode_token(self, token: str) -> Dict[str, Any]:
"""
解码令牌
Args:
token: JWT令牌
Returns:
Dict: 解码后的数据
Raises:
HTTPException: 令牌无效或过期
"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
return payload
except JWTError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的认证凭据",
headers={"WWW-Authenticate": "Bearer"}
)
def verify_token(self, token: str, token_type: str = "access") -> Dict[str, Any]:
"""
验证令牌
Args:
token: JWT令牌
token_type: 令牌类型access/refresh
Returns:
Dict: 解码后的数据
Raises:
HTTPException: 令牌无效或类型不匹配
"""
payload = self.decode_token(token)
if payload.get("type") != token_type:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"令牌类型不匹配,期望{token_type}"
)
return payload
# 创建全局安全管理器实例
security_manager = SecurityManager()
def get_password_hash(password: str) -> str:
"""获取密码哈希值(便捷函数)"""
return security_manager.get_password_hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""验证密码(便捷函数)"""
return security_manager.verify_password(plain_password, hashed_password)
def create_access_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
"""创建访问令牌(便捷函数)"""
return security_manager.create_access_token(data, expires_delta)
def create_refresh_token(data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
"""创建刷新令牌(便捷函数)"""
return security_manager.create_refresh_token(data, expires_delta)