""" 资产回收相关CRUD操作 """ from typing import List, Optional, Tuple from sqlalchemy.orm import Session from sqlalchemy import and_, or_ from app.models.recovery import AssetRecoveryOrder, AssetRecoveryItem from app.models.asset import Asset from app.schemas.recovery import AssetRecoveryOrderCreate, AssetRecoveryOrderUpdate class AssetRecoveryOrderCRUD: """回收单CRUD操作""" def get(self, db: Session, id: int) -> Optional[AssetRecoveryOrder]: """根据ID获取回收单""" return db.query(AssetRecoveryOrder).filter( AssetRecoveryOrder.id == id ).first() def get_by_code(self, db: Session, order_code: str) -> Optional[AssetRecoveryOrder]: """根据单号获取回收单""" return db.query(AssetRecoveryOrder).filter( AssetRecoveryOrder.order_code == order_code ).first() def get_multi( self, db: Session, skip: int = 0, limit: int = 20, recovery_type: Optional[str] = None, approval_status: Optional[str] = None, execute_status: Optional[str] = None, keyword: Optional[str] = None ) -> Tuple[List[AssetRecoveryOrder], int]: """获取回收单列表""" query = db.query(AssetRecoveryOrder) # 筛选条件 if recovery_type: query = query.filter(AssetRecoveryOrder.recovery_type == recovery_type) if approval_status: query = query.filter(AssetRecoveryOrder.approval_status == approval_status) if execute_status: query = query.filter(AssetRecoveryOrder.execute_status == execute_status) if keyword: query = query.filter( or_( AssetRecoveryOrder.order_code.like(f"%{keyword}%"), AssetRecoveryOrder.title.like(f"%{keyword}%") ) ) # 排序 query = query.order_by(AssetRecoveryOrder.created_at.desc()) # 总数 total = query.count() # 分页 items = query.offset(skip).limit(limit).all() return items, total def create( self, db: Session, obj_in: AssetRecoveryOrderCreate, order_code: str, apply_user_id: int ) -> AssetRecoveryOrder: """创建回收单""" from datetime import datetime # 创建回收单 db_obj = AssetRecoveryOrder( order_code=order_code, recovery_type=obj_in.recovery_type, title=obj_in.title, asset_count=len(obj_in.asset_ids), apply_user_id=apply_user_id, apply_time=datetime.utcnow(), remark=obj_in.remark, approval_status="pending", execute_status="pending" ) db.add(db_obj) db.commit() db.refresh(db_obj) # 创建回收单明细 self._create_items( db=db, order_id=db_obj.id, asset_ids=obj_in.asset_ids ) return db_obj def update( self, db: Session, db_obj: AssetRecoveryOrder, obj_in: AssetRecoveryOrderUpdate ) -> AssetRecoveryOrder: """更新回收单""" update_data = obj_in.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(db_obj, field, value) db.add(db_obj) db.commit() db.refresh(db_obj) return db_obj def approve( self, db: Session, db_obj: AssetRecoveryOrder, approval_status: str, approval_user_id: int, approval_remark: Optional[str] = None ) -> AssetRecoveryOrder: """审批回收单""" from datetime import datetime db_obj.approval_status = approval_status db_obj.approval_user_id = approval_user_id db_obj.approval_time = datetime.utcnow() db_obj.approval_remark = approval_remark # 如果审批通过,自动设置为可执行状态 if approval_status == "approved": db_obj.execute_status = "pending" elif approval_status == "rejected": db_obj.execute_status = "cancelled" db.add(db_obj) db.commit() db.refresh(db_obj) return db_obj def start( self, db: Session, db_obj: AssetRecoveryOrder, execute_user_id: int ) -> AssetRecoveryOrder: """开始回收""" from datetime import datetime db_obj.execute_status = "executing" db_obj.execute_user_id = execute_user_id db_obj.execute_time = datetime.utcnow() db.add(db_obj) db.commit() db.refresh(db_obj) return db_obj def complete( self, db: Session, db_obj: AssetRecoveryOrder, execute_user_id: int ) -> AssetRecoveryOrder: """完成回收""" from datetime import datetime db_obj.execute_status = "completed" db_obj.execute_user_id = execute_user_id db_obj.execute_time = datetime.utcnow() db.add(db_obj) db.commit() db.refresh(db_obj) return db_obj def cancel(self, db: Session, db_obj: AssetRecoveryOrder) -> AssetRecoveryOrder: """取消回收单""" db_obj.approval_status = "cancelled" db_obj.execute_status = "cancelled" db.add(db_obj) db.commit() db.refresh(db_obj) return db_obj def delete(self, db: Session, id: int) -> bool: """删除回收单""" obj = self.get(db, id) if obj: db.delete(obj) db.commit() return True return False def get_statistics( self, db: Session ) -> dict: """获取回收单统计信息""" query = db.query(AssetRecoveryOrder) total = query.count() pending = query.filter(AssetRecoveryOrder.approval_status == "pending").count() approved = query.filter(AssetRecoveryOrder.approval_status == "approved").count() rejected = query.filter(AssetRecoveryOrder.approval_status == "rejected").count() executing = query.filter(AssetRecoveryOrder.execute_status == "executing").count() completed = query.filter(AssetRecoveryOrder.execute_status == "completed").count() return { "total": total, "pending": pending, "approved": approved, "rejected": rejected, "executing": executing, "completed": completed } def _create_items( self, db: Session, order_id: int, asset_ids: List[int] ): """创建回收单明细""" # 查询资产信息 assets = db.query(Asset).filter(Asset.id.in_(asset_ids)).all() for asset in assets: item = AssetRecoveryItem( order_id=order_id, asset_id=asset.id, asset_code=asset.asset_code, recovery_status="pending" ) db.add(item) db.commit() class AssetRecoveryItemCRUD: """回收单明细CRUD操作""" def get_by_order(self, db: Session, order_id: int) -> List[AssetRecoveryItem]: """根据回收单ID获取明细列表""" return db.query(AssetRecoveryItem).filter( AssetRecoveryItem.order_id == order_id ).order_by(AssetRecoveryItem.id).all() def get_multi( self, db: Session, skip: int = 0, limit: int = 20, order_id: Optional[int] = None, recovery_status: Optional[str] = None ) -> Tuple[List[AssetRecoveryItem], int]: """获取明细列表""" query = db.query(AssetRecoveryItem) if order_id: query = query.filter(AssetRecoveryItem.order_id == order_id) if recovery_status: query = query.filter(AssetRecoveryItem.recovery_status == recovery_status) total = query.count() items = query.offset(skip).limit(limit).all() return items, total def update_recovery_status( self, db: Session, item_id: int, recovery_status: str ) -> AssetRecoveryItem: """更新明细回收状态""" item = db.query(AssetRecoveryItem).filter( AssetRecoveryItem.id == item_id ).first() if item: item.recovery_status = recovery_status db.add(item) db.commit() db.refresh(item) return item def batch_update_recovery_status( self, db: Session, order_id: int, recovery_status: str ): """批量更新明细回收状态""" items = db.query(AssetRecoveryItem).filter( and_( AssetRecoveryItem.order_id == order_id, AssetRecoveryItem.recovery_status == "pending" ) ).all() for item in items: item.recovery_status = recovery_status db.add(item) db.commit() # 创建全局实例 recovery_order = AssetRecoveryOrderCRUD() recovery_item = AssetRecoveryItemCRUD()