diff --git a/routes/api_schedules.py b/routes/api_schedules.py index fd22ba8..1d81026 100644 --- a/routes/api_schedules.py +++ b/routes/api_schedules.py @@ -188,6 +188,9 @@ def toggle_schedule_api(schedule_id): def run_schedule_now_api(schedule_id): """立即执行定时任务""" import json + import threading + import time as time_mod + import uuid schedule = database.get_schedule_by_id(schedule_id) if not schedule: @@ -210,27 +213,104 @@ def run_schedule_now_api(schedule_id): if not safe_get_user_accounts_snapshot(user_id): load_user_accounts(user_id) - started = [] + from services.state import safe_create_batch, safe_finalize_batch_after_dispatch + from services.task_batches import _send_batch_task_email_if_configured + + execution_start_time = time_mod.time() + log_id = database.create_schedule_execution_log( + schedule_id=schedule_id, + user_id=user_id, + schedule_name=schedule.get("name", "未命名任务"), + ) + + batch_id = f"batch_{uuid.uuid4().hex[:12]}" + now_ts = time_mod.time() + safe_create_batch( + batch_id, + { + "user_id": user_id, + "browse_type": browse_type, + "schedule_name": schedule.get("name", "未命名任务"), + "screenshots": [], + "total_accounts": 0, + "completed": 0, + "created_at": now_ts, + "updated_at": now_ts, + }, + ) + + started_count = 0 + skipped_count = 0 + completion_lock = threading.Lock() + remaining = {"count": 0, "done": False} + + def on_browse_done(): + with completion_lock: + remaining["count"] -= 1 + if remaining["done"] or remaining["count"] > 0: + return + remaining["done"] = True + execution_duration = int(time_mod.time() - execution_start_time) + database.update_schedule_execution_log( + log_id, + total_accounts=len(account_ids), + success_accounts=started_count, + failed_accounts=len(account_ids) - started_count, + duration_seconds=execution_duration, + status="completed", + ) + for account_id in account_ids: account = safe_get_account(user_id, account_id) if not account: + skipped_count += 1 continue if account.is_running: + skipped_count += 1 continue + task_source = f"user_scheduled:{batch_id}" + with completion_lock: + remaining["count"] += 1 ok, msg = submit_account_task( user_id=user_id, account_id=account_id, browse_type=browse_type, enable_screenshot=enable_screenshot, - source="user_scheduled", + source=task_source, + done_callback=on_browse_done, ) if ok: - started.append(account_id) + started_count += 1 + else: + with completion_lock: + remaining["count"] -= 1 + skipped_count += 1 + + batch_info = safe_finalize_batch_after_dispatch(batch_id, started_count, now_ts=time_mod.time()) + if batch_info: + _send_batch_task_email_if_configured(batch_info) database.update_schedule_last_run(schedule_id) - return jsonify({"success": True, "started_count": len(started), "message": f"已启动 {len(started)} 个账号"}) + if started_count <= 0: + database.update_schedule_execution_log( + log_id, + total_accounts=len(account_ids), + success_accounts=0, + failed_accounts=len(account_ids), + duration_seconds=0, + status="completed", + ) + + return jsonify( + { + "success": True, + "started_count": started_count, + "skipped_count": skipped_count, + "message": f"已启动 {started_count} 个账号", + } + ) @api_schedules_bp.route("/api/schedules//logs", methods=["GET"]) diff --git a/services/schedule_utils.py b/services/schedule_utils.py index 3e1f928..7fda7bf 100644 --- a/services/schedule_utils.py +++ b/services/schedule_utils.py @@ -91,8 +91,18 @@ def compute_next_run_at( if random_delay: window_start = candidate_base - timedelta(minutes=15) - random_minutes = random.randint(0, 30) - candidate = window_start + timedelta(minutes=random_minutes) + window_end = candidate_base + timedelta(minutes=15) + + # 只从“未来窗口”中抽样,避免抽到过去时间导致整天被跳过 + delta_seconds = (now - window_start).total_seconds() + if delta_seconds < 0: + min_offset = 0 + else: + min_offset = int(delta_seconds // 60) + 1 + max_offset = int((window_end - window_start).total_seconds() // 60) + if min_offset > max_offset: + continue + candidate = window_start + timedelta(minutes=random.randint(min_offset, max_offset)) else: candidate = candidate_base @@ -110,4 +120,3 @@ def format_cst(dt: datetime) -> str: dt = BEIJING_TZ.localize(dt) dt = dt.astimezone(BEIJING_TZ) return dt.strftime("%Y-%m-%d %H:%M:%S") - diff --git a/services/scheduler.py b/services/scheduler.py index 295f32e..8c915f3 100644 --- a/services/scheduler.py +++ b/services/scheduler.py @@ -30,6 +30,30 @@ config = get_config() SCREENSHOTS_DIR = config.SCREENSHOTS_DIR os.makedirs(SCREENSHOTS_DIR, exist_ok=True) +_HHMM_RE = None + + +def _normalize_hhmm(value: object, *, default: str) -> str: + """Normalize scheduler time to HH:MM; fallback to default when invalid.""" + global _HHMM_RE + if _HHMM_RE is None: + import re + + _HHMM_RE = re.compile(r"^(\d{1,2}):(\d{2})$") + + text = str(value or "").strip() + match = _HHMM_RE.match(text) + if not match: + return default + try: + hour = int(match.group(1)) + minute = int(match.group(2)) + except Exception: + return default + if hour < 0 or hour > 23 or minute < 0 or minute > 59: + return default + return f"{hour:02d}:{minute:02d}" + def run_scheduled_task(skip_weekday_check: bool = False) -> None: """执行所有账号的浏览任务(可被手动调用,过滤重复账号)""" @@ -333,7 +357,10 @@ def scheduled_task_worker() -> None: def check_and_schedule(force: bool = False): config_data = database.get_system_config() schedule_enabled = bool(config_data.get("schedule_enabled")) - schedule_time_cst = config_data.get("schedule_time", "02:00") + schedule_time_raw = config_data.get("schedule_time", "02:00") + schedule_time_cst = _normalize_hhmm(schedule_time_raw, default="02:00") + if schedule_time_cst != str(schedule_time_raw or "").strip(): + logger.warning(f"[定时任务] 系统定时时间格式无效,已回退到 {schedule_time_cst} (原值: {schedule_time_raw!r})") signature = (schedule_enabled, schedule_time_cst) config_changed = schedule_state.get("signature") != signature @@ -364,18 +391,28 @@ def scheduled_task_worker() -> None: elif config_changed and not is_first_run: logger.info("[定时任务] 浏览任务已禁用") - check_and_schedule(force=True) + try: + check_and_schedule(force=True) + except Exception as e: + logger.exception(f"[定时任务] 初始化系统定时任务失败: {e}") last_config_check = time.time() last_user_schedule_minute = None while True: try: - schedule.run_pending() + try: + schedule.run_pending() + except Exception as e: + logger.exception(f"[定时任务] 系统调度执行出错: {e}") now_ts = time.time() if now_ts - last_config_check >= config_check_interval: - check_and_schedule() - last_config_check = now_ts + try: + check_and_schedule() + except Exception as e: + logger.exception(f"[定时任务] 系统调度配置刷新失败: {e}") + finally: + last_config_check = now_ts now_beijing = get_beijing_now() minute_key = now_beijing.strftime("%Y-%m-%d %H:%M") @@ -385,5 +422,5 @@ def scheduled_task_worker() -> None: time.sleep(1) except Exception as e: - logger.exception(f"[定时任务] 调度器出错: {str(e)}") + logger.exception(f"[定时任务] 调度器主循环出错: {e}") time.sleep(5)