diff --git a/realtime/status_push.py b/realtime/status_push.py index 665dfdf..b3b662c 100644 --- a/realtime/status_push.py +++ b/realtime/status_push.py @@ -22,9 +22,11 @@ def status_push_worker() -> None: from services.tasks import get_task_scheduler scheduler = get_task_scheduler() + last_queued_emit: dict[str, dict] = {} while True: try: + now_ts = time.time() queue_snapshot = scheduler.get_queue_state_snapshot() pending_total = int(queue_snapshot.get("pending_total", 0) or 0) running_total = int(queue_snapshot.get("running_total", 0) or 0) @@ -32,6 +34,7 @@ def status_push_worker() -> None: positions = queue_snapshot.get("positions") or {} status_items = safe_iter_task_status_items() + active_queued_ids: set[str] = set() for account_id, status_info in status_items: status = status_info.get("status") if status not in ("运行中", "排队中"): @@ -39,11 +42,46 @@ def status_push_worker() -> None: user_id = status_info.get("user_id") if not user_id: continue + pos = positions.get(account_id) or {} + + # 排队中的账号不需要每秒全量推送:仅在队列位置/总数变化或到达保底间隔时推送,避免拖慢整体性能 + if status != "运行中": + active_queued_ids.add(account_id) + running_user = int(running_by_user.get(int(user_id), 0) or 0) + prev = last_queued_emit.get(account_id) or {} + prev_ts = float(prev.get("ts", 0) or 0) + should_emit = False + if not prev: + should_emit = True + elif ( + prev.get("queue_ahead") != pos.get("queue_ahead") + or prev.get("queue_position") != pos.get("queue_position") + or prev.get("pending_total") != pending_total + or prev.get("running_total") != running_total + or prev.get("running_user") != running_user + ): + should_emit = True + elif now_ts - prev_ts >= max(push_interval, 2.0): + should_emit = True + + if not should_emit: + continue + + last_queued_emit[account_id] = { + "ts": now_ts, + "queue_ahead": pos.get("queue_ahead"), + "queue_position": pos.get("queue_position"), + "pending_total": pending_total, + "running_total": running_total, + "running_user": running_user, + } + else: + last_queued_emit.pop(account_id, None) + account = safe_get_account(user_id, account_id) if not account: continue account_data = account.to_dict() - pos = positions.get(account_id) or {} account_data.update( { "queue_pending_total": pending_total, @@ -78,6 +116,12 @@ def status_push_worker() -> None: } socketio.emit("task_progress", progress_data, room=f"user_{user_id}") + # 清理已不在排队的记录,避免 dict 无限制增长 + if last_queued_emit: + for acc_id in list(last_queued_emit.keys()): + if acc_id not in active_queued_ids: + last_queued_emit.pop(acc_id, None) + time.sleep(push_interval) except Exception as e: logger.debug(f"状态推送出错: {e}") diff --git a/services/tasks.py b/services/tasks.py index a757d1d..3a82432 100644 --- a/services/tasks.py +++ b/services/tasks.py @@ -603,7 +603,7 @@ def run_task(user_id, account_id, browse_type, enable_screenshot=True, source="m nonlocal last_progress_ts try: now_ts = time_module.time() - if now_ts - last_progress_ts < 0.5: + if now_ts - last_progress_ts < 1.0: return last_progress_ts = now_ts @@ -621,6 +621,22 @@ def run_task(user_id, account_id, browse_type, enable_screenshot=True, source="m safe_update_task_status( account_id, {"progress": {"items": browsed_items, "attachments": viewed_attachments}} ) + account_data = account.to_dict() + _emit( + "task_progress", + { + "account_id": account_id, + "stage": account_data.get("detail_status", ""), + "total_items": int(account.total_items or 0), + "browsed_items": browsed_items, + "total_attachments": int(account.total_attachments or 0), + "viewed_attachments": viewed_attachments, + "start_time": account_data.get("start_time", 0), + "elapsed_seconds": account_data.get("elapsed_seconds", 0), + "elapsed_display": account_data.get("elapsed_display", ""), + }, + room=f"user_{user_id}", + ) except Exception: pass