feat(server): 新增云端缓存与同步服务端骨架

Ultraworked with [Sisyphus](https://github.com/code-yeongyu/oh-my-openagent)

Co-authored-by: Sisyphus <clio-agent@sisyphuslabs.ai>
This commit is contained in:
Developer
2026-03-18 00:27:04 +08:00
parent df12a6ac72
commit 6764f4c53b
14 changed files with 2183 additions and 0 deletions

146
server/src/auth.js Normal file
View File

@@ -0,0 +1,146 @@
const crypto = require('crypto');
const db = require('./db');
const TOKEN_TTL_DAYS = 30;
function parseJsonMaybe(value) {
if (!value) {
return null;
}
if (typeof value === 'object') {
return value;
}
try {
return JSON.parse(value);
} catch (error) {
return null;
}
}
function sha256(value) {
return crypto.createHash('sha256').update(String(value || '')).digest('hex');
}
function randomToken(size) {
return crypto.randomBytes(size).toString('base64').replace(/\+/g, '-').replace(/\//g, '_').replace(/=+$/g, '');
}
function normalizeEmail(email) {
return String(email || '').trim().toLowerCase();
}
function hashPassword(password) {
return new Promise(function(resolve, reject) {
var salt = crypto.randomBytes(16).toString('base64');
crypto.scrypt(String(password), salt, 64, function(error, derivedKey) {
if (error) {
reject(error);
return;
}
resolve('scrypt$' + salt + '$' + derivedKey.toString('base64'));
});
});
}
function verifyPassword(password, storedHash) {
return new Promise(function(resolve, reject) {
var parts = String(storedHash || '').split('$');
if (parts.length !== 3 || parts[0] !== 'scrypt') {
resolve(false);
return;
}
var salt = parts[1];
var expected = Buffer.from(parts[2], 'base64');
crypto.scrypt(String(password), salt, expected.length, function(error, derivedKey) {
if (error) {
reject(error);
return;
}
resolve(crypto.timingSafeEqual(expected, derivedKey));
});
});
}
async function createAuthToken(userId, deviceFingerprint) {
var rawToken = randomToken(32);
var expiresAt = new Date(Date.now() + TOKEN_TTL_DAYS * 24 * 60 * 60 * 1000);
await db.execute(
'INSERT INTO auth_tokens (user_id, token_hash, device_fingerprint, expires_at, last_seen_at) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)',
[userId, sha256(rawToken), deviceFingerprint || null, expiresAt]
);
return rawToken;
}
async function touchAuthToken(tokenHash) {
await db.execute('UPDATE auth_tokens SET last_seen_at = CURRENT_TIMESTAMP WHERE token_hash = ?', [tokenHash]);
}
async function revokeAuthToken(tokenHash) {
await db.execute('UPDATE auth_tokens SET revoked_at = CURRENT_TIMESTAMP WHERE token_hash = ? AND revoked_at IS NULL', [tokenHash]);
}
async function getAuthContextFromToken(rawToken) {
if (!rawToken) {
return null;
}
var tokenHash = sha256(rawToken);
var rows = await db.query(
'SELECT t.id AS token_id, t.user_id, t.token_hash, t.device_fingerprint, t.expires_at, t.revoked_at, u.email, u.status, k.wrapped_dek, k.kdf_salt, k.kdf_params, k.key_version FROM auth_tokens t INNER JOIN users u ON u.id = t.user_id LEFT JOIN user_keyrings k ON k.user_id = u.id WHERE t.token_hash = ? LIMIT 1',
[tokenHash]
);
if (!rows.length) {
return null;
}
var row = rows[0];
if (row.revoked_at) {
return null;
}
if (new Date(row.expires_at).getTime() <= Date.now()) {
return null;
}
if (Number(row.status || 0) !== 1) {
return null;
}
await touchAuthToken(tokenHash);
return {
tokenHash: tokenHash,
tokenId: row.token_id,
user: {
id: row.user_id,
email: row.email
},
keyring: row.wrapped_dek ? {
wrappedDek: row.wrapped_dek,
kdfSalt: row.kdf_salt,
kdfParams: parseJsonMaybe(row.kdf_params),
keyVersion: Number(row.key_version || 1)
} : null
};
}
async function requireAuth(request, reply) {
var header = request.headers.authorization || '';
var match = String(header).match(/^Bearer\s+(.+)$/i);
if (!match) {
reply.code(401);
throw new Error('未登录');
}
var authContext = await getAuthContextFromToken(match[1]);
if (!authContext) {
reply.code(401);
throw new Error('登录已失效');
}
request.authContext = authContext;
}
module.exports = {
createAuthToken,
getAuthContextFromToken,
hashPassword,
normalizeEmail,
parseJsonMaybe,
requireAuth,
revokeAuthToken,
sha256,
verifyPassword
};

38
server/src/config.js Normal file
View File

@@ -0,0 +1,38 @@
const fs = require('fs');
const path = require('path');
const dotenv = require('dotenv');
const envPath = path.join(__dirname, '..', '.env');
if (fs.existsSync(envPath)) {
dotenv.config({ path: envPath });
}
function numberFromEnv(name, fallback) {
const value = Number(process.env[name]);
return Number.isFinite(value) && value > 0 ? value : fallback;
}
module.exports = {
app: {
host: process.env.HOST || '127.0.0.1',
port: numberFromEnv('PORT', 3200),
logLevel: process.env.LOG_LEVEL || 'info'
},
db: {
host: process.env.MYSQL_HOST || '127.0.0.1',
port: numberFromEnv('MYSQL_PORT', 3306),
user: process.env.MYSQL_USER || '',
password: process.env.MYSQL_PASSWORD || '',
database: process.env.MYSQL_DATABASE || ''
},
crypto: {
secret: process.env.APP_ENCRYPTION_SECRET || ''
},
sharedCache: {
writeToken: process.env.SHARED_CACHE_WRITE_TOKEN || ''
},
redis: {
enabled: String(process.env.REDIS_ENABLED || 'true').toLowerCase() !== 'false',
url: process.env.REDIS_URL || 'redis://127.0.0.1:6379'
}
};

49
server/src/crypto.js Normal file
View File

@@ -0,0 +1,49 @@
const crypto = require('crypto');
const config = require('./config');
function getKey() {
if (!config.crypto.secret) {
throw new Error('APP_ENCRYPTION_SECRET 未配置');
}
return crypto.createHash('sha256').update(String(config.crypto.secret)).digest();
}
function encryptJson(value) {
const plainText = JSON.stringify(value);
const iv = crypto.randomBytes(12);
const cipher = crypto.createCipheriv('aes-256-gcm', getKey(), iv);
const encrypted = Buffer.concat([cipher.update(plainText, 'utf8'), cipher.final()]);
const tag = cipher.getAuthTag();
const payloadHash = crypto.createHash('sha256').update(plainText).digest('hex');
return {
ciphertext: encrypted.toString('base64'),
iv: iv.toString('base64'),
tag: tag.toString('base64'),
payloadHash
};
}
function decryptJson(record) {
const decipher = crypto.createDecipheriv(
'aes-256-gcm',
getKey(),
Buffer.from(record.iv, 'base64')
);
decipher.setAuthTag(Buffer.from(record.tag, 'base64'));
const decrypted = Buffer.concat([
decipher.update(Buffer.from(record.ciphertext, 'base64')),
decipher.final()
]);
return JSON.parse(decrypted.toString('utf8'));
}
function sha256(value) {
return crypto.createHash('sha256').update(String(value || '')).digest('hex');
}
module.exports = {
encryptJson,
decryptJson,
sha256
};

49
server/src/db.js Normal file
View File

@@ -0,0 +1,49 @@
const mysql = require('mysql2/promise');
const config = require('./config');
let pool = null;
function getPool() {
if (pool) {
return pool;
}
if (!config.db.host || !config.db.user || !config.db.database) {
throw new Error('MySQL 配置不完整');
}
pool = mysql.createPool({
host: config.db.host,
port: config.db.port,
user: config.db.user,
password: config.db.password,
database: config.db.database,
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0,
charset: 'utf8mb4'
});
return pool;
}
async function query(sql, params) {
const [rows] = await getPool().query(sql, params || []);
return rows;
}
async function execute(sql, params) {
const [result] = await getPool().execute(sql, params || []);
return result;
}
async function ping() {
await query('SELECT 1 AS ok');
}
module.exports = {
getPool,
query,
execute,
ping
};

61
server/src/index.js Normal file
View File

@@ -0,0 +1,61 @@
const Fastify = require('fastify');
const cors = require('@fastify/cors');
const rateLimit = require('@fastify/rate-limit');
const config = require('./config');
const authRoutes = require('./routes/auth');
const sharedCacheRoutes = require('./routes/shared-cache');
const vaultRoutes = require('./routes/vault');
async function buildApp() {
const app = Fastify({
logger: {
level: config.app.logLevel
},
bodyLimit: 1024 * 1024
});
await app.register(cors, {
origin: true,
credentials: false
});
await app.register(rateLimit, {
max: 120,
timeWindow: '1 minute'
});
app.get('/', async function () {
return {
ok: true,
service: 'magnet-cloud-cache-server',
version: '0.1.0'
};
});
await app.register(authRoutes);
await app.register(sharedCacheRoutes);
await app.register(vaultRoutes);
return app;
}
async function start() {
const app = await buildApp();
try {
await app.listen({
host: config.app.host,
port: config.app.port
});
app.log.info('server started');
} catch (error) {
app.log.error(error);
process.exit(1);
}
}
if (require.main === module) {
start();
}
module.exports = {
buildApp
};

78
server/src/redis.js Normal file
View File

@@ -0,0 +1,78 @@
const { createClient } = require('redis');
let clientPromise = null;
async function getRedisClient(config) {
if (!config || !config.redis || !config.redis.enabled || !config.redis.url) {
return null;
}
if (clientPromise) {
return clientPromise;
}
clientPromise = (async function() {
const client = createClient({ url: config.redis.url });
client.on('error', function () {
return null;
});
await client.connect();
return client;
})().catch(function() {
clientPromise = null;
return null;
});
return clientPromise;
}
async function getJson(config, key) {
const client = await getRedisClient(config);
if (!client) {
return null;
}
try {
const value = await client.get(key);
return value ? JSON.parse(value) : null;
} catch (error) {
return null;
}
}
async function setJson(config, key, value, ttlSeconds) {
const client = await getRedisClient(config);
if (!client) {
return false;
}
try {
await client.set(key, JSON.stringify(value), {
EX: Math.max(1, Number(ttlSeconds) || 60)
});
return true;
} catch (error) {
return false;
}
}
async function delByPattern(config, pattern) {
const client = await getRedisClient(config);
if (!client) {
return 0;
}
try {
const keys = [];
for await (const key of client.scanIterator({ MATCH: pattern, COUNT: 100 })) {
keys.push(key);
}
if (keys.length > 0) {
await client.del(keys);
}
return keys.length;
} catch (error) {
return 0;
}
}
module.exports = {
getRedisClient,
getJson,
setJson,
delByPattern
};

169
server/src/routes/auth.js Normal file
View File

@@ -0,0 +1,169 @@
const db = require('../db');
const {
createAuthToken,
getAuthContextFromToken,
hashPassword,
normalizeEmail,
parseJsonMaybe,
revokeAuthToken,
verifyPassword
} = require('../auth');
function normalizeString(value, limit) {
return String(value || '').trim().slice(0, limit);
}
function normalizeDeviceFingerprint(value) {
return normalizeString(value, 191);
}
function normalizeDeviceName(value) {
return normalizeString(value, 191) || 'Chrome Extension';
}
async function upsertDevice(userId, deviceName, deviceFingerprint) {
if (!deviceFingerprint) {
return;
}
await db.execute(
'INSERT INTO devices (user_id, device_name, device_fingerprint, last_seen_at) VALUES (?, ?, ?, CURRENT_TIMESTAMP) ON DUPLICATE KEY UPDATE device_name = VALUES(device_name), last_seen_at = CURRENT_TIMESTAMP',
[userId, deviceName, deviceFingerprint]
);
}
async function routes(fastify) {
fastify.post('/api/auth/register', async function (request, reply) {
var body = request.body || {};
var email = normalizeEmail(body.email);
var password = String(body.password || '');
var wrappedDek = normalizeString(body.wrappedDek, 5000);
var kdfSalt = normalizeString(body.kdfSalt, 255);
var keyVersion = Math.max(1, Number(body.keyVersion) || 1);
var deviceName = normalizeDeviceName(body.deviceName);
var deviceFingerprint = normalizeDeviceFingerprint(body.deviceFingerprint);
var kdfParams = body.kdfParams && typeof body.kdfParams === 'object' ? body.kdfParams : {};
if (!email || email.indexOf('@') === -1) {
reply.code(400);
return { ok: false, error: '请检查邮箱格式后再试' };
}
if (password.length < 6) {
reply.code(400);
return { ok: false, error: '密码长度还不够,请再确认一下' };
}
if (!wrappedDek || !kdfSalt) {
reply.code(400);
return { ok: false, error: '当前初始化信息不完整,请稍后再试' };
}
var existingRows = await db.query('SELECT id FROM users WHERE email = ? LIMIT 1', [email]);
if (existingRows.length > 0) {
reply.code(409);
return { ok: false, error: '这个邮箱已经可以直接登录了' };
}
var passwordHash = await hashPassword(password);
var userResult = await db.execute(
'INSERT INTO users (email, password_hash, status) VALUES (?, ?, 1)',
[email, passwordHash]
);
var userId = Number(userResult.insertId);
await db.execute(
'INSERT INTO user_keyrings (user_id, wrapped_dek, kdf_salt, kdf_params, key_version) VALUES (?, ?, ?, ?, ?)',
[userId, wrappedDek, kdfSalt, JSON.stringify(kdfParams), keyVersion]
);
await upsertDevice(userId, deviceName, deviceFingerprint);
var token = await createAuthToken(userId, deviceFingerprint);
return {
ok: true,
token: token,
user: { id: userId, email: email },
keyring: {
wrappedDek: wrappedDek,
kdfSalt: kdfSalt,
kdfParams: kdfParams,
keyVersion: keyVersion
}
};
});
fastify.post('/api/auth/login', async function (request, reply) {
var body = request.body || {};
var email = normalizeEmail(body.email);
var password = String(body.password || '');
var deviceName = normalizeDeviceName(body.deviceName);
var deviceFingerprint = normalizeDeviceFingerprint(body.deviceFingerprint);
var rows = await db.query(
'SELECT u.id, u.email, u.password_hash, u.status, k.wrapped_dek, k.kdf_salt, k.kdf_params, k.key_version FROM users u LEFT JOIN user_keyrings k ON k.user_id = u.id WHERE u.email = ? LIMIT 1',
[email]
);
if (!rows.length) {
reply.code(401);
return { ok: false, error: '账号信息没有对上,请再确认一下' };
}
var row = rows[0];
if (Number(row.status || 0) !== 1) {
reply.code(403);
return { ok: false, error: '当前账号暂时无法使用' };
}
var valid = await verifyPassword(password, row.password_hash);
if (!valid) {
reply.code(401);
return { ok: false, error: '账号信息没有对上,请再确认一下' };
}
await upsertDevice(row.id, deviceName, deviceFingerprint);
var token = await createAuthToken(row.id, deviceFingerprint);
return {
ok: true,
token: token,
user: { id: row.id, email: row.email },
keyring: {
wrappedDek: row.wrapped_dek,
kdfSalt: row.kdf_salt,
kdfParams: parseJsonMaybe(row.kdf_params) || {},
keyVersion: Number(row.key_version || 1)
}
};
});
fastify.get('/api/auth/me', async function (request, reply) {
var header = request.headers.authorization || '';
var match = String(header).match(/^Bearer\s+(.+)$/i);
if (!match) {
reply.code(401);
return { ok: false, error: '先登录后就可以继续了' };
}
var authContext = await getAuthContextFromToken(match[1]);
if (!authContext) {
reply.code(401);
return { ok: false, error: '登录状态需要重新确认一下' };
}
return {
ok: true,
user: authContext.user,
keyring: authContext.keyring
};
});
fastify.post('/api/auth/logout', async function (request, reply) {
var header = request.headers.authorization || '';
var match = String(header).match(/^Bearer\s+(.+)$/i);
if (!match) {
reply.code(400);
return { ok: false, error: '当前登录信息还没带上' };
}
var authContext = await getAuthContextFromToken(match[1]);
if (!authContext) {
return { ok: true };
}
await revokeAuthToken(authContext.tokenHash);
return { ok: true };
});
}
module.exports = routes;

View File

@@ -0,0 +1,477 @@
const db = require('../db');
const config = require('../config');
const { encryptJson, decryptJson, sha256 } = require('../crypto');
const redisCache = require('../redis');
function requireSharedCacheWrite(request, reply) {
const token = String(request.headers['x-shared-cache-write-token'] || '');
if (!config.sharedCache.writeToken || token !== config.sharedCache.writeToken) {
reply.code(401);
return { ok: false, error: 'shared cache write unauthorized' };
}
return null;
}
function normalizeThread(thread) {
if (!thread || typeof thread !== 'object') return null;
const forumKey = typeof thread.forumKey === 'string' ? thread.forumKey.trim() : '';
const threadKey = typeof thread.threadKey === 'string' ? thread.threadKey.trim() : '';
const url = typeof thread.url === 'string' ? thread.url.trim() : '';
if (!forumKey || !threadKey) return null;
return {
forumKey,
threadKey,
url,
title: typeof thread.title === 'string' ? thread.title : '',
magnets: Array.isArray(thread.magnets) ? thread.magnets.filter(Boolean) : [],
lastSeenAt: Number(thread.lastSeenAt) || Date.now()
};
}
function normalizeCoverage(payload) {
const forumKey = typeof payload.forumKey === 'string' ? payload.forumKey.trim() : '';
const startPage = Math.max(1, Number(payload.startPage) || 1);
const endPage = Math.max(startPage, Number(payload.endPage) || startPage);
if (!forumKey) return null;
return {
forumKey,
startPage,
endPage,
strategy: typeof payload.strategy === 'string' && payload.strategy ? payload.strategy : 'full_live',
crawledAt: Number(payload.crawledAt) || Date.now(),
threads: Array.isArray(payload.threads) ? payload.threads : []
};
}
function normalizePageCoverage(payload) {
const forumKey = typeof payload.forumKey === 'string' ? payload.forumKey.trim() : '';
const page = Math.max(1, Number(payload.page) || 1);
if (!forumKey) return null;
return {
forumKey,
page,
crawledAt: Number(payload.crawledAt) || Date.now(),
threads: Array.isArray(payload.threads) ? payload.threads : []
};
}
function normalizePlanPayload(payload) {
const forumKey = typeof payload.forumKey === 'string' ? payload.forumKey.trim() : '';
const startPage = Math.max(1, Number(payload.startPage) || 1);
const endPage = Math.max(startPage, Number(payload.endPage) || startPage);
const frontRefreshPages = Math.max(0, Number(payload.frontRefreshPages) || 0);
if (!forumKey) return null;
return { forumKey, startPage, endPage, frontRefreshPages };
}
function buildPlanCacheKey(payload) {
return [
'coverage-plan',
payload.forumKey,
payload.startPage,
payload.endPage,
payload.frontRefreshPages
].join(':');
}
async function getThreadMapForKeys(threadKeys) {
if (!Array.isArray(threadKeys) || threadKeys.length === 0) {
return [];
}
const placeholders = threadKeys.map(function () { return '?'; }).join(',');
const rows = await db.query(
'SELECT thread_key, payload_ciphertext, payload_iv, payload_tag FROM shared_thread_cache WHERE thread_key IN (' + placeholders + ')',
threadKeys
);
return rows.map(function (row) {
return decryptJson({
ciphertext: row.payload_ciphertext,
iv: row.payload_iv,
tag: row.payload_tag
});
}).filter(Boolean);
}
async function buildHydratedCoverageBlock(record, clippedStart, clippedEnd) {
const payload = decryptJson({
ciphertext: record.payload_ciphertext,
iv: record.payload_iv,
tag: record.payload_tag
});
const threads = Array.isArray(payload.threads) ? payload.threads : [];
if (!threads.length) {
return null;
}
return {
forumKey: record.forum_key,
startPage: clippedStart,
endPage: clippedEnd,
crawledAt: new Date(record.crawled_at).getTime(),
strategy: record.strategy || 'full_live',
frontRefreshPages: 0,
threads: threads
};
}
async function buildCoveragePlan(payload) {
const exactRows = await db.query(
'SELECT forum_key, start_page, end_page, strategy, thread_count, crawled_at, payload_ciphertext, payload_iv, payload_tag FROM shared_coverage_cache WHERE forum_key = ? AND start_page = ? AND end_page = ? ORDER BY crawled_at DESC LIMIT 1',
[payload.forumKey, payload.startPage, payload.endPage]
);
if (exactRows.length) {
const exactCoverage = await buildHydratedCoverageBlock(exactRows[0], payload.startPage, payload.endPage);
if (exactCoverage) {
return { ok: true, exactCoverage, cachedBlocks: [], shiftedCoverage: null, source: 'server_exact' };
}
}
const pageRows = await db.query(
'SELECT forum_key, page, crawled_at, payload_ciphertext, payload_iv, payload_tag FROM shared_page_cache WHERE forum_key = ? AND page BETWEEN ? AND ? ORDER BY page ASC',
[payload.forumKey, payload.startPage, payload.endPage]
);
const pageBlocks = [];
let currentBlock = null;
pageRows.forEach(function (row) {
const page = Number(row.page || 0);
if (!currentBlock || page !== currentBlock.endPage + 1) {
if (currentBlock) {
pageBlocks.push(currentBlock);
}
currentBlock = { startPage: page, endPage: page, rows: [row] };
return;
}
currentBlock.endPage = page;
currentBlock.rows.push(row);
});
if (currentBlock) {
pageBlocks.push(currentBlock);
}
const hydratedPageBlocks = pageBlocks.map(function (block) {
const merged = Object.create(null);
const threads = [];
let latestCrawledAt = 0;
block.rows.forEach(function (row) {
latestCrawledAt = Math.max(latestCrawledAt, new Date(row.crawled_at).getTime());
const payloadData = decryptJson({
ciphertext: row.payload_ciphertext,
iv: row.payload_iv,
tag: row.payload_tag
});
(Array.isArray(payloadData.threads) ? payloadData.threads : []).forEach(function (thread) {
const key = String(thread.threadKey || thread.url || '');
if (!key || merged[key]) {
return;
}
merged[key] = true;
threads.push(thread);
});
});
return {
forumKey: payload.forumKey,
startPage: block.startPage,
endPage: block.endPage,
crawledAt: latestCrawledAt,
strategy: 'assembled_pages',
frontRefreshPages: 0,
threads: threads
};
}).filter(function (block) {
return Array.isArray(block.threads) && block.threads.length > 0;
});
const coverageRows = await db.query(
'SELECT forum_key, start_page, end_page, strategy, thread_count, crawled_at, payload_ciphertext, payload_iv, payload_tag FROM shared_coverage_cache WHERE forum_key = ? AND start_page <= ? AND end_page >= ? ORDER BY start_page ASC, end_page DESC',
[payload.forumKey, payload.endPage, payload.startPage]
);
const hydratedCoverageBlocks = [];
for (const row of coverageRows) {
const clippedStart = Math.max(payload.startPage, Number(row.start_page || payload.startPage));
const clippedEnd = Math.min(payload.endPage, Number(row.end_page || payload.endPage));
if (clippedStart > clippedEnd) {
continue;
}
const block = await buildHydratedCoverageBlock(row, clippedStart, clippedEnd);
if (block) {
hydratedCoverageBlocks.push(block);
}
}
const allBlocks = hydratedPageBlocks.concat(hydratedCoverageBlocks).sort(function (a, b) {
const startDiff = Number(a.startPage || 0) - Number(b.startPage || 0);
if (startDiff !== 0) return startDiff;
return Number(b.endPage || 0) - Number(a.endPage || 0);
});
let shiftedCoverage = null;
if (payload.startPage === 1 && payload.frontRefreshPages > 0) {
const anchorRows = await db.query(
'SELECT forum_key, start_page, end_page, strategy, thread_count, crawled_at, payload_ciphertext, payload_iv, payload_tag FROM shared_coverage_cache WHERE forum_key = ? AND start_page = 1 ORDER BY crawled_at DESC, end_page DESC LIMIT 1',
[payload.forumKey]
);
if (anchorRows.length) {
const anchor = await buildHydratedCoverageBlock(anchorRows[0], 1, Math.min(payload.endPage, Number(anchorRows[0].end_page || payload.endPage)));
if (anchor) {
shiftedCoverage = {
forumKey: anchor.forumKey,
sourceStartPage: anchor.startPage,
sourceEndPage: Number(anchorRows[0].end_page || anchor.endPage),
crawledAt: anchor.crawledAt,
strategy: anchor.strategy,
threads: anchor.threads,
reusedStartPage: Math.min(payload.endPage, payload.startPage + payload.frontRefreshPages),
reusedEndPage: Math.min(payload.endPage, Number(anchorRows[0].end_page || payload.endPage) + payload.frontRefreshPages)
};
if (shiftedCoverage.reusedStartPage > shiftedCoverage.reusedEndPage) {
shiftedCoverage = null;
}
}
}
}
return {
ok: true,
exactCoverage: null,
cachedBlocks: allBlocks,
shiftedCoverage: shiftedCoverage,
source: 'server_plan'
};
}
async function getCloudStats() {
const countsRows = await db.query(
"SELECT (SELECT COUNT(*) FROM shared_thread_cache) AS thread_count, (SELECT COUNT(*) FROM shared_thread_cache WHERE magnet_count > 0) AS magnet_thread_count, (SELECT COUNT(*) FROM shared_coverage_cache) AS coverage_count, (SELECT COUNT(*) FROM shared_page_cache) AS page_count, (SELECT COUNT(*) FROM vault_items) AS vault_count, (SELECT COUNT(*) FROM users) AS user_count"
);
const latestRows = await db.query(
"SELECT (SELECT MAX(updated_at) FROM shared_thread_cache) AS latest_thread_time, (SELECT MAX(updated_at) FROM shared_coverage_cache) AS latest_coverage_time, (SELECT MAX(updated_at) FROM shared_page_cache) AS latest_page_time"
);
const sizeRows = await db.query(
"SELECT table_name, table_rows, ROUND((data_length + index_length) / 1024 / 1024, 2) AS size_mb FROM information_schema.tables WHERE table_schema = DATABASE() ORDER BY (data_length + index_length) DESC"
);
return {
ok: true,
counts: {
threads: Number(countsRows[0].thread_count || 0),
magnetThreads: Number(countsRows[0].magnet_thread_count || 0),
coverages: Number(countsRows[0].coverage_count || 0),
pages: Number(countsRows[0].page_count || 0),
vaultItems: Number(countsRows[0].vault_count || 0),
users: Number(countsRows[0].user_count || 0)
},
latest: {
threads: latestRows[0].latest_thread_time,
coverages: latestRows[0].latest_coverage_time,
pages: latestRows[0].latest_page_time
},
tables: sizeRows.map(function(row) {
return {
tableName: row.table_name,
rowCount: Number(row.table_rows || 0),
sizeMb: Number(row.size_mb || 0)
};
})
};
}
async function routes(fastify) {
fastify.get('/health', async function () {
return { ok: true, service: 'magnet-cloud-cache-server' };
});
fastify.get('/ready', async function (_, reply) {
try {
await db.ping();
return { ok: true };
} catch (error) {
reply.code(500);
return { ok: false, error: error.message };
}
});
fastify.get('/api/shared-cache/stats', async function () {
return getCloudStats();
});
fastify.post('/api/shared-cache/threads/lookup', async function (request) {
const items = Array.isArray(request.body && request.body.threads) ? request.body.threads : [];
const normalized = items.map(normalizeThread).filter(Boolean);
if (normalized.length === 0) {
return { ok: true, threads: [] };
}
const forumKey = normalized[0].forumKey;
const threadKeys = normalized.map(function (item) { return item.threadKey; });
const placeholders = threadKeys.map(function () { return '?'; }).join(',');
const rows = await db.query(
'SELECT forum_key, thread_key, payload_ciphertext, payload_iv, payload_tag, updated_at FROM shared_thread_cache WHERE forum_key = ? AND thread_key IN (' + placeholders + ')',
[forumKey].concat(threadKeys)
);
const result = rows.map(function (row) {
return decryptJson({
ciphertext: row.payload_ciphertext,
iv: row.payload_iv,
tag: row.payload_tag
});
});
return { ok: true, threads: result };
});
fastify.post('/api/shared-cache/threads/upsert', async function (request, reply) {
const denied = requireSharedCacheWrite(request, reply);
if (denied) return denied;
const items = Array.isArray(request.body && request.body.threads) ? request.body.threads : [];
const normalized = items.map(normalizeThread).filter(Boolean);
let saved = 0;
for (const item of normalized) {
const encrypted = encryptJson(item);
await db.execute(
'INSERT INTO shared_thread_cache (forum_key, thread_key, url_hash, title_hash, magnet_count, payload_ciphertext, payload_iv, payload_tag, payload_hash, last_seen_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, FROM_UNIXTIME(? / 1000)) ON DUPLICATE KEY UPDATE magnet_count = IF(VALUES(magnet_count) > 0 AND VALUES(last_seen_at) >= last_seen_at, VALUES(magnet_count), magnet_count), payload_ciphertext = IF(VALUES(magnet_count) > 0 AND VALUES(last_seen_at) >= last_seen_at, VALUES(payload_ciphertext), payload_ciphertext), payload_iv = IF(VALUES(magnet_count) > 0 AND VALUES(last_seen_at) >= last_seen_at, VALUES(payload_iv), payload_iv), payload_tag = IF(VALUES(magnet_count) > 0 AND VALUES(last_seen_at) >= last_seen_at, VALUES(payload_tag), payload_tag), payload_hash = IF(VALUES(magnet_count) > 0 AND VALUES(last_seen_at) >= last_seen_at, VALUES(payload_hash), payload_hash), last_seen_at = GREATEST(last_seen_at, VALUES(last_seen_at)), updated_at = IF(VALUES(magnet_count) > 0 AND VALUES(last_seen_at) >= last_seen_at, CURRENT_TIMESTAMP, updated_at)',
[
item.forumKey,
item.threadKey,
sha256(item.url),
sha256(item.title),
item.magnets.length,
encrypted.ciphertext,
encrypted.iv,
encrypted.tag,
encrypted.payloadHash,
item.lastSeenAt
]
);
saved += 1;
}
return { ok: true, savedCount: saved };
});
fastify.post('/api/shared-cache/coverages/lookup', async function (request) {
const payload = normalizeCoverage(request.body || {});
if (!payload) {
return { ok: true, coverage: null };
}
const rows = await db.query(
'SELECT payload_ciphertext, payload_iv, payload_tag FROM shared_coverage_cache WHERE forum_key = ? AND start_page = ? AND end_page = ? AND strategy = ? LIMIT 1',
[payload.forumKey, payload.startPage, payload.endPage, payload.strategy]
);
if (!rows.length) {
return { ok: true, coverage: null };
}
return {
ok: true,
coverage: decryptJson({
ciphertext: rows[0].payload_ciphertext,
iv: rows[0].payload_iv,
tag: rows[0].payload_tag
})
};
});
fastify.post('/api/shared-cache/coverages/upsert', async function (request, reply) {
const denied = requireSharedCacheWrite(request, reply);
if (denied) return denied;
const payload = normalizeCoverage(request.body || {});
if (!payload) {
throw new Error('无效的 coverage 参数');
}
const encrypted = encryptJson(payload);
await db.execute(
'INSERT INTO shared_coverage_cache (forum_key, start_page, end_page, strategy, thread_count, crawled_at, payload_ciphertext, payload_iv, payload_tag, payload_hash) VALUES (?, ?, ?, ?, ?, FROM_UNIXTIME(? / 1000), ?, ?, ?, ?) ON DUPLICATE KEY UPDATE thread_count = IF(VALUES(crawled_at) >= crawled_at, VALUES(thread_count), thread_count), crawled_at = GREATEST(crawled_at, VALUES(crawled_at)), payload_ciphertext = IF(VALUES(crawled_at) >= crawled_at, VALUES(payload_ciphertext), payload_ciphertext), payload_iv = IF(VALUES(crawled_at) >= crawled_at, VALUES(payload_iv), payload_iv), payload_tag = IF(VALUES(crawled_at) >= crawled_at, VALUES(payload_tag), payload_tag), payload_hash = IF(VALUES(crawled_at) >= crawled_at, VALUES(payload_hash), payload_hash), updated_at = IF(VALUES(crawled_at) >= crawled_at, CURRENT_TIMESTAMP, updated_at)',
[
payload.forumKey,
payload.startPage,
payload.endPage,
payload.strategy,
payload.threads.length,
payload.crawledAt,
encrypted.ciphertext,
encrypted.iv,
encrypted.tag,
encrypted.payloadHash
]
);
redisCache.delByPattern(config, 'coverage-plan:' + payload.forumKey + ':*').catch(function () {});
return { ok: true };
});
fastify.post('/api/shared-cache/pages/lookup', async function (request) {
const payload = normalizePageCoverage(request.body || {});
if (!payload) {
return { ok: true, coverage: null };
}
const rows = await db.query(
'SELECT payload_ciphertext, payload_iv, payload_tag FROM shared_page_cache WHERE forum_key = ? AND page = ? LIMIT 1',
[payload.forumKey, payload.page]
);
if (!rows.length) {
return { ok: true, coverage: null };
}
return {
ok: true,
coverage: decryptJson({
ciphertext: rows[0].payload_ciphertext,
iv: rows[0].payload_iv,
tag: rows[0].payload_tag
})
};
});
fastify.post('/api/shared-cache/pages/upsert', async function (request, reply) {
const denied = requireSharedCacheWrite(request, reply);
if (denied) return denied;
const payload = normalizePageCoverage(request.body || {});
if (!payload) {
throw new Error('无效的 page coverage 参数');
}
const encrypted = encryptJson(payload);
await db.execute(
'INSERT INTO shared_page_cache (forum_key, page, thread_count, crawled_at, payload_ciphertext, payload_iv, payload_tag, payload_hash) VALUES (?, ?, ?, FROM_UNIXTIME(? / 1000), ?, ?, ?, ?) ON DUPLICATE KEY UPDATE thread_count = IF(VALUES(crawled_at) >= crawled_at, VALUES(thread_count), thread_count), crawled_at = GREATEST(crawled_at, VALUES(crawled_at)), payload_ciphertext = IF(VALUES(crawled_at) >= crawled_at, VALUES(payload_ciphertext), payload_ciphertext), payload_iv = IF(VALUES(crawled_at) >= crawled_at, VALUES(payload_iv), payload_iv), payload_tag = IF(VALUES(crawled_at) >= crawled_at, VALUES(payload_tag), payload_tag), payload_hash = IF(VALUES(crawled_at) >= crawled_at, VALUES(payload_hash), payload_hash), updated_at = IF(VALUES(crawled_at) >= crawled_at, CURRENT_TIMESTAMP, updated_at)',
[
payload.forumKey,
payload.page,
payload.threads.length,
payload.crawledAt,
encrypted.ciphertext,
encrypted.iv,
encrypted.tag,
encrypted.payloadHash
]
);
redisCache.delByPattern(config, 'coverage-plan:' + payload.forumKey + ':*').catch(function () {});
return { ok: true };
});
fastify.post('/api/shared-cache/coverages/plan', async function (request) {
const payload = normalizePlanPayload(request.body || {});
const cacheKey = payload ? buildPlanCacheKey(payload) : '';
let cachedPlan = null;
if (!payload) {
return { ok: false, error: 'invalid coverage plan payload' };
}
cachedPlan = await redisCache.getJson(config, cacheKey);
if (cachedPlan) {
return Object.assign({}, cachedPlan, { source: 'redis_plan' });
}
const plan = await buildCoveragePlan(payload);
await redisCache.setJson(config, cacheKey, plan, 90);
return plan;
});
}
module.exports = routes;

View File

@@ -0,0 +1,96 @@
const db = require('../db');
const { requireAuth } = require('../auth');
function normalizeVaultItem(item) {
if (!item || typeof item !== 'object') {
return null;
}
var itemType = String(item.itemType || '').trim().slice(0, 64);
var itemKey = String(item.itemKey || '').trim().slice(0, 191);
var payloadCiphertext = String(item.payloadCiphertext || '').trim();
var payloadIv = String(item.payloadIv || '').trim().slice(0, 128);
var payloadTag = String(item.payloadTag || '').trim().slice(0, 128);
var payloadHash = String(item.payloadHash || '').trim().slice(0, 64);
var keyVersion = Math.max(1, Number(item.keyVersion) || 1);
if (!itemType || !itemKey || !payloadCiphertext || !payloadIv || !payloadTag || !payloadHash) {
return null;
}
return {
itemType: itemType,
itemKey: itemKey,
payloadCiphertext: payloadCiphertext,
payloadIv: payloadIv,
payloadTag: payloadTag,
payloadHash: payloadHash,
keyVersion: keyVersion
};
}
async function routes(fastify) {
fastify.addHook('preHandler', requireAuth);
fastify.post('/api/vault/push', async function (request, reply) {
var items = Array.isArray(request.body && request.body.items) ? request.body.items : [];
var normalized = items.map(normalizeVaultItem).filter(Boolean);
var index = 0;
var item = null;
if (normalized.length === 0) {
reply.code(400);
return { ok: false, error: '没有可保存的保险柜项目' };
}
for (index = 0; index < normalized.length; index++) {
item = normalized[index];
await db.execute(
'INSERT INTO vault_items (user_id, item_type, item_key, payload_ciphertext, payload_iv, payload_tag, payload_hash, key_version) VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE payload_ciphertext = VALUES(payload_ciphertext), payload_iv = VALUES(payload_iv), payload_tag = VALUES(payload_tag), payload_hash = VALUES(payload_hash), key_version = VALUES(key_version), updated_at = CURRENT_TIMESTAMP',
[
request.authContext.user.id,
item.itemType,
item.itemKey,
item.payloadCiphertext,
item.payloadIv,
item.payloadTag,
item.payloadHash,
item.keyVersion
]
);
}
return { ok: true, savedCount: normalized.length };
});
fastify.post('/api/vault/pull', async function (request) {
var itemTypes = Array.isArray(request.body && request.body.itemTypes) ? request.body.itemTypes.map(function (itemType) {
return String(itemType || '').trim().slice(0, 64);
}).filter(Boolean) : [];
var sql = 'SELECT item_type, item_key, payload_ciphertext, payload_iv, payload_tag, payload_hash, key_version, updated_at FROM vault_items WHERE user_id = ?';
var params = [request.authContext.user.id];
if (itemTypes.length > 0) {
sql += ' AND item_type IN (' + itemTypes.map(function () { return '?'; }).join(',') + ')';
params = params.concat(itemTypes);
}
sql += ' ORDER BY updated_at DESC';
var rows = await db.query(sql, params);
return {
ok: true,
items: rows.map(function (row) {
return {
itemType: row.item_type,
itemKey: row.item_key,
payloadCiphertext: row.payload_ciphertext,
payloadIv: row.payload_iv,
payloadTag: row.payload_tag,
payloadHash: row.payload_hash,
keyVersion: Number(row.key_version || 1),
updatedAt: row.updated_at
};
})
};
});
}
module.exports = routes;