File size: 4,439 Bytes
22bf868
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# app.py
import os
import asyncio
import base64
import io
import traceback

import cv2
import pyaudio
import PIL.Image
import mss
import gradio as gr

from google import genai
from google.genai import types

# Audio and video capture config
FORMAT = pyaudio.paInt16
CHANNELS = 1
SEND_SAMPLE_RATE = 16000
RECEIVE_SAMPLE_RATE = 24000
CHUNK_SIZE = 1024

MODEL = "models/gemini-2.0-flash-live-001"

# Initialize Google GenAI client
client = genai.Client(
    http_options={"api_version": "v1beta"},
    api_key=os.environ.get("GEMINI_API_KEY"),
)

# Live connect configuration
CONFIG = types.LiveConnectConfig(
    response_modalities=["audio"],
    speech_config=types.SpeechConfig(
        voice_config=types.VoiceConfig(
            prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Puck")
        )
    ),
    system_instruction=types.Content(
        parts=[types.Part.from_text(text="You are Puck..." )]
    ),
)

# AudioLoop class adapted for Gradio
class AudioLoop:
    def __init__(self, mode="camera"):
        self.mode = mode
        self.audio_in_queue = None
        self.out_queue = None
        self.session = None

    async def _get_frame(self, cap):
        ret, frame = cap.read()
        if not ret:
            return None
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        img = PIL.Image.fromarray(frame_rgb)
        img.thumbnail((640, 480))
        buf = io.BytesIO()
        img.save(buf, format="JPEG")
        return buf.getvalue()

    async def _video_stream(self):
        cap = await asyncio.to_thread(cv2.VideoCapture, 0)
        try:
            while True:
                frame = await self._get_frame(cap)
                if frame is None:
                    break
                await self.out_queue.put({"mime_type": "image/jpeg", "data": base64.b64encode(frame).decode()})
                await asyncio.sleep(0.1)
        finally:
            cap.release()

    async def _audio_stream(self):
        mic_info = pya.get_default_input_device_info()
        stream = await asyncio.to_thread(
            pyaudio.PyAudio().open,
            format=FORMAT,
            channels=CHANNELS,
            rate=SEND_SAMPLE_RATE,
            input=True,
            input_device_index=mic_info['index'],
            frames_per_buffer=CHUNK_SIZE,
        )
        while True:
            data = await asyncio.to_thread(stream.read, CHUNK_SIZE, False)
            await self.out_queue.put({"data": data, "mime_type": "audio/pcm"})

    async def send_realtime(self):
        while True:
            msg = await self.out_queue.get()
            await self.session.send(input=msg)

    async def receive_audio(self):
        while True:
            turn = self.session.receive()
            async for response in turn:
                if data := response.data:
                    yield (None, data)
                if text := response.text:
                    yield (text, None)

    async def run(self):
        async with client.aio.live.connect(model=MODEL, config=CONFIG) as session:
            self.session = session
            self.audio_in_queue = asyncio.Queue()
            self.out_queue = asyncio.Queue(maxsize=5)

            tasks = []
            tasks.append(asyncio.create_task(self._audio_stream()))
            if self.mode == "camera":
                tasks.append(asyncio.create_task(self._video_stream()))
            tasks.append(asyncio.create_task(self.send_realtime()))

            async for text, audio in self.receive_audio():
                yield text, audio

            for t in tasks:
                t.cancel()

# Gradio interface
async def chat(mode="camera"):
    """Starts a live chat session and yields (text, audio) tuples as they arrive."""
    loop = AudioLoop(mode=mode)
    async for t, a in loop.run():
        yield t, a

with gr.Blocks() as demo:
    gr.Markdown("# Gemini Live API Web Chat\nUse your microphone and camera directly from the browser.")
    mode = gr.Radio(choices=["camera", "screen", "none"], value="camera", label="Video Source")
    chatbot = gr.Chatbot()
    with gr.Row():
        start = gr.Button("Start")
        stop = gr.Button("Stop")
    start.click(lambda m: chat(m), inputs=[mode], outputs=[chatbot], _js="(fn, inputs) => {fn(inputs).then(data => console.log(data));}")
    demo.launch(server_name="0.0.0.0", share=True)

# requirements.txt
#
# google-genai
# opencv-python
# pyaudio
# pillow
# mss
# gradio