Files
zcglxt/backend_new/app/services/statistics_service.py

547 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
统计分析服务层
"""
from typing import Optional, List, Dict, Any
from datetime import datetime, date, timedelta
from decimal import Decimal
from sqlalchemy import select, func, and_, or_, case, text
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.asset import Asset
from app.models.allocation import AssetAllocationOrder
from app.models.maintenance import MaintenanceRecord
from app.models.organization import Organization
from app.models.brand_supplier import Supplier
from app.models.device_type import DeviceType
class StatisticsService:
"""统计分析服务类"""
async def get_overview(
self,
db: AsyncSession,
organization_id: Optional[int] = None
) -> Dict[str, Any]:
"""
获取总览统计
Args:
db: 数据库会话
organization_id: 网点ID
Returns:
总览统计数据
"""
# 构建查询条件
conditions = []
if organization_id:
conditions.append(Asset.organization_id == organization_id)
where_clause = and_(*conditions) if conditions else None
# 资产总数
total_query = select(func.count(Asset.id))
if where_clause:
total_query = total_query.where(where_clause)
total_result = await db.execute(total_query)
total_assets = total_result.scalar() or 0
# 资产总价值
value_query = select(func.coalesce(func.sum(Asset.purchase_price), 0))
if where_clause:
value_query = value_query.where(where_clause)
value_result = await db.execute(value_query)
total_value = value_result.scalar() or Decimal("0")
# 各状态数量
status_query = select(
Asset.status,
func.count(Asset.id).label('count')
).group_by(Asset.status)
if where_clause:
status_query = status_query.where(where_clause)
status_result = await db.execute(status_query)
status_counts = {row[0]: row[1] for row in status_result}
# 今日和本月采购数量
today = datetime.utcnow().date()
today_start = datetime.combine(today, datetime.min.time())
month_start = datetime(today.year, today.month, 1)
today_query = select(func.count(Asset.id)).where(Asset.created_at >= today_start)
if where_clause:
today_query = today_query.where(Asset.organization_id == organization_id)
today_result = await db.execute(today_query)
today_purchase_count = today_result.scalar() or 0
month_query = select(func.count(Asset.id)).where(Asset.created_at >= month_start)
if where_clause:
month_query = month_query.where(Asset.organization_id == organization_id)
month_result = await db.execute(month_query)
this_month_purchase_count = month_result.scalar() or 0
# 机构网点数
org_query = select(func.count(Organization.id))
org_result = await db.execute(org_query)
organization_count = org_result.scalar() or 0
# 供应商数
supplier_query = select(func.count(Supplier.id))
supplier_result = await db.execute(supplier_query)
supplier_count = supplier_result.scalar() or 0
return {
"total_assets": total_assets,
"total_value": float(total_value),
"in_stock_count": status_counts.get("in_stock", 0),
"in_use_count": status_counts.get("in_use", 0),
"maintenance_count": status_counts.get("maintenance", 0),
"scrapped_count": status_counts.get("scrapped", 0) + status_counts.get("pending_scrap", 0),
"today_purchase_count": today_purchase_count,
"this_month_purchase_count": this_month_purchase_count,
"organization_count": organization_count,
"supplier_count": supplier_count,
}
async def get_purchase_statistics(
self,
db: AsyncSession,
start_date: Optional[date] = None,
end_date: Optional[date] = None,
organization_id: Optional[int] = None
) -> Dict[str, Any]:
"""
获取采购统计
Args:
db: 数据库会话
start_date: 开始日期
end_date: 结束日期
organization_id: 网点ID
Returns:
采购统计数据
"""
# 构建查询条件
conditions = []
if start_date:
conditions.append(Asset.purchase_date >= start_date)
if end_date:
conditions.append(Asset.purchase_date <= end_date)
if organization_id:
conditions.append(Asset.organization_id == organization_id)
where_clause = and_(*conditions) if conditions else None
# 总采购数量和金额
count_query = select(func.count(Asset.id))
value_query = select(func.coalesce(func.sum(Asset.purchase_price), 0))
if where_clause:
count_query = count_query.where(where_clause)
value_query = value_query.where(where_clause)
count_result = await db.execute(count_query)
value_result = await db.execute(value_query)
total_purchase_count = count_result.scalar() or 0
total_purchase_value = value_result.scalar() or Decimal("0")
# 月度趋势
monthly_query = select(
func.to_char(Asset.purchase_date, 'YYYY-MM').label('month'),
func.count(Asset.id).label('count'),
func.coalesce(func.sum(Asset.purchase_price), 0).label('value')
).group_by('month').order_by('month')
if where_clause:
monthly_query = monthly_query.where(where_clause)
monthly_result = await db.execute(monthly_query)
monthly_trend = [
{
"month": row[0],
"count": row[1],
"value": float(row[2]) if row[2] else 0
}
for row in monthly_result
]
# 供应商分布
supplier_query = select(
Supplier.id.label('supplier_id'),
Supplier.name.label('supplier_name'),
func.count(Asset.id).label('count'),
func.coalesce(func.sum(Asset.purchase_price), 0).label('value')
).join(
Asset, Asset.supplier_id == Supplier.id
).group_by(
Supplier.id, Supplier.name
).order_by(func.count(Asset.id).desc())
if where_clause:
supplier_query = supplier_query.where(
and_(*[c for c in conditions if not any(x in str(c) for x in ['organization_id'])])
)
supplier_result = await db.execute(supplier_query)
supplier_distribution = [
{
"supplier_id": row[0],
"supplier_name": row[1],
"count": row[2],
"value": float(row[3]) if row[3] else 0
}
for row in supplier_result
]
return {
"total_purchase_count": total_purchase_count,
"total_purchase_value": float(total_purchase_value),
"monthly_trend": monthly_trend,
"supplier_distribution": supplier_distribution,
"category_distribution": [],
}
async def get_depreciation_statistics(
self,
db: AsyncSession,
organization_id: Optional[int] = None
) -> Dict[str, Any]:
"""
获取折旧统计
Args:
db: 数据库会话
organization_id: 网点ID
Returns:
折旧统计数据
"""
# 简化实现,实际需要根据折旧规则计算
return {
"total_depreciation_value": 0.0,
"average_depreciation_rate": 0.05,
"depreciation_by_category": [],
"assets_near_end_life": [],
}
async def get_value_statistics(
self,
db: AsyncSession,
organization_id: Optional[int] = None
) -> Dict[str, Any]:
"""
获取价值统计
Args:
db: 数据库会话
organization_id: 网点ID
Returns:
价值统计数据
"""
# 构建查询条件
conditions = []
if organization_id:
conditions.append(Asset.organization_id == organization_id)
where_clause = and_(*conditions) if conditions else None
# 总价值
total_query = select(func.coalesce(func.sum(Asset.purchase_price), 0))
if where_clause:
total_query = total_query.where(where_clause)
total_result = await db.execute(total_query)
total_value = total_result.scalar() or Decimal("0")
# 按分类统计
category_query = select(
DeviceType.id.label('device_type_id'),
DeviceType.name.label('device_type_name'),
func.count(Asset.id).label('count'),
func.coalesce(func.sum(Asset.purchase_price), 0).label('value')
).join(
Asset, Asset.device_type_id == DeviceType.id
).group_by(
DeviceType.id, DeviceType.name
).order_by(func.coalesce(func.sum(Asset.purchase_price), 0).desc())
if where_clause:
category_query = category_query.where(where_clause)
category_result = await db.execute(category_query)
value_by_category = [
{
"device_type_id": row[0],
"device_type_name": row[1],
"count": row[2],
"value": float(row[3]) if row[3] else 0
}
for row in category_result
]
# 按网点统计
org_query = select(
Organization.id.label('organization_id'),
Organization.name.label('organization_name'),
func.count(Asset.id).label('count'),
func.coalesce(func.sum(Asset.purchase_price), 0).label('value')
).join(
Asset, Asset.organization_id == Organization.id
).group_by(
Organization.id, Organization.name
).order_by(func.coalesce(func.sum(Asset.purchase_price), 0).desc())
if where_clause:
org_query = org_query.where(where_clause)
org_result = await db.execute(org_query)
value_by_organization = [
{
"organization_id": row[0],
"organization_name": row[1],
"count": row[2],
"value": float(row[3]) if row[3] else 0
}
for row in org_result
]
# 高价值资产价值前10
high_value_query = select(
Asset.id,
Asset.asset_code,
Asset.asset_name,
Asset.purchase_price,
DeviceType.name.label('device_type_name')
).join(
DeviceType, Asset.device_type_id == DeviceType.id
).order_by(
Asset.purchase_price.desc()
).limit(10)
if where_clause:
high_value_query = high_value_query.where(where_clause)
high_value_result = await db.execute(high_value_query)
high_value_assets = [
{
"asset_id": row[0],
"asset_code": row[1],
"asset_name": row[2],
"purchase_price": float(row[3]) if row[3] else 0,
"device_type_name": row[4]
}
for row in high_value_result
]
return {
"total_value": float(total_value),
"net_value": float(total_value * Decimal("0.8")), # 简化计算
"depreciation_value": float(total_value * Decimal("0.2")),
"value_by_category": value_by_category,
"value_by_organization": value_by_organization,
"high_value_assets": high_value_assets,
}
async def get_trend_analysis(
self,
db: AsyncSession,
start_date: Optional[date] = None,
end_date: Optional[date] = None,
organization_id: Optional[int] = None
) -> Dict[str, Any]:
"""
获取趋势分析
Args:
db: 数据库会话
start_date: 开始日期
end_date: 结束日期
organization_id: 网点ID
Returns:
趋势分析数据
"""
# 默认查询最近12个月
if not end_date:
end_date = datetime.utcnow().date()
if not start_date:
start_date = end_date - timedelta(days=365)
# 构建查询条件
conditions = [
Asset.created_at >= datetime.combine(start_date, datetime.min.time()),
Asset.created_at <= datetime.combine(end_date, datetime.max.time())
]
if organization_id:
conditions.append(Asset.organization_id == organization_id)
where_clause = and_(*conditions)
# 资产数量趋势(按月)
asset_trend_query = select(
func.to_char(Asset.created_at, 'YYYY-MM').label('month'),
func.count(Asset.id).label('count')
).group_by('month').order_by('month')
asset_trend_result = await db.execute(asset_trend_query.where(where_clause))
asset_trend = [
{"month": row[0], "count": row[1]}
for row in asset_trend_result
]
# 资产价值趋势
value_trend_query = select(
func.to_char(Asset.created_at, 'YYYY-MM').label('month'),
func.coalesce(func.sum(Asset.purchase_price), 0).label('value')
).group_by('month').order_by('month')
value_trend_result = await db.execute(value_trend_query.where(where_clause))
value_trend = [
{"month": row[0], "value": float(row[1]) if row[1] else 0}
for row in value_trend_result
]
return {
"asset_trend": asset_trend,
"value_trend": value_trend,
"purchase_trend": [],
"maintenance_trend": [],
"allocation_trend": [],
}
async def get_maintenance_statistics(
self,
db: AsyncSession,
start_date: Optional[date] = None,
end_date: Optional[date] = None,
organization_id: Optional[int] = None
) -> Dict[str, Any]:
"""
获取维修统计
Args:
db: 数据库会话
start_date: 开始日期
end_date: 结束日期
organization_id: 网点ID
Returns:
维修统计数据
"""
# 构建查询条件
conditions = []
if start_date:
conditions.append(MaintenanceRecord.created_at >= datetime.combine(start_date, datetime.min.time()))
if end_date:
conditions.append(MaintenanceRecord.created_at <= datetime.combine(end_date, datetime.max.time()))
if organization_id:
conditions.append(MaintenanceRecord.organization_id == organization_id)
where_clause = and_(*conditions) if conditions else None
# 总维修次数和费用
count_query = select(func.count(MaintenanceRecord.id))
cost_query = select(func.coalesce(func.sum(MaintenanceRecord.cost), 0))
if where_clause:
count_query = count_query.where(where_clause)
cost_query = cost_query.where(where_clause)
count_result = await db.execute(count_query)
cost_result = await db.execute(cost_query)
total_maintenance_count = count_result.scalar() or 0
total_maintenance_cost = cost_result.scalar() or Decimal("0")
# 按状态统计
status_query = select(
MaintenanceRecord.status,
func.count(MaintenanceRecord.id).label('count')
).group_by(MaintenanceRecord.status)
if where_clause:
status_query = status_query.where(where_clause)
status_result = await db.execute(status_query)
status_counts = {row[0]: row[1] for row in status_result}
return {
"total_maintenance_count": total_maintenance_count,
"total_maintenance_cost": float(total_maintenance_cost),
"pending_count": status_counts.get("pending", 0),
"in_progress_count": status_counts.get("in_progress", 0),
"completed_count": status_counts.get("completed", 0),
"monthly_trend": [],
"type_distribution": [],
"cost_by_category": [],
}
async def get_allocation_statistics(
self,
db: AsyncSession,
start_date: Optional[date] = None,
end_date: Optional[date] = None,
organization_id: Optional[int] = None
) -> Dict[str, Any]:
"""
获取分配统计
Args:
db: 数据库会话
start_date: 开始日期
end_date: 结束日期
organization_id: 网点ID
Returns:
分配统计数据
"""
# 构建查询条件
conditions = []
if start_date:
conditions.append(AssetAllocationOrder.created_at >= datetime.combine(start_date, datetime.min.time()))
if end_date:
conditions.append(AssetAllocationOrder.created_at <= datetime.combine(end_date, datetime.max.time()))
where_clause = and_(*conditions) if conditions else None
# 总分配次数
count_query = select(func.count(AssetAllocationOrder.id))
if where_clause:
count_query = count_query.where(where_clause)
count_result = await db.execute(count_query)
total_allocation_count = count_result.scalar() or 0
# 按状态统计
status_query = select(
AssetAllocationOrder.status,
func.count(AssetAllocationOrder.id).label('count')
).group_by(AssetAllocationOrder.status)
if where_clause:
status_query = status_query.where(where_clause)
status_result = await db.execute(status_query)
status_counts = {row[0]: row[1] for row in status_result}
return {
"total_allocation_count": total_allocation_count,
"pending_count": status_counts.get("pending", 0),
"approved_count": status_counts.get("approved", 0),
"rejected_count": status_counts.get("rejected", 0),
"monthly_trend": [],
"by_organization": [],
"transfer_statistics": [],
}
# 创建全局实例
statistics_service = StatisticsService()