lifekline / server /analyzeParallelStream.js
miounet11
升级
25fe4a1
/**
* 并行分析流处理器
* 支持6个Agent并行执行,渐进式SSE推送
* K线分为过去(出生到今年)和未来(今年到100岁)两个并行请求
* 支持缓存命中时直接返回
*/
import { nanoid } from 'nanoid';
import {
updateUserPoints,
saveUserInput,
saveAnalysis,
logEvent,
} from './database.js';
import { calculateLifeTimeline } from './baziCalculator.js';
import { runParallelAgents, mergeAgentResults, sendSSE } from './parallelAnalyzer.js';
import {
computeBaziHash,
getCachedAnalysis,
cacheAnalysis,
extractCoreData,
mergeCachedWithFresh,
hasCachedAnalysis,
} from './cacheManager.js';
const COST_PER_ANALYSIS = process.env.COST_PER_ANALYSIS ? parseInt(process.env.COST_PER_ANALYSIS, 10) : 50;
/**
* 并行分析流处理器
*/
export const handleParallelAnalyzeStream = async (req, res) => {
// 设置SSE响应头
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.setHeader('X-Accel-Buffering', 'no');
const body = req.body || {};
const useCustomApi = Boolean(body.useCustomApi);
const skipCache = Boolean(body.skipCache); // 是否跳过缓存
let authedInfo = req.__authedInfo || null;
const input = {
name: body.name || '',
birthPlace: body.birthPlace || '',
gender: body.gender,
birthYear: body.birthYear,
yearPillar: body.yearPillar,
monthPillar: body.monthPillar,
dayPillar: body.dayPillar,
hourPillar: body.hourPillar,
startAge: body.startAge,
firstDaYun: body.firstDaYun,
};
const inputId = nanoid();
const startTime = Date.now();
// 进度回调
const onProgress = (message) => {
sendSSE(res, 'progress', { message, timestamp: Date.now() });
};
// 启动心跳保活
const keepAliveInterval = setInterval(() => {
if (!res.writableEnded) {
res.write(': keep-alive\n\n');
}
}, 10000);
const cleanup = () => clearInterval(keepAliveInterval);
res.on('close', cleanup);
res.on('finish', cleanup);
sendSSE(res, 'progress', { message: '正在初始化并行分析系统...', phase: 'init' });
// 计算八字哈希
const baziHash = computeBaziHash(
input.yearPillar,
input.monthPillar,
input.dayPillar,
input.hourPillar
);
const genderKey = input.gender === 'Male' ? 'male' : 'female';
sendSSE(res, 'progress', { message: `八字哈希: ${baziHash}`, phase: 'hash' });
// 检查缓存
if (!skipCache && !useCustomApi) {
const cachedData = getCachedAnalysis(baziHash, genderKey);
if (cachedData) {
sendSSE(res, 'cache_hit', {
message: '✓ 命中永久缓存,直接返回一致性结果',
baziHash,
cachedAt: cachedData.createdAt,
});
// 从缓存构建结果
const cachedResult = mergeCachedWithFresh(cachedData);
const finalResult = {
chartData: cachedData.klineData || [],
analysis: cachedResult,
};
// 处理用户积分(缓存命中也扣分)
let user = null;
let cost = 0;
let isGuest = false;
if (authedInfo) {
const newPoints = Math.max(0, authedInfo.user.points - COST_PER_ANALYSIS);
updateUserPoints(authedInfo.user.id, newPoints);
cost = COST_PER_ANALYSIS;
user = { id: authedInfo.user.id, email: authedInfo.user.email, points: newPoints };
logEvent('info', '缓存命中分析', { baziHash, cost }, authedInfo.user.id, req.ip);
} else {
isGuest = true;
logEvent('info', '游客缓存命中', { baziHash }, null, req.ip);
}
// 发送完成事件
sendSSE(res, 'complete', {
result: finalResult,
user,
cost,
isGuest,
fromCache: true,
processingTimeMs: Date.now() - startTime,
});
return res.end();
} else {
sendSSE(res, 'cache_miss', { message: '缓存未命中,启动并行分析...', baziHash });
}
}
// 预计算生命周期骨架
let skeletonData = null;
try {
skeletonData = calculateLifeTimeline(input);
sendSSE(res, 'progress', { message: '✓ 已生成100年流年骨架', phase: 'skeleton' });
} catch (err) {
console.error('骨架计算失败:', err);
sendSSE(res, 'error', {
error: 'SKELETON_CALC_FAILED',
message: '流年骨架计算失败,请检查输入数据'
});
return res.end();
}
// 执行并行Agent分析
sendSSE(res, 'parallel_start', {
message: '🚀 启动6个专业Agent并行分析...',
agents: ['core', 'kline_past', 'kline_future', 'career', 'marriage', 'crypto'],
});
const parallelResult = await runParallelAgents(input, skeletonData, res, onProgress);
if (!parallelResult.success) {
sendSSE(res, 'error', {
error: 'ALL_AGENTS_FAILED',
message: '所有Agent分析均失败,请稍后重试'
});
return res.end();
}
// 合并Agent结果
sendSSE(res, 'progress', { message: '正在合并分析结果...', phase: 'merge' });
const mergedAnalysis = mergeAgentResults(parallelResult.results, skeletonData);
const finalResult = {
chartData: mergedAnalysis.chartPoints || [],
analysis: {
bazi: mergedAnalysis.bazi || [],
summary: mergedAnalysis.summary || '命理分析完成',
summaryScore: mergedAnalysis.summaryScore || 5,
personality: mergedAnalysis.personality || '',
personalityScore: mergedAnalysis.personalityScore || 5,
industry: mergedAnalysis.industry || '',
industryScore: mergedAnalysis.industryScore || 5,
fengShui: mergedAnalysis.fengShui || '',
fengShuiScore: mergedAnalysis.fengShuiScore || 5,
wealth: mergedAnalysis.wealth || '',
wealthScore: mergedAnalysis.wealthScore || 5,
marriage: mergedAnalysis.marriage || '',
marriageScore: mergedAnalysis.marriageScore || 5,
health: mergedAnalysis.health || '',
healthScore: mergedAnalysis.healthScore || 5,
family: mergedAnalysis.family || '',
familyScore: mergedAnalysis.familyScore || 5,
crypto: mergedAnalysis.crypto || '',
cryptoScore: mergedAnalysis.cryptoScore || 5,
cryptoYear: mergedAnalysis.cryptoYear || '待定',
cryptoStyle: mergedAnalysis.cryptoStyle || '现货定投',
// 扩展字段
appearance: mergedAnalysis.appearance,
bodyType: mergedAnalysis.bodyType,
skin: mergedAnalysis.skin,
characterSummary: mergedAnalysis.characterSummary,
monthlyFortune: mergedAnalysis.monthlyFortune,
monthlyHighlights: mergedAnalysis.monthlyHighlights,
yearlyFortune: mergedAnalysis.yearlyFortune,
yearlyKeyEvents: mergedAnalysis.yearlyKeyEvents,
luckyColors: mergedAnalysis.luckyColors,
luckyDirections: mergedAnalysis.luckyDirections,
luckyZodiac: mergedAnalysis.luckyZodiac,
luckyNumbers: mergedAnalysis.luckyNumbers,
keyDatesThisMonth: mergedAnalysis.keyDatesThisMonth,
keyDatesThisYear: mergedAnalysis.keyDatesThisYear,
pastEvents: mergedAnalysis.pastEvents,
futureEvents: mergedAnalysis.futureEvents,
keyYears: mergedAnalysis.keyYears,
peakYears: mergedAnalysis.peakYears,
troughYears: mergedAnalysis.troughYears,
healthBodyParts: mergedAnalysis.healthBodyParts,
},
};
// 保存到缓存(永久缓存)
if (!useCustomApi && !skipCache) {
try {
const coreData = extractCoreData(finalResult.analysis, finalResult.chartData);
cacheAnalysis({
baziHash,
gender: genderKey,
...coreData,
modelUsed: 'parallel-agents',
version: 1,
});
sendSSE(res, 'progress', { message: '✓ 结果已存入永久缓存', phase: 'cache' });
} catch (cacheErr) {
console.error('缓存保存失败:', cacheErr);
// 缓存失败不影响主流程
}
}
// 处理用户数据
let user = null;
let cost = 0;
let isGuest = false;
if (!useCustomApi) {
// 保存用户输入
saveUserInput({
id: inputId,
userId: authedInfo ? authedInfo.user.id : null,
name: input.name,
gender: input.gender,
birthYear: input.birthYear,
yearPillar: input.yearPillar,
monthPillar: input.monthPillar,
dayPillar: input.dayPillar,
hourPillar: input.hourPillar,
startAge: input.startAge,
firstDaYun: input.firstDaYun,
modelName: 'parallel-agents',
apiBaseUrl: '',
useCustomApi: false,
ipAddress: req.ip,
userAgent: req.get('User-Agent'),
});
const analysisId = nanoid();
if (authedInfo) {
const newPoints = Math.max(0, authedInfo.user.points - COST_PER_ANALYSIS);
updateUserPoints(authedInfo.user.id, newPoints);
cost = COST_PER_ANALYSIS;
saveAnalysis({
id: analysisId,
userId: authedInfo.user.id,
inputId: inputId,
cost,
modelUsed: 'parallel-agents',
chartData: finalResult.chartData,
analysisData: finalResult.analysis,
processingTimeMs: Date.now() - startTime,
status: 'completed',
});
logEvent('info', '并行分析完成', {
analysisId,
cost,
agents: parallelResult.completedAgents,
successCount: parallelResult.successCount,
}, authedInfo.user.id, req.ip);
user = { id: authedInfo.user.id, email: authedInfo.user.email, points: newPoints };
} else {
isGuest = true;
saveAnalysis({
id: analysisId,
userId: null,
inputId: inputId,
cost: 0,
modelUsed: 'parallel-agents',
chartData: finalResult.chartData,
analysisData: finalResult.analysis,
processingTimeMs: Date.now() - startTime,
status: 'completed',
});
logEvent('info', '游客并行分析', {
analysisId,
agents: parallelResult.completedAgents,
}, null, req.ip);
}
}
// 发送完成事件
sendSSE(res, 'complete', {
result: finalResult,
user,
cost,
isGuest,
fromCache: false,
processingTimeMs: Date.now() - startTime,
agentsUsed: parallelResult.completedAgents,
successCount: parallelResult.successCount,
totalAgents: parallelResult.totalAgents,
});
res.end();
};
export default handleParallelAnalyzeStream;