File size: 7,041 Bytes
15c06cc
0c5a382
 
919ef54
e3dc1e5
3c68fa9
7035fed
919ef54
 
15c06cc
a6c0d93
7035fed
edee20e
7035fed
15c06cc
3c68fa9
0c5a382
 
 
15c06cc
 
919ef54
 
 
7035fed
 
3c68fa9
 
919ef54
1149742
0c5a382
1149742
 
8321f99
1149742
 
919ef54
0c5a382
919ef54
1149742
0c5a382
 
 
1149742
0c5a382
1149742
 
8321f99
1149742
 
919ef54
0c5a382
919ef54
1149742
0c5a382
 
 
a6c0d93
 
 
0c5a382
 
 
 
 
1149742
 
0c5a382
3c68fa9
 
 
 
 
 
 
 
 
 
 
 
 
919ef54
7035fed
1149742
3c68fa9
 
 
1149742
3c68fa9
919ef54
a6c0d93
3c68fa9
 
 
 
 
 
 
 
 
 
 
 
 
7035fed
 
 
3c68fa9
 
1149742
919ef54
a6c0d93
1149742
 
 
 
 
 
 
 
 
0c5a382
1149742
0c5a382
a6c0d93
 
0c5a382
 
8321f99
 
0c5a382
1149742
0c5a382
 
 
8321f99
 
0c5a382
8321f99
0c5a382
 
1149742
 
8321f99
0c5a382
1149742
 
7035fed
a6c0d93
0c5a382
e3dc1e5
 
 
 
 
 
 
 
 
 
 
 
 
a6c0d93
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0bb2b09
a6c0d93
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import gradio as gr
import tempfile
import imageio
import torch
import time
import os
from openai import OpenAI
from transformers import pipeline
from diffusers import DiffusionPipeline

# ---------- Load OpenAI Key from Hugging Face Secret ----------
client = OpenAI(api_key=os.getenv("OPENAI_KEY"))

# ---------- Configuration ----------
AVAILABLE_MODELS = {
    "Codette Fine-Tuned (v9)": "ft:gpt-4.1-2025-04-14:raiffs-bits:codette-final:BO907H7Z",
    "GPT-2 (small, fast)": "gpt2",
    "Falcon (TII UAE)": "tiiuae/falcon-7b-instruct",
    "Mistral (OpenAccess)": "mistralai/Mistral-7B-v0.1"
}

device = "cuda" if torch.cuda.is_available() else "cpu"
text_model_cache = {}
chat_memory = {}
last_usage_time = {}

MAX_PROMPTS_PER_SESSION = 5
THROTTLE_SECONDS = 30

# ---------- Load Image Generator ----------
try:
    image_generator = DiffusionPipeline.from_pretrained(
        "runwayml/stable-diffusion-v1-5",
        safety_checker=None,
        torch_dtype=torch.float16 if device == "cuda" else torch.float32
    )
    image_generator.to(device)
    image_enabled = True
except Exception as e:
    print(f"[Image Model Load Error]: {e}")
    image_generator = None
    image_enabled = False

# ---------- Load Video Generator ----------
try:
    video_pipeline = DiffusionPipeline.from_pretrained(
        "damo-vilab/text-to-video-ms-1.7b",
        safety_checker=None,
        torch_dtype=torch.float16 if device == "cuda" else torch.float32
    )
    video_pipeline.to(device)
    video_enabled = True
except Exception as e:
    print(f"[Video Model Load Error]: {e}")
    video_pipeline = None
    video_enabled = False

# ---------- Main Terminal with Rate Limits ----------
def codette_terminal_limited(prompt, model_name, generate_image, generate_video,
                              session_id, batch_size, video_steps, fps):
    if session_id not in chat_memory:
        chat_memory[session_id] = []

    if prompt.lower() in ["exit", "quit"]:
        chat_memory[session_id] = []
        yield "🧠 Codette signing off... Session reset.", None, None
        return

    if model_name == "Codette Fine-Tuned (v9)":
        count = sum(1 for line in chat_memory[session_id] if line.startswith("πŸ–‹οΈ You >"))
        if count >= MAX_PROMPTS_PER_SESSION:
            yield "[πŸ›‘ Limit] Max 5 prompts per session.", None, None
            return
        now = time.time()
        if now - last_usage_time.get(session_id, 0) < THROTTLE_SECONDS:
            wait = int(THROTTLE_SECONDS - (now - last_usage_time[session_id]))
            yield f"[⏳ Wait] Try again in {wait} sec.", None, None
            return
        last_usage_time[session_id] = now

    if model_name == "Codette Fine-Tuned (v9)":
        try:
            response = client.chat.completions.create(
                model=AVAILABLE_MODELS[model_name],
                messages=[{"role": "user", "content": prompt}],
                temperature=0.7,
                max_tokens=256
            )
            output = response.choices[0].message.content.strip()
        except Exception as e:
            yield f"[OpenAI error]: {e}", None, None
            return
    else:
        if model_name not in text_model_cache:
            try:
                text_model_cache[model_name] = pipeline(
                    "text-generation",
                    model=AVAILABLE_MODELS[model_name],
                    device=0 if device == "cuda" else -1
                )
            except Exception as e:
                yield f"[Text model error]: {e}", None, None
                return
        try:
            output = text_model_cache[model_name](
                prompt, max_length=100, do_sample=True, num_return_sequences=1
            )[0]['generated_text'].strip()
        except Exception as e:
            yield f"[Generation error]: {e}", None, None
            return

    # Stream text output
    response_so_far = ""
    for char in output:
        response_so_far += char
        temp_log = chat_memory[session_id][:]
        temp_log.append(f"πŸ–‹οΈ You > {prompt}")
        temp_log.append(f"🧠 Codette > {response_so_far}")
        yield "\n".join(temp_log[-10:]), None, None
        time.sleep(0.01)

    chat_memory[session_id].append(f"πŸ–‹οΈ You > {prompt}")
    chat_memory[session_id].append(f"🧠 Codette > {output}")

    imgs, vid = None, None

    if generate_image and image_enabled:
        try:
            result = image_generator(prompt, num_images_per_prompt=batch_size)
            imgs = result.images
        except Exception as e:
            response_so_far += f"\n[Image error]: {e}"

    if generate_video and video_enabled:
        try:
            result = video_pipeline(prompt, num_inference_steps=video_steps)
            frames = result.frames
            temp_video_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name
            imageio.mimsave(temp_video_path, frames, fps=fps)
            vid = temp_video_path
        except Exception as e:
            response_so_far += f"\n[Video error]: {e}"

    yield "\n".join(chat_memory[session_id][-10:]), imgs, vid

# ---------- Gradio UI ----------
with gr.Blocks(title="🧬 Codette Terminal – Streamed AI Chat") as demo:
    gr.Markdown("## 🧬 Codette Terminal (Chat + Image + Video + Fine-Tuned AI)")
    gr.Markdown("Type a prompt, choose a model, and generate responses. Type `'exit'` to reset the session.")

    with gr.Row():
        session_id = gr.Textbox(value="session_default", visible=False)
        model_dropdown = gr.Dropdown(choices=list(AVAILABLE_MODELS.keys()), value="GPT-2 (small, fast)", label="Language Model")

    with gr.Row():
        generate_image_toggle = gr.Checkbox(label="Generate Image(s)?", value=False, interactive=image_enabled)
        generate_video_toggle = gr.Checkbox(label="Generate Video?", value=False, interactive=video_enabled)

    with gr.Row():
        batch_size_slider = gr.Slider(label="Number of Images", minimum=1, maximum=4, step=1, value=1)
        video_steps_slider = gr.Slider(label="Video Inference Steps", minimum=10, maximum=100, step=10, value=50)
        fps_slider = gr.Slider(label="Video FPS", minimum=4, maximum=24, step=2, value=8)

    with gr.Row():
        user_input = gr.Textbox(
            label="Your Prompt",
            placeholder="e.g. A robot dreaming on Mars",
            lines=1
        )

    with gr.Row():
        output_text = gr.Textbox(label="Codette Output", lines=15, interactive=False)

    with gr.Row():
        output_image = gr.Gallery(label="Generated Image(s)", columns=2)
        output_video = gr.Video(label="Generated Video")

    user_input.submit(
        codette_terminal_limited,
        inputs=[
            user_input, model_dropdown, generate_image_toggle, generate_video_toggle,
            session_id, batch_size_slider, video_steps_slider, fps_slider
        ],
        outputs=[output_text, output_image, output_video]
    )

# ---------- Launch ----------
if __name__ == "__main__":
    demo.launch(mcp_server=True)