#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations import json import re import threading import time as time_mod import uuid import database from flask import Blueprint, jsonify, request from flask_login import current_user, login_required from services.accounts_service import load_user_accounts from services.browse_types import BROWSE_TYPE_SHOULD_READ, normalize_browse_type, validate_browse_type from services.state import safe_get_account, safe_get_user_accounts_snapshot from services.tasks import submit_account_task api_schedules_bp = Blueprint("api_schedules", __name__) _HHMM_RE = re.compile(r"^(\d{1,2}):(\d{2})$") def _request_json(default=None): if default is None: default = {} data = request.get_json(silent=True) return data if isinstance(data, dict) else default def _normalize_hhmm(value: object) -> str | None: match = _HHMM_RE.match(str(value or "").strip()) if not match: return None hour = int(match.group(1)) minute = int(match.group(2)) if hour < 0 or hour > 23 or minute < 0 or minute > 59: return None return f"{hour:02d}:{minute:02d}" def _normalize_random_delay(value) -> tuple[int | None, str | None]: try: normalized = int(value or 0) except Exception: return None, "random_delay必须是0或1" if normalized not in (0, 1): return None, "random_delay必须是0或1" return normalized, None def _parse_schedule_account_ids(raw_value) -> list: try: parsed = json.loads(raw_value or "[]") except (json.JSONDecodeError, TypeError): return [] return parsed if isinstance(parsed, list) else [] def _get_owned_schedule_or_error(schedule_id: int): schedule = database.get_schedule_by_id(schedule_id) if not schedule: return None, (jsonify({"error": "定时任务不存在"}), 404) if schedule.get("user_id") != current_user.id: return None, (jsonify({"error": "无权访问"}), 403) return schedule, None def _ensure_user_accounts_loaded(user_id: int) -> None: if safe_get_user_accounts_snapshot(user_id): return load_user_accounts(user_id) def _parse_browse_type_or_error(raw_value, *, default=BROWSE_TYPE_SHOULD_READ): browse_type = validate_browse_type(raw_value, default=default) if not browse_type: return None, (jsonify({"error": "浏览类型无效"}), 400) return browse_type, None @api_schedules_bp.route("/api/schedules", methods=["GET"]) @login_required def get_user_schedules_api(): """获取当前用户的所有定时任务""" schedules = database.get_user_schedules(current_user.id) for schedule in schedules: schedule["account_ids"] = _parse_schedule_account_ids(schedule.get("account_ids")) return jsonify(schedules) @api_schedules_bp.route("/api/schedules", methods=["POST"]) @login_required def create_user_schedule_api(): """创建用户定时任务""" data = _request_json() name = data.get("name", "我的定时任务") schedule_time = data.get("schedule_time", "08:00") weekdays = data.get("weekdays", "1,2,3,4,5") browse_type, browse_error = _parse_browse_type_or_error(data.get("browse_type", BROWSE_TYPE_SHOULD_READ)) if browse_error: return browse_error enable_screenshot = data.get("enable_screenshot", 1) random_delay, delay_error = _normalize_random_delay(data.get("random_delay", 0)) if delay_error: return jsonify({"error": delay_error}), 400 account_ids = data.get("account_ids", []) normalized_time = _normalize_hhmm(schedule_time) if not normalized_time: return jsonify({"error": "时间格式不正确,应为 HH:MM"}), 400 schedule_id = database.create_user_schedule( user_id=current_user.id, name=name, schedule_time=normalized_time, weekdays=weekdays, browse_type=browse_type, enable_screenshot=enable_screenshot, random_delay=random_delay, account_ids=account_ids, ) if schedule_id: return jsonify({"success": True, "id": schedule_id}) return jsonify({"error": "创建失败"}), 500 @api_schedules_bp.route("/api/schedules/", methods=["GET"]) @login_required def get_schedule_detail_api(schedule_id): """获取定时任务详情""" schedule, error_response = _get_owned_schedule_or_error(schedule_id) if error_response: return error_response schedule["account_ids"] = _parse_schedule_account_ids(schedule.get("account_ids")) return jsonify(schedule) @api_schedules_bp.route("/api/schedules/", methods=["PUT"]) @login_required def update_schedule_api(schedule_id): """更新定时任务""" _, error_response = _get_owned_schedule_or_error(schedule_id) if error_response: return error_response data = _request_json() allowed_fields = { "name", "schedule_time", "weekdays", "browse_type", "enable_screenshot", "random_delay", "account_ids", "enabled", } update_data = {key: value for key, value in data.items() if key in allowed_fields} if "schedule_time" in update_data: normalized_time = _normalize_hhmm(update_data["schedule_time"]) if not normalized_time: return jsonify({"error": "时间格式不正确,应为 HH:MM"}), 400 update_data["schedule_time"] = normalized_time if "random_delay" in update_data: random_delay, delay_error = _normalize_random_delay(update_data.get("random_delay")) if delay_error: return jsonify({"error": delay_error}), 400 update_data["random_delay"] = random_delay if "browse_type" in update_data: normalized_browse_type, browse_error = _parse_browse_type_or_error(update_data.get("browse_type")) if browse_error: return browse_error update_data["browse_type"] = normalized_browse_type success = database.update_user_schedule(schedule_id, **update_data) if success: return jsonify({"success": True}) return jsonify({"error": "更新失败"}), 500 @api_schedules_bp.route("/api/schedules/", methods=["DELETE"]) @login_required def delete_schedule_api(schedule_id): """删除定时任务""" _, error_response = _get_owned_schedule_or_error(schedule_id) if error_response: return error_response success = database.delete_user_schedule(schedule_id) if success: return jsonify({"success": True}) return jsonify({"error": "删除失败"}), 500 @api_schedules_bp.route("/api/schedules//toggle", methods=["POST"]) @login_required def toggle_schedule_api(schedule_id): """启用/禁用定时任务""" schedule, error_response = _get_owned_schedule_or_error(schedule_id) if error_response: return error_response data = _request_json() enabled = data.get("enabled", not schedule["enabled"]) success = database.toggle_user_schedule(schedule_id, enabled) if success: return jsonify({"success": True, "enabled": enabled}) return jsonify({"error": "操作失败"}), 500 @api_schedules_bp.route("/api/schedules//run", methods=["POST"]) @login_required def run_schedule_now_api(schedule_id): """立即执行定时任务""" schedule, error_response = _get_owned_schedule_or_error(schedule_id) if error_response: return error_response account_ids = _parse_schedule_account_ids(schedule.get("account_ids")) if not account_ids: return jsonify({"error": "没有配置账号"}), 400 user_id = current_user.id browse_type = normalize_browse_type(schedule.get("browse_type", BROWSE_TYPE_SHOULD_READ)) enable_screenshot = schedule["enable_screenshot"] _ensure_user_accounts_loaded(user_id) from services.state import safe_create_batch, safe_finalize_batch_after_dispatch from services.task_batches import _send_batch_task_email_if_configured execution_start_time = time_mod.time() log_id = database.create_schedule_execution_log( schedule_id=schedule_id, user_id=user_id, schedule_name=schedule.get("name", "未命名任务"), ) batch_id = f"batch_{uuid.uuid4().hex[:12]}" now_ts = time_mod.time() safe_create_batch( batch_id, { "user_id": user_id, "browse_type": browse_type, "schedule_name": schedule.get("name", "未命名任务"), "screenshots": [], "total_accounts": 0, "completed": 0, "created_at": now_ts, "updated_at": now_ts, }, ) started_count = 0 skipped_count = 0 completion_lock = threading.Lock() remaining = {"count": 0, "done": False} def on_browse_done(): with completion_lock: remaining["count"] -= 1 if remaining["done"] or remaining["count"] > 0: return remaining["done"] = True execution_duration = int(time_mod.time() - execution_start_time) database.update_schedule_execution_log( log_id, total_accounts=len(account_ids), success_accounts=started_count, failed_accounts=len(account_ids) - started_count, duration_seconds=execution_duration, status="completed", ) task_source = f"user_scheduled:{batch_id}" for account_id in account_ids: account = safe_get_account(user_id, account_id) if (not account) or account.is_running: skipped_count += 1 continue with completion_lock: remaining["count"] += 1 ok, _ = submit_account_task( user_id=user_id, account_id=account_id, browse_type=browse_type, enable_screenshot=enable_screenshot, source=task_source, done_callback=on_browse_done, ) if ok: started_count += 1 else: with completion_lock: remaining["count"] -= 1 skipped_count += 1 batch_info = safe_finalize_batch_after_dispatch(batch_id, started_count, now_ts=time_mod.time()) if batch_info: _send_batch_task_email_if_configured(batch_info) database.update_schedule_last_run(schedule_id) if started_count <= 0: database.update_schedule_execution_log( log_id, total_accounts=len(account_ids), success_accounts=0, failed_accounts=len(account_ids), duration_seconds=0, status="completed", ) return jsonify( { "success": True, "started_count": started_count, "skipped_count": skipped_count, "message": f"已启动 {started_count} 个账号", } ) @api_schedules_bp.route("/api/schedules//logs", methods=["GET"]) @login_required def get_schedule_logs_api(schedule_id): """获取定时任务执行日志""" try: schedule = database.get_schedule_by_id(schedule_id) if not schedule or schedule["user_id"] != current_user.id: return jsonify([]) limit = request.args.get("limit", 20, type=int) logs = database.get_schedule_execution_logs(schedule_id, limit) return jsonify(logs if logs else []) except Exception: return jsonify([]) @api_schedules_bp.route("/api/schedules//logs", methods=["DELETE"]) @login_required def delete_schedule_logs_api(schedule_id): """清空定时任务执行日志""" try: schedule = database.get_schedule_by_id(schedule_id) if not schedule or schedule["user_id"] != current_user.id: return jsonify({"error": "无权限"}), 403 deleted = database.delete_schedule_logs(schedule_id, current_user.id) return jsonify({"success": True, "deleted": deleted}) except Exception as e: return jsonify({"error": str(e)}), 500