""" 统计分析服务层 """ from typing import Optional, List, Dict, Any from datetime import datetime, date, timedelta from decimal import Decimal from sqlalchemy import select, func, and_, or_, case, text from sqlalchemy.ext.asyncio import AsyncSession from app.models.asset import Asset from app.models.allocation import AssetAllocationOrder from app.models.maintenance import MaintenanceRecord from app.models.organization import Organization from app.models.brand_supplier import Supplier from app.models.device_type import DeviceType class StatisticsService: """统计分析服务类""" async def get_overview( self, db: AsyncSession, organization_id: Optional[int] = None ) -> Dict[str, Any]: """ 获取总览统计 Args: db: 数据库会话 organization_id: 网点ID Returns: 总览统计数据 """ # 构建查询条件 conditions = [] if organization_id: conditions.append(Asset.organization_id == organization_id) where_clause = and_(*conditions) if conditions else None # 资产总数 total_query = select(func.count(Asset.id)) if where_clause: total_query = total_query.where(where_clause) total_result = await db.execute(total_query) total_assets = total_result.scalar() or 0 # 资产总价值 value_query = select(func.coalesce(func.sum(Asset.purchase_price), 0)) if where_clause: value_query = value_query.where(where_clause) value_result = await db.execute(value_query) total_value = value_result.scalar() or Decimal("0") # 各状态数量 status_query = select( Asset.status, func.count(Asset.id).label('count') ).group_by(Asset.status) if where_clause: status_query = status_query.where(where_clause) status_result = await db.execute(status_query) status_counts = {row[0]: row[1] for row in status_result} # 今日和本月采购数量 today = datetime.utcnow().date() today_start = datetime.combine(today, datetime.min.time()) month_start = datetime(today.year, today.month, 1) today_query = select(func.count(Asset.id)).where(Asset.created_at >= today_start) if where_clause: today_query = today_query.where(Asset.organization_id == organization_id) today_result = await db.execute(today_query) today_purchase_count = today_result.scalar() or 0 month_query = select(func.count(Asset.id)).where(Asset.created_at >= month_start) if where_clause: month_query = month_query.where(Asset.organization_id == organization_id) month_result = await db.execute(month_query) this_month_purchase_count = month_result.scalar() or 0 # 机构网点数 org_query = select(func.count(Organization.id)) org_result = await db.execute(org_query) organization_count = org_result.scalar() or 0 # 供应商数 supplier_query = select(func.count(Supplier.id)) supplier_result = await db.execute(supplier_query) supplier_count = supplier_result.scalar() or 0 return { "total_assets": total_assets, "total_value": float(total_value), "in_stock_count": status_counts.get("in_stock", 0), "in_use_count": status_counts.get("in_use", 0), "maintenance_count": status_counts.get("maintenance", 0), "scrapped_count": status_counts.get("scrapped", 0) + status_counts.get("pending_scrap", 0), "today_purchase_count": today_purchase_count, "this_month_purchase_count": this_month_purchase_count, "organization_count": organization_count, "supplier_count": supplier_count, } async def get_purchase_statistics( self, db: AsyncSession, start_date: Optional[date] = None, end_date: Optional[date] = None, organization_id: Optional[int] = None ) -> Dict[str, Any]: """ 获取采购统计 Args: db: 数据库会话 start_date: 开始日期 end_date: 结束日期 organization_id: 网点ID Returns: 采购统计数据 """ # 构建查询条件 conditions = [] if start_date: conditions.append(Asset.purchase_date >= start_date) if end_date: conditions.append(Asset.purchase_date <= end_date) if organization_id: conditions.append(Asset.organization_id == organization_id) where_clause = and_(*conditions) if conditions else None # 总采购数量和金额 count_query = select(func.count(Asset.id)) value_query = select(func.coalesce(func.sum(Asset.purchase_price), 0)) if where_clause: count_query = count_query.where(where_clause) value_query = value_query.where(where_clause) count_result = await db.execute(count_query) value_result = await db.execute(value_query) total_purchase_count = count_result.scalar() or 0 total_purchase_value = value_result.scalar() or Decimal("0") # 月度趋势 monthly_query = select( func.to_char(Asset.purchase_date, 'YYYY-MM').label('month'), func.count(Asset.id).label('count'), func.coalesce(func.sum(Asset.purchase_price), 0).label('value') ).group_by('month').order_by('month') if where_clause: monthly_query = monthly_query.where(where_clause) monthly_result = await db.execute(monthly_query) monthly_trend = [ { "month": row[0], "count": row[1], "value": float(row[2]) if row[2] else 0 } for row in monthly_result ] # 供应商分布 supplier_query = select( Supplier.id.label('supplier_id'), Supplier.name.label('supplier_name'), func.count(Asset.id).label('count'), func.coalesce(func.sum(Asset.purchase_price), 0).label('value') ).join( Asset, Asset.supplier_id == Supplier.id ).group_by( Supplier.id, Supplier.name ).order_by(func.count(Asset.id).desc()) if where_clause: supplier_query = supplier_query.where( and_(*[c for c in conditions if not any(x in str(c) for x in ['organization_id'])]) ) supplier_result = await db.execute(supplier_query) supplier_distribution = [ { "supplier_id": row[0], "supplier_name": row[1], "count": row[2], "value": float(row[3]) if row[3] else 0 } for row in supplier_result ] return { "total_purchase_count": total_purchase_count, "total_purchase_value": float(total_purchase_value), "monthly_trend": monthly_trend, "supplier_distribution": supplier_distribution, "category_distribution": [], } async def get_depreciation_statistics( self, db: AsyncSession, organization_id: Optional[int] = None ) -> Dict[str, Any]: """ 获取折旧统计 Args: db: 数据库会话 organization_id: 网点ID Returns: 折旧统计数据 """ # 简化实现,实际需要根据折旧规则计算 return { "total_depreciation_value": 0.0, "average_depreciation_rate": 0.05, "depreciation_by_category": [], "assets_near_end_life": [], } async def get_value_statistics( self, db: AsyncSession, organization_id: Optional[int] = None ) -> Dict[str, Any]: """ 获取价值统计 Args: db: 数据库会话 organization_id: 网点ID Returns: 价值统计数据 """ # 构建查询条件 conditions = [] if organization_id: conditions.append(Asset.organization_id == organization_id) where_clause = and_(*conditions) if conditions else None # 总价值 total_query = select(func.coalesce(func.sum(Asset.purchase_price), 0)) if where_clause: total_query = total_query.where(where_clause) total_result = await db.execute(total_query) total_value = total_result.scalar() or Decimal("0") # 按分类统计 category_query = select( DeviceType.id.label('device_type_id'), DeviceType.name.label('device_type_name'), func.count(Asset.id).label('count'), func.coalesce(func.sum(Asset.purchase_price), 0).label('value') ).join( Asset, Asset.device_type_id == DeviceType.id ).group_by( DeviceType.id, DeviceType.name ).order_by(func.coalesce(func.sum(Asset.purchase_price), 0).desc()) if where_clause: category_query = category_query.where(where_clause) category_result = await db.execute(category_query) value_by_category = [ { "device_type_id": row[0], "device_type_name": row[1], "count": row[2], "value": float(row[3]) if row[3] else 0 } for row in category_result ] # 按网点统计 org_query = select( Organization.id.label('organization_id'), Organization.name.label('organization_name'), func.count(Asset.id).label('count'), func.coalesce(func.sum(Asset.purchase_price), 0).label('value') ).join( Asset, Asset.organization_id == Organization.id ).group_by( Organization.id, Organization.name ).order_by(func.coalesce(func.sum(Asset.purchase_price), 0).desc()) if where_clause: org_query = org_query.where(where_clause) org_result = await db.execute(org_query) value_by_organization = [ { "organization_id": row[0], "organization_name": row[1], "count": row[2], "value": float(row[3]) if row[3] else 0 } for row in org_result ] # 高价值资产(价值前10) high_value_query = select( Asset.id, Asset.asset_code, Asset.asset_name, Asset.purchase_price, DeviceType.name.label('device_type_name') ).join( DeviceType, Asset.device_type_id == DeviceType.id ).order_by( Asset.purchase_price.desc() ).limit(10) if where_clause: high_value_query = high_value_query.where(where_clause) high_value_result = await db.execute(high_value_query) high_value_assets = [ { "asset_id": row[0], "asset_code": row[1], "asset_name": row[2], "purchase_price": float(row[3]) if row[3] else 0, "device_type_name": row[4] } for row in high_value_result ] return { "total_value": float(total_value), "net_value": float(total_value * Decimal("0.8")), # 简化计算 "depreciation_value": float(total_value * Decimal("0.2")), "value_by_category": value_by_category, "value_by_organization": value_by_organization, "high_value_assets": high_value_assets, } async def get_trend_analysis( self, db: AsyncSession, start_date: Optional[date] = None, end_date: Optional[date] = None, organization_id: Optional[int] = None ) -> Dict[str, Any]: """ 获取趋势分析 Args: db: 数据库会话 start_date: 开始日期 end_date: 结束日期 organization_id: 网点ID Returns: 趋势分析数据 """ # 默认查询最近12个月 if not end_date: end_date = datetime.utcnow().date() if not start_date: start_date = end_date - timedelta(days=365) # 构建查询条件 conditions = [ Asset.created_at >= datetime.combine(start_date, datetime.min.time()), Asset.created_at <= datetime.combine(end_date, datetime.max.time()) ] if organization_id: conditions.append(Asset.organization_id == organization_id) where_clause = and_(*conditions) # 资产数量趋势(按月) asset_trend_query = select( func.to_char(Asset.created_at, 'YYYY-MM').label('month'), func.count(Asset.id).label('count') ).group_by('month').order_by('month') asset_trend_result = await db.execute(asset_trend_query.where(where_clause)) asset_trend = [ {"month": row[0], "count": row[1]} for row in asset_trend_result ] # 资产价值趋势 value_trend_query = select( func.to_char(Asset.created_at, 'YYYY-MM').label('month'), func.coalesce(func.sum(Asset.purchase_price), 0).label('value') ).group_by('month').order_by('month') value_trend_result = await db.execute(value_trend_query.where(where_clause)) value_trend = [ {"month": row[0], "value": float(row[1]) if row[1] else 0} for row in value_trend_result ] return { "asset_trend": asset_trend, "value_trend": value_trend, "purchase_trend": [], "maintenance_trend": [], "allocation_trend": [], } async def get_maintenance_statistics( self, db: AsyncSession, start_date: Optional[date] = None, end_date: Optional[date] = None, organization_id: Optional[int] = None ) -> Dict[str, Any]: """ 获取维修统计 Args: db: 数据库会话 start_date: 开始日期 end_date: 结束日期 organization_id: 网点ID Returns: 维修统计数据 """ # 构建查询条件 conditions = [] if start_date: conditions.append(MaintenanceRecord.created_at >= datetime.combine(start_date, datetime.min.time())) if end_date: conditions.append(MaintenanceRecord.created_at <= datetime.combine(end_date, datetime.max.time())) if organization_id: conditions.append(MaintenanceRecord.organization_id == organization_id) where_clause = and_(*conditions) if conditions else None # 总维修次数和费用 count_query = select(func.count(MaintenanceRecord.id)) cost_query = select(func.coalesce(func.sum(MaintenanceRecord.cost), 0)) if where_clause: count_query = count_query.where(where_clause) cost_query = cost_query.where(where_clause) count_result = await db.execute(count_query) cost_result = await db.execute(cost_query) total_maintenance_count = count_result.scalar() or 0 total_maintenance_cost = cost_result.scalar() or Decimal("0") # 按状态统计 status_query = select( MaintenanceRecord.status, func.count(MaintenanceRecord.id).label('count') ).group_by(MaintenanceRecord.status) if where_clause: status_query = status_query.where(where_clause) status_result = await db.execute(status_query) status_counts = {row[0]: row[1] for row in status_result} return { "total_maintenance_count": total_maintenance_count, "total_maintenance_cost": float(total_maintenance_cost), "pending_count": status_counts.get("pending", 0), "in_progress_count": status_counts.get("in_progress", 0), "completed_count": status_counts.get("completed", 0), "monthly_trend": [], "type_distribution": [], "cost_by_category": [], } async def get_allocation_statistics( self, db: AsyncSession, start_date: Optional[date] = None, end_date: Optional[date] = None, organization_id: Optional[int] = None ) -> Dict[str, Any]: """ 获取分配统计 Args: db: 数据库会话 start_date: 开始日期 end_date: 结束日期 organization_id: 网点ID Returns: 分配统计数据 """ # 构建查询条件 conditions = [] if start_date: conditions.append(AssetAllocationOrder.created_at >= datetime.combine(start_date, datetime.min.time())) if end_date: conditions.append(AssetAllocationOrder.created_at <= datetime.combine(end_date, datetime.max.time())) where_clause = and_(*conditions) if conditions else None # 总分配次数 count_query = select(func.count(AssetAllocationOrder.id)) if where_clause: count_query = count_query.where(where_clause) count_result = await db.execute(count_query) total_allocation_count = count_result.scalar() or 0 # 按状态统计 status_query = select( AssetAllocationOrder.status, func.count(AssetAllocationOrder.id).label('count') ).group_by(AssetAllocationOrder.status) if where_clause: status_query = status_query.where(where_clause) status_result = await db.execute(status_query) status_counts = {row[0]: row[1] for row in status_result} return { "total_allocation_count": total_allocation_count, "pending_count": status_counts.get("pending", 0), "approved_count": status_counts.get("approved", 0), "rejected_count": status_counts.get("rejected", 0), "monthly_trend": [], "by_organization": [], "transfer_statistics": [], } # 创建全局实例 statistics_service = StatisticsService()