File size: 11,002 Bytes
69dbdbd
 
 
 
 
 
e0dd43c
69dbdbd
e0dd43c
69dbdbd
 
 
 
 
 
e0dd43c
 
 
69dbdbd
e0dd43c
69dbdbd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e0dd43c
69dbdbd
 
 
 
 
 
 
 
 
 
 
e0dd43c
69dbdbd
 
 
e0dd43c
 
69dbdbd
e0dd43c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
69dbdbd
 
 
 
 
e0dd43c
69dbdbd
 
 
 
 
 
 
 
 
e0dd43c
69dbdbd
 
 
 
 
 
 
 
 
 
 
 
e0dd43c
69dbdbd
 
 
 
 
e0dd43c
 
 
 
 
 
 
 
 
 
 
 
69dbdbd
897173a
69dbdbd
e0dd43c
69dbdbd
 
897173a
 
e0dd43c
69dbdbd
 
 
 
e0dd43c
 
 
 
 
 
 
 
 
 
 
 
69dbdbd
 
 
 
 
 
e0dd43c
69dbdbd
e0dd43c
69dbdbd
e0dd43c
69dbdbd
 
 
 
897173a
69dbdbd
e0dd43c
897173a
401be78
69dbdbd
401be78
69dbdbd
401be78
 
897173a
401be78
 
 
69dbdbd
 
 
 
1b0c4df
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e0dd43c
 
 
 
 
69dbdbd
e0dd43c
 
69dbdbd
e0dd43c
366b8c6
6492525
401be78
69dbdbd
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
#!/usr/bin/env python
# coding=utf-8
# Copyright 2024 The HuggingFace Inc. team. All rights reserved.
#
import os
import re
import json
from typing import Optional
import logging

from smolagents.agent_types import AgentAudio, AgentImage, AgentText, handle_agent_output_types
from smolagents.agents import ActionStep, MultiStepAgent
from smolagents.memory import MemoryStep
from smolagents.utils import _is_package_available

# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

def pull_messages_from_step(step_log: MemoryStep):
    """Extract ChatMessage objects from agent steps with proper nesting"""
    import gradio as gr

    if isinstance(step_log, ActionStep):
        step_number = f"Step {step_log.step_number}" if step_log.step_number is not None else ""
        yield gr.ChatMessage(role="assistant", content=f"**{step_number}**")

        if hasattr(step_log, "model_output") and step_log.model_output is not None:
            model_output = step_log.model_output.strip()
            model_output = re.sub(r"```\s*<end_code>", "```", model_output)
            model_output = re.sub(r"<end_code>\s*```", "```", model_output)
            model_output = re.sub(r"```\s*\n\s*<end_code>", "```", model_output)
            model_output = model_output.strip()
            yield gr.ChatMessage(role="assistant", content=model_output)

        if hasattr(step_log, "tool_calls") and step_log.tool_calls is not None:
            first_tool_call = step_log.tool_calls[0]
            used_code = first_tool_call.name == "python_interpreter"
            parent_id = f"call_{len(step_log.tool_calls)}"
            args = first_tool_call.arguments
            content = str(args.get("answer", str(args))) if isinstance(args, dict) else str(args).strip()

            if used_code:
                content = re.sub(r"```.*?\n", "", content)
                content = re.sub(r"\s*<end_code>\s*", "", content)
                content = content.strip()
                if not content.startswith("```python"):
                    content = f"```python\n{content}\n```"

            parent_message_tool = gr.ChatMessage(
                role="assistant",
                content=content,
                metadata={"title": f"πŸ› οΈ Used tool {first_tool_call.name}", "id": parent_id, "status": "pending"}
            )
            yield parent_message_tool

            if hasattr(step_log, "observations") and step_log.observations and step_log.observations.strip():
                log_content = re.sub(r"^Execution logs:\s*", "", step_log.observations.strip())
                if log_content:
                    try:
                        # Try to parse as JSON for table data
                        data = json.loads(log_content)
                        if isinstance(data, list) and data and isinstance(data[0], list):
                            # Format as markdown table
                            headers = data[0]
                            rows = data[1:]
                            table_md = "| " + " | ".join(headers) + " |\n"
                            table_md += "| " + " | ".join(["---"] * len(headers)) + " |\n"
                            for row in rows:
                                table_md += "| " + " | ".join(str(cell) for cell in row) + " |\n"
                            yield gr.ChatMessage(
                                role="assistant",
                                content=table_md,
                                metadata={"title": "πŸ“Š Table Data", "parent_id": parent_id, "status": "done"}
                            )
                        else:
                            yield gr.ChatMessage(
                                role="assistant",
                                content=log_content,
                                metadata={"title": "πŸ“ Execution Logs", "parent_id": parent_id, "status": "done"}
                            )
                    except json.JSONDecodeError:
                        yield gr.ChatMessage(
                            role="assistant",
                            content=log_content,
                            metadata={"title": "πŸ“ Execution Logs", "parent_id": parent_id, "status": "done"}
                        )

            if hasattr(step_log, "error") and step_log.error is not None:
                yield gr.ChatMessage(
                    role="assistant",
                    content=str(step_log.error),
                    metadata={"title": "πŸ’₯ Error", "parent_id": parent_id, "status": "done"}
                )

            parent_message_tool.metadata["status"] = "done"

        elif hasattr(step_log, "error") and step_log.error is not None:
            yield gr.ChatMessage(role="assistant", content=str(step_log.error), metadata={"title": "πŸ’₯ Error"})

        step_footnote = f"{step_number}"
        if hasattr(step_log, "input_token_count") and hasattr(step_log, "output_token_count"):
            token_str = f" | Input-tokens:{step_log.input_token_count:,} | Output-tokens:{step_log.output_token_count:,}"
            step_footnote += token_str
        if hasattr(step_log, "duration"):
            step_duration = f" | Duration: {round(float(step_log.duration), 2)}" if step_log.duration else None
            step_footnote += step_duration
        step_footnote = f"""<span style="color: #bbbbc2; font-size: 12px;">{step_footnote}</span> """
        yield gr.ChatMessage(role="assistant", content=f"{step_footnote}")
        yield gr.ChatMessage(role="assistant", content="-----")

        if hasattr(step_log, "observations") and step_log.observations:
            for line in step_log.observations.split("\n"):
                if line.startswith("Screenshot saved at:"):
                    screenshot_path = line.replace("Screenshot saved at: ", "").strip()
                    logger.debug(f"Yielding screenshot: {screenshot_path}")
                    yield gr.ChatMessage(
                        role="assistant",
                        content={"path": screenshot_path, "mime_type": "image/png"},
                        metadata={"title": "πŸ“Έ Screenshot"}
                    )
                elif line.endswith("_detected.png"):
                    yield gr.ChatMessage(
                        role="assistant",
                        content={"path": line.strip(), "mime_type": "image/png"},
                        metadata={"title": "πŸ–ΌοΈ Detected Elements"}
                    )
                elif line and not line.startswith("Current url:"):
                    yield gr.ChatMessage(
                        role="assistant",
                        content=line,
                        metadata={"title": "πŸ“ Scraped Text"}
                    )

def stream_to_gradio(initialize_agent, task: str, api_key: str = None, reset_agent_memory: bool = False, additional_args: Optional[dict] = None):
    if not _is_package_available("gradio"):
        raise ModuleNotFoundError("Please install 'gradio' extra to use the GradioUI: `pip install 'smolagents[gradio]'`")
    import gradio as gr

    logger.debug(f"Received api_key: {'****' if api_key else 'None'}")
    agent = initialize_agent(api_key)

    total_input_tokens = 0
    total_output_tokens = 0

    for step_log in agent.run(task, stream=True, reset=reset_agent_memory, additional_args=additional_args):
        input_tokens = agent.model.last_input_token_count
        output_tokens = agent.model.last_output_token_count
        logger.debug(f"Input tokens: {input_tokens}, Output tokens: {output_tokens}")
        if input_tokens is not None:
            total_input_tokens += input_tokens
        if output_tokens is not None:
            total_output_tokens += output_tokens
        if isinstance(step_log, ActionStep):
            step_log.input_token_count = input_tokens if input_tokens is not None else 0
            step_log.output_token_count = output_tokens if output_tokens is not None else 0

        for message in pull_messages_from_step(step_log):
            yield message

    final_answer = step_log
    final_answer = handle_agent_output_types(final_answer)

    if isinstance(final_answer, AgentText):
        yield gr.ChatMessage(role="assistant", content=f"**Final answer:**\n{final_answer.to_string()}\n")
    elif isinstance(final_answer, AgentImage):
        yield gr.ChatMessage(role="assistant", content={"path": final_answer.to_string(), "mime_type": "image/png"})
    elif isinstance(final_answer, AgentAudio):
        yield gr.ChatMessage(role="assistant", content={"path": final_answer.to_string(), "mime_type": "audio/wav"})
    else:
        yield gr.ChatMessage(role="assistant", content=f"**Final answer:** {str(final_answer)}")

class GradioUI:
    def __init__(self, initialize_agent):
        if not _is_package_available("gradio"):
            raise ModuleNotFoundError("Please install 'gradio' extra to use the GradioUI: `pip install 'smolagents[gradio]'`")
        self.initialize_agent = initialize_agent
        self.messages = []  # Initialize messages as a class attribute

    def interact_with_agent(self, prompt, api_key):
        import gradio as gr
        self.messages.append(gr.ChatMessage(role="user", content=prompt))
        yield self.messages
        for msg in stream_to_gradio(self.initialize_agent, task=prompt, api_key=api_key, reset_agent_memory=False):
            self.messages.append(msg)
            yield self.messages
        yield self.messages

    def launch(self, **kwargs):
        import gradio as gr

        css = """
        .chatbot .avatar-container {
            display: flex !important;
            justify-content: center !important;
            align-items: center !important;
            width: 40px !important;
            height: 40px !important;
            overflow: hidden !important;
        }
        .chatbot .avatar-container img {
            width: 100% !important;
            height: 100% !important;
            object-fit: cover !important;
            border-radius: 50% !important;
        }
        """

        with gr.Blocks(fill_height=True, css=css) as demo:
            gr.Markdown("**Note**: Please provide your own Gemini API key below. The default key may run out of quota.")
            api_key_input = gr.Textbox(
                lines=1, label="Gemini API Key (optional)", placeholder="Enter your Gemini API key here", type="password"
            )
            chatbot = gr.Chatbot(
                label="Web Navigation Agent", type="messages",
                avatar_images=(None, "./icon.png"), scale=1, height=600
            )
            text_input = gr.Textbox(
                lines=1, label="Enter URL and request (e.g., navigate to https://en.wikipedia.org/wiki/Nvidia, and provide me info on its history)"
            )
            text_input.submit(self.interact_with_agent, [text_input, api_key_input], [chatbot])

        demo.launch(debug=True, **kwargs)

__all__ = ["stream_to_gradio", "GradioUI"]