feat: switch OSS download quota to reservation plus log reconcile

This commit is contained in:
2026-02-17 18:12:33 +08:00
parent b171b41599
commit 10a3f09952
3 changed files with 669 additions and 50 deletions

View File

@@ -380,6 +380,41 @@ function initDatabase() {
) )
`); `);
// 下载流量预扣保留表(直连下载签发时占用额度,不计入已用)
db.exec(`
CREATE TABLE IF NOT EXISTS user_download_traffic_reservations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
source TEXT NOT NULL DEFAULT 'direct', -- direct/share_direct
object_key TEXT,
reserved_bytes INTEGER NOT NULL DEFAULT 0,
remaining_bytes INTEGER NOT NULL DEFAULT 0,
status TEXT NOT NULL DEFAULT 'pending', -- pending/confirmed/expired/cancelled
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
expires_at DATETIME NOT NULL,
finalized_at DATETIME,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
`);
// OSS 日志已处理记录(避免重复入账)
db.exec(`
CREATE TABLE IF NOT EXISTS download_traffic_ingested_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
bucket TEXT NOT NULL,
log_key TEXT NOT NULL,
etag TEXT,
file_size INTEGER DEFAULT 0,
line_count INTEGER DEFAULT 0,
bytes_count INTEGER DEFAULT 0,
status TEXT NOT NULL DEFAULT 'success',
error_message TEXT,
processed_at DATETIME DEFAULT CURRENT_TIMESTAMP,
UNIQUE(bucket, log_key, etag)
)
`);
// 日志表索引 // 日志表索引
db.exec(` db.exec(`
CREATE INDEX IF NOT EXISTS idx_logs_created_at ON system_logs(created_at); CREATE INDEX IF NOT EXISTS idx_logs_created_at ON system_logs(created_at);
@@ -395,6 +430,14 @@ function initDatabase() {
-- 下载流量报表索引(按用户+日期查询) -- 下载流量报表索引(按用户+日期查询)
CREATE INDEX IF NOT EXISTS idx_download_traffic_user_date ON user_download_traffic_daily(user_id, date_key); CREATE INDEX IF NOT EXISTS idx_download_traffic_user_date ON user_download_traffic_daily(user_id, date_key);
-- 下载预扣索引(按用户+状态+到期时间)
CREATE INDEX IF NOT EXISTS idx_download_reservation_user_status_expires
ON user_download_traffic_reservations(user_id, status, expires_at);
-- 已处理日志索引(按处理时间回溯)
CREATE INDEX IF NOT EXISTS idx_ingested_logs_processed_at
ON download_traffic_ingested_logs(processed_at);
`); `);
console.log('[数据库性能优化] ✓ 日志表复合索引已创建'); console.log('[数据库性能优化] ✓ 日志表复合索引已创建');
@@ -1537,6 +1580,154 @@ const DownloadTrafficReportDB = {
} }
}; };
const DownloadTrafficReservationDB = {
create({ userId, source = 'direct', objectKey = null, reservedBytes, expiresAt }) {
const uid = Number(userId);
const bytes = Math.floor(Number(reservedBytes));
const expiresAtValue = typeof expiresAt === 'string' ? expiresAt : null;
if (!Number.isFinite(uid) || uid <= 0 || !Number.isFinite(bytes) || bytes <= 0 || !expiresAtValue) {
return null;
}
const result = db.prepare(`
INSERT INTO user_download_traffic_reservations (
user_id, source, object_key, reserved_bytes, remaining_bytes,
status, expires_at, created_at, updated_at
) VALUES (?, ?, ?, ?, ?, 'pending', ?, datetime('now', 'localtime'), datetime('now', 'localtime'))
`).run(uid, source, objectKey, bytes, bytes, expiresAtValue);
return db.prepare('SELECT * FROM user_download_traffic_reservations WHERE id = ?').get(result.lastInsertRowid);
},
getPendingReservedBytes(userId) {
const uid = Number(userId);
if (!Number.isFinite(uid) || uid <= 0) {
return 0;
}
const row = db.prepare(`
SELECT COALESCE(SUM(remaining_bytes), 0) AS reserved
FROM user_download_traffic_reservations
WHERE user_id = ?
AND status = 'pending'
AND expires_at > datetime('now', 'localtime')
`).get(uid);
return Number(row?.reserved || 0);
},
expirePendingReservations() {
return db.prepare(`
UPDATE user_download_traffic_reservations
SET status = 'expired',
remaining_bytes = 0,
finalized_at = datetime('now', 'localtime'),
updated_at = datetime('now', 'localtime')
WHERE status = 'pending'
AND expires_at <= datetime('now', 'localtime')
`).run();
},
consumePendingBytes(userId, bytes) {
const uid = Number(userId);
let remaining = Math.floor(Number(bytes));
if (!Number.isFinite(uid) || uid <= 0 || !Number.isFinite(remaining) || remaining <= 0) {
return { consumed: 0, finalizedCount: 0 };
}
const pendingRows = db.prepare(`
SELECT id, remaining_bytes
FROM user_download_traffic_reservations
WHERE user_id = ?
AND status = 'pending'
AND remaining_bytes > 0
ORDER BY created_at ASC, id ASC
`).all(uid);
let consumed = 0;
let finalizedCount = 0;
for (const row of pendingRows) {
if (remaining <= 0) break;
const rowRemaining = Number(row.remaining_bytes || 0);
if (!Number.isFinite(rowRemaining) || rowRemaining <= 0) continue;
const useBytes = Math.min(remaining, rowRemaining);
const nextRemaining = rowRemaining - useBytes;
const nextStatus = nextRemaining <= 0 ? 'confirmed' : 'pending';
db.prepare(`
UPDATE user_download_traffic_reservations
SET remaining_bytes = ?,
status = ?,
finalized_at = CASE WHEN ? = 'confirmed' THEN datetime('now', 'localtime') ELSE finalized_at END,
updated_at = datetime('now', 'localtime')
WHERE id = ?
`).run(nextRemaining, nextStatus, nextStatus, row.id);
consumed += useBytes;
remaining -= useBytes;
if (nextStatus === 'confirmed') {
finalizedCount += 1;
}
}
return { consumed, finalizedCount };
}
};
const DownloadTrafficIngestDB = {
isProcessed(bucket, logKey, etag = null) {
const row = db.prepare(`
SELECT id
FROM download_traffic_ingested_logs
WHERE bucket = ?
AND log_key = ?
AND COALESCE(etag, '') = COALESCE(?, '')
LIMIT 1
`).get(bucket, logKey, etag);
return !!row;
},
markProcessed({
bucket,
logKey,
etag = null,
fileSize = 0,
lineCount = 0,
bytesCount = 0,
status = 'success',
errorMessage = null
}) {
return db.prepare(`
INSERT INTO download_traffic_ingested_logs (
bucket, log_key, etag, file_size, line_count, bytes_count,
status, error_message, processed_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, datetime('now', 'localtime'))
ON CONFLICT(bucket, log_key, etag)
DO UPDATE SET
file_size = excluded.file_size,
line_count = excluded.line_count,
bytes_count = excluded.bytes_count,
status = excluded.status,
error_message = excluded.error_message,
processed_at = datetime('now', 'localtime')
`).run(
bucket,
logKey,
etag || '',
Number(fileSize) || 0,
Number(lineCount) || 0,
Number(bytesCount) || 0,
status || 'success',
errorMessage || null
);
}
};
// 系统日志操作 // 系统日志操作
const SystemLogDB = { const SystemLogDB = {
// 日志级别常量 // 日志级别常量
@@ -1733,6 +1924,8 @@ module.exports = {
VerificationDB, VerificationDB,
PasswordResetTokenDB, PasswordResetTokenDB,
DownloadTrafficReportDB, DownloadTrafficReportDB,
DownloadTrafficReservationDB,
DownloadTrafficIngestDB,
SystemLogDB, SystemLogDB,
TransactionDB, TransactionDB,
WalManager WalManager

View File

@@ -10,6 +10,7 @@ const multer = require('multer');
const nodemailer = require('nodemailer'); const nodemailer = require('nodemailer');
const path = require('path'); const path = require('path');
const fs = require('fs'); const fs = require('fs');
const zlib = require('zlib');
const { body, validationResult } = require('express-validator'); const { body, validationResult } = require('express-validator');
const archiver = require('archiver'); const archiver = require('archiver');
const crypto = require('crypto'); const crypto = require('crypto');
@@ -62,7 +63,20 @@ function clearOssUsageCache(userId) {
console.log(`[OSS缓存] 已清除: 用户 ${userId}`); console.log(`[OSS缓存] 已清除: 用户 ${userId}`);
} }
const { db, UserDB, ShareDB, SettingsDB, VerificationDB, PasswordResetTokenDB, DownloadTrafficReportDB, SystemLogDB, TransactionDB, WalManager } = require('./database'); const {
db,
UserDB,
ShareDB,
SettingsDB,
VerificationDB,
PasswordResetTokenDB,
DownloadTrafficReportDB,
DownloadTrafficReservationDB,
DownloadTrafficIngestDB,
SystemLogDB,
TransactionDB,
WalManager
} = require('./database');
const StorageUsageCache = require('./utils/storage-cache'); const StorageUsageCache = require('./utils/storage-cache');
const { JWT_SECRET, generateToken, generateRefreshToken, refreshAccessToken, authMiddleware, adminMiddleware, isJwtSecretSecure } = require('./auth'); const { JWT_SECRET, generateToken, generateRefreshToken, refreshAccessToken, authMiddleware, adminMiddleware, isJwtSecretSecure } = require('./auth');
const { StorageInterface, LocalStorageClient, OssStorageClient, formatFileSize, formatOssError } = require('./storage'); const { StorageInterface, LocalStorageClient, OssStorageClient, formatFileSize, formatOssError } = require('./storage');
@@ -76,6 +90,10 @@ const DEFAULT_LOCAL_STORAGE_QUOTA_BYTES = 1024 * 1024 * 1024; // 1GB
const DEFAULT_OSS_STORAGE_QUOTA_BYTES = 1024 * 1024 * 1024; // 1GB const DEFAULT_OSS_STORAGE_QUOTA_BYTES = 1024 * 1024 * 1024; // 1GB
const MAX_DOWNLOAD_TRAFFIC_BYTES = 10 * 1024 * 1024 * 1024 * 1024; // 10TB const MAX_DOWNLOAD_TRAFFIC_BYTES = 10 * 1024 * 1024 * 1024 * 1024; // 10TB
const DOWNLOAD_POLICY_SWEEP_INTERVAL_MS = 30 * 60 * 1000; // 30分钟 const DOWNLOAD_POLICY_SWEEP_INTERVAL_MS = 30 * 60 * 1000; // 30分钟
const DOWNLOAD_RESERVATION_TTL_MS = Number(process.env.DOWNLOAD_RESERVATION_TTL_MS || (30 * 60 * 1000)); // 30分钟
const DOWNLOAD_LOG_RECONCILE_INTERVAL_MS = Number(process.env.DOWNLOAD_LOG_RECONCILE_INTERVAL_MS || (5 * 60 * 1000)); // 5分钟
const DOWNLOAD_LOG_MAX_FILES_PER_SWEEP = Number(process.env.DOWNLOAD_LOG_MAX_FILES_PER_SWEEP || 40);
const DOWNLOAD_LOG_LIST_MAX_KEYS = Number(process.env.DOWNLOAD_LOG_LIST_MAX_KEYS || 200);
const SHARE_CODE_REGEX = /^[A-Za-z0-9]{6,32}$/; const SHARE_CODE_REGEX = /^[A-Za-z0-9]{6,32}$/;
const COOKIE_SECURE_MODE = String(process.env.COOKIE_SECURE || '').toLowerCase(); const COOKIE_SECURE_MODE = String(process.env.COOKIE_SECURE || '').toLowerCase();
const SHOULD_USE_SECURE_COOKIES = const SHOULD_USE_SECURE_COOKIES =
@@ -660,6 +678,10 @@ function getDownloadTrafficState(user) {
}; };
} }
function getBusyDownloadMessage() {
return '当前网络繁忙,请稍后再试';
}
function parseDateTimeValue(value) { function parseDateTimeValue(value) {
if (!value || typeof value !== 'string') { if (!value || typeof value !== 'string') {
return null; return null;
@@ -893,7 +915,7 @@ function applyDownloadTrafficUsage(userId, bytesToAdd) {
return applyDownloadTrafficUsageTransaction(userId, Math.floor(parsedBytes)); return applyDownloadTrafficUsageTransaction(userId, Math.floor(parsedBytes));
} }
const reserveDirectDownloadTrafficTransaction = db.transaction((userId, bytesToReserve) => { const reserveDirectDownloadTrafficTransaction = db.transaction((userId, bytesToReserve, reservationOptions = {}) => {
const policyState = enforceDownloadTrafficPolicyTransaction(userId, 'direct_download_reserve'); const policyState = enforceDownloadTrafficPolicyTransaction(userId, 'direct_download_reserve');
const user = policyState?.user || UserDB.findById(userId); const user = policyState?.user || UserDB.findById(userId);
if (!user) { if (!user) {
@@ -902,52 +924,116 @@ const reserveDirectDownloadTrafficTransaction = db.transaction((userId, bytesToR
const reserveBytes = Math.floor(Number(bytesToReserve)); const reserveBytes = Math.floor(Number(bytesToReserve));
if (!Number.isFinite(reserveBytes) || reserveBytes <= 0) { if (!Number.isFinite(reserveBytes) || reserveBytes <= 0) {
return { return { ok: true, reserved: 0, isUnlimited: true };
ok: true,
quota: normalizeDownloadTrafficQuota(user.download_traffic_quota),
usedBefore: normalizeDownloadTrafficUsed(user.download_traffic_used, normalizeDownloadTrafficQuota(user.download_traffic_quota)),
usedAfter: normalizeDownloadTrafficUsed(user.download_traffic_used, normalizeDownloadTrafficQuota(user.download_traffic_quota)),
reserved: 0
};
} }
const trafficState = getDownloadTrafficState(user); const trafficState = getDownloadTrafficState(user);
if (!trafficState.isUnlimited && reserveBytes > trafficState.remaining) { if (trafficState.isUnlimited) {
return { ok: true, reserved: 0, isUnlimited: true };
}
const pendingReserved = Number(DownloadTrafficReservationDB.getPendingReservedBytes(userId) || 0);
const available = Math.max(0, trafficState.remaining - pendingReserved);
if (reserveBytes > available) {
return { return {
ok: false, ok: false,
reason: 'insufficient', reason: 'insufficient_available',
quota: trafficState.quota, quota: trafficState.quota,
usedBefore: trafficState.used, usedBefore: trafficState.used,
remaining: trafficState.remaining pendingReserved,
remaining: trafficState.remaining,
available
}; };
} }
const nextUsed = trafficState.used + reserveBytes; const ttlMs = Math.max(60 * 1000, Number(reservationOptions.ttlMs || DOWNLOAD_RESERVATION_TTL_MS));
UserDB.update(userId, { download_traffic_used: nextUsed }); const expiresAt = new Date(Date.now() + ttlMs);
DownloadTrafficReportDB.addUsage(userId, reserveBytes, 1, new Date()); const reservation = DownloadTrafficReservationDB.create({
userId,
source: reservationOptions.source || 'direct',
objectKey: reservationOptions.objectKey || null,
reservedBytes: reserveBytes,
expiresAt: formatDateTimeForSqlite(expiresAt)
});
if (!reservation) {
return { ok: false, reason: 'create_failed' };
}
return { return {
ok: true, ok: true,
isUnlimited: false,
quota: trafficState.quota, quota: trafficState.quota,
usedBefore: trafficState.used, usedBefore: trafficState.used,
usedAfter: nextUsed, pendingReserved,
reserved: reserveBytes availableBefore: available,
reserved: reserveBytes,
reservation
}; };
}); });
function reserveDirectDownloadTraffic(userId, bytesToReserve) { function reserveDirectDownloadTraffic(userId, bytesToReserve, reservationOptions = {}) {
const parsedBytes = Number(bytesToReserve); const parsedBytes = Number(bytesToReserve);
if (!Number.isFinite(parsedBytes) || parsedBytes <= 0) { if (!Number.isFinite(parsedBytes) || parsedBytes <= 0) {
return { return {
ok: true, ok: true,
quota: 0, reserved: 0,
usedBefore: 0, isUnlimited: true
usedAfter: 0,
reserved: 0
}; };
} }
return reserveDirectDownloadTrafficTransaction(userId, Math.floor(parsedBytes)); return reserveDirectDownloadTrafficTransaction(userId, Math.floor(parsedBytes), reservationOptions);
}
const applyConfirmedDownloadTrafficFromLogTransaction = db.transaction((userId, confirmedBytes, downloadCount = 0, eventDate = new Date()) => {
const policyState = enforceDownloadTrafficPolicyTransaction(userId, 'log_confirm');
const user = policyState?.user || UserDB.findById(userId);
if (!user) {
return null;
}
const bytes = Math.floor(Number(confirmedBytes));
const count = Math.floor(Number(downloadCount));
if (!Number.isFinite(bytes) || bytes <= 0) {
return {
userId,
confirmed: 0,
added: 0,
consumedReserved: 0
};
}
const trafficState = getDownloadTrafficState(user);
const nextUsed = trafficState.isUnlimited
? (trafficState.used + bytes)
: Math.min(trafficState.quota, trafficState.used + bytes);
UserDB.update(userId, { download_traffic_used: nextUsed });
DownloadTrafficReportDB.addUsage(userId, bytes, count > 0 ? count : 1, eventDate);
const consumeResult = DownloadTrafficReservationDB.consumePendingBytes(userId, bytes);
return {
userId,
confirmed: bytes,
added: Math.max(0, nextUsed - trafficState.used),
usedBefore: trafficState.used,
usedAfter: nextUsed,
consumedReserved: Number(consumeResult?.consumed || 0),
finalizedReservations: Number(consumeResult?.finalizedCount || 0)
};
});
function applyConfirmedDownloadTrafficFromLog(userId, confirmedBytes, downloadCount = 0, eventDate = new Date()) {
const parsed = Number(confirmedBytes);
if (!Number.isFinite(parsed) || parsed <= 0) {
return null;
}
return applyConfirmedDownloadTrafficFromLogTransaction(
userId,
Math.floor(parsed),
Math.floor(Number(downloadCount || 0)),
eventDate
);
} }
function runDownloadTrafficPolicySweep(trigger = 'scheduled') { function runDownloadTrafficPolicySweep(trigger = 'scheduled') {
@@ -974,6 +1060,328 @@ function runDownloadTrafficPolicySweep(trigger = 'scheduled') {
} }
} }
function getDownloadTrafficLogIngestConfig(baseBucket = '') {
const configuredBucket = (SettingsDB.get('download_traffic_log_bucket') || process.env.DOWNLOAD_TRAFFIC_LOG_BUCKET || '').trim();
const configuredPrefixRaw = SettingsDB.get('download_traffic_log_prefix') || process.env.DOWNLOAD_TRAFFIC_LOG_PREFIX || '';
const configuredPrefix = String(configuredPrefixRaw).replace(/^\/+/, '');
return {
bucket: configuredBucket || baseBucket || '',
prefix: configuredPrefix
};
}
async function readS3BodyToBuffer(body) {
if (!body) return Buffer.alloc(0);
if (typeof body.transformToByteArray === 'function') {
const arr = await body.transformToByteArray();
return Buffer.from(arr);
}
if (typeof body.transformToString === 'function') {
const text = await body.transformToString();
return Buffer.from(text, 'utf8');
}
if (Buffer.isBuffer(body)) {
return body;
}
if (typeof body === 'string') {
return Buffer.from(body, 'utf8');
}
if (typeof body[Symbol.asyncIterator] === 'function') {
const chunks = [];
for await (const chunk of body) {
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
}
return Buffer.concat(chunks);
}
return Buffer.alloc(0);
}
function parseDownloadTrafficLogTime(line) {
if (!line || typeof line !== 'string') return null;
const match = line.match(/\[(\d{2})\/([A-Za-z]{3})\/(\d{4}):(\d{2}):(\d{2}):(\d{2})\s*([+-]\d{4})?\]/);
if (!match) return null;
const monthMap = {
Jan: 0, Feb: 1, Mar: 2, Apr: 3, May: 4, Jun: 5,
Jul: 6, Aug: 7, Sep: 8, Oct: 9, Nov: 10, Dec: 11
};
const month = monthMap[match[2]];
if (month === undefined) return null;
const year = Number(match[3]);
const day = Number(match[1]);
const hour = Number(match[4]);
const minute = Number(match[5]);
const second = Number(match[6]);
if ([year, day, hour, minute, second].some(v => !Number.isFinite(v))) {
return null;
}
// 先按 UTC 构造,再应用时区偏移(若存在)
let utcMillis = Date.UTC(year, month, day, hour, minute, second);
const tzRaw = match[7];
if (tzRaw && /^[+-]\d{4}$/.test(tzRaw)) {
const sign = tzRaw[0] === '+' ? 1 : -1;
const tzHour = Number(tzRaw.slice(1, 3));
const tzMin = Number(tzRaw.slice(3, 5));
const offsetMinutes = sign * (tzHour * 60 + tzMin);
utcMillis -= offsetMinutes * 60 * 1000;
}
const parsed = new Date(utcMillis);
return Number.isNaN(parsed.getTime()) ? null : parsed;
}
function parseDownloadTrafficLine(line) {
if (!line || typeof line !== 'string') {
return null;
}
const trimmed = line.trim();
if (!trimmed) return null;
// 仅处理 GET 请求HEAD/PUT/POST 等不计下载流量)
if (!/\bGET\b/i.test(trimmed)) {
return null;
}
let statusCode = 0;
let bytesSent = 0;
const statusMatch = trimmed.match(/"\s*(\d{3})\s+(\d+|-)\b/);
if (statusMatch) {
statusCode = Number(statusMatch[1]);
bytesSent = statusMatch[2] === '-' ? 0 : Number(statusMatch[2]);
}
if (![200, 206].includes(statusCode) || !Number.isFinite(bytesSent) || bytesSent <= 0) {
return null;
}
// 尝试从请求路径提取 object key
let objectKey = null;
const requestMatch = trimmed.match(/"(?:GET|HEAD)\s+([^" ]+)\s+HTTP\//i);
if (requestMatch && requestMatch[1]) {
let requestPath = requestMatch[1];
const qIndex = requestPath.indexOf('?');
if (qIndex >= 0) {
requestPath = requestPath.slice(0, qIndex);
}
requestPath = requestPath.replace(/^https?:\/\/[^/]+/i, '');
requestPath = requestPath.replace(/^\/+/, '');
try {
requestPath = decodeURIComponent(requestPath);
} catch {
// ignore decode error
}
objectKey = requestPath || null;
}
if (!objectKey) {
const keyMatch = trimmed.match(/\buser_(\d+)\/[^\s"]+/);
if (keyMatch && keyMatch[0]) {
objectKey = keyMatch[0];
}
}
if (!objectKey) {
return null;
}
const userMatch = objectKey.match(/(?:^|\/)user_(\d+)\//);
if (!userMatch) {
return null;
}
const userId = Number(userMatch[1]);
if (!Number.isFinite(userId) || userId <= 0) {
return null;
}
return {
userId,
bytes: Math.floor(bytesSent),
objectKey,
eventAt: parseDownloadTrafficLogTime(trimmed) || new Date()
};
}
function extractLogLinesFromBuffer(buffer, logKey = '') {
let content = Buffer.isBuffer(buffer) ? buffer : Buffer.from(buffer || '');
if ((logKey || '').toLowerCase().endsWith('.gz')) {
try {
content = zlib.gunzipSync(content);
} catch (error) {
console.warn(`[下载流量日志] 解压失败: ${logKey}`, error.message);
}
}
const text = content.toString('utf8');
return text.split(/\r?\n/);
}
async function runDownloadTrafficLogReconcile(trigger = 'interval') {
try {
const expiredResult = DownloadTrafficReservationDB.expirePendingReservations();
if ((expiredResult?.changes || 0) > 0) {
console.log(`[下载流量预扣] 已释放过期保留额度: ${expiredResult.changes} 条 (trigger=${trigger})`);
}
if (!SettingsDB.hasUnifiedOssConfig()) {
return;
}
const serviceUser = {
id: 0,
has_oss_config: 0,
current_storage_type: 'oss'
};
const { client, bucket: defaultBucket } = createS3ClientContextForUser(serviceUser);
const ingestConfig = getDownloadTrafficLogIngestConfig(defaultBucket);
if (!ingestConfig.bucket) {
return;
}
const { ListObjectsV2Command, GetObjectCommand } = require('@aws-sdk/client-s3');
let continuationToken = null;
let listed = 0;
const candidates = [];
do {
const listResp = await client.send(new ListObjectsV2Command({
Bucket: ingestConfig.bucket,
Prefix: ingestConfig.prefix || undefined,
ContinuationToken: continuationToken || undefined,
MaxKeys: DOWNLOAD_LOG_LIST_MAX_KEYS
}));
const contents = Array.isArray(listResp?.Contents) ? listResp.Contents : [];
for (const item of contents) {
listed += 1;
const key = item?.Key;
if (!key) continue;
const size = Number(item?.Size || 0);
if (!Number.isFinite(size) || size <= 0) continue;
const etag = (item?.ETag || '').replace(/"/g, '');
const processed = DownloadTrafficIngestDB.isProcessed(ingestConfig.bucket, key, etag);
if (processed) continue;
// 仅处理常见日志文件后缀
const lowerKey = key.toLowerCase();
if (!lowerKey.endsWith('.log') && !lowerKey.endsWith('.txt') && !lowerKey.endsWith('.gz')) {
continue;
}
candidates.push({ key, etag, size });
if (candidates.length >= DOWNLOAD_LOG_MAX_FILES_PER_SWEEP) {
break;
}
}
if (candidates.length >= DOWNLOAD_LOG_MAX_FILES_PER_SWEEP) {
break;
}
continuationToken = listResp?.IsTruncated ? listResp?.NextContinuationToken : null;
} while (continuationToken);
if (candidates.length === 0) {
return;
}
let processedFiles = 0;
let processedLines = 0;
let confirmedBytes = 0;
for (const candidate of candidates) {
const { key, etag, size } = candidate;
try {
const getResp = await client.send(new GetObjectCommand({
Bucket: ingestConfig.bucket,
Key: key
}));
const bodyBuffer = await readS3BodyToBuffer(getResp?.Body);
const lines = extractLogLinesFromBuffer(bodyBuffer, key);
const aggregateByUser = new Map();
let parsedLineCount = 0;
for (const line of lines) {
const parsed = parseDownloadTrafficLine(line);
if (!parsed) continue;
parsedLineCount += 1;
const existing = aggregateByUser.get(parsed.userId) || {
bytes: 0,
count: 0,
eventAt: parsed.eventAt
};
existing.bytes += parsed.bytes;
existing.count += 1;
if (parsed.eventAt && existing.eventAt && parsed.eventAt < existing.eventAt) {
existing.eventAt = parsed.eventAt;
}
aggregateByUser.set(parsed.userId, existing);
}
let fileBytes = 0;
for (const [uid, stat] of aggregateByUser.entries()) {
if (!stat || !Number.isFinite(stat.bytes) || stat.bytes <= 0) continue;
const result = applyConfirmedDownloadTrafficFromLog(uid, stat.bytes, stat.count, stat.eventAt || new Date());
if (result) {
fileBytes += stat.bytes;
}
}
processedFiles += 1;
processedLines += parsedLineCount;
confirmedBytes += fileBytes;
DownloadTrafficIngestDB.markProcessed({
bucket: ingestConfig.bucket,
logKey: key,
etag,
fileSize: size,
lineCount: parsedLineCount,
bytesCount: fileBytes,
status: 'success',
errorMessage: null
});
} catch (error) {
console.error(`[下载流量日志] 处理失败: ${key}`, error);
DownloadTrafficIngestDB.markProcessed({
bucket: ingestConfig.bucket,
logKey: key,
etag,
fileSize: size,
lineCount: 0,
bytesCount: 0,
status: 'failed',
errorMessage: String(error?.message || error).slice(0, 500)
});
}
}
if (processedFiles > 0) {
console.log(
`[下载流量日志] 扫描完成 (trigger=${trigger}) ` +
`listed=${listed}, files=${processedFiles}, lines=${processedLines}, bytes=${confirmedBytes}`
);
}
} catch (error) {
console.error(`[下载流量日志] 扫描失败 (trigger=${trigger}):`, error);
}
}
const downloadPolicySweepTimer = setInterval(() => { const downloadPolicySweepTimer = setInterval(() => {
runDownloadTrafficPolicySweep('interval'); runDownloadTrafficPolicySweep('interval');
}, DOWNLOAD_POLICY_SWEEP_INTERVAL_MS); }, DOWNLOAD_POLICY_SWEEP_INTERVAL_MS);
@@ -986,6 +1394,18 @@ setTimeout(() => {
runDownloadTrafficPolicySweep('startup'); runDownloadTrafficPolicySweep('startup');
}, 10 * 1000); }, 10 * 1000);
const downloadTrafficLogReconcileTimer = setInterval(() => {
runDownloadTrafficLogReconcile('interval');
}, DOWNLOAD_LOG_RECONCILE_INTERVAL_MS);
if (downloadTrafficLogReconcileTimer && typeof downloadTrafficLogReconcileTimer.unref === 'function') {
downloadTrafficLogReconcileTimer.unref();
}
setTimeout(() => {
runDownloadTrafficLogReconcile('startup');
}, 30 * 1000);
// 构建用于存储客户端的用户对象(自动尝试解密 OSS Secret // 构建用于存储客户端的用户对象(自动尝试解密 OSS Secret
function buildStorageUserContext(user, overrides = {}) { function buildStorageUserContext(user, overrides = {}) {
if (!user) { if (!user) {
@@ -4153,7 +4573,7 @@ app.get('/api/files/download-url', authMiddleware, async (req, res) => {
const objectKey = ossClient.getObjectKey(normalizedPath); const objectKey = ossClient.getObjectKey(normalizedPath);
let fileSize = 0; let fileSize = 0;
// 启用下载流量限制时,签发前先校验文件大小与剩余额度 // 启用下载流量限制时,签发前先获取文件大小(用于预扣保留额度
if (!trafficState.isUnlimited) { if (!trafficState.isUnlimited) {
let headResponse; let headResponse;
try { try {
@@ -4176,11 +4596,10 @@ app.get('/api/files/download-url', authMiddleware, async (req, res) => {
fileSize = Number.isFinite(contentLength) && contentLength > 0 fileSize = Number.isFinite(contentLength) && contentLength > 0
? Math.floor(contentLength) ? Math.floor(contentLength)
: 0; : 0;
if (fileSize <= 0) {
if (fileSize > trafficState.remaining) { return res.status(503).json({
return res.status(403).json({
success: false, success: false,
message: `下载流量不足:文件 ${formatFileSize(fileSize)},剩余 ${formatFileSize(trafficState.remaining)}` message: getBusyDownloadMessage()
}); });
} }
} }
@@ -4195,14 +4614,17 @@ app.get('/api/files/download-url', authMiddleware, async (req, res) => {
// 生成签名 URL1小时有效 // 生成签名 URL1小时有效
const signedUrl = await getSignedUrl(client, command, { expiresIn: 3600 }); const signedUrl = await getSignedUrl(client, command, { expiresIn: 3600 });
// 直连模式下无法精确获知真实下载字节;限流时在签发前预扣文件大小 // 直连模式:先预扣保留额度(不写入已用),实际用量由 OSS 日志异步确认入账
if (!trafficState.isUnlimited && fileSize > 0) { if (!trafficState.isUnlimited && fileSize > 0) {
const reserveResult = reserveDirectDownloadTraffic(latestUser.id, fileSize); const reserveResult = reserveDirectDownloadTraffic(latestUser.id, fileSize, {
source: 'direct',
objectKey,
ttlMs: DOWNLOAD_RESERVATION_TTL_MS
});
if (!reserveResult?.ok) { if (!reserveResult?.ok) {
const remaining = Number(reserveResult?.remaining || 0); return res.status(503).json({
return res.status(403).json({
success: false, success: false,
message: `下载流量不足:文件 ${formatFileSize(fileSize)},剩余 ${formatFileSize(remaining)}` message: getBusyDownloadMessage()
}); });
} }
} }
@@ -5527,11 +5949,10 @@ app.post('/api/share/:code/download-url', shareRateLimitMiddleware, async (req,
fileSize = Number.isFinite(contentLength) && contentLength > 0 fileSize = Number.isFinite(contentLength) && contentLength > 0
? Math.floor(contentLength) ? Math.floor(contentLength)
: 0; : 0;
if (fileSize <= 0) {
if (fileSize > ownerTrafficState.remaining) { return res.status(503).json({
return res.status(403).json({
success: false, success: false,
message: `下载流量不足:文件 ${formatFileSize(fileSize)},剩余 ${formatFileSize(ownerTrafficState.remaining)}` message: getBusyDownloadMessage()
}); });
} }
} }
@@ -5547,12 +5968,15 @@ app.post('/api/share/:code/download-url', shareRateLimitMiddleware, async (req,
const signedUrl = await getSignedUrl(client, command, { expiresIn: 3600 }); const signedUrl = await getSignedUrl(client, command, { expiresIn: 3600 });
if (!ownerTrafficState.isUnlimited && fileSize > 0) { if (!ownerTrafficState.isUnlimited && fileSize > 0) {
const reserveResult = reserveDirectDownloadTraffic(shareOwner.id, fileSize); const reserveResult = reserveDirectDownloadTraffic(shareOwner.id, fileSize, {
source: 'share_direct',
objectKey,
ttlMs: DOWNLOAD_RESERVATION_TTL_MS
});
if (!reserveResult?.ok) { if (!reserveResult?.ok) {
const remaining = Number(reserveResult?.remaining || 0); return res.status(503).json({
return res.status(403).json({
success: false, success: false,
message: `下载流量不足:文件 ${formatFileSize(fileSize)},剩余 ${formatFileSize(remaining)}` message: getBusyDownloadMessage()
}); });
} }
} }

View File

@@ -1554,11 +1554,9 @@ handleDragLeave(e) {
// OSS 下载优先使用直连(避免服务器中转导致带宽与费用双重损耗) // OSS 下载优先使用直连(避免服务器中转导致带宽与费用双重损耗)
if (canDirectOssDownload) { if (canDirectOssDownload) {
const directResult = await this.downloadFromOSS(filePath); await this.downloadFromOSS(filePath);
if (directResult) {
return; return;
} }
}
// 其他场景走后端下载接口(支持下载流量计量/权限控制) // 其他场景走后端下载接口(支持下载流量计量/权限控制)
this.downloadFromLocal(filePath); this.downloadFromLocal(filePath);
@@ -1584,7 +1582,12 @@ handleDragLeave(e) {
document.body.removeChild(link); document.body.removeChild(link);
return true; return true;
} catch (error) { } catch (error) {
console.error('OSS直连下载失败,将回退到后端下载:', error); console.error('OSS直连下载失败:', error);
const message = error.response?.data?.message || '下载失败,请稍后重试';
this.showToast('error', '下载失败', message);
if (error.response?.status === 401) {
this.logout();
}
return false; return false;
} }
}, },
@@ -1888,10 +1891,8 @@ handleDragLeave(e) {
? `/${file.name}` ? `/${file.name}`
: `${this.currentPath}/${file.name}`; : `${this.currentPath}/${file.name}`;
const hasDownloadTrafficLimit = Number(this.user?.download_traffic_quota || 0) > 0; // OSS 模式优先直连,避免限流场景回退为后端中转
if (this.storageType === 'oss' && this.user?.oss_config_source !== 'none') {
// OSS 模式且未启用下载流量限制时,返回签名 URL用于媒体预览
if (this.storageType === 'oss' && this.user?.oss_config_source !== 'none' && !hasDownloadTrafficLimit) {
try { try {
const { data } = await axios.get(`${this.apiBase}/api/files/download-url`, { const { data } = await axios.get(`${this.apiBase}/api/files/download-url`, {
params: { path: filePath } params: { path: filePath }
@@ -1902,6 +1903,7 @@ handleDragLeave(e) {
} catch (error) { } catch (error) {
console.error('获取媒体URL失败:', error); console.error('获取媒体URL失败:', error);
} }
return null;
} }
// 本地存储模式:通过后端 API // 本地存储模式:通过后端 API