File size: 3,673 Bytes
146bdba
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import { PassThrough } from 'stream';
import { createLogger } from '../utils/logger.js';

const logger = createLogger('StreamManager');

/**

 * 流管理器 - 负责管理和跟踪活跃的流

 */
export class StreamManager {
  constructor() {
    this.activeStreams = new Map();
  }
  
  /**

   * 创建新的流

   * @returns {PassThrough} 新创建的流

   */
  createStream() {
    const stream = new PassThrough();
    let streamClosed = false;
    
    // 重写stream.end方法,确保安全关闭
    const originalEnd = stream.end;
    stream.end = function(...args) {
      if (streamClosed) return;
      streamClosed = true;
      return originalEnd.apply(this, args);
    };
    
    // 添加状态检查方法
    stream.isClosed = () => streamClosed;
    
    return stream;
  }
  
  /**

   * 注册并管理流

   * @param {string} clientId - 客户端ID

   * @param {Stream} stream - 要管理的流

   * @returns {Stream} 返回被管理的流

   */
  register(clientId, stream) {
    // 如果该客户端已有活跃流,先关闭它
    if (this.activeStreams.has(clientId)) {
      this.close(clientId);
    }
    
    // 注册新流
    this.activeStreams.set(clientId, stream);
    logger.debug(`注册客户端 ${clientId} 的新流`);
    
    // 设置流事件监听器
    stream.on('end', () => {
      if (this.activeStreams.get(clientId) === stream) {
        this.activeStreams.delete(clientId);
        logger.debug(`客户端 ${clientId} 的流已结束并移除`);
      }
    });
    
    stream.on('error', (error) => {
      logger.error(`客户端 ${clientId} 的流错误: ${error.message}`);
      if (this.activeStreams.get(clientId) === stream) {
        this.activeStreams.delete(clientId);
      }
    });
    
    return stream;
  }
  
  /**

   * 关闭指定客户端的流

   * @param {string} clientId - 客户端ID

   */
  close(clientId) {
    const stream = this.activeStreams.get(clientId);
    if (stream) {
      try {
        logger.debug(`关闭客户端 ${clientId} 的流`);
        stream.end();
        this.activeStreams.delete(clientId);
      } catch (error) {
        logger.error(`关闭流时出错: ${error.message}`);
      }
    }
  }
  
  /**

   * 获取指定客户端的流

   * @param {string} clientId - 客户端ID

   * @returns {Stream|null} 流对象或null

   */
  get(clientId) {
    return this.activeStreams.get(clientId) || null;
  }
  
  /**

   * 检查客户端是否有活跃流

   * @param {string} clientId - 客户端ID

   * @returns {boolean}

   */
  has(clientId) {
    return this.activeStreams.has(clientId);
  }
  
  /**

   * 获取活跃流的数量

   * @returns {number}

   */
  getActiveCount() {
    return this.activeStreams.size;
  }
  
  /**

   * 关闭所有流

   */
  closeAll() {
    logger.info(`关闭所有活跃流 (共 ${this.activeStreams.size} 个)`);
    for (const [clientId, stream] of this.activeStreams) {
      this.close(clientId);
    }
  }
  
  /**

   * 安全写入数据到流

   * @param {Stream} stream - 目标流

   * @param {string|Buffer} data - 要写入的数据

   * @returns {boolean} 写入是否成功

   */
  safeWrite(stream, data) {
    if (!stream || stream.destroyed || (stream.isClosed && stream.isClosed())) {
      return false;
    }
    
    try {
      return stream.write(data);
    } catch (error) {
      logger.error(`流写入错误: ${error.message}`);
      return false;
    }
  }
}

// 创建全局流管理器实例
export const streamManager = new StreamManager();