nodejs / src /services /StreamManager.js
clash-linux's picture
Upload 27 files
146bdba verified
import { PassThrough } from 'stream';
import { createLogger } from '../utils/logger.js';
const logger = createLogger('StreamManager');
/**
* 流管理器 - 负责管理和跟踪活跃的流
*/
export class StreamManager {
constructor() {
this.activeStreams = new Map();
}
/**
* 创建新的流
* @returns {PassThrough} 新创建的流
*/
createStream() {
const stream = new PassThrough();
let streamClosed = false;
// 重写stream.end方法,确保安全关闭
const originalEnd = stream.end;
stream.end = function(...args) {
if (streamClosed) return;
streamClosed = true;
return originalEnd.apply(this, args);
};
// 添加状态检查方法
stream.isClosed = () => streamClosed;
return stream;
}
/**
* 注册并管理流
* @param {string} clientId - 客户端ID
* @param {Stream} stream - 要管理的流
* @returns {Stream} 返回被管理的流
*/
register(clientId, stream) {
// 如果该客户端已有活跃流,先关闭它
if (this.activeStreams.has(clientId)) {
this.close(clientId);
}
// 注册新流
this.activeStreams.set(clientId, stream);
logger.debug(`注册客户端 ${clientId} 的新流`);
// 设置流事件监听器
stream.on('end', () => {
if (this.activeStreams.get(clientId) === stream) {
this.activeStreams.delete(clientId);
logger.debug(`客户端 ${clientId} 的流已结束并移除`);
}
});
stream.on('error', (error) => {
logger.error(`客户端 ${clientId} 的流错误: ${error.message}`);
if (this.activeStreams.get(clientId) === stream) {
this.activeStreams.delete(clientId);
}
});
return stream;
}
/**
* 关闭指定客户端的流
* @param {string} clientId - 客户端ID
*/
close(clientId) {
const stream = this.activeStreams.get(clientId);
if (stream) {
try {
logger.debug(`关闭客户端 ${clientId} 的流`);
stream.end();
this.activeStreams.delete(clientId);
} catch (error) {
logger.error(`关闭流时出错: ${error.message}`);
}
}
}
/**
* 获取指定客户端的流
* @param {string} clientId - 客户端ID
* @returns {Stream|null} 流对象或null
*/
get(clientId) {
return this.activeStreams.get(clientId) || null;
}
/**
* 检查客户端是否有活跃流
* @param {string} clientId - 客户端ID
* @returns {boolean}
*/
has(clientId) {
return this.activeStreams.has(clientId);
}
/**
* 获取活跃流的数量
* @returns {number}
*/
getActiveCount() {
return this.activeStreams.size;
}
/**
* 关闭所有流
*/
closeAll() {
logger.info(`关闭所有活跃流 (共 ${this.activeStreams.size} 个)`);
for (const [clientId, stream] of this.activeStreams) {
this.close(clientId);
}
}
/**
* 安全写入数据到流
* @param {Stream} stream - 目标流
* @param {string|Buffer} data - 要写入的数据
* @returns {boolean} 写入是否成功
*/
safeWrite(stream, data) {
if (!stream || stream.destroyed || (stream.isClosed && stream.isClosed())) {
return false;
}
try {
return stream.write(data);
} catch (error) {
logger.error(`流写入错误: ${error.message}`);
return false;
}
}
}
// 创建全局流管理器实例
export const streamManager = new StreamManager();