232 lines
6.8 KiB
Python
232 lines
6.8 KiB
Python
"""
|
||
用户相关的Pydantic Schema
|
||
"""
|
||
from typing import Optional, List
|
||
from datetime import datetime
|
||
from pydantic import BaseModel, Field, EmailStr, field_validator
|
||
|
||
|
||
# ===== 用户Schema =====
|
||
|
||
class UserBase(BaseModel):
|
||
"""用户基础Schema"""
|
||
real_name: str = Field(..., min_length=1, max_length=100, description="真实姓名")
|
||
email: Optional[EmailStr] = Field(None, description="邮箱")
|
||
phone: Optional[str] = Field(None, max_length=20, description="手机号")
|
||
|
||
|
||
class UserCreate(UserBase):
|
||
"""创建用户Schema"""
|
||
username: str = Field(..., min_length=4, max_length=50, description="用户名")
|
||
password: str = Field(..., min_length=8, max_length=100, description="密码")
|
||
role_ids: List[int] = Field(..., min_items=1, description="角色ID列表")
|
||
|
||
@field_validator("username")
|
||
@classmethod
|
||
def validate_username(cls, v: str) -> str:
|
||
"""验证用户名格式"""
|
||
if not v.replace("_", "").isalnum():
|
||
raise ValueError("用户名只能包含字母、数字和下划线")
|
||
return v
|
||
|
||
@field_validator("password")
|
||
@classmethod
|
||
def validate_password(cls, v: str) -> str:
|
||
"""验证密码强度"""
|
||
if not any(c.isupper() for c in v):
|
||
raise ValueError("密码必须包含至少一个大写字母")
|
||
if not any(c.islower() for c in v):
|
||
raise ValueError("密码必须包含至少一个小写字母")
|
||
if not any(c.isdigit() for c in v):
|
||
raise ValueError("密码必须包含至少一个数字")
|
||
return v
|
||
|
||
|
||
class UserUpdate(BaseModel):
|
||
"""更新用户Schema"""
|
||
real_name: Optional[str] = Field(None, min_length=1, max_length=100)
|
||
email: Optional[EmailStr] = None
|
||
phone: Optional[str] = Field(None, max_length=20)
|
||
status: Optional[str] = Field(None, pattern="^(active|disabled|locked)$")
|
||
role_ids: Optional[List[int]] = None
|
||
|
||
|
||
class UserInDB(BaseModel):
|
||
"""数据库中的用户Schema"""
|
||
id: int
|
||
username: str
|
||
real_name: str
|
||
email: Optional[str]
|
||
phone: Optional[str]
|
||
avatar_url: Optional[str]
|
||
status: str
|
||
is_admin: bool
|
||
last_login_at: Optional[datetime]
|
||
created_at: datetime
|
||
|
||
class Config:
|
||
from_attributes = True
|
||
|
||
|
||
class UserResponse(UserInDB):
|
||
"""用户响应Schema"""
|
||
roles: List["RoleResponse"] = []
|
||
|
||
class Config:
|
||
from_attributes = True
|
||
|
||
|
||
class UserInfo(BaseModel):
|
||
"""用户信息Schema(不含敏感信息)"""
|
||
id: int
|
||
username: str
|
||
real_name: str
|
||
email: Optional[str]
|
||
avatar_url: Optional[str]
|
||
is_admin: bool
|
||
status: str
|
||
|
||
class Config:
|
||
from_attributes = True
|
||
|
||
|
||
# ===== 登录认证Schema =====
|
||
|
||
class LoginRequest(BaseModel):
|
||
"""登录请求Schema"""
|
||
username: str = Field(..., min_length=1, description="用户名")
|
||
password: str = Field(..., min_length=1, description="密码")
|
||
captcha: str = Field(..., min_length=4, description="验证码")
|
||
captcha_key: str = Field(..., description="验证码UUID")
|
||
|
||
|
||
class LoginResponse(BaseModel):
|
||
"""登录响应Schema"""
|
||
access_token: str = Field(..., description="访问令牌")
|
||
refresh_token: str = Field(..., description="刷新令牌")
|
||
token_type: str = Field(default="Bearer", description="令牌类型")
|
||
expires_in: int = Field(..., description="过期时间(秒)")
|
||
user: UserInfo = Field(..., description="用户信息")
|
||
|
||
|
||
class RefreshTokenRequest(BaseModel):
|
||
"""刷新令牌请求Schema"""
|
||
refresh_token: str = Field(..., description="刷新令牌")
|
||
|
||
|
||
class RefreshTokenResponse(BaseModel):
|
||
"""刷新令牌响应Schema"""
|
||
access_token: str = Field(..., description="新的访问令牌")
|
||
expires_in: int = Field(..., description="过期时间(秒)")
|
||
|
||
|
||
class ChangePasswordRequest(BaseModel):
|
||
"""修改密码请求Schema"""
|
||
old_password: str = Field(..., min_length=1, description="旧密码")
|
||
new_password: str = Field(..., min_length=8, max_length=100, description="新密码")
|
||
confirm_password: str = Field(..., min_length=8, max_length=100, description="确认密码")
|
||
|
||
@field_validator("confirm_password")
|
||
@classmethod
|
||
def validate_passwords_match(cls, v: str, info) -> str:
|
||
"""验证两次密码是否一致"""
|
||
if "new_password" in info.data and v != info.data["new_password"]:
|
||
raise ValueError("两次输入的密码不一致")
|
||
return v
|
||
|
||
|
||
class ResetPasswordRequest(BaseModel):
|
||
"""重置密码请求Schema"""
|
||
new_password: str = Field(..., min_length=8, max_length=100, description="新密码")
|
||
|
||
|
||
# ===== 角色Schema =====
|
||
|
||
class RoleBase(BaseModel):
|
||
"""角色基础Schema"""
|
||
role_name: str = Field(..., min_length=1, max_length=50, description="角色名称")
|
||
role_code: str = Field(..., min_length=1, max_length=50, description="角色代码")
|
||
description: Optional[str] = Field(None, description="角色描述")
|
||
|
||
|
||
class RoleCreate(RoleBase):
|
||
"""创建角色Schema"""
|
||
permission_ids: List[int] = Field(default_factory=list, description="权限ID列表")
|
||
|
||
|
||
class RoleUpdate(BaseModel):
|
||
"""更新角色Schema"""
|
||
role_name: Optional[str] = Field(None, min_length=1, max_length=50)
|
||
description: Optional[str] = None
|
||
permission_ids: Optional[List[int]] = None
|
||
|
||
|
||
class RoleInDB(BaseModel):
|
||
"""数据库中的角色Schema"""
|
||
id: int
|
||
role_name: str
|
||
role_code: str
|
||
description: Optional[str]
|
||
status: str
|
||
sort_order: int
|
||
created_at: datetime
|
||
|
||
class Config:
|
||
from_attributes = True
|
||
|
||
|
||
class RoleResponse(RoleInDB):
|
||
"""角色响应Schema"""
|
||
permissions: List["PermissionResponse"] = []
|
||
|
||
class Config:
|
||
from_attributes = True
|
||
|
||
|
||
class RoleWithUserCount(RoleResponse):
|
||
"""带用户数量的角色响应Schema"""
|
||
user_count: int = Field(..., description="用户数量")
|
||
|
||
|
||
# ===== 权限Schema =====
|
||
|
||
class PermissionBase(BaseModel):
|
||
"""权限基础Schema"""
|
||
permission_name: str = Field(..., min_length=1, max_length=100)
|
||
permission_code: str = Field(..., min_length=1, max_length=100)
|
||
module: str = Field(..., min_length=1, max_length=50)
|
||
resource: Optional[str] = Field(None, max_length=50)
|
||
action: Optional[str] = Field(None, max_length=50)
|
||
description: Optional[str] = None
|
||
|
||
|
||
class PermissionCreate(PermissionBase):
|
||
"""创建权限Schema"""
|
||
pass
|
||
|
||
|
||
class PermissionUpdate(BaseModel):
|
||
"""更新权限Schema"""
|
||
permission_name: Optional[str] = Field(None, min_length=1, max_length=100)
|
||
description: Optional[str] = None
|
||
|
||
|
||
class PermissionResponse(PermissionBase):
|
||
"""权限响应Schema"""
|
||
id: int
|
||
created_at: datetime
|
||
|
||
class Config:
|
||
from_attributes = True
|
||
|
||
|
||
class PermissionTreeNode(PermissionResponse):
|
||
"""权限树节点Schema"""
|
||
children: List["PermissionTreeNode"] = []
|
||
|
||
|
||
# 更新前向引用
|
||
UserResponse.model_rebuild()
|
||
RoleResponse.model_rebuild()
|
||
PermissionTreeNode.model_rebuild()
|