toolkit / components.py
lgaleana's picture
SEO example
2ecf740
import json
import re
import traceback
from concurrent.futures import ThreadPoolExecutor
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Union
import gradio as gr
import ai
class Component(ABC):
def __init__(self, id_: int):
# Internal state
self._id = id_
self._source = self.__class__.__name__
self.vname: str
# Gradio state
self.component_id: gr.Number
self.gr_component: Union[gr.Box, gr.Textbox]
self.output: gr.Textbox
self.visible: gr.Number
def render(self) -> None:
self.component_id = gr.Number(value=self._id, visible=False)
self.visible = gr.Number(0, visible=False)
self.gr_component = self._render()
@abstractmethod
def _render(self) -> Union[gr.Box, gr.Textbox]:
...
class Input(Component):
vname = "v"
def _render(self) -> gr.Textbox:
self.output = gr.Textbox(
label=f"Input: {{{self.vname}{self._id}}}",
interactive=True,
placeholder="Variable value",
visible=False,
)
return self.output
class TaskComponent(Component, ABC):
vname = "t"
def __init__(self, id_: int, value: str = "", visible: bool = False):
super().__init__(id_)
self._initial_value = value
self._initial_visbility = visible
self.name: str
self.input: gr.Textbox
def format_input(self, input: str, vars_in_scope: Dict[str, Any]) -> str:
try:
json.loads(input)
return input
except:
input = input.strip()
prompt_vars = [v for v in re.findall("{(.*)}", input)]
undefined_vars = prompt_vars - vars_in_scope.keys()
if len(undefined_vars) > 0:
raise KeyError(
f"The variables :: {undefined_vars} in task :: {self._id} are being used before being defined."
)
return input.format(**vars_in_scope)
@property
def n_inputs(self) -> int:
return len(self.inputs)
@property
@abstractmethod
def inputs(self) -> List[gr.Textbox]:
...
@abstractmethod
def execute(self, *args, vars_in_scope: Dict[str, Any]):
...
class AITask(TaskComponent):
name = "AI Task"
def _render(self) -> gr.Box:
with gr.Box(visible=self._initial_visbility) as gr_component:
with gr.Row():
self.input = gr.Textbox(
label="Instructions",
lines=10,
interactive=True,
placeholder="What would you like ChatGPT to do?",
value=self._initial_value,
)
self.output = gr.Textbox(
label=f"Output: {{{self.vname}{self._id}}}",
lines=10,
interactive=True,
)
return gr_component
@property
def inputs(self) -> List[gr.Textbox]:
return [self.input]
def execute(self, prompt: str, vars_in_scope: Dict[str, Any]) -> Optional[str]:
formatted_prompt = self.format_input(prompt, vars_in_scope)
if formatted_prompt:
return ai.llm.next([{"role": "user", "content": formatted_prompt}])
class CodeTask(TaskComponent):
name = "Code Task"
def __init__(
self, id_: int, value: str = "", visible: bool = False, code_value: str = ""
):
super().__init__(id_, value, visible)
self._initial_code_value = code_value
def _render(self) -> gr.Box:
with gr.Box(visible=self._initial_visbility) as gr_component:
with gr.Row():
with gr.Column():
self.code_prompt = gr.Textbox(
label="What would you like the code to do?",
interactive=True,
value=self._initial_code_value,
lines=3,
)
generate_code = gr.Button("Generate code")
with gr.Accordion(label="Generated code", open=False) as accordion:
self.accordion = accordion
self.raw_output = gr.Textbox(
label="Raw output",
lines=5,
interactive=False,
)
self.packages = gr.Textbox(
label="The following packages will be installed",
interactive=True,
)
self.script = gr.Textbox(
label="Code to be executed",
lines=10,
interactive=True,
)
self.error_message = gr.HighlightedText(
value=None, visible=False
)
self.input = gr.Textbox(
label="Input", interactive=True, value=self._initial_value
)
with gr.Column():
self.output = gr.Textbox(
label=f"Output: {{{self.vname}{self._id}}}",
lines=14,
interactive=True,
)
generate_code.click(
self.generate_code,
inputs=[self.code_prompt],
outputs=[
self.raw_output,
self.packages,
self.script,
self.error_message,
self.accordion,
],
)
return gr_component
@staticmethod
def generate_code(code_prompt: str):
raw_output = ""
packages = ""
script = ""
error_message = gr.HighlightedText.update(None, visible=False)
accordion = gr.Accordion.update()
if not code_prompt:
return (
raw_output,
packages,
script,
error_message,
accordion,
)
def llm_call(prompt):
return ai.llm.next([{"role": "user", "content": prompt}], temperature=0)
print(f"Generating code.")
try:
raw_output = llm_call(
f"""Write a python function to:
{code_prompt}
Write the code for the function. Name the function toolkit.
Use pip packages where available.
Include the necessary imports.
Instead of printing or saving to disk, the function should return the data."""
)
with ThreadPoolExecutor() as executor:
packages, script = tuple(
executor.map(
llm_call,
[
f"""The following text has some python code:
{raw_output}
Find the pip packages that need to be installed and get their corresponsing names in pip.
Package names in the imports and in pip might be different. Use the correct pip names.
Include only the packages that need to be installed with pip.
Put them in a valid JSON:
```
{{
"packages": List of packages. If no packages, empty list.
}}
```""",
f"""The following text has some python code:
{raw_output}
Extract it. Remove anything after the function definition.""",
],
)
)
for packages in re.findall("{.*}", packages, re.DOTALL):
try:
packages = json.loads(packages)
packages = packages["packages"]
except:
print(packages)
traceback.print_exc()
packages = "ERROR"
except Exception as e:
traceback.print_exc()
error_message = gr.HighlightedText.update(
value=[(str(e), "ERROR")], visible=True
)
accordion = gr.Accordion.update(open=True)
return (
raw_output,
packages,
script.replace("```python", "").replace("```", "").strip(),
error_message,
accordion,
)
@property
def inputs(self) -> List[gr.Textbox]:
return [self.packages, self.script, self.input]
def execute(
self, packages: str, script: str, input: str, vars_in_scope: Dict[str, Any]
):
if not script:
return None
script = script.strip()
formatted_input = self.format_input(input, vars_in_scope)
import inspect
import subprocess
import sys
def run(toolkit_func):
if len(inspect.getfullargspec(toolkit_func)[0]) > 0:
if formatted_input:
try:
return toolkit_func(eval(formatted_input))
except:
return toolkit_func(formatted_input)
raise ValueError(f"Code for task :: {self._id} needs an input.")
return toolkit_func()
for p in eval(packages):
subprocess.check_call([sys.executable, "-m", "pip", "install", p])
script = f"import os\nos.environ = {{}}\n\n{script}"
exec(script, locals())
locals_ = locals()
if "toolkit" in locals_:
toolkit_func = locals_["toolkit"]
return run(toolkit_func)
for var in reversed(locals_.values()):
# Try to run all local functions
if callable(var):
try:
return run(var)
except:
continue
raise RuntimeError(f"Unable to run the code for task :: {self._id}")
class Task(Component):
available_tasks = [AITask, CodeTask]
vname = "t"
def __init__(self, id_: int):
super().__init__(id_)
self._inner_tasks = [t(id_) for t in self.available_tasks]
self.gr_component: gr.Box
def _render(self) -> gr.Box:
with gr.Box(visible=False) as gr_component:
self.active_index = gr.Dropdown(
[AITask.name, CodeTask.name],
label="Pick a new Task",
type="index",
)
for t in self._inner_tasks:
t.render()
self.active_index.select(
self.pick_task,
inputs=[self.active_index],
outputs=[t.gr_component for t in self._inner_tasks],
)
return gr_component
@staticmethod
def pick_task(idx: int) -> List[Dict]:
update = [gr.Box.update(visible=False)] * len(Task.available_tasks)
update[idx] = gr.Box.update(visible=True)
return update
@property
def inputs(self) -> List[gr.Textbox]:
return [i for t in self._inner_tasks for i in t.inputs]
@property
def outputs(self) -> List[gr.Textbox]:
return [t.output for t in self._inner_tasks]
@property
def inner_n_inputs(self) -> List[int]:
return [t.n_inputs for t in self._inner_tasks]
def execute(self, active_index, *args, vars_in_scope: Dict[str, Any]):
inner_task = self._inner_tasks[active_index]
print(f"Executing {self._source}: {self._id}")
return inner_task.execute(*args, vars_in_scope)
MAX_TASKS = 10
all_tasks = {i: Task(i) for i in range(MAX_TASKS)}
class Tasks:
@classmethod
def visibilities(cls) -> List[gr.Number]:
return [t.visible for t in all_tasks.values()]
@classmethod
def active_indexes(cls) -> List[gr.Dropdown]:
return [t.active_index for t in all_tasks.values()]
@classmethod
def gr_components(cls) -> List[gr.Box]:
return [t.gr_component for t in all_tasks.values()]