Files
zcglxt/backend_new/app/crud/maintenance.py

248 lines
7.6 KiB
Python

"""
维修管理相关CRUD操作
"""
from typing import List, Optional, Tuple
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_, func
from app.models.maintenance import MaintenanceRecord
from app.schemas.maintenance import MaintenanceRecordCreate, MaintenanceRecordUpdate
class MaintenanceRecordCRUD:
"""维修记录CRUD操作"""
def get(self, db: Session, id: int) -> Optional[MaintenanceRecord]:
"""根据ID获取维修记录"""
return db.query(MaintenanceRecord).filter(
MaintenanceRecord.id == id
).first()
def get_by_code(self, db: Session, record_code: str) -> Optional[MaintenanceRecord]:
"""根据单号获取维修记录"""
return db.query(MaintenanceRecord).filter(
MaintenanceRecord.record_code == record_code
).first()
def get_multi(
self,
db: Session,
skip: int = 0,
limit: int = 20,
asset_id: Optional[int] = None,
status: Optional[str] = None,
fault_type: Optional[str] = None,
priority: Optional[str] = None,
maintenance_type: Optional[str] = None,
keyword: Optional[str] = None
) -> Tuple[List[MaintenanceRecord], int]:
"""获取维修记录列表"""
query = db.query(MaintenanceRecord)
# 筛选条件
if asset_id:
query = query.filter(MaintenanceRecord.asset_id == asset_id)
if status:
query = query.filter(MaintenanceRecord.status == status)
if fault_type:
query = query.filter(MaintenanceRecord.fault_type == fault_type)
if priority:
query = query.filter(MaintenanceRecord.priority == priority)
if maintenance_type:
query = query.filter(MaintenanceRecord.maintenance_type == maintenance_type)
if keyword:
query = query.filter(
or_(
MaintenanceRecord.record_code.like(f"%{keyword}%"),
MaintenanceRecord.asset_code.like(f"%{keyword}%"),
MaintenanceRecord.fault_description.like(f"%{keyword}%")
)
)
# 排序
query = query.order_by(MaintenanceRecord.report_time.desc())
# 总数
total = query.count()
# 分页
items = query.offset(skip).limit(limit).all()
return items, total
def create(
self,
db: Session,
obj_in: MaintenanceRecordCreate,
record_code: str,
asset_code: str,
report_user_id: int,
creator_id: int
) -> MaintenanceRecord:
"""创建维修记录"""
db_obj = MaintenanceRecord(
record_code=record_code,
asset_id=obj_in.asset_id,
asset_code=asset_code,
fault_description=obj_in.fault_description,
fault_type=obj_in.fault_type,
report_user_id=report_user_id,
priority=obj_in.priority,
maintenance_type=obj_in.maintenance_type,
vendor_id=obj_in.vendor_id,
maintenance_cost=obj_in.maintenance_cost,
maintenance_result=obj_in.maintenance_result,
replaced_parts=obj_in.replaced_parts,
images=obj_in.images,
remark=obj_in.remark,
status="pending",
created_by=creator_id
)
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def update(
self,
db: Session,
db_obj: MaintenanceRecord,
obj_in: MaintenanceRecordUpdate,
updater_id: int
) -> MaintenanceRecord:
"""更新维修记录"""
update_data = obj_in.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_obj, field, value)
db_obj.updated_by = updater_id
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def start_maintenance(
self,
db: Session,
db_obj: MaintenanceRecord,
maintenance_type: str,
maintenance_user_id: int,
vendor_id: Optional[int] = None
) -> MaintenanceRecord:
"""开始维修"""
from datetime import datetime
db_obj.status = "in_progress"
db_obj.start_time = datetime.utcnow()
db_obj.maintenance_type = maintenance_type
db_obj.maintenance_user_id = maintenance_user_id
if vendor_id:
db_obj.vendor_id = vendor_id
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def complete_maintenance(
self,
db: Session,
db_obj: MaintenanceRecord,
maintenance_result: str,
maintenance_cost: Optional[float] = None,
replaced_parts: Optional[str] = None,
images: Optional[str] = None
) -> MaintenanceRecord:
"""完成维修"""
from datetime import datetime
db_obj.status = "completed"
db_obj.complete_time = datetime.utcnow()
db_obj.maintenance_result = maintenance_result
if maintenance_cost is not None:
db_obj.maintenance_cost = maintenance_cost
if replaced_parts:
db_obj.replaced_parts = replaced_parts
if images:
db_obj.images = images
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def cancel_maintenance(
self,
db: Session,
db_obj: MaintenanceRecord
) -> MaintenanceRecord:
"""取消维修"""
db_obj.status = "cancelled"
db.add(db_obj)
db.commit()
db.refresh(db_obj)
return db_obj
def delete(self, db: Session, id: int) -> bool:
"""删除维修记录"""
obj = self.get(db, id)
if obj:
db.delete(obj)
db.commit()
return True
return False
def get_statistics(
self,
db: Session,
asset_id: Optional[int] = None
) -> dict:
"""获取维修统计信息"""
from decimal import Decimal
query = db.query(MaintenanceRecord)
if asset_id:
query = query.filter(MaintenanceRecord.asset_id == asset_id)
total = query.count()
pending = query.filter(MaintenanceRecord.status == "pending").count()
in_progress = query.filter(MaintenanceRecord.status == "in_progress").count()
completed = query.filter(MaintenanceRecord.status == "completed").count()
cancelled = query.filter(MaintenanceRecord.status == "cancelled").count()
# 总维修费用
total_cost_result = query.filter(
MaintenanceRecord.status == "completed",
MaintenanceRecord.maintenance_cost.isnot(None)
).with_entities(
func.sum(MaintenanceRecord.maintenance_cost)
).first()
total_cost = total_cost_result[0] if total_cost_result and total_cost_result[0] else Decimal("0.00")
return {
"total": total,
"pending": pending,
"in_progress": in_progress,
"completed": completed,
"cancelled": cancelled,
"total_cost": total_cost
}
def get_by_asset(
self,
db: Session,
asset_id: int,
skip: int = 0,
limit: int = 50
) -> List[MaintenanceRecord]:
"""根据资产ID获取维修记录"""
return db.query(MaintenanceRecord).filter(
MaintenanceRecord.asset_id == asset_id
).order_by(
MaintenanceRecord.report_time.desc()
).offset(skip).limit(limit).all()
# 创建全局实例
maintenance_record = MaintenanceRecordCRUD()