Files
zsglpt/db/admin.py
yuyx b0fe325154 feat: KDocs 上传增强 + 离线监控 + Bug修复
KDocs 上传功能增强:
- 搜索优化:只用姓名搜索 + C列验证,避免匹配到错误单元格
- 有效行范围:支持配置起始行/结束行,限制上传区域
- 图片覆盖:支持覆盖单元格已有图片(Escape + Delete)
- 配置持久化:kdocs_row_start/row_end 保存到数据库(v18迁移)

二次登录功能:
- 登录后立即再次登录,让"上次登录时间"显示为刚刚

KDocs 离线监控:
- 每5分钟检测金山文档登录状态
- 离线时发送邮件通知管理员(每次掉线只通知一次)
- 恢复在线后重置通知状态

Bug 修复:
- 任务日志搜索账号关键词报错500:添加异常处理

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-07 23:40:46 +08:00

405 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import sqlite3
from datetime import datetime, timedelta
import pytz
import db_pool
from db.utils import get_cst_now_str
from password_utils import (
hash_password_bcrypt,
is_sha256_hash,
verify_password_bcrypt,
verify_password_sha256,
)
def ensure_default_admin() -> bool:
"""确保存在默认管理员账号(行为保持不变)。"""
import secrets
import string
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) as count FROM admins")
result = cursor.fetchone()
if result["count"] == 0:
alphabet = string.ascii_letters + string.digits
random_password = "".join(secrets.choice(alphabet) for _ in range(12))
default_password_hash = hash_password_bcrypt(random_password)
cursor.execute(
"INSERT INTO admins (username, password_hash, created_at) VALUES (?, ?, ?)",
("admin", default_password_hash, get_cst_now_str()),
)
conn.commit()
print("=" * 60)
print("安全提醒:已创建默认管理员账号")
print("用户名: admin")
print(f"密码: {random_password}")
print("请立即登录后修改密码!")
print("=" * 60)
return True
return False
def verify_admin(username: str, password: str):
"""验证管理员登录 - 自动从SHA256升级到bcrypt行为保持不变"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM admins WHERE username = ?", (username,))
admin = cursor.fetchone()
if not admin:
return None
admin_dict = dict(admin)
password_hash = admin_dict["password_hash"]
if is_sha256_hash(password_hash):
if verify_password_sha256(password, password_hash):
new_hash = hash_password_bcrypt(password)
cursor.execute("UPDATE admins SET password_hash = ? WHERE username = ?", (new_hash, username))
conn.commit()
print(f"管理员 {username} 密码已自动升级到bcrypt")
return admin_dict
return None
if verify_password_bcrypt(password, password_hash):
return admin_dict
return None
def update_admin_password(username: str, new_password: str) -> bool:
"""更新管理员密码"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
password_hash = hash_password_bcrypt(new_password)
cursor.execute("UPDATE admins SET password_hash = ? WHERE username = ?", (password_hash, username))
conn.commit()
return cursor.rowcount > 0
def update_admin_username(old_username: str, new_username: str) -> bool:
"""更新管理员用户名"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
try:
cursor.execute("UPDATE admins SET username = ? WHERE username = ?", (new_username, old_username))
conn.commit()
return True
except sqlite3.IntegrityError:
return False
def get_system_stats() -> dict:
"""获取系统统计信息"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) as count FROM users")
total_users = cursor.fetchone()["count"]
cursor.execute("SELECT COUNT(*) as count FROM users WHERE status = 'approved'")
approved_users = cursor.fetchone()["count"]
cursor.execute(
"""
SELECT COUNT(*) as count
FROM users
WHERE date(created_at) = date('now', 'localtime')
"""
)
new_users_today = cursor.fetchone()["count"]
cursor.execute(
"""
SELECT COUNT(*) as count
FROM users
WHERE datetime(created_at) >= datetime('now', 'localtime', '-7 days')
"""
)
new_users_7d = cursor.fetchone()["count"]
cursor.execute("SELECT COUNT(*) as count FROM accounts")
total_accounts = cursor.fetchone()["count"]
cursor.execute(
"""
SELECT COUNT(*) as count FROM users
WHERE vip_expire_time IS NOT NULL
AND datetime(vip_expire_time) > datetime('now', 'localtime')
"""
)
vip_users = cursor.fetchone()["count"]
return {
"total_users": total_users,
"approved_users": approved_users,
"new_users_today": new_users_today,
"new_users_7d": new_users_7d,
"total_accounts": total_accounts,
"vip_users": vip_users,
}
def get_system_config_raw() -> dict:
"""获取系统配置(无缓存,供 facade 做缓存/失效)。"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM system_config WHERE id = 1")
row = cursor.fetchone()
if row:
return dict(row)
return {
"max_concurrent_global": 2,
"max_concurrent_per_account": 1,
"max_screenshot_concurrent": 3,
"schedule_enabled": 0,
"schedule_time": "02:00",
"schedule_browse_type": "应读",
"schedule_weekdays": "1,2,3,4,5,6,7",
"proxy_enabled": 0,
"proxy_api_url": "",
"proxy_expire_minutes": 3,
"enable_screenshot": 1,
"auto_approve_enabled": 0,
"auto_approve_hourly_limit": 10,
"auto_approve_vip_days": 7,
"kdocs_enabled": 0,
"kdocs_doc_url": "",
"kdocs_default_unit": "",
"kdocs_sheet_name": "",
"kdocs_sheet_index": 0,
"kdocs_unit_column": "A",
"kdocs_image_column": "D",
"kdocs_admin_notify_enabled": 0,
"kdocs_admin_notify_email": "",
"kdocs_row_start": 0,
"kdocs_row_end": 0,
}
def update_system_config(
*,
max_concurrent=None,
schedule_enabled=None,
schedule_time=None,
schedule_browse_type=None,
schedule_weekdays=None,
max_concurrent_per_account=None,
max_screenshot_concurrent=None,
enable_screenshot=None,
proxy_enabled=None,
proxy_api_url=None,
proxy_expire_minutes=None,
auto_approve_enabled=None,
auto_approve_hourly_limit=None,
auto_approve_vip_days=None,
kdocs_enabled=None,
kdocs_doc_url=None,
kdocs_default_unit=None,
kdocs_sheet_name=None,
kdocs_sheet_index=None,
kdocs_unit_column=None,
kdocs_image_column=None,
kdocs_admin_notify_enabled=None,
kdocs_admin_notify_email=None,
kdocs_row_start=None,
kdocs_row_end=None,
) -> bool:
"""更新系统配置仅更新DB不做缓存处理"""
allowed_fields = {
"max_concurrent_global",
"schedule_enabled",
"schedule_time",
"schedule_browse_type",
"schedule_weekdays",
"max_concurrent_per_account",
"max_screenshot_concurrent",
"enable_screenshot",
"proxy_enabled",
"proxy_api_url",
"proxy_expire_minutes",
"auto_approve_enabled",
"auto_approve_hourly_limit",
"auto_approve_vip_days",
"kdocs_enabled",
"kdocs_doc_url",
"kdocs_default_unit",
"kdocs_sheet_name",
"kdocs_sheet_index",
"kdocs_unit_column",
"kdocs_image_column",
"kdocs_admin_notify_enabled",
"kdocs_admin_notify_email",
"kdocs_row_start",
"kdocs_row_end",
"updated_at",
}
with db_pool.get_db() as conn:
cursor = conn.cursor()
updates = []
params = []
if max_concurrent is not None:
updates.append("max_concurrent_global = ?")
params.append(max_concurrent)
if schedule_enabled is not None:
updates.append("schedule_enabled = ?")
params.append(schedule_enabled)
if schedule_time is not None:
updates.append("schedule_time = ?")
params.append(schedule_time)
if schedule_browse_type is not None:
updates.append("schedule_browse_type = ?")
params.append(schedule_browse_type)
if max_concurrent_per_account is not None:
updates.append("max_concurrent_per_account = ?")
params.append(max_concurrent_per_account)
if max_screenshot_concurrent is not None:
updates.append("max_screenshot_concurrent = ?")
params.append(max_screenshot_concurrent)
if enable_screenshot is not None:
updates.append("enable_screenshot = ?")
params.append(enable_screenshot)
if schedule_weekdays is not None:
updates.append("schedule_weekdays = ?")
params.append(schedule_weekdays)
if proxy_enabled is not None:
updates.append("proxy_enabled = ?")
params.append(proxy_enabled)
if proxy_api_url is not None:
updates.append("proxy_api_url = ?")
params.append(proxy_api_url)
if proxy_expire_minutes is not None:
updates.append("proxy_expire_minutes = ?")
params.append(proxy_expire_minutes)
if auto_approve_enabled is not None:
updates.append("auto_approve_enabled = ?")
params.append(auto_approve_enabled)
if auto_approve_hourly_limit is not None:
updates.append("auto_approve_hourly_limit = ?")
params.append(auto_approve_hourly_limit)
if auto_approve_vip_days is not None:
updates.append("auto_approve_vip_days = ?")
params.append(auto_approve_vip_days)
if kdocs_enabled is not None:
updates.append("kdocs_enabled = ?")
params.append(kdocs_enabled)
if kdocs_doc_url is not None:
updates.append("kdocs_doc_url = ?")
params.append(kdocs_doc_url)
if kdocs_default_unit is not None:
updates.append("kdocs_default_unit = ?")
params.append(kdocs_default_unit)
if kdocs_sheet_name is not None:
updates.append("kdocs_sheet_name = ?")
params.append(kdocs_sheet_name)
if kdocs_sheet_index is not None:
updates.append("kdocs_sheet_index = ?")
params.append(kdocs_sheet_index)
if kdocs_unit_column is not None:
updates.append("kdocs_unit_column = ?")
params.append(kdocs_unit_column)
if kdocs_image_column is not None:
updates.append("kdocs_image_column = ?")
params.append(kdocs_image_column)
if kdocs_admin_notify_enabled is not None:
updates.append("kdocs_admin_notify_enabled = ?")
params.append(kdocs_admin_notify_enabled)
if kdocs_admin_notify_email is not None:
updates.append("kdocs_admin_notify_email = ?")
params.append(kdocs_admin_notify_email)
if kdocs_row_start is not None:
updates.append("kdocs_row_start = ?")
params.append(kdocs_row_start)
if kdocs_row_end is not None:
updates.append("kdocs_row_end = ?")
params.append(kdocs_row_end)
if not updates:
return False
updates.append("updated_at = ?")
params.append(get_cst_now_str())
for update_clause in updates:
field_name = update_clause.split("=")[0].strip()
if field_name not in allowed_fields:
raise ValueError(f"非法字段名: {field_name}")
sql = f"UPDATE system_config SET {', '.join(updates)} WHERE id = 1"
cursor.execute(sql, params)
conn.commit()
return True
def get_hourly_registration_count() -> int:
"""获取最近一小时内的注册用户数"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT COUNT(*) FROM users
WHERE created_at >= datetime('now', 'localtime', '-1 hour')
"""
)
return cursor.fetchone()[0]
# ==================== 密码重置(管理员) ====================
def admin_reset_user_password(user_id: int, new_password: str) -> bool:
"""管理员直接重置用户密码"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
password_hash = hash_password_bcrypt(new_password)
try:
cursor.execute("UPDATE users SET password_hash = ? WHERE id = ?", (password_hash, user_id))
conn.commit()
return cursor.rowcount > 0
except Exception as e:
print(f"管理员重置密码失败: {e}")
return False
def clean_old_operation_logs(days: int = 30) -> int:
"""清理指定天数前的操作日志如果存在operation_logs表"""
with db_pool.get_db() as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT name FROM sqlite_master
WHERE type='table' AND name='operation_logs'
"""
)
if not cursor.fetchone():
return 0
try:
cursor.execute(
"""
DELETE FROM operation_logs
WHERE created_at < datetime('now', 'localtime', '-' || ? || ' days')
""",
(days,),
)
deleted_count = cursor.rowcount
conn.commit()
print(f"已清理 {deleted_count} 条旧操作日志 (>{days}天)")
return deleted_count
except Exception as e:
print(f"清理旧操作日志失败: {e}")
return 0