#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations import os import threading import time from datetime import timedelta import database import email_service from app_config import get_config from app_logger import get_logger from services.accounts_service import load_user_accounts from services.browse_types import BROWSE_TYPE_SHOULD_READ, normalize_browse_type from services.state import ( safe_cleanup_expired_captcha, safe_create_batch, safe_finalize_batch_after_dispatch, safe_get_account, safe_get_user_accounts_snapshot, ) from services.task_batches import _send_batch_task_email_if_configured from services.tasks import submit_account_task from services.time_utils import get_beijing_now logger = get_logger("app") config = get_config() try: _SCHEDULE_SUBMIT_DELAY_SECONDS = float(os.environ.get("SCHEDULE_SUBMIT_DELAY_SECONDS", "0.2")) except Exception: _SCHEDULE_SUBMIT_DELAY_SECONDS = 0.2 _SCHEDULE_SUBMIT_DELAY_SECONDS = max(0.0, _SCHEDULE_SUBMIT_DELAY_SECONDS) SCREENSHOTS_DIR = config.SCREENSHOTS_DIR os.makedirs(SCREENSHOTS_DIR, exist_ok=True) _HHMM_RE = None def _normalize_hhmm(value: object, *, default: str) -> str: """Normalize scheduler time to HH:MM; fallback to default when invalid.""" global _HHMM_RE if _HHMM_RE is None: import re _HHMM_RE = re.compile(r"^(\d{1,2}):(\d{2})$") text = str(value or "").strip() match = _HHMM_RE.match(text) if not match: return default try: hour = int(match.group(1)) minute = int(match.group(2)) except Exception: return default if hour < 0 or hour > 23 or minute < 0 or minute > 59: return default return f"{hour:02d}:{minute:02d}" def _safe_recompute_schedule_next_run(schedule_id: int) -> None: try: database.recompute_schedule_next_run(schedule_id) except Exception: pass def _load_accounts_for_users(approved_users: list[dict]) -> tuple[dict[int, dict], list[str]]: """批量加载用户账号快照。""" user_accounts: dict[int, dict] = {} account_ids: list[str] = [] for user in approved_users: user_id = user["id"] accounts = safe_get_user_accounts_snapshot(user_id) if not accounts: load_user_accounts(user_id) accounts = safe_get_user_accounts_snapshot(user_id) if accounts: user_accounts[user_id] = accounts account_ids.extend(list(accounts.keys())) return user_accounts, account_ids def _should_skip_suspended_account(account_status_info, account, username: str) -> bool: """判断是否应跳过暂停账号,并输出日志。""" if not account_status_info: return False status = account_status_info["status"] if "status" in account_status_info.keys() else "active" if status != "suspended": return False fail_count = account_status_info["login_fail_count"] if "login_fail_count" in account_status_info.keys() else 0 logger.info( f"[定时任务] 跳过暂停账号: {account.username} (用户:{username}) - 连续{fail_count}次密码错误,需修改密码" ) return True def _parse_schedule_account_ids(schedule_config: dict, schedule_id: int): import json try: account_ids_raw = schedule_config.get("account_ids", "[]") or "[]" account_ids = json.loads(account_ids_raw) except Exception as e: logger.warning(f"[定时任务] 任务#{schedule_id} 解析account_ids失败: {e}") return [] if isinstance(account_ids, list): return account_ids return [] def _create_user_schedule_batch(*, batch_id: str, user_id: int, browse_type: str, schedule_name: str, now_ts: float) -> None: safe_create_batch( batch_id, { "user_id": user_id, "browse_type": browse_type, "schedule_name": schedule_name, "screenshots": [], "total_accounts": 0, "completed": 0, "created_at": now_ts, "updated_at": now_ts, }, ) def _build_user_schedule_done_callback( *, completion_lock: threading.Lock, remaining: dict, counters: dict, execution_start_time: float, log_id: int, schedule_id: int, total_accounts: int, ): def on_browse_done(): with completion_lock: remaining["count"] -= 1 if remaining["done"] or remaining["count"] > 0: return remaining["done"] = True execution_duration = int(time.time() - execution_start_time) started_count = int(counters.get("started", 0) or 0) database.update_schedule_execution_log( log_id, total_accounts=total_accounts, success_accounts=started_count, failed_accounts=total_accounts - started_count, duration_seconds=execution_duration, status="completed", ) logger.info(f"[用户定时任务] 任务#{schedule_id}浏览阶段完成,耗时{execution_duration}秒,等待截图完成后发送邮件") return on_browse_done def _submit_user_schedule_accounts( *, user_id: int, account_ids: list, browse_type: str, enable_screenshot, task_source: str, done_callback, completion_lock: threading.Lock, remaining: dict, counters: dict, ) -> tuple[int, int]: started_count = 0 skipped_count = 0 for account_id in account_ids: account = safe_get_account(user_id, account_id) if (not account) or account.is_running: skipped_count += 1 continue with completion_lock: remaining["count"] += 1 ok, msg = submit_account_task( user_id=user_id, account_id=account_id, browse_type=browse_type, enable_screenshot=enable_screenshot, source=task_source, done_callback=done_callback, ) if ok: started_count += 1 counters["started"] = started_count else: with completion_lock: remaining["count"] -= 1 skipped_count += 1 logger.warning(f"[用户定时任务] 账号 {account.username} 启动失败: {msg}") return started_count, skipped_count def run_scheduled_task(skip_weekday_check: bool = False) -> None: """执行所有账号的浏览任务(可被手动调用,过滤重复账号)""" try: config_data = database.get_system_config() browse_type = normalize_browse_type(config_data.get("schedule_browse_type", BROWSE_TYPE_SHOULD_READ)) if not skip_weekday_check: now_beijing = get_beijing_now() current_weekday = now_beijing.isoweekday() schedule_weekdays = config_data.get("schedule_weekdays", "1,2,3,4,5,6,7") allowed_weekdays = [int(d.strip()) for d in schedule_weekdays.split(",") if d.strip()] if current_weekday not in allowed_weekdays: weekday_names = ["", "周一", "周二", "周三", "周四", "周五", "周六", "周日"] logger.info(f"[定时任务] 今天是{weekday_names[current_weekday]},不在执行日期内,跳过执行") return else: logger.info("[立即执行] 跳过星期检查,强制执行任务") logger.info(f"[定时任务] 开始执行 - 浏览类型: {browse_type}") all_users = database.get_all_users() approved_users = [u for u in all_users if u["status"] == "approved"] executed_usernames = set() total_accounts = 0 skipped_duplicates = 0 executed_accounts = 0 cfg = database.get_system_config() enable_screenshot_scheduled = cfg.get("enable_screenshot", 0) == 1 user_accounts, account_ids = _load_accounts_for_users(approved_users) account_statuses = database.get_account_status_batch(account_ids) for user in approved_users: user_id = user["id"] accounts = user_accounts.get(user_id, {}) if not accounts: continue for account_id, account in accounts.items(): total_accounts += 1 if account.is_running: continue account_status_info = account_statuses.get(str(account_id)) if _should_skip_suspended_account(account_status_info, account, user["username"]): continue if account.username in executed_usernames: skipped_duplicates += 1 logger.info( f"[定时任务] 跳过重复账号: {account.username} (用户:{user['username']}) - 该账号已被其他用户执行" ) continue executed_usernames.add(account.username) logger.info(f"[定时任务] 启动账号: {account.username} (用户:{user['username']})") ok, msg = submit_account_task( user_id=user_id, account_id=account_id, browse_type=browse_type, enable_screenshot=enable_screenshot_scheduled, source="scheduled", ) if ok: executed_accounts += 1 else: logger.warning(f"[定时任务] 启动失败({account.username}): {msg}") if _SCHEDULE_SUBMIT_DELAY_SECONDS > 0: time.sleep(_SCHEDULE_SUBMIT_DELAY_SECONDS) logger.info( f"[定时任务] 执行完成 - 总账号数:{total_accounts}, 已执行:{executed_accounts}, 跳过重复:{skipped_duplicates}" ) except Exception as e: logger.exception(f"[定时任务] 执行出错: {str(e)}") def scheduled_task_worker() -> None: """定时任务工作线程""" import schedule def decay_risk_scores(): """风险分衰减:每天定时执行一次""" try: from security.risk_scorer import RiskScorer RiskScorer().decay_scores() logger.info("[定时任务] 风险分衰减已执行") except Exception as e: logger.exception(f"[定时任务] 风险分衰减执行失败: {e}") def cleanup_expired_captcha(): try: deleted_count = safe_cleanup_expired_captcha() if deleted_count > 0: logger.info(f"[定时清理] 已清理 {deleted_count} 个过期验证码") except Exception as e: logger.warning(f"[定时清理] 清理验证码出错: {str(e)}") def cleanup_old_data(): """清理旧数据:7天前截图和任务日志,30天前操作日志和定时任务执行日志""" try: logger.info("[定时清理] 开始清理旧数据...") deleted_logs = database.delete_old_task_logs(7) logger.info(f"[定时清理] 已删除 {deleted_logs} 条任务日志") deleted_operation_logs = database.clean_old_operation_logs(30) logger.info(f"[定时清理] 已删除 {deleted_operation_logs} 条操作日志") deleted_schedule_logs = database.clean_old_schedule_logs(30) logger.info(f"[定时清理] 已删除 {deleted_schedule_logs} 条定时任务执行日志") deleted_screenshots = 0 if os.path.exists(SCREENSHOTS_DIR): cutoff_time = time.time() - (7 * 24 * 60 * 60) with os.scandir(SCREENSHOTS_DIR) as entries: for entry in entries: if (not entry.is_file()) or (not entry.name.lower().endswith((".png", ".jpg", ".jpeg"))): continue try: if entry.stat().st_mtime < cutoff_time: os.remove(entry.path) deleted_screenshots += 1 except Exception as e: logger.warning(f"[定时清理] 删除截图失败 {entry.name}: {str(e)}") logger.info(f"[定时清理] 已删除 {deleted_screenshots} 个截图文件") logger.info("[定时清理] 清理完成!") except Exception as e: logger.exception(f"[定时清理] 清理任务出错: {str(e)}") def _parse_due_schedule_weekdays(schedule_config: dict, schedule_id: int): weekdays_str = schedule_config.get("weekdays", "1,2,3,4,5") try: return [int(d) for d in weekdays_str.split(",") if d.strip()] except Exception as e: logger.warning(f"[定时任务] 任务#{schedule_id} 解析weekdays失败: {e}") _safe_recompute_schedule_next_run(schedule_id) return None def _execute_due_user_schedule(schedule_config: dict) -> None: schedule_name = schedule_config.get("name", "未命名任务") schedule_id = schedule_config["id"] user_id = schedule_config["user_id"] browse_type = normalize_browse_type(schedule_config.get("browse_type", BROWSE_TYPE_SHOULD_READ)) enable_screenshot = schedule_config.get("enable_screenshot", 1) account_ids = _parse_schedule_account_ids(schedule_config, schedule_id) if not account_ids: _safe_recompute_schedule_next_run(schedule_id) return if not safe_get_user_accounts_snapshot(user_id): load_user_accounts(user_id) import uuid execution_start_time = time.time() log_id = database.create_schedule_execution_log( schedule_id=schedule_id, user_id=user_id, schedule_name=schedule_name, ) batch_id = f"batch_{uuid.uuid4().hex[:12]}" now_ts = time.time() _create_user_schedule_batch( batch_id=batch_id, user_id=user_id, browse_type=browse_type, schedule_name=schedule_name, now_ts=now_ts, ) completion_lock = threading.Lock() remaining = {"count": 0, "done": False} counters = {"started": 0} on_browse_done = _build_user_schedule_done_callback( completion_lock=completion_lock, remaining=remaining, counters=counters, execution_start_time=execution_start_time, log_id=log_id, schedule_id=schedule_id, total_accounts=len(account_ids), ) task_source = f"user_scheduled:{batch_id}" started_count, skipped_count = _submit_user_schedule_accounts( user_id=user_id, account_ids=account_ids, browse_type=browse_type, enable_screenshot=enable_screenshot, task_source=task_source, done_callback=on_browse_done, completion_lock=completion_lock, remaining=remaining, counters=counters, ) batch_info = safe_finalize_batch_after_dispatch(batch_id, started_count, now_ts=time.time()) if batch_info: _send_batch_task_email_if_configured(batch_info) database.update_schedule_last_run(schedule_id) logger.info(f"[用户定时任务] 已启动 {started_count} 个账号,跳过 {skipped_count} 个账号,批次ID: {batch_id}") if started_count <= 0: database.update_schedule_execution_log( log_id, total_accounts=len(account_ids), success_accounts=0, failed_accounts=len(account_ids), duration_seconds=0, status="completed", ) if started_count == 0 and len(account_ids) > 0: logger.warning("[用户定时任务] ⚠️ 警告:所有账号都被跳过了!请检查user_accounts状态") def check_user_schedules(): """检查并执行用户定时任务(O-08:next_run_at 索引驱动)。""" try: now = get_beijing_now() now_str = now.strftime("%Y-%m-%d %H:%M:%S") current_weekday = now.isoweekday() due_schedules = database.get_due_user_schedules(now_str, limit=50) or [] for schedule_config in due_schedules: schedule_id = schedule_config["id"] schedule_name = schedule_config.get("name", "未命名任务") allowed_weekdays = _parse_due_schedule_weekdays(schedule_config, schedule_id) if allowed_weekdays is None: continue if current_weekday not in allowed_weekdays: _safe_recompute_schedule_next_run(schedule_id) continue logger.info( f"[用户定时任务] 任务#{schedule_id} '{schedule_name}' 到期,开始执行 " f"(next_run_at={schedule_config.get('next_run_at')})" ) _execute_due_user_schedule(schedule_config) except Exception as e: logger.exception(f"[用户定时任务] 检查出错: {str(e)}") try: config_check_interval = float(os.environ.get("SCHEDULER_CONFIG_CHECK_SECONDS", "30")) except Exception: config_check_interval = 30.0 config_check_interval = max(5.0, config_check_interval) schedule_state = {"signature": None} def check_and_schedule(force: bool = False): config_data = database.get_system_config() schedule_enabled = bool(config_data.get("schedule_enabled")) schedule_time_raw = config_data.get("schedule_time", "02:00") schedule_time_cst = _normalize_hhmm(schedule_time_raw, default="02:00") if schedule_time_cst != str(schedule_time_raw or "").strip(): logger.warning(f"[定时任务] 系统定时时间格式无效,已回退到 {schedule_time_cst} (原值: {schedule_time_raw!r})") risk_decay_time_raw = os.environ.get("RISK_SCORE_DECAY_TIME_CST", "04:00") risk_decay_time_cst = _normalize_hhmm(risk_decay_time_raw, default="04:00") if risk_decay_time_cst != str(risk_decay_time_raw or "").strip(): logger.warning(f"[定时任务] 风险分衰减时间格式无效,已回退到 {risk_decay_time_cst} (原值: {risk_decay_time_raw!r})") signature = (schedule_enabled, schedule_time_cst, risk_decay_time_cst) config_changed = schedule_state.get("signature") != signature is_first_run = schedule_state.get("signature") is None if (not force) and (not config_changed): return schedule_state["signature"] = signature schedule.clear() cleanup_time_cst = "03:00" schedule.every().day.at(cleanup_time_cst).do(cleanup_old_data) schedule.every().day.at(risk_decay_time_cst).do(decay_risk_scores) schedule.every().hour.do(cleanup_expired_captcha) quota_reset_time_cst = "00:00" schedule.every().day.at(quota_reset_time_cst).do(email_service.reset_smtp_daily_quota) if is_first_run: logger.info(f"[定时任务] 已设置数据清理任务: 每天 CST {cleanup_time_cst}") logger.info(f"[定时任务] 已设置风险分衰减: 每天 CST {risk_decay_time_cst}") logger.info(f"[定时任务] 已设置验证码清理任务: 每小时执行一次") logger.info(f"[定时任务] 已设置SMTP配额重置: 每天 CST {quota_reset_time_cst}") if schedule_enabled: schedule.every().day.at(schedule_time_cst).do(run_scheduled_task) if is_first_run or config_changed: logger.info(f"[定时任务] 已设置浏览任务: 每天 CST {schedule_time_cst}") elif config_changed and not is_first_run: logger.info("[定时任务] 浏览任务已禁用") try: check_and_schedule(force=True) except Exception as e: logger.exception(f"[定时任务] 初始化系统定时任务失败: {e}") last_config_check = time.time() last_user_schedule_minute = None while True: try: try: schedule.run_pending() except Exception as e: logger.exception(f"[定时任务] 系统调度执行出错: {e}") now_ts = time.time() if now_ts - last_config_check >= config_check_interval: try: check_and_schedule() except Exception as e: logger.exception(f"[定时任务] 系统调度配置刷新失败: {e}") finally: last_config_check = now_ts now_beijing = get_beijing_now() minute_key = now_beijing.strftime("%Y-%m-%d %H:%M") if minute_key != last_user_schedule_minute: check_user_schedules() last_user_schedule_minute = minute_key time.sleep(1) except Exception as e: logger.exception(f"[定时任务] 调度器主循环出错: {e}") time.sleep(5)