Spaces:
Running
Running
import { PassThrough } from 'stream'; | |
import { createLogger } from '../utils/logger.js'; | |
const logger = createLogger('StreamManager'); | |
/** | |
* 流管理器 - 负责管理和跟踪活跃的流 | |
*/ | |
export class StreamManager { | |
constructor() { | |
this.activeStreams = new Map(); | |
} | |
/** | |
* 创建新的流 | |
* @returns {PassThrough} 新创建的流 | |
*/ | |
createStream() { | |
const stream = new PassThrough(); | |
let streamClosed = false; | |
// 重写stream.end方法,确保安全关闭 | |
const originalEnd = stream.end; | |
stream.end = function(...args) { | |
if (streamClosed) return; | |
streamClosed = true; | |
return originalEnd.apply(this, args); | |
}; | |
// 添加状态检查方法 | |
stream.isClosed = () => streamClosed; | |
return stream; | |
} | |
/** | |
* 注册并管理流 | |
* @param {string} clientId - 客户端ID | |
* @param {Stream} stream - 要管理的流 | |
* @returns {Stream} 返回被管理的流 | |
*/ | |
register(clientId, stream) { | |
// 如果该客户端已有活跃流,先关闭它 | |
if (this.activeStreams.has(clientId)) { | |
this.close(clientId); | |
} | |
// 注册新流 | |
this.activeStreams.set(clientId, stream); | |
logger.debug(`注册客户端 ${clientId} 的新流`); | |
// 设置流事件监听器 | |
stream.on('end', () => { | |
if (this.activeStreams.get(clientId) === stream) { | |
this.activeStreams.delete(clientId); | |
logger.debug(`客户端 ${clientId} 的流已结束并移除`); | |
} | |
}); | |
stream.on('error', (error) => { | |
logger.error(`客户端 ${clientId} 的流错误: ${error.message}`); | |
if (this.activeStreams.get(clientId) === stream) { | |
this.activeStreams.delete(clientId); | |
} | |
}); | |
return stream; | |
} | |
/** | |
* 关闭指定客户端的流 | |
* @param {string} clientId - 客户端ID | |
*/ | |
close(clientId) { | |
const stream = this.activeStreams.get(clientId); | |
if (stream) { | |
try { | |
logger.debug(`关闭客户端 ${clientId} 的流`); | |
stream.end(); | |
this.activeStreams.delete(clientId); | |
} catch (error) { | |
logger.error(`关闭流时出错: ${error.message}`); | |
} | |
} | |
} | |
/** | |
* 获取指定客户端的流 | |
* @param {string} clientId - 客户端ID | |
* @returns {Stream|null} 流对象或null | |
*/ | |
get(clientId) { | |
return this.activeStreams.get(clientId) || null; | |
} | |
/** | |
* 检查客户端是否有活跃流 | |
* @param {string} clientId - 客户端ID | |
* @returns {boolean} | |
*/ | |
has(clientId) { | |
return this.activeStreams.has(clientId); | |
} | |
/** | |
* 获取活跃流的数量 | |
* @returns {number} | |
*/ | |
getActiveCount() { | |
return this.activeStreams.size; | |
} | |
/** | |
* 关闭所有流 | |
*/ | |
closeAll() { | |
logger.info(`关闭所有活跃流 (共 ${this.activeStreams.size} 个)`); | |
for (const [clientId, stream] of this.activeStreams) { | |
this.close(clientId); | |
} | |
} | |
/** | |
* 安全写入数据到流 | |
* @param {Stream} stream - 目标流 | |
* @param {string|Buffer} data - 要写入的数据 | |
* @returns {boolean} 写入是否成功 | |
*/ | |
safeWrite(stream, data) { | |
if (!stream || stream.destroyed || (stream.isClosed && stream.isClosed())) { | |
return false; | |
} | |
try { | |
return stream.write(data); | |
} catch (error) { | |
logger.error(`流写入错误: ${error.message}`); | |
return false; | |
} | |
} | |
} | |
// 创建全局流管理器实例 | |
export const streamManager = new StreamManager(); | |