同步本地更改

This commit is contained in:
2025-12-15 10:48:58 +08:00
parent dab29347bd
commit a619e96e73
3 changed files with 139 additions and 13 deletions

View File

@@ -188,6 +188,9 @@ def toggle_schedule_api(schedule_id):
def run_schedule_now_api(schedule_id): def run_schedule_now_api(schedule_id):
"""立即执行定时任务""" """立即执行定时任务"""
import json import json
import threading
import time as time_mod
import uuid
schedule = database.get_schedule_by_id(schedule_id) schedule = database.get_schedule_by_id(schedule_id)
if not schedule: if not schedule:
@@ -210,27 +213,104 @@ def run_schedule_now_api(schedule_id):
if not safe_get_user_accounts_snapshot(user_id): if not safe_get_user_accounts_snapshot(user_id):
load_user_accounts(user_id) load_user_accounts(user_id)
started = [] from services.state import safe_create_batch, safe_finalize_batch_after_dispatch
from services.task_batches import _send_batch_task_email_if_configured
execution_start_time = time_mod.time()
log_id = database.create_schedule_execution_log(
schedule_id=schedule_id,
user_id=user_id,
schedule_name=schedule.get("name", "未命名任务"),
)
batch_id = f"batch_{uuid.uuid4().hex[:12]}"
now_ts = time_mod.time()
safe_create_batch(
batch_id,
{
"user_id": user_id,
"browse_type": browse_type,
"schedule_name": schedule.get("name", "未命名任务"),
"screenshots": [],
"total_accounts": 0,
"completed": 0,
"created_at": now_ts,
"updated_at": now_ts,
},
)
started_count = 0
skipped_count = 0
completion_lock = threading.Lock()
remaining = {"count": 0, "done": False}
def on_browse_done():
with completion_lock:
remaining["count"] -= 1
if remaining["done"] or remaining["count"] > 0:
return
remaining["done"] = True
execution_duration = int(time_mod.time() - execution_start_time)
database.update_schedule_execution_log(
log_id,
total_accounts=len(account_ids),
success_accounts=started_count,
failed_accounts=len(account_ids) - started_count,
duration_seconds=execution_duration,
status="completed",
)
for account_id in account_ids: for account_id in account_ids:
account = safe_get_account(user_id, account_id) account = safe_get_account(user_id, account_id)
if not account: if not account:
skipped_count += 1
continue continue
if account.is_running: if account.is_running:
skipped_count += 1
continue continue
task_source = f"user_scheduled:{batch_id}"
with completion_lock:
remaining["count"] += 1
ok, msg = submit_account_task( ok, msg = submit_account_task(
user_id=user_id, user_id=user_id,
account_id=account_id, account_id=account_id,
browse_type=browse_type, browse_type=browse_type,
enable_screenshot=enable_screenshot, enable_screenshot=enable_screenshot,
source="user_scheduled", source=task_source,
done_callback=on_browse_done,
) )
if ok: if ok:
started.append(account_id) started_count += 1
else:
with completion_lock:
remaining["count"] -= 1
skipped_count += 1
batch_info = safe_finalize_batch_after_dispatch(batch_id, started_count, now_ts=time_mod.time())
if batch_info:
_send_batch_task_email_if_configured(batch_info)
database.update_schedule_last_run(schedule_id) database.update_schedule_last_run(schedule_id)
return jsonify({"success": True, "started_count": len(started), "message": f"已启动 {len(started)} 个账号"}) if started_count <= 0:
database.update_schedule_execution_log(
log_id,
total_accounts=len(account_ids),
success_accounts=0,
failed_accounts=len(account_ids),
duration_seconds=0,
status="completed",
)
return jsonify(
{
"success": True,
"started_count": started_count,
"skipped_count": skipped_count,
"message": f"已启动 {started_count} 个账号",
}
)
@api_schedules_bp.route("/api/schedules/<int:schedule_id>/logs", methods=["GET"]) @api_schedules_bp.route("/api/schedules/<int:schedule_id>/logs", methods=["GET"])

View File

@@ -91,8 +91,18 @@ def compute_next_run_at(
if random_delay: if random_delay:
window_start = candidate_base - timedelta(minutes=15) window_start = candidate_base - timedelta(minutes=15)
random_minutes = random.randint(0, 30) window_end = candidate_base + timedelta(minutes=15)
candidate = window_start + timedelta(minutes=random_minutes)
# 只从“未来窗口”中抽样,避免抽到过去时间导致整天被跳过
delta_seconds = (now - window_start).total_seconds()
if delta_seconds < 0:
min_offset = 0
else:
min_offset = int(delta_seconds // 60) + 1
max_offset = int((window_end - window_start).total_seconds() // 60)
if min_offset > max_offset:
continue
candidate = window_start + timedelta(minutes=random.randint(min_offset, max_offset))
else: else:
candidate = candidate_base candidate = candidate_base
@@ -110,4 +120,3 @@ def format_cst(dt: datetime) -> str:
dt = BEIJING_TZ.localize(dt) dt = BEIJING_TZ.localize(dt)
dt = dt.astimezone(BEIJING_TZ) dt = dt.astimezone(BEIJING_TZ)
return dt.strftime("%Y-%m-%d %H:%M:%S") return dt.strftime("%Y-%m-%d %H:%M:%S")

View File

@@ -30,6 +30,30 @@ config = get_config()
SCREENSHOTS_DIR = config.SCREENSHOTS_DIR SCREENSHOTS_DIR = config.SCREENSHOTS_DIR
os.makedirs(SCREENSHOTS_DIR, exist_ok=True) os.makedirs(SCREENSHOTS_DIR, exist_ok=True)
_HHMM_RE = None
def _normalize_hhmm(value: object, *, default: str) -> str:
"""Normalize scheduler time to HH:MM; fallback to default when invalid."""
global _HHMM_RE
if _HHMM_RE is None:
import re
_HHMM_RE = re.compile(r"^(\d{1,2}):(\d{2})$")
text = str(value or "").strip()
match = _HHMM_RE.match(text)
if not match:
return default
try:
hour = int(match.group(1))
minute = int(match.group(2))
except Exception:
return default
if hour < 0 or hour > 23 or minute < 0 or minute > 59:
return default
return f"{hour:02d}:{minute:02d}"
def run_scheduled_task(skip_weekday_check: bool = False) -> None: def run_scheduled_task(skip_weekday_check: bool = False) -> None:
"""执行所有账号的浏览任务(可被手动调用,过滤重复账号)""" """执行所有账号的浏览任务(可被手动调用,过滤重复账号)"""
@@ -333,7 +357,10 @@ def scheduled_task_worker() -> None:
def check_and_schedule(force: bool = False): def check_and_schedule(force: bool = False):
config_data = database.get_system_config() config_data = database.get_system_config()
schedule_enabled = bool(config_data.get("schedule_enabled")) schedule_enabled = bool(config_data.get("schedule_enabled"))
schedule_time_cst = config_data.get("schedule_time", "02:00") schedule_time_raw = config_data.get("schedule_time", "02:00")
schedule_time_cst = _normalize_hhmm(schedule_time_raw, default="02:00")
if schedule_time_cst != str(schedule_time_raw or "").strip():
logger.warning(f"[定时任务] 系统定时时间格式无效,已回退到 {schedule_time_cst} (原值: {schedule_time_raw!r})")
signature = (schedule_enabled, schedule_time_cst) signature = (schedule_enabled, schedule_time_cst)
config_changed = schedule_state.get("signature") != signature config_changed = schedule_state.get("signature") != signature
@@ -364,17 +391,27 @@ def scheduled_task_worker() -> None:
elif config_changed and not is_first_run: elif config_changed and not is_first_run:
logger.info("[定时任务] 浏览任务已禁用") logger.info("[定时任务] 浏览任务已禁用")
try:
check_and_schedule(force=True) check_and_schedule(force=True)
except Exception as e:
logger.exception(f"[定时任务] 初始化系统定时任务失败: {e}")
last_config_check = time.time() last_config_check = time.time()
last_user_schedule_minute = None last_user_schedule_minute = None
while True: while True:
try:
try: try:
schedule.run_pending() schedule.run_pending()
except Exception as e:
logger.exception(f"[定时任务] 系统调度执行出错: {e}")
now_ts = time.time() now_ts = time.time()
if now_ts - last_config_check >= config_check_interval: if now_ts - last_config_check >= config_check_interval:
try:
check_and_schedule() check_and_schedule()
except Exception as e:
logger.exception(f"[定时任务] 系统调度配置刷新失败: {e}")
finally:
last_config_check = now_ts last_config_check = now_ts
now_beijing = get_beijing_now() now_beijing = get_beijing_now()
@@ -385,5 +422,5 @@ def scheduled_task_worker() -> None:
time.sleep(1) time.sleep(1)
except Exception as e: except Exception as e:
logger.exception(f"[定时任务] 调度器出错: {str(e)}") logger.exception(f"[定时任务] 调度器主循环出错: {e}")
time.sleep(5) time.sleep(5)