| |
|
|
| import pandas as pd |
| from typing import Any |
| from pydantic import ConfigDict |
|
|
| from llama_index.core import Settings |
| from llama_index.core.workflow import Workflow, Event, StartEvent as BaseStartEvent, StopEvent, step |
| from llama_index.llms.groq import Groq |
|
|
| |
| from config import pandas_prompt_str, instruction_str, RESPONSE_SYNTHESIS_PROMPT_STR |
| from utils import descricao_colunas, limpar_codigo_pandas |
|
|
| |
| class StartEvent(BaseStartEvent): |
| query: str |
| df: pd.DataFrame |
| model_config = ConfigDict(arbitrary_types_allowed=True) |
|
|
| class CodeEvent(Event): |
| pandas_prompt: str; query: str; df: pd.DataFrame |
| model_config = ConfigDict(arbitrary_types_allowed=True) |
|
|
| class OutputEvent(Event): |
| pandas_code: str; query: str; df: pd.DataFrame |
| model_config = ConfigDict(arbitrary_types_allowed=True) |
|
|
| class ExecutedEvent(Event): |
| pandas_code: str; pandas_output: Any; query: str; df: pd.DataFrame |
| model_config = ConfigDict(arbitrary_types_allowed=True) |
|
|
| |
| class PandasWorkflow(Workflow): |
| @step |
| async def iniciar_processamento(self, ev: StartEvent) -> CodeEvent: |
| colunas_info = descricao_colunas(ev.df) |
| prompt_text = pandas_prompt_str.format( |
| colunas_detalhes=colunas_info, df_str=ev.df.head(5).to_string(), |
| instruction_str=instruction_str, query_str=ev.query |
| ) |
| return CodeEvent(pandas_prompt=prompt_text, query=ev.query, df=ev.df) |
|
|
| @step |
| async def gerar_codigo(self, ev: CodeEvent) -> OutputEvent: |
| response = await Settings.llm.acomplete(ev.pandas_prompt) |
| codigo_limpo = limpar_codigo_pandas(str(response).strip()) |
| print(f"✅ Código gerado: {codigo_limpo}") |
| return OutputEvent(pandas_code=codigo_limpo, query=ev.query, df=ev.df) |
|
|
| @step |
| async def executar_codigo(self, ev: OutputEvent) -> ExecutedEvent: |
| try: |
| resultado = eval(ev.pandas_code, {"__builtins__": {}}, {"df": ev.df, "pd": pd}) |
| except Exception as e: |
| resultado = f"Erro ao executar o código: {str(e)}" |
| return ExecutedEvent( |
| pandas_code=ev.pandas_code, pandas_output=resultado, |
| query=ev.query, df=ev.df |
| ) |
|
|
| @step |
| async def finalizar_e_sintetizar(self, ev: ExecutedEvent) -> StopEvent: |
| if isinstance(ev.pandas_output, str) and "Erro" in ev.pandas_output: |
| resposta_final = f"Não foi possível processar: {ev.pandas_output}" |
| else: |
| prompt = RESPONSE_SYNTHESIS_PROMPT_STR.format( |
| query_str=ev.query, pandas_instructions=ev.pandas_code, |
| pandas_output=str(ev.pandas_output) |
| ) |
| response = await Settings.llm.acomplete(prompt) |
| resposta_final = str(response).strip() |
| return StopEvent(result={"resposta_final": resposta_final}) |
|
|
| |
| async def executar_consulta(query: str, df_local: pd.DataFrame): |
| if df_local is None: return {"resposta_final": "Erro: DataFrame não carregado."} |
| if not query.strip(): return {"resposta_final": "Erro: Consulta vazia."} |
| try: |
| wf = PandasWorkflow() |
| resultado_final = await wf.run(query=query, df=df_local) |
| if not isinstance(resultado_final, dict): |
| raise TypeError(f"Workflow retornou tipo inesperado: {type(resultado_final)}") |
| return resultado_final |
| except Exception as e: |
| import traceback; traceback.print_exc() |
| return {"resposta_final": f"Erro crítico no workflow: {str(e)}"} |