From 10a3f099521cdf3adb20bd2f68ecddadb5dcee46 Mon Sep 17 00:00:00 2001 From: yuyx <237899745@qq.com> Date: Tue, 17 Feb 2026 18:12:33 +0800 Subject: [PATCH] feat: switch OSS download quota to reservation plus log reconcile --- backend/database.js | 193 +++++++++++++++++ backend/server.js | 506 ++++++++++++++++++++++++++++++++++++++++---- frontend/app.js | 20 +- 3 files changed, 669 insertions(+), 50 deletions(-) diff --git a/backend/database.js b/backend/database.js index 770be8b..d86d113 100644 --- a/backend/database.js +++ b/backend/database.js @@ -380,6 +380,41 @@ function initDatabase() { ) `); + // 下载流量预扣保留表(直连下载签发时占用额度,不计入已用) + db.exec(` + CREATE TABLE IF NOT EXISTS user_download_traffic_reservations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL, + source TEXT NOT NULL DEFAULT 'direct', -- direct/share_direct + object_key TEXT, + reserved_bytes INTEGER NOT NULL DEFAULT 0, + remaining_bytes INTEGER NOT NULL DEFAULT 0, + status TEXT NOT NULL DEFAULT 'pending', -- pending/confirmed/expired/cancelled + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + expires_at DATETIME NOT NULL, + finalized_at DATETIME, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE + ) + `); + + // OSS 日志已处理记录(避免重复入账) + db.exec(` + CREATE TABLE IF NOT EXISTS download_traffic_ingested_logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + bucket TEXT NOT NULL, + log_key TEXT NOT NULL, + etag TEXT, + file_size INTEGER DEFAULT 0, + line_count INTEGER DEFAULT 0, + bytes_count INTEGER DEFAULT 0, + status TEXT NOT NULL DEFAULT 'success', + error_message TEXT, + processed_at DATETIME DEFAULT CURRENT_TIMESTAMP, + UNIQUE(bucket, log_key, etag) + ) + `); + // 日志表索引 db.exec(` CREATE INDEX IF NOT EXISTS idx_logs_created_at ON system_logs(created_at); @@ -395,6 +430,14 @@ function initDatabase() { -- 下载流量报表索引(按用户+日期查询) CREATE INDEX IF NOT EXISTS idx_download_traffic_user_date ON user_download_traffic_daily(user_id, date_key); + + -- 下载预扣索引(按用户+状态+到期时间) + CREATE INDEX IF NOT EXISTS idx_download_reservation_user_status_expires + ON user_download_traffic_reservations(user_id, status, expires_at); + + -- 已处理日志索引(按处理时间回溯) + CREATE INDEX IF NOT EXISTS idx_ingested_logs_processed_at + ON download_traffic_ingested_logs(processed_at); `); console.log('[数据库性能优化] ✓ 日志表复合索引已创建'); @@ -1537,6 +1580,154 @@ const DownloadTrafficReportDB = { } }; +const DownloadTrafficReservationDB = { + create({ userId, source = 'direct', objectKey = null, reservedBytes, expiresAt }) { + const uid = Number(userId); + const bytes = Math.floor(Number(reservedBytes)); + const expiresAtValue = typeof expiresAt === 'string' ? expiresAt : null; + + if (!Number.isFinite(uid) || uid <= 0 || !Number.isFinite(bytes) || bytes <= 0 || !expiresAtValue) { + return null; + } + + const result = db.prepare(` + INSERT INTO user_download_traffic_reservations ( + user_id, source, object_key, reserved_bytes, remaining_bytes, + status, expires_at, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, 'pending', ?, datetime('now', 'localtime'), datetime('now', 'localtime')) + `).run(uid, source, objectKey, bytes, bytes, expiresAtValue); + + return db.prepare('SELECT * FROM user_download_traffic_reservations WHERE id = ?').get(result.lastInsertRowid); + }, + + getPendingReservedBytes(userId) { + const uid = Number(userId); + if (!Number.isFinite(uid) || uid <= 0) { + return 0; + } + + const row = db.prepare(` + SELECT COALESCE(SUM(remaining_bytes), 0) AS reserved + FROM user_download_traffic_reservations + WHERE user_id = ? + AND status = 'pending' + AND expires_at > datetime('now', 'localtime') + `).get(uid); + + return Number(row?.reserved || 0); + }, + + expirePendingReservations() { + return db.prepare(` + UPDATE user_download_traffic_reservations + SET status = 'expired', + remaining_bytes = 0, + finalized_at = datetime('now', 'localtime'), + updated_at = datetime('now', 'localtime') + WHERE status = 'pending' + AND expires_at <= datetime('now', 'localtime') + `).run(); + }, + + consumePendingBytes(userId, bytes) { + const uid = Number(userId); + let remaining = Math.floor(Number(bytes)); + if (!Number.isFinite(uid) || uid <= 0 || !Number.isFinite(remaining) || remaining <= 0) { + return { consumed: 0, finalizedCount: 0 }; + } + + const pendingRows = db.prepare(` + SELECT id, remaining_bytes + FROM user_download_traffic_reservations + WHERE user_id = ? + AND status = 'pending' + AND remaining_bytes > 0 + ORDER BY created_at ASC, id ASC + `).all(uid); + + let consumed = 0; + let finalizedCount = 0; + + for (const row of pendingRows) { + if (remaining <= 0) break; + + const rowRemaining = Number(row.remaining_bytes || 0); + if (!Number.isFinite(rowRemaining) || rowRemaining <= 0) continue; + + const useBytes = Math.min(remaining, rowRemaining); + const nextRemaining = rowRemaining - useBytes; + const nextStatus = nextRemaining <= 0 ? 'confirmed' : 'pending'; + + db.prepare(` + UPDATE user_download_traffic_reservations + SET remaining_bytes = ?, + status = ?, + finalized_at = CASE WHEN ? = 'confirmed' THEN datetime('now', 'localtime') ELSE finalized_at END, + updated_at = datetime('now', 'localtime') + WHERE id = ? + `).run(nextRemaining, nextStatus, nextStatus, row.id); + + consumed += useBytes; + remaining -= useBytes; + if (nextStatus === 'confirmed') { + finalizedCount += 1; + } + } + + return { consumed, finalizedCount }; + } +}; + +const DownloadTrafficIngestDB = { + isProcessed(bucket, logKey, etag = null) { + const row = db.prepare(` + SELECT id + FROM download_traffic_ingested_logs + WHERE bucket = ? + AND log_key = ? + AND COALESCE(etag, '') = COALESCE(?, '') + LIMIT 1 + `).get(bucket, logKey, etag); + + return !!row; + }, + + markProcessed({ + bucket, + logKey, + etag = null, + fileSize = 0, + lineCount = 0, + bytesCount = 0, + status = 'success', + errorMessage = null + }) { + return db.prepare(` + INSERT INTO download_traffic_ingested_logs ( + bucket, log_key, etag, file_size, line_count, bytes_count, + status, error_message, processed_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, datetime('now', 'localtime')) + ON CONFLICT(bucket, log_key, etag) + DO UPDATE SET + file_size = excluded.file_size, + line_count = excluded.line_count, + bytes_count = excluded.bytes_count, + status = excluded.status, + error_message = excluded.error_message, + processed_at = datetime('now', 'localtime') + `).run( + bucket, + logKey, + etag || '', + Number(fileSize) || 0, + Number(lineCount) || 0, + Number(bytesCount) || 0, + status || 'success', + errorMessage || null + ); + } +}; + // 系统日志操作 const SystemLogDB = { // 日志级别常量 @@ -1733,6 +1924,8 @@ module.exports = { VerificationDB, PasswordResetTokenDB, DownloadTrafficReportDB, + DownloadTrafficReservationDB, + DownloadTrafficIngestDB, SystemLogDB, TransactionDB, WalManager diff --git a/backend/server.js b/backend/server.js index 0295bd0..e0b17e4 100644 --- a/backend/server.js +++ b/backend/server.js @@ -10,6 +10,7 @@ const multer = require('multer'); const nodemailer = require('nodemailer'); const path = require('path'); const fs = require('fs'); +const zlib = require('zlib'); const { body, validationResult } = require('express-validator'); const archiver = require('archiver'); const crypto = require('crypto'); @@ -62,7 +63,20 @@ function clearOssUsageCache(userId) { console.log(`[OSS缓存] 已清除: 用户 ${userId}`); } -const { db, UserDB, ShareDB, SettingsDB, VerificationDB, PasswordResetTokenDB, DownloadTrafficReportDB, SystemLogDB, TransactionDB, WalManager } = require('./database'); +const { + db, + UserDB, + ShareDB, + SettingsDB, + VerificationDB, + PasswordResetTokenDB, + DownloadTrafficReportDB, + DownloadTrafficReservationDB, + DownloadTrafficIngestDB, + SystemLogDB, + TransactionDB, + WalManager +} = require('./database'); const StorageUsageCache = require('./utils/storage-cache'); const { JWT_SECRET, generateToken, generateRefreshToken, refreshAccessToken, authMiddleware, adminMiddleware, isJwtSecretSecure } = require('./auth'); const { StorageInterface, LocalStorageClient, OssStorageClient, formatFileSize, formatOssError } = require('./storage'); @@ -76,6 +90,10 @@ const DEFAULT_LOCAL_STORAGE_QUOTA_BYTES = 1024 * 1024 * 1024; // 1GB const DEFAULT_OSS_STORAGE_QUOTA_BYTES = 1024 * 1024 * 1024; // 1GB const MAX_DOWNLOAD_TRAFFIC_BYTES = 10 * 1024 * 1024 * 1024 * 1024; // 10TB const DOWNLOAD_POLICY_SWEEP_INTERVAL_MS = 30 * 60 * 1000; // 30分钟 +const DOWNLOAD_RESERVATION_TTL_MS = Number(process.env.DOWNLOAD_RESERVATION_TTL_MS || (30 * 60 * 1000)); // 30分钟 +const DOWNLOAD_LOG_RECONCILE_INTERVAL_MS = Number(process.env.DOWNLOAD_LOG_RECONCILE_INTERVAL_MS || (5 * 60 * 1000)); // 5分钟 +const DOWNLOAD_LOG_MAX_FILES_PER_SWEEP = Number(process.env.DOWNLOAD_LOG_MAX_FILES_PER_SWEEP || 40); +const DOWNLOAD_LOG_LIST_MAX_KEYS = Number(process.env.DOWNLOAD_LOG_LIST_MAX_KEYS || 200); const SHARE_CODE_REGEX = /^[A-Za-z0-9]{6,32}$/; const COOKIE_SECURE_MODE = String(process.env.COOKIE_SECURE || '').toLowerCase(); const SHOULD_USE_SECURE_COOKIES = @@ -660,6 +678,10 @@ function getDownloadTrafficState(user) { }; } +function getBusyDownloadMessage() { + return '当前网络繁忙,请稍后再试'; +} + function parseDateTimeValue(value) { if (!value || typeof value !== 'string') { return null; @@ -893,7 +915,7 @@ function applyDownloadTrafficUsage(userId, bytesToAdd) { return applyDownloadTrafficUsageTransaction(userId, Math.floor(parsedBytes)); } -const reserveDirectDownloadTrafficTransaction = db.transaction((userId, bytesToReserve) => { +const reserveDirectDownloadTrafficTransaction = db.transaction((userId, bytesToReserve, reservationOptions = {}) => { const policyState = enforceDownloadTrafficPolicyTransaction(userId, 'direct_download_reserve'); const user = policyState?.user || UserDB.findById(userId); if (!user) { @@ -902,52 +924,116 @@ const reserveDirectDownloadTrafficTransaction = db.transaction((userId, bytesToR const reserveBytes = Math.floor(Number(bytesToReserve)); if (!Number.isFinite(reserveBytes) || reserveBytes <= 0) { - return { - ok: true, - quota: normalizeDownloadTrafficQuota(user.download_traffic_quota), - usedBefore: normalizeDownloadTrafficUsed(user.download_traffic_used, normalizeDownloadTrafficQuota(user.download_traffic_quota)), - usedAfter: normalizeDownloadTrafficUsed(user.download_traffic_used, normalizeDownloadTrafficQuota(user.download_traffic_quota)), - reserved: 0 - }; + return { ok: true, reserved: 0, isUnlimited: true }; } const trafficState = getDownloadTrafficState(user); - if (!trafficState.isUnlimited && reserveBytes > trafficState.remaining) { + if (trafficState.isUnlimited) { + return { ok: true, reserved: 0, isUnlimited: true }; + } + + const pendingReserved = Number(DownloadTrafficReservationDB.getPendingReservedBytes(userId) || 0); + const available = Math.max(0, trafficState.remaining - pendingReserved); + if (reserveBytes > available) { return { ok: false, - reason: 'insufficient', + reason: 'insufficient_available', quota: trafficState.quota, usedBefore: trafficState.used, - remaining: trafficState.remaining + pendingReserved, + remaining: trafficState.remaining, + available }; } - const nextUsed = trafficState.used + reserveBytes; - UserDB.update(userId, { download_traffic_used: nextUsed }); - DownloadTrafficReportDB.addUsage(userId, reserveBytes, 1, new Date()); + const ttlMs = Math.max(60 * 1000, Number(reservationOptions.ttlMs || DOWNLOAD_RESERVATION_TTL_MS)); + const expiresAt = new Date(Date.now() + ttlMs); + const reservation = DownloadTrafficReservationDB.create({ + userId, + source: reservationOptions.source || 'direct', + objectKey: reservationOptions.objectKey || null, + reservedBytes: reserveBytes, + expiresAt: formatDateTimeForSqlite(expiresAt) + }); + + if (!reservation) { + return { ok: false, reason: 'create_failed' }; + } return { ok: true, + isUnlimited: false, quota: trafficState.quota, usedBefore: trafficState.used, - usedAfter: nextUsed, - reserved: reserveBytes + pendingReserved, + availableBefore: available, + reserved: reserveBytes, + reservation }; }); -function reserveDirectDownloadTraffic(userId, bytesToReserve) { +function reserveDirectDownloadTraffic(userId, bytesToReserve, reservationOptions = {}) { const parsedBytes = Number(bytesToReserve); if (!Number.isFinite(parsedBytes) || parsedBytes <= 0) { return { ok: true, - quota: 0, - usedBefore: 0, - usedAfter: 0, - reserved: 0 + reserved: 0, + isUnlimited: true }; } - return reserveDirectDownloadTrafficTransaction(userId, Math.floor(parsedBytes)); + return reserveDirectDownloadTrafficTransaction(userId, Math.floor(parsedBytes), reservationOptions); +} + +const applyConfirmedDownloadTrafficFromLogTransaction = db.transaction((userId, confirmedBytes, downloadCount = 0, eventDate = new Date()) => { + const policyState = enforceDownloadTrafficPolicyTransaction(userId, 'log_confirm'); + const user = policyState?.user || UserDB.findById(userId); + if (!user) { + return null; + } + + const bytes = Math.floor(Number(confirmedBytes)); + const count = Math.floor(Number(downloadCount)); + if (!Number.isFinite(bytes) || bytes <= 0) { + return { + userId, + confirmed: 0, + added: 0, + consumedReserved: 0 + }; + } + + const trafficState = getDownloadTrafficState(user); + const nextUsed = trafficState.isUnlimited + ? (trafficState.used + bytes) + : Math.min(trafficState.quota, trafficState.used + bytes); + + UserDB.update(userId, { download_traffic_used: nextUsed }); + DownloadTrafficReportDB.addUsage(userId, bytes, count > 0 ? count : 1, eventDate); + const consumeResult = DownloadTrafficReservationDB.consumePendingBytes(userId, bytes); + + return { + userId, + confirmed: bytes, + added: Math.max(0, nextUsed - trafficState.used), + usedBefore: trafficState.used, + usedAfter: nextUsed, + consumedReserved: Number(consumeResult?.consumed || 0), + finalizedReservations: Number(consumeResult?.finalizedCount || 0) + }; +}); + +function applyConfirmedDownloadTrafficFromLog(userId, confirmedBytes, downloadCount = 0, eventDate = new Date()) { + const parsed = Number(confirmedBytes); + if (!Number.isFinite(parsed) || parsed <= 0) { + return null; + } + return applyConfirmedDownloadTrafficFromLogTransaction( + userId, + Math.floor(parsed), + Math.floor(Number(downloadCount || 0)), + eventDate + ); } function runDownloadTrafficPolicySweep(trigger = 'scheduled') { @@ -974,6 +1060,328 @@ function runDownloadTrafficPolicySweep(trigger = 'scheduled') { } } +function getDownloadTrafficLogIngestConfig(baseBucket = '') { + const configuredBucket = (SettingsDB.get('download_traffic_log_bucket') || process.env.DOWNLOAD_TRAFFIC_LOG_BUCKET || '').trim(); + const configuredPrefixRaw = SettingsDB.get('download_traffic_log_prefix') || process.env.DOWNLOAD_TRAFFIC_LOG_PREFIX || ''; + const configuredPrefix = String(configuredPrefixRaw).replace(/^\/+/, ''); + + return { + bucket: configuredBucket || baseBucket || '', + prefix: configuredPrefix + }; +} + +async function readS3BodyToBuffer(body) { + if (!body) return Buffer.alloc(0); + + if (typeof body.transformToByteArray === 'function') { + const arr = await body.transformToByteArray(); + return Buffer.from(arr); + } + + if (typeof body.transformToString === 'function') { + const text = await body.transformToString(); + return Buffer.from(text, 'utf8'); + } + + if (Buffer.isBuffer(body)) { + return body; + } + + if (typeof body === 'string') { + return Buffer.from(body, 'utf8'); + } + + if (typeof body[Symbol.asyncIterator] === 'function') { + const chunks = []; + for await (const chunk of body) { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + } + return Buffer.concat(chunks); + } + + return Buffer.alloc(0); +} + +function parseDownloadTrafficLogTime(line) { + if (!line || typeof line !== 'string') return null; + const match = line.match(/\[(\d{2})\/([A-Za-z]{3})\/(\d{4}):(\d{2}):(\d{2}):(\d{2})\s*([+-]\d{4})?\]/); + if (!match) return null; + + const monthMap = { + Jan: 0, Feb: 1, Mar: 2, Apr: 3, May: 4, Jun: 5, + Jul: 6, Aug: 7, Sep: 8, Oct: 9, Nov: 10, Dec: 11 + }; + const month = monthMap[match[2]]; + if (month === undefined) return null; + + const year = Number(match[3]); + const day = Number(match[1]); + const hour = Number(match[4]); + const minute = Number(match[5]); + const second = Number(match[6]); + + if ([year, day, hour, minute, second].some(v => !Number.isFinite(v))) { + return null; + } + + // 先按 UTC 构造,再应用时区偏移(若存在) + let utcMillis = Date.UTC(year, month, day, hour, minute, second); + const tzRaw = match[7]; + if (tzRaw && /^[+-]\d{4}$/.test(tzRaw)) { + const sign = tzRaw[0] === '+' ? 1 : -1; + const tzHour = Number(tzRaw.slice(1, 3)); + const tzMin = Number(tzRaw.slice(3, 5)); + const offsetMinutes = sign * (tzHour * 60 + tzMin); + utcMillis -= offsetMinutes * 60 * 1000; + } + + const parsed = new Date(utcMillis); + return Number.isNaN(parsed.getTime()) ? null : parsed; +} + +function parseDownloadTrafficLine(line) { + if (!line || typeof line !== 'string') { + return null; + } + + const trimmed = line.trim(); + if (!trimmed) return null; + + // 仅处理 GET 请求(HEAD/PUT/POST 等不计下载流量) + if (!/\bGET\b/i.test(trimmed)) { + return null; + } + + let statusCode = 0; + let bytesSent = 0; + const statusMatch = trimmed.match(/"\s*(\d{3})\s+(\d+|-)\b/); + if (statusMatch) { + statusCode = Number(statusMatch[1]); + bytesSent = statusMatch[2] === '-' ? 0 : Number(statusMatch[2]); + } + + if (![200, 206].includes(statusCode) || !Number.isFinite(bytesSent) || bytesSent <= 0) { + return null; + } + + // 尝试从请求路径提取 object key + let objectKey = null; + const requestMatch = trimmed.match(/"(?:GET|HEAD)\s+([^" ]+)\s+HTTP\//i); + if (requestMatch && requestMatch[1]) { + let requestPath = requestMatch[1]; + const qIndex = requestPath.indexOf('?'); + if (qIndex >= 0) { + requestPath = requestPath.slice(0, qIndex); + } + requestPath = requestPath.replace(/^https?:\/\/[^/]+/i, ''); + requestPath = requestPath.replace(/^\/+/, ''); + try { + requestPath = decodeURIComponent(requestPath); + } catch { + // ignore decode error + } + objectKey = requestPath || null; + } + + if (!objectKey) { + const keyMatch = trimmed.match(/\buser_(\d+)\/[^\s"]+/); + if (keyMatch && keyMatch[0]) { + objectKey = keyMatch[0]; + } + } + + if (!objectKey) { + return null; + } + + const userMatch = objectKey.match(/(?:^|\/)user_(\d+)\//); + if (!userMatch) { + return null; + } + + const userId = Number(userMatch[1]); + if (!Number.isFinite(userId) || userId <= 0) { + return null; + } + + return { + userId, + bytes: Math.floor(bytesSent), + objectKey, + eventAt: parseDownloadTrafficLogTime(trimmed) || new Date() + }; +} + +function extractLogLinesFromBuffer(buffer, logKey = '') { + let content = Buffer.isBuffer(buffer) ? buffer : Buffer.from(buffer || ''); + if ((logKey || '').toLowerCase().endsWith('.gz')) { + try { + content = zlib.gunzipSync(content); + } catch (error) { + console.warn(`[下载流量日志] 解压失败: ${logKey}`, error.message); + } + } + + const text = content.toString('utf8'); + return text.split(/\r?\n/); +} + +async function runDownloadTrafficLogReconcile(trigger = 'interval') { + try { + const expiredResult = DownloadTrafficReservationDB.expirePendingReservations(); + if ((expiredResult?.changes || 0) > 0) { + console.log(`[下载流量预扣] 已释放过期保留额度: ${expiredResult.changes} 条 (trigger=${trigger})`); + } + + if (!SettingsDB.hasUnifiedOssConfig()) { + return; + } + + const serviceUser = { + id: 0, + has_oss_config: 0, + current_storage_type: 'oss' + }; + const { client, bucket: defaultBucket } = createS3ClientContextForUser(serviceUser); + const ingestConfig = getDownloadTrafficLogIngestConfig(defaultBucket); + + if (!ingestConfig.bucket) { + return; + } + + const { ListObjectsV2Command, GetObjectCommand } = require('@aws-sdk/client-s3'); + let continuationToken = null; + let listed = 0; + const candidates = []; + + do { + const listResp = await client.send(new ListObjectsV2Command({ + Bucket: ingestConfig.bucket, + Prefix: ingestConfig.prefix || undefined, + ContinuationToken: continuationToken || undefined, + MaxKeys: DOWNLOAD_LOG_LIST_MAX_KEYS + })); + + const contents = Array.isArray(listResp?.Contents) ? listResp.Contents : []; + for (const item of contents) { + listed += 1; + const key = item?.Key; + if (!key) continue; + + const size = Number(item?.Size || 0); + if (!Number.isFinite(size) || size <= 0) continue; + + const etag = (item?.ETag || '').replace(/"/g, ''); + const processed = DownloadTrafficIngestDB.isProcessed(ingestConfig.bucket, key, etag); + if (processed) continue; + + // 仅处理常见日志文件后缀 + const lowerKey = key.toLowerCase(); + if (!lowerKey.endsWith('.log') && !lowerKey.endsWith('.txt') && !lowerKey.endsWith('.gz')) { + continue; + } + + candidates.push({ key, etag, size }); + if (candidates.length >= DOWNLOAD_LOG_MAX_FILES_PER_SWEEP) { + break; + } + } + + if (candidates.length >= DOWNLOAD_LOG_MAX_FILES_PER_SWEEP) { + break; + } + + continuationToken = listResp?.IsTruncated ? listResp?.NextContinuationToken : null; + } while (continuationToken); + + if (candidates.length === 0) { + return; + } + + let processedFiles = 0; + let processedLines = 0; + let confirmedBytes = 0; + + for (const candidate of candidates) { + const { key, etag, size } = candidate; + + try { + const getResp = await client.send(new GetObjectCommand({ + Bucket: ingestConfig.bucket, + Key: key + })); + const bodyBuffer = await readS3BodyToBuffer(getResp?.Body); + const lines = extractLogLinesFromBuffer(bodyBuffer, key); + const aggregateByUser = new Map(); + let parsedLineCount = 0; + + for (const line of lines) { + const parsed = parseDownloadTrafficLine(line); + if (!parsed) continue; + parsedLineCount += 1; + const existing = aggregateByUser.get(parsed.userId) || { + bytes: 0, + count: 0, + eventAt: parsed.eventAt + }; + existing.bytes += parsed.bytes; + existing.count += 1; + if (parsed.eventAt && existing.eventAt && parsed.eventAt < existing.eventAt) { + existing.eventAt = parsed.eventAt; + } + aggregateByUser.set(parsed.userId, existing); + } + + let fileBytes = 0; + for (const [uid, stat] of aggregateByUser.entries()) { + if (!stat || !Number.isFinite(stat.bytes) || stat.bytes <= 0) continue; + const result = applyConfirmedDownloadTrafficFromLog(uid, stat.bytes, stat.count, stat.eventAt || new Date()); + if (result) { + fileBytes += stat.bytes; + } + } + + processedFiles += 1; + processedLines += parsedLineCount; + confirmedBytes += fileBytes; + + DownloadTrafficIngestDB.markProcessed({ + bucket: ingestConfig.bucket, + logKey: key, + etag, + fileSize: size, + lineCount: parsedLineCount, + bytesCount: fileBytes, + status: 'success', + errorMessage: null + }); + } catch (error) { + console.error(`[下载流量日志] 处理失败: ${key}`, error); + DownloadTrafficIngestDB.markProcessed({ + bucket: ingestConfig.bucket, + logKey: key, + etag, + fileSize: size, + lineCount: 0, + bytesCount: 0, + status: 'failed', + errorMessage: String(error?.message || error).slice(0, 500) + }); + } + } + + if (processedFiles > 0) { + console.log( + `[下载流量日志] 扫描完成 (trigger=${trigger}) ` + + `listed=${listed}, files=${processedFiles}, lines=${processedLines}, bytes=${confirmedBytes}` + ); + } + } catch (error) { + console.error(`[下载流量日志] 扫描失败 (trigger=${trigger}):`, error); + } +} + const downloadPolicySweepTimer = setInterval(() => { runDownloadTrafficPolicySweep('interval'); }, DOWNLOAD_POLICY_SWEEP_INTERVAL_MS); @@ -986,6 +1394,18 @@ setTimeout(() => { runDownloadTrafficPolicySweep('startup'); }, 10 * 1000); +const downloadTrafficLogReconcileTimer = setInterval(() => { + runDownloadTrafficLogReconcile('interval'); +}, DOWNLOAD_LOG_RECONCILE_INTERVAL_MS); + +if (downloadTrafficLogReconcileTimer && typeof downloadTrafficLogReconcileTimer.unref === 'function') { + downloadTrafficLogReconcileTimer.unref(); +} + +setTimeout(() => { + runDownloadTrafficLogReconcile('startup'); +}, 30 * 1000); + // 构建用于存储客户端的用户对象(自动尝试解密 OSS Secret) function buildStorageUserContext(user, overrides = {}) { if (!user) { @@ -4153,7 +4573,7 @@ app.get('/api/files/download-url', authMiddleware, async (req, res) => { const objectKey = ossClient.getObjectKey(normalizedPath); let fileSize = 0; - // 启用下载流量限制时,签发前先校验文件大小与剩余额度 + // 启用下载流量限制时,签发前先获取文件大小(用于预扣保留额度) if (!trafficState.isUnlimited) { let headResponse; try { @@ -4176,11 +4596,10 @@ app.get('/api/files/download-url', authMiddleware, async (req, res) => { fileSize = Number.isFinite(contentLength) && contentLength > 0 ? Math.floor(contentLength) : 0; - - if (fileSize > trafficState.remaining) { - return res.status(403).json({ + if (fileSize <= 0) { + return res.status(503).json({ success: false, - message: `下载流量不足:文件 ${formatFileSize(fileSize)},剩余 ${formatFileSize(trafficState.remaining)}` + message: getBusyDownloadMessage() }); } } @@ -4195,14 +4614,17 @@ app.get('/api/files/download-url', authMiddleware, async (req, res) => { // 生成签名 URL(1小时有效) const signedUrl = await getSignedUrl(client, command, { expiresIn: 3600 }); - // 直连模式下无法精确获知真实下载字节;限流时在签发前预扣文件大小 + // 直连模式:先预扣保留额度(不写入已用),实际用量由 OSS 日志异步确认入账 if (!trafficState.isUnlimited && fileSize > 0) { - const reserveResult = reserveDirectDownloadTraffic(latestUser.id, fileSize); + const reserveResult = reserveDirectDownloadTraffic(latestUser.id, fileSize, { + source: 'direct', + objectKey, + ttlMs: DOWNLOAD_RESERVATION_TTL_MS + }); if (!reserveResult?.ok) { - const remaining = Number(reserveResult?.remaining || 0); - return res.status(403).json({ + return res.status(503).json({ success: false, - message: `下载流量不足:文件 ${formatFileSize(fileSize)},剩余 ${formatFileSize(remaining)}` + message: getBusyDownloadMessage() }); } } @@ -5527,11 +5949,10 @@ app.post('/api/share/:code/download-url', shareRateLimitMiddleware, async (req, fileSize = Number.isFinite(contentLength) && contentLength > 0 ? Math.floor(contentLength) : 0; - - if (fileSize > ownerTrafficState.remaining) { - return res.status(403).json({ + if (fileSize <= 0) { + return res.status(503).json({ success: false, - message: `下载流量不足:文件 ${formatFileSize(fileSize)},剩余 ${formatFileSize(ownerTrafficState.remaining)}` + message: getBusyDownloadMessage() }); } } @@ -5547,12 +5968,15 @@ app.post('/api/share/:code/download-url', shareRateLimitMiddleware, async (req, const signedUrl = await getSignedUrl(client, command, { expiresIn: 3600 }); if (!ownerTrafficState.isUnlimited && fileSize > 0) { - const reserveResult = reserveDirectDownloadTraffic(shareOwner.id, fileSize); + const reserveResult = reserveDirectDownloadTraffic(shareOwner.id, fileSize, { + source: 'share_direct', + objectKey, + ttlMs: DOWNLOAD_RESERVATION_TTL_MS + }); if (!reserveResult?.ok) { - const remaining = Number(reserveResult?.remaining || 0); - return res.status(403).json({ + return res.status(503).json({ success: false, - message: `下载流量不足:文件 ${formatFileSize(fileSize)},剩余 ${formatFileSize(remaining)}` + message: getBusyDownloadMessage() }); } } diff --git a/frontend/app.js b/frontend/app.js index a628e71..80b1500 100644 --- a/frontend/app.js +++ b/frontend/app.js @@ -1554,10 +1554,8 @@ handleDragLeave(e) { // OSS 下载优先使用直连(避免服务器中转导致带宽与费用双重损耗) if (canDirectOssDownload) { - const directResult = await this.downloadFromOSS(filePath); - if (directResult) { - return; - } + await this.downloadFromOSS(filePath); + return; } // 其他场景走后端下载接口(支持下载流量计量/权限控制) @@ -1584,7 +1582,12 @@ handleDragLeave(e) { document.body.removeChild(link); return true; } catch (error) { - console.error('OSS直连下载失败,将回退到后端下载:', error); + console.error('OSS直连下载失败:', error); + const message = error.response?.data?.message || '下载失败,请稍后重试'; + this.showToast('error', '下载失败', message); + if (error.response?.status === 401) { + this.logout(); + } return false; } }, @@ -1888,10 +1891,8 @@ handleDragLeave(e) { ? `/${file.name}` : `${this.currentPath}/${file.name}`; - const hasDownloadTrafficLimit = Number(this.user?.download_traffic_quota || 0) > 0; - - // OSS 模式且未启用下载流量限制时,返回签名 URL(用于媒体预览) - if (this.storageType === 'oss' && this.user?.oss_config_source !== 'none' && !hasDownloadTrafficLimit) { + // OSS 模式优先直连,避免限流场景回退为后端中转 + if (this.storageType === 'oss' && this.user?.oss_config_source !== 'none') { try { const { data } = await axios.get(`${this.apiBase}/api/files/download-url`, { params: { path: filePath } @@ -1902,6 +1903,7 @@ handleDragLeave(e) { } catch (error) { console.error('获取媒体URL失败:', error); } + return null; } // 本地存储模式:通过后端 API