"use client" import protoRoot from "@/protobuf/SttMessage_es6.js" import AgoraRTC, { IAgoraRTCClient, IMicrophoneAudioTrack, IRemoteAudioTrack, UID, ICameraVideoTrack, } from "agora-rtc-sdk-ng" import { EMessageDataType, EMessageType, IChatItem, ITextItem } from "@/types" import { AGEventEmitter } from "../events" import { RtcEvents, IUserTracks } from "./types" import { apiGenAgoraData } from "@/common/request" import { VideoSourceType } from "@/common/constant" const TIMEOUT_MS = 5000; // Timeout for incomplete messages interface TextDataChunk { message_id: string; part_index: number; total_parts: number; content: string; } export class RtcManager extends AGEventEmitter { private _joined client: IAgoraRTCClient localTracks: IUserTracks appId: string | null = null token: string | null = null userId: number | null = null constructor() { super() this._joined = false this.localTracks = {} this.client = AgoraRTC.createClient({ mode: "rtc", codec: "vp8" }) this._listenRtcEvents() } async join({ channel, userId }: { channel: string; userId: number }) { if (!this._joined) { const res = await apiGenAgoraData({ channel, userId }) const { code, data } = res if (code != 0) { throw new Error("Failed to get Agora token") } const { appId, token } = data this.appId = appId this.token = token this.userId = userId await this.client?.join(appId, channel, token, userId) this._joined = true } } async createCameraTracks() { try { const videoTrack = await AgoraRTC.createCameraVideoTrack(); this.localTracks.videoTrack = videoTrack; } catch (err) { console.error("Failed to create video track", err); } this.emit("localTracksChanged", this.localTracks); } async createMicrophoneTracks() { try { const audioTrack = await AgoraRTC.createMicrophoneAudioTrack(); this.localTracks.audioTrack = audioTrack; } catch (err) { console.error("Failed to create audio track", err); } this.emit("localTracksChanged", this.localTracks); } async createScreenShareTrack() { try { const screenTrack = await AgoraRTC.createScreenVideoTrack({ encoderConfig: { width: 1200, height: 800, frameRate: 5 } }, "disable"); this.localTracks.screenTrack = screenTrack; } catch (err) { console.error("Failed to create screen track", err); } this.emit("localTracksChanged", this.localTracks); } async switchVideoSource(type: VideoSourceType) { if (type === VideoSourceType.SCREEN) { await this.createScreenShareTrack(); if (this.localTracks.screenTrack) { this.client.unpublish(this.localTracks.videoTrack); this.localTracks.videoTrack?.close(); this.localTracks.videoTrack = undefined; this.client.publish(this.localTracks.screenTrack); this.emit("localTracksChanged", this.localTracks); } } else if (type === VideoSourceType.CAMERA) { await this.createCameraTracks(); if (this.localTracks.videoTrack) { this.client.unpublish(this.localTracks.screenTrack); this.localTracks.screenTrack?.close(); this.localTracks.screenTrack = undefined; this.client.publish(this.localTracks.videoTrack); this.emit("localTracksChanged", this.localTracks); } } } async publish() { const tracks = []; if (this.localTracks.videoTrack) { tracks.push(this.localTracks.videoTrack); } if (this.localTracks.audioTrack) { tracks.push(this.localTracks.audioTrack); } if (tracks.length) { await this.client.publish(tracks); } } async destroy() { this.localTracks?.audioTrack?.close() this.localTracks?.videoTrack?.close() if (this._joined) { await this.client?.leave() } this._resetData() } // ----------- public methods ------------ // -------------- private methods -------------- private _listenRtcEvents() { this.client.on("network-quality", (quality) => { this.emit("networkQuality", quality) }) this.client.on("user-published", async (user, mediaType) => { await this.client.subscribe(user, mediaType) if (mediaType === "audio") { this._playAudio(user.audioTrack) } this.emit("remoteUserChanged", { userId: user.uid, audioTrack: user.audioTrack, videoTrack: user.videoTrack, }) }) this.client.on("user-unpublished", async (user, mediaType) => { await this.client.unsubscribe(user, mediaType) this.emit("remoteUserChanged", { userId: user.uid, audioTrack: user.audioTrack, videoTrack: user.videoTrack, }) }) this.client.on("stream-message", (uid: UID, stream: any) => { this._parseData(stream) }) } private _parseData(data: any): ITextItem | void { let decoder = new TextDecoder('utf-8'); let decodedMessage = decoder.decode(data); console.log("[test] textstream raw data", decodedMessage); // const { stream_id, is_final, text, text_ts, data_type, message_id, part_number, total_parts } = textstream; // if (total_parts > 0) { // // If message is split, handle it accordingly // this._handleSplitMessage(message_id, part_number, total_parts, stream_id, is_final, text, text_ts); // } else { // // If there is no message_id, treat it as a complete message // this._handleCompleteMessage(stream_id, is_final, text, text_ts); // } this.handleChunk(decodedMessage); } private messageCache: { [key: string]: TextDataChunk[] } = {}; // Function to process received chunk via event emitter handleChunk(formattedChunk: string) { try { // Split the chunk by the delimiter "|" const [message_id, partIndexStr, totalPartsStr, content] = formattedChunk.split('|'); const part_index = parseInt(partIndexStr, 10); const total_parts = totalPartsStr === '???' ? -1 : parseInt(totalPartsStr, 10); // -1 means total parts unknown // Ensure total_parts is known before processing further if (total_parts === -1) { console.warn(`Total parts for message ${message_id} unknown, waiting for further parts.`); return; } const chunkData: TextDataChunk = { message_id, part_index, total_parts, content, }; // Check if we already have an entry for this message if (!this.messageCache[message_id]) { this.messageCache[message_id] = []; // Set a timeout to discard incomplete messages setTimeout(() => { if (this.messageCache[message_id]?.length !== total_parts) { console.warn(`Incomplete message with ID ${message_id} discarded`); delete this.messageCache[message_id]; // Discard incomplete message } }, TIMEOUT_MS); } // Cache this chunk by message_id this.messageCache[message_id].push(chunkData); // If all parts are received, reconstruct the message if (this.messageCache[message_id].length === total_parts) { const completeMessage = this.reconstructMessage(this.messageCache[message_id]); const { stream_id, is_final, text, text_ts, data_type } = JSON.parse(atob(completeMessage)); const isAgent = Number(stream_id) != Number(this.userId) let textItem: IChatItem = { type: isAgent ? EMessageType.AGENT : EMessageType.USER, time: text_ts, text: text, data_type: EMessageDataType.TEXT, userId: stream_id, isFinal: is_final, };; if (data_type === "raw") { let { data, type } = JSON.parse(text); if (type === "image_url") { textItem = { ...textItem, data_type: EMessageDataType.IMAGE, text: data.image_url, }; } else if (type === "reasoning") { textItem = { ...textItem, data_type: EMessageDataType.REASON, text: data.text, }; } } if (text.trim().length > 0) { this.emit("textChanged", textItem); } // Clean up the cache delete this.messageCache[message_id]; } } catch (error) { console.error('Error processing chunk:', error); } } // Function to reconstruct the full message from chunks reconstructMessage(chunks: TextDataChunk[]): string { // Sort chunks by their part index chunks.sort((a, b) => a.part_index - b.part_index); // Concatenate all chunks to form the full message return chunks.map(chunk => chunk.content).join(''); } _playAudio( audioTrack: IMicrophoneAudioTrack | IRemoteAudioTrack | undefined, ) { if (audioTrack && !audioTrack.isPlaying) { audioTrack.play() } } private _resetData() { this.localTracks = {} this._joined = false } } export const rtcManager = new RtcManager()