""" 品牌和供应商CRUD操作 """ from typing import List, Optional, Tuple from sqlalchemy import and_, or_, func from sqlalchemy.orm import Session from app.models.brand_supplier import Brand, Supplier from app.schemas.brand_supplier import ( BrandCreate, BrandUpdate, SupplierCreate, SupplierUpdate ) class BrandCRUD: """品牌CRUD操作类""" def get(self, db: Session, id: int) -> Optional[Brand]: """根据ID获取品牌""" return db.query(Brand).filter( and_( Brand.id == id, Brand.deleted_at.is_(None) ) ).first() def get_by_code(self, db: Session, code: str) -> Optional[Brand]: """根据代码获取品牌""" return db.query(Brand).filter( and_( Brand.brand_code == code, Brand.deleted_at.is_(None) ) ).first() def get_multi( self, db: Session, skip: int = 0, limit: int = 20, status: Optional[str] = None, keyword: Optional[str] = None ) -> Tuple[List[Brand], int]: """获取品牌列表""" query = db.query(Brand).filter(Brand.deleted_at.is_(None)) if status: query = query.filter(Brand.status == status) if keyword: query = query.filter( or_( Brand.brand_code.ilike(f"%{keyword}%"), Brand.brand_name.ilike(f"%{keyword}%") ) ) query = query.order_by(Brand.sort_order.asc(), Brand.id.desc()) total = query.count() items = query.offset(skip).limit(limit).all() return items, total def create(self, db: Session, obj_in: BrandCreate, creator_id: Optional[int] = None) -> Brand: """创建品牌""" if self.get_by_code(db, obj_in.brand_code): raise ValueError(f"品牌代码 '{obj_in.brand_code}' 已存在") db_obj = Brand(**obj_in.model_dump(), created_by=creator_id) db.add(db_obj) db.commit() db.refresh(db_obj) return db_obj def update( self, db: Session, db_obj: Brand, obj_in: BrandUpdate, updater_id: Optional[int] = None ) -> Brand: """更新品牌""" obj_data = obj_in.model_dump(exclude_unset=True) for field, value in obj_data.items(): setattr(db_obj, field, value) db_obj.updated_by = updater_id db.add(db_obj) db.commit() db.refresh(db_obj) return db_obj def delete(self, db: Session, id: int, deleter_id: Optional[int] = None) -> bool: """删除品牌(软删除)""" obj = self.get(db, id) if not obj: return False obj.deleted_at = func.now() obj.deleted_by = deleter_id db.add(obj) db.commit() return True class SupplierCRUD: """供应商CRUD操作类""" def get(self, db: Session, id: int) -> Optional[Supplier]: """根据ID获取供应商""" return db.query(Supplier).filter( and_( Supplier.id == id, Supplier.deleted_at.is_(None) ) ).first() def get_by_code(self, db: Session, code: str) -> Optional[Supplier]: """根据代码获取供应商""" return db.query(Supplier).filter( and_( Supplier.supplier_code == code, Supplier.deleted_at.is_(None) ) ).first() def get_multi( self, db: Session, skip: int = 0, limit: int = 20, status: Optional[str] = None, keyword: Optional[str] = None ) -> Tuple[List[Supplier], int]: """获取供应商列表""" query = db.query(Supplier).filter(Supplier.deleted_at.is_(None)) if status: query = query.filter(Supplier.status == status) if keyword: query = query.filter( or_( Supplier.supplier_code.ilike(f"%{keyword}%"), Supplier.supplier_name.ilike(f"%{keyword}%") ) ) query = query.order_by(Supplier.id.desc()) total = query.count() items = query.offset(skip).limit(limit).all() return items, total def create(self, db: Session, obj_in: SupplierCreate, creator_id: Optional[int] = None) -> Supplier: """创建供应商""" if self.get_by_code(db, obj_in.supplier_code): raise ValueError(f"供应商代码 '{obj_in.supplier_code}' 已存在") db_obj = Supplier(**obj_in.model_dump(), created_by=creator_id) db.add(db_obj) db.commit() db.refresh(db_obj) return db_obj def update( self, db: Session, db_obj: Supplier, obj_in: SupplierUpdate, updater_id: Optional[int] = None ) -> Supplier: """更新供应商""" obj_data = obj_in.model_dump(exclude_unset=True) for field, value in obj_data.items(): setattr(db_obj, field, value) db_obj.updated_by = updater_id db.add(db_obj) db.commit() db.refresh(db_obj) return db_obj def delete(self, db: Session, id: int, deleter_id: Optional[int] = None) -> bool: """删除供应商(软删除)""" obj = self.get(db, id) if not obj: return False obj.deleted_at = func.now() obj.deleted_by = deleter_id db.add(obj) db.commit() return True # 创建全局实例 brand = BrandCRUD() supplier = SupplierCRUD()