Spaces:
Running
Running
File size: 3,673 Bytes
146bdba |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
import { PassThrough } from 'stream';
import { createLogger } from '../utils/logger.js';
const logger = createLogger('StreamManager');
/**
* 流管理器 - 负责管理和跟踪活跃的流
*/
export class StreamManager {
constructor() {
this.activeStreams = new Map();
}
/**
* 创建新的流
* @returns {PassThrough} 新创建的流
*/
createStream() {
const stream = new PassThrough();
let streamClosed = false;
// 重写stream.end方法,确保安全关闭
const originalEnd = stream.end;
stream.end = function(...args) {
if (streamClosed) return;
streamClosed = true;
return originalEnd.apply(this, args);
};
// 添加状态检查方法
stream.isClosed = () => streamClosed;
return stream;
}
/**
* 注册并管理流
* @param {string} clientId - 客户端ID
* @param {Stream} stream - 要管理的流
* @returns {Stream} 返回被管理的流
*/
register(clientId, stream) {
// 如果该客户端已有活跃流,先关闭它
if (this.activeStreams.has(clientId)) {
this.close(clientId);
}
// 注册新流
this.activeStreams.set(clientId, stream);
logger.debug(`注册客户端 ${clientId} 的新流`);
// 设置流事件监听器
stream.on('end', () => {
if (this.activeStreams.get(clientId) === stream) {
this.activeStreams.delete(clientId);
logger.debug(`客户端 ${clientId} 的流已结束并移除`);
}
});
stream.on('error', (error) => {
logger.error(`客户端 ${clientId} 的流错误: ${error.message}`);
if (this.activeStreams.get(clientId) === stream) {
this.activeStreams.delete(clientId);
}
});
return stream;
}
/**
* 关闭指定客户端的流
* @param {string} clientId - 客户端ID
*/
close(clientId) {
const stream = this.activeStreams.get(clientId);
if (stream) {
try {
logger.debug(`关闭客户端 ${clientId} 的流`);
stream.end();
this.activeStreams.delete(clientId);
} catch (error) {
logger.error(`关闭流时出错: ${error.message}`);
}
}
}
/**
* 获取指定客户端的流
* @param {string} clientId - 客户端ID
* @returns {Stream|null} 流对象或null
*/
get(clientId) {
return this.activeStreams.get(clientId) || null;
}
/**
* 检查客户端是否有活跃流
* @param {string} clientId - 客户端ID
* @returns {boolean}
*/
has(clientId) {
return this.activeStreams.has(clientId);
}
/**
* 获取活跃流的数量
* @returns {number}
*/
getActiveCount() {
return this.activeStreams.size;
}
/**
* 关闭所有流
*/
closeAll() {
logger.info(`关闭所有活跃流 (共 ${this.activeStreams.size} 个)`);
for (const [clientId, stream] of this.activeStreams) {
this.close(clientId);
}
}
/**
* 安全写入数据到流
* @param {Stream} stream - 目标流
* @param {string|Buffer} data - 要写入的数据
* @returns {boolean} 写入是否成功
*/
safeWrite(stream, data) {
if (!stream || stream.destroyed || (stream.isClosed && stream.isClosed())) {
return false;
}
try {
return stream.write(data);
} catch (error) {
logger.error(`流写入错误: ${error.message}`);
return false;
}
}
}
// 创建全局流管理器实例
export const streamManager = new StreamManager();
|