Files
zcglxt/backend_new/app/services/transfer_service.py

452 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
资产调拨业务服务层
"""
from typing import List, Optional, Dict, Any
from sqlalchemy.orm import Session, selectinload
from app.crud.transfer import transfer_order, transfer_item
from app.crud.asset import asset
from app.schemas.transfer import (
AssetTransferOrderCreate,
AssetTransferOrderUpdate
)
from app.core.exceptions import NotFoundException, BusinessException
class TransferService:
"""资产调拨服务类"""
async def get_order(
self,
db: Session,
order_id: int
) -> Dict[str, Any]:
"""获取调拨单详情"""
# 使用selectinload预加载关联数据避免N+1查询
from app.models.transfer import AssetTransferOrder
from app.models.organization import Organization
from app.models.user import User
from app.models.transfer import AssetTransferItem
obj = db.query(
AssetTransferOrder
).options(
selectinload(AssetTransferOrder.items),
selectinload(AssetTransferOrder.source_org.of_type(Organization)),
selectinload(AssetTransferOrder.target_org.of_type(Organization)),
selectinload(AssetTransferOrder.applicant.of_type(User)),
selectinload(AssetTransferOrder.approver.of_type(User)),
selectinload(AssetTransferOrder.executor.of_type(User))
).filter(
AssetTransferOrder.id == order_id
).first()
if not obj:
raise NotFoundException("调拨单")
# 加载关联信息
return self._load_order_relations(db, obj)
def get_orders(
self,
db: Session,
skip: int = 0,
limit: int = 20,
transfer_type: Optional[str] = None,
approval_status: Optional[str] = None,
execute_status: Optional[str] = None,
source_org_id: Optional[int] = None,
target_org_id: Optional[int] = None,
keyword: Optional[str] = None
) -> tuple:
"""获取调拨单列表"""
items, total = transfer_order.get_multi(
db=db,
skip=skip,
limit=limit,
transfer_type=transfer_type,
approval_status=approval_status,
execute_status=execute_status,
source_org_id=source_org_id,
target_org_id=target_org_id,
keyword=keyword
)
# 加载关联信息
items_with_relations = [self._load_order_relations(db, item) for item in items]
return items_with_relations, total
async def create_order(
self,
db: Session,
obj_in: AssetTransferOrderCreate,
apply_user_id: int
):
"""创建调拨单"""
# 验证资产存在性和状态
assets = []
for asset_id in obj_in.asset_ids:
asset_obj = asset.get(db, asset_id)
if not asset_obj:
raise NotFoundException(f"资产ID {asset_id}")
assets.append(asset_obj)
# 验证资产状态是否允许调拨
for asset_obj in assets:
if asset_obj.status not in ["in_stock", "in_use"]:
raise BusinessException(
f"资产 {asset_obj.asset_code} 当前状态为 {asset_obj.status},不允许调拨操作"
)
# 验证资产所属机构是否为调出机构
for asset_obj in assets:
if asset_obj.organization_id != obj_in.source_org_id:
raise BusinessException(
f"资产 {asset_obj.asset_code} 所属机构与调出机构不一致"
)
# 生成调拨单号
order_code = await self._generate_order_code(db)
# 创建调拨单
db_obj = transfer_order.create(
db=db,
obj_in=obj_in,
order_code=order_code,
apply_user_id=apply_user_id
)
return self._load_order_relations(db, db_obj)
def update_order(
self,
db: Session,
order_id: int,
obj_in: AssetTransferOrderUpdate
):
"""更新调拨单"""
db_obj = transfer_order.get(db, order_id)
if not db_obj:
raise NotFoundException("调拨单")
# 只有待审批状态可以更新
if db_obj.approval_status != "pending":
raise BusinessException("只有待审批状态的调拨单可以更新")
return transfer_order.update(db, db_obj, obj_in)
def approve_order(
self,
db: Session,
order_id: int,
approval_status: str,
approval_user_id: int,
approval_remark: Optional[str] = None
):
"""审批调拨单"""
db_obj = transfer_order.get(db, order_id)
if not db_obj:
raise NotFoundException("调拨单")
# 检查状态
if db_obj.approval_status != "pending":
raise BusinessException("该调拨单已审批,无法重复审批")
# 审批
db_obj = transfer_order.approve(
db=db,
db_obj=db_obj,
approval_status=approval_status,
approval_user_id=approval_user_id,
approval_remark=approval_remark
)
return self._load_order_relations(db, db_obj)
def start_order(
self,
db: Session,
order_id: int,
execute_user_id: int
):
"""开始调拨"""
db_obj = transfer_order.get(db, order_id)
if not db_obj:
raise NotFoundException("调拨单")
# 检查状态
if db_obj.approval_status != "approved":
raise BusinessException("该调拨单未审批通过,无法开始执行")
if db_obj.execute_status != "pending":
raise BusinessException("该调拨单已开始或已完成")
# 开始调拨
db_obj = transfer_order.start(db, db_obj, execute_user_id)
# 更新明细状态为调拨中
transfer_item.batch_update_transfer_status(db, order_id, "transferring")
return self._load_order_relations(db, db_obj)
async def complete_order(
self,
db: Session,
order_id: int,
execute_user_id: int
):
"""完成调拨"""
db_obj = transfer_order.get(db, order_id)
if not db_obj:
raise NotFoundException("调拨单")
# 检查状态
if db_obj.execute_status not in ["pending", "executing"]:
raise BusinessException("该调拨单状态不允许完成操作")
# 完成调拨单
db_obj = transfer_order.complete(db, db_obj, execute_user_id)
# 更新资产机构和状态
await self._execute_transfer_logic(db, db_obj)
# 更新明细状态为完成
transfer_item.batch_update_transfer_status(db, order_id, "completed")
return self._load_order_relations(db, db_obj)
def cancel_order(
self,
db: Session,
order_id: int
) -> bool:
"""取消调拨单"""
db_obj = transfer_order.get(db, order_id)
if not db_obj:
raise NotFoundException("调拨单")
# 检查状态
if db_obj.execute_status == "completed":
raise BusinessException("已完成的调拨单无法取消")
transfer_order.cancel(db, db_obj)
return True
def delete_order(
self,
db: Session,
order_id: int
) -> bool:
"""删除调拨单"""
db_obj = transfer_order.get(db, order_id)
if not db_obj:
raise NotFoundException("调拨单")
# 只有已取消或已拒绝的可以删除
if db_obj.approval_status not in ["rejected", "cancelled"]:
raise BusinessException("只能删除已拒绝或已取消的调拨单")
return transfer_order.delete(db, order_id)
def get_order_items(
self,
db: Session,
order_id: int
) -> List:
"""获取调拨单明细"""
# 验证调拨单存在
if not transfer_order.get(db, order_id):
raise NotFoundException("调拨单")
return transfer_item.get_by_order(db, order_id)
def get_statistics(
self,
db: Session,
source_org_id: Optional[int] = None,
target_org_id: Optional[int] = None
) -> Dict[str, int]:
"""获取调拨单统计信息"""
return transfer_order.get_statistics(db, source_org_id, target_org_id)
async def _execute_transfer_logic(
self,
db: Session,
order_obj
):
"""执行调拨逻辑(完成调拨时自动执行)"""
# 获取明细
items = transfer_item.get_by_order(db, order_obj.id)
# 更新资产机构和状态
from app.services.asset_service import asset_service
from app.schemas.asset import AssetStatusTransition, AssetUpdate
for item in items:
try:
# 变更资产状态
await asset_service.change_asset_status(
db=db,
asset_id=item.asset_id,
status_transition=AssetStatusTransition(
new_status="transferring",
remark=f"调拨单: {order_obj.order_code},从{item.source_organization_id}{item.target_organization_id}"
),
operator_id=order_obj.execute_user_id
)
# 更新资产所属机构
asset_obj = asset.get(db, item.asset_id)
if asset_obj:
asset.update(
db=db,
db_obj=asset_obj,
obj_in=AssetUpdate(
organization_id=item.target_organization_id
),
updater_id=order_obj.execute_user_id
)
# 最终状态变更
target_status = "in_stock"
await asset_service.change_asset_status(
db=db,
asset_id=item.asset_id,
status_transition=AssetStatusTransition(
new_status=target_status,
remark=f"调拨完成: {order_obj.order_code}"
),
operator_id=order_obj.execute_user_id
)
except Exception as e:
# 记录失败日志
print(f"调拨资产 {item.asset_code} 失败: {str(e)}")
raise
def _load_order_relations(
self,
db: Session,
obj
) -> Dict[str, Any]:
"""加载调拨单关联信息"""
from app.models.user import User
from app.models.organization import Organization
result = {
"id": obj.id,
"order_code": obj.order_code,
"source_org_id": obj.source_org_id,
"target_org_id": obj.target_org_id,
"transfer_type": obj.transfer_type,
"title": obj.title,
"asset_count": obj.asset_count,
"apply_user_id": obj.apply_user_id,
"apply_time": obj.apply_time,
"approval_status": obj.approval_status,
"approval_user_id": obj.approval_user_id,
"approval_time": obj.approval_time,
"approval_remark": obj.approval_remark,
"execute_status": obj.execute_status,
"execute_user_id": obj.execute_user_id,
"execute_time": obj.execute_time,
"remark": obj.remark,
"created_at": obj.created_at,
"updated_at": obj.updated_at
}
# 加载调出机构
if obj.source_org_id:
source_org = db.query(Organization).filter(
Organization.id == obj.source_org_id
).first()
if source_org:
result["source_organization"] = {
"id": source_org.id,
"org_name": source_org.org_name,
"org_type": source_org.org_type
}
# 加载调入机构
if obj.target_org_id:
target_org = db.query(Organization).filter(
Organization.id == obj.target_org_id
).first()
if target_org:
result["target_organization"] = {
"id": target_org.id,
"org_name": target_org.org_name,
"org_type": target_org.org_type
}
# 加载申请人
if obj.apply_user_id:
apply_user = db.query(User).filter(User.id == obj.apply_user_id).first()
if apply_user:
result["apply_user"] = {
"id": apply_user.id,
"real_name": apply_user.real_name,
"username": apply_user.username
}
# 加载审批人
if obj.approval_user_id:
approval_user = db.query(User).filter(User.id == obj.approval_user_id).first()
if approval_user:
result["approval_user"] = {
"id": approval_user.id,
"real_name": approval_user.real_name,
"username": approval_user.username
}
# 加载执行人
if obj.execute_user_id:
execute_user = db.query(User).filter(User.id == obj.execute_user_id).first()
if execute_user:
result["execute_user"] = {
"id": execute_user.id,
"real_name": execute_user.real_name,
"username": execute_user.username
}
# 加载明细
items = transfer_item.get_by_order(db, obj.id)
result["items"] = [
{
"id": item.id,
"asset_id": item.asset_id,
"asset_code": item.asset_code,
"source_organization_id": item.source_organization_id,
"target_organization_id": item.target_organization_id,
"transfer_status": item.transfer_status
}
for item in items
]
return result
async def _generate_order_code(self, db: Session) -> str:
"""生成调拨单号"""
from datetime import datetime
import random
import string
# 日期部分
date_str = datetime.now().strftime("%Y%m%d")
# 序号部分5位随机数
sequence = "".join(random.choices(string.digits, k=5))
# 组合单号: TO-20250124-00001
order_code = f"TO-{date_str}-{sequence}"
# 检查是否重复,如果重复则重新生成
while transfer_order.get_by_code(db, order_code):
sequence = "".join(random.choices(string.digits, k=5))
order_code = f"TO-{date_str}-{sequence}"
return order_code
# 创建全局实例
transfer_service = TransferService()