297 lines
9.1 KiB
Python
297 lines
9.1 KiB
Python
"""
|
|
资产管理业务服务层
|
|
"""
|
|
from typing import List, Optional, Tuple, Dict, Any
|
|
from sqlalchemy.orm import Session
|
|
from app.crud.asset import asset, asset_status_history
|
|
from app.schemas.asset import (
|
|
AssetCreate,
|
|
AssetUpdate,
|
|
AssetStatusTransition
|
|
)
|
|
from app.services.state_machine_service import state_machine_service
|
|
from app.utils.asset_code import generate_asset_code
|
|
from app.utils.qrcode import generate_qr_code, delete_qr_code
|
|
from app.core.exceptions import NotFoundException, AlreadyExistsException, StateTransitionException
|
|
|
|
|
|
class AssetService:
|
|
"""资产服务类"""
|
|
|
|
def __init__(self):
|
|
self.state_machine = state_machine_service
|
|
|
|
async def get_asset(self, db: Session, asset_id: int):
|
|
"""获取资产详情"""
|
|
obj = asset.get(db, asset_id)
|
|
if not obj:
|
|
raise NotFoundException("资产")
|
|
|
|
# 加载关联信息
|
|
return self._load_relations(db, obj)
|
|
|
|
def get_assets(
|
|
self,
|
|
db: Session,
|
|
skip: int = 0,
|
|
limit: int = 20,
|
|
keyword: Optional[str] = None,
|
|
device_type_id: Optional[int] = None,
|
|
organization_id: Optional[int] = None,
|
|
status: Optional[str] = None,
|
|
purchase_date_start: Optional[Any] = None,
|
|
purchase_date_end: Optional[Any] = None
|
|
) -> Tuple[List, int]:
|
|
"""获取资产列表"""
|
|
return asset.get_multi(
|
|
db=db,
|
|
skip=skip,
|
|
limit=limit,
|
|
keyword=keyword,
|
|
device_type_id=device_type_id,
|
|
organization_id=organization_id,
|
|
status=status,
|
|
purchase_date_start=purchase_date_start,
|
|
purchase_date_end=purchase_date_end
|
|
)
|
|
|
|
async def create_asset(
|
|
self,
|
|
db: Session,
|
|
obj_in: AssetCreate,
|
|
creator_id: int
|
|
):
|
|
"""创建资产"""
|
|
# 检查序列号是否已存在
|
|
if obj_in.serial_number:
|
|
existing = asset.get_by_serial_number(db, obj_in.serial_number)
|
|
if existing:
|
|
raise AlreadyExistsException("该序列号已被使用")
|
|
|
|
# 生成资产编码
|
|
asset_code = await generate_asset_code(db)
|
|
|
|
# 创建资产
|
|
db_obj = asset.create(db, obj_in, asset_code, creator_id)
|
|
|
|
# 生成二维码
|
|
try:
|
|
qr_code_url = generate_qr_code(asset_code)
|
|
db_obj.qr_code_url = qr_code_url
|
|
db.add(db_obj)
|
|
db.commit()
|
|
db.refresh(db_obj)
|
|
except Exception as e:
|
|
# 二维码生成失败不影响资产创建
|
|
pass
|
|
|
|
# 记录状态历史
|
|
await self._record_status_change(
|
|
db=db,
|
|
asset_id=db_obj.id,
|
|
old_status=None,
|
|
new_status="pending",
|
|
operation_type="create",
|
|
operator_id=creator_id,
|
|
operator_name=None, # 可以从用户表获取
|
|
remark="资产创建"
|
|
)
|
|
|
|
return db_obj
|
|
|
|
def update_asset(
|
|
self,
|
|
db: Session,
|
|
asset_id: int,
|
|
obj_in: AssetUpdate,
|
|
updater_id: int
|
|
):
|
|
"""更新资产"""
|
|
db_obj = asset.get(db, asset_id)
|
|
if not db_obj:
|
|
raise NotFoundException("资产")
|
|
|
|
# 如果更新序列号,检查是否重复
|
|
if obj_in.serial_number and obj_in.serial_number != db_obj.serial_number:
|
|
existing = asset.get_by_serial_number(db, obj_in.serial_number)
|
|
if existing:
|
|
raise AlreadyExistsException("该序列号已被使用")
|
|
|
|
return asset.update(db, db_obj, obj_in, updater_id)
|
|
|
|
def delete_asset(
|
|
self,
|
|
db: Session,
|
|
asset_id: int,
|
|
deleter_id: int
|
|
) -> bool:
|
|
"""删除资产"""
|
|
if not asset.get(db, asset_id):
|
|
raise NotFoundException("资产")
|
|
return asset.delete(db, asset_id, deleter_id)
|
|
|
|
async def change_asset_status(
|
|
self,
|
|
db: Session,
|
|
asset_id: int,
|
|
status_transition: AssetStatusTransition,
|
|
operator_id: int,
|
|
operator_name: Optional[str] = None
|
|
):
|
|
"""变更资产状态"""
|
|
db_obj = asset.get(db, asset_id)
|
|
if not db_obj:
|
|
raise NotFoundException("资产")
|
|
|
|
# 验证状态转换
|
|
error = self.state_machine.validate_transition(
|
|
db_obj.status,
|
|
status_transition.new_status
|
|
)
|
|
if error:
|
|
raise StateTransitionException(db_obj.status, status_transition.new_status)
|
|
|
|
# 更新状态
|
|
old_status = db_obj.status
|
|
asset.update_status(
|
|
db=db,
|
|
asset_id=asset_id,
|
|
new_status=status_transition.new_status,
|
|
updater_id=operator_id
|
|
)
|
|
|
|
# 记录状态历史
|
|
await self._record_status_change(
|
|
db=db,
|
|
asset_id=asset_id,
|
|
old_status=old_status,
|
|
new_status=status_transition.new_status,
|
|
operation_type=self._get_operation_type(old_status, status_transition.new_status),
|
|
operator_id=operator_id,
|
|
operator_name=operator_name,
|
|
remark=status_transition.remark,
|
|
extra_data=status_transition.extra_data
|
|
)
|
|
|
|
# 刷新对象
|
|
db.refresh(db_obj)
|
|
return db_obj
|
|
|
|
def get_asset_status_history(
|
|
self,
|
|
db: Session,
|
|
asset_id: int,
|
|
skip: int = 0,
|
|
limit: int = 50
|
|
) -> List:
|
|
"""获取资产状态历史"""
|
|
if not asset.get(db, asset_id):
|
|
raise NotFoundException("资产")
|
|
|
|
return asset_status_history.get_by_asset(db, asset_id, skip, limit)
|
|
|
|
def scan_asset_by_code(
|
|
self,
|
|
db: Session,
|
|
asset_code: str
|
|
):
|
|
"""扫码查询资产"""
|
|
obj = asset.get_by_code(db, asset_code)
|
|
if not obj:
|
|
raise NotFoundException("资产")
|
|
|
|
return self._load_relations(db, obj)
|
|
|
|
def get_statistics(
|
|
self,
|
|
db: Session,
|
|
organization_id: Optional[int] = None
|
|
) -> Dict[str, Any]:
|
|
"""获取资产统计信息"""
|
|
query = db.query(
|
|
func.count(Asset.id).label("total"),
|
|
func.sum(Asset.purchase_price).label("total_value")
|
|
).filter(Asset.deleted_at.is_(None))
|
|
|
|
if organization_id:
|
|
query = query.filter(Asset.organization_id == organization_id)
|
|
|
|
result = query.first()
|
|
|
|
# 按状态统计
|
|
status_query = db.query(
|
|
Asset.status,
|
|
func.count(Asset.id).label("count")
|
|
).filter(
|
|
Asset.deleted_at.is_(None)
|
|
)
|
|
|
|
if organization_id:
|
|
status_query = status_query.filter(Asset.organization_id == organization_id)
|
|
|
|
status_query = status_query.group_by(Asset.status)
|
|
status_distribution = {row.status: row.count for row in status_query.all()}
|
|
|
|
return {
|
|
"total": result.total or 0,
|
|
"total_value": float(result.total_value or 0),
|
|
"status_distribution": status_distribution
|
|
}
|
|
|
|
def _load_relations(self, db: Session, obj):
|
|
"""加载关联信息"""
|
|
# 这里可以预加载关联对象
|
|
# 例如: obj.device_type, obj.brand, obj.organization等
|
|
return obj
|
|
|
|
async def _record_status_change(
|
|
self,
|
|
db: Session,
|
|
asset_id: int,
|
|
old_status: Optional[str],
|
|
new_status: str,
|
|
operation_type: str,
|
|
operator_id: int,
|
|
operator_name: Optional[str] = None,
|
|
organization_id: Optional[int] = None,
|
|
remark: Optional[str] = None,
|
|
extra_data: Optional[Dict[str, Any]] = None
|
|
):
|
|
"""记录状态变更历史"""
|
|
asset_status_history.create(
|
|
db=db,
|
|
asset_id=asset_id,
|
|
old_status=old_status,
|
|
new_status=new_status,
|
|
operation_type=operation_type,
|
|
operator_id=operator_id,
|
|
operator_name=operator_name,
|
|
organization_id=organization_id,
|
|
remark=remark,
|
|
extra_data=extra_data
|
|
)
|
|
|
|
def _get_operation_type(self, old_status: str, new_status: str) -> str:
|
|
"""根据状态转换获取操作类型"""
|
|
operation_map = {
|
|
("pending", "in_stock"): "in_stock",
|
|
("in_stock", "in_use"): "allocate",
|
|
("in_use", "in_stock"): "recover",
|
|
("in_stock", "transferring"): "transfer",
|
|
("in_use", "transferring"): "transfer",
|
|
("transferring", "in_use"): "transfer_complete",
|
|
("in_stock", "maintenance"): "maintenance",
|
|
("in_use", "maintenance"): "maintenance",
|
|
("maintenance", "in_stock"): "maintenance_complete",
|
|
("maintenance", "in_use"): "maintenance_complete",
|
|
("in_stock", "pending_scrap"): "pending_scrap",
|
|
("in_use", "pending_scrap"): "pending_scrap",
|
|
("pending_scrap", "scrapped"): "scrap",
|
|
("pending_scrap", "in_stock"): "cancel_scrap",
|
|
}
|
|
return operation_map.get((old_status, new_status), "status_change")
|
|
|
|
|
|
# 创建全局实例
|
|
asset_service = AssetService()
|