| |
| |
| |
| |
|
|
| import os |
| import json |
| import threading |
| import re |
| from typing import Dict, Any |
|
|
| import gradio as gr |
| from fastapi import FastAPI, Depends, HTTPException |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
| import uvicorn |
|
|
| from transformers import pipeline |
|
|
| |
| from langchain_community.llms import HuggingFacePipeline |
| from langchain_community.utilities import SerpAPIWrapper |
| from langchain_community.vectorstores import Chroma |
| from langchain_community.embeddings import HuggingFaceEmbeddings |
|
|
| from langchain.memory import ConversationBufferMemory |
| from langchain.agents import initialize_agent, Tool |
| from langchain.chains import LLMChain |
| from langchain.prompts import PromptTemplate |
| from langchain.docstore.document import Document |
|
|
| |
| from langchain.tools.python.tool import PythonAstREPLTool |
|
|
| |
| |
| |
| HF_TOKEN = os.getenv("HF_TOKEN") |
| SERPAPI_KEY = os.getenv("SERPAPI_API_KEY") |
| JWT_SECRET = os.getenv("JWT_SECRET", "changeme123") |
|
|
| |
| |
| |
| security = HTTPBearer() |
|
|
| def verify_jwt(credentials: HTTPAuthorizationCredentials = Depends(security)): |
| token = credentials.credentials |
| if token != JWT_SECRET: |
| raise HTTPException(status_code=403, detail="Invalid token") |
| return True |
|
|
| |
| |
| |
| MODEL_ID = "PuruAI/Medini_Intelligence" |
| FALLBACK_MODEL = "gpt2" |
|
|
| def load_llm(): |
| pipeline_kwargs = {"max_new_tokens": 512, "temperature": 0.7} |
| try: |
| model_pipeline = pipeline("text-generation", model=MODEL_ID, use_auth_token=HF_TOKEN, **pipeline_kwargs) |
| except Exception: |
| print(f"Warning: Failed to load {MODEL_ID}. Falling back to {FALLBACK_MODEL}.") |
| model_pipeline = pipeline("text-generation", model=FALLBACK_MODEL, **pipeline_kwargs) |
|
|
| return HuggingFacePipeline(pipeline=model_pipeline) |
|
|
| llm = load_llm() |
|
|
| |
| |
| |
| embeddings = HuggingFaceEmbeddings() |
| chroma_db = Chroma(persist_directory="./medini_memory", embedding_function=embeddings) |
| retriever = chroma_db.as_retriever() |
|
|
| qa_prompt_template = """ |
| You are a question-answering system. Use the following context, which contains information retrieved from memory, to answer the user's question. |
| If the context is empty or does not contain the answer, state clearly that the information is not in memory. |
| Context: |
| {context} |
| Question: {question} |
| Answer: |
| """ |
| QA_PROMPT = PromptTemplate(template=qa_prompt_template, input_variables=["context", "question"]) |
| qa_chain = LLMChain(llm=llm, prompt=QA_PROMPT) |
|
|
| def retrieve_and_answer(question: str) -> str: |
| docs = retriever.get_relevant_documents(question) |
| context = "\n---\n".join([d.page_content for d in docs]) |
| return qa_chain.run(context=context, question=question) |
|
|
| |
| |
| |
| search = SerpAPIWrapper(serpapi_api_key=SERPAPI_KEY) |
| python_tool = PythonAstREPLTool() |
|
|
| tools = [ |
| Tool(name="Knowledge Recall", func=retrieve_and_answer, description="Retrieve info from Medini memory (Chroma DB)."), |
| Tool(name="Web Search", func=search.run, description="Search the web for up-to-date information."), |
| Tool(name="Python REPL", func=python_tool.run, description="Execute Python code for math/data manipulation."), |
| ] |
|
|
| TOOL_MAP = {tool.name.lower().replace(" ", ""): tool.func for tool in tools} |
|
|
| |
| |
| |
| memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True) |
| agent = initialize_agent( |
| tools=tools, |
| llm=llm, |
| agent="conversational-react-description", |
| memory=memory, |
| verbose=True |
| ) |
|
|
| |
| |
| |
| plan_prompt = PromptTemplate( |
| input_variables=["goal"], |
| template=""" |
| You are Medini Planner. Decompose the high-level goal into a JSON object containing a 'steps' array (max 6 steps). Each step must have: id (integer), name (short string), description (detailed instruction), and tool_hint (either 'recall', 'search', 'python', or 'agent'). |
| Return JSON only. |
| Goal: {goal} |
| """ |
| ) |
| planner_chain = LLMChain(llm=llm, prompt=plan_prompt) |
|
|
| def create_plan(goal: str) -> Dict[str, Any]: |
| raw = planner_chain.run(goal=goal) |
| m = re.search(r"\{.*\}", raw, flags=re.DOTALL) |
| json_str = m.group(0) if m else raw |
| json_str = json_str.replace("```json", "").replace("```", "").strip() |
|
|
| try: |
| plan = json.loads(json_str) |
| if 'steps' not in plan: |
| raise ValueError("Parsed JSON missing 'steps'.") |
| return plan |
| except json.JSONDecodeError as e: |
| print(f"JSON Parsing Error: {e} in string: {json_str[:200]}...") |
| raise ValueError("Planner returned malformed JSON.") from e |
|
|
| def execute_step(step: Dict[str, Any]) -> Dict[str, Any]: |
| hint = (step.get("tool_hint") or "").lower() |
| input_text = step.get("description") |
| output, status = "Execution skipped.", "error" |
|
|
| try: |
| tool_func = None |
| if "recall" in hint: |
| tool_func = TOOL_MAP.get("knowledgerecall") |
| elif "search" in hint: |
| tool_func = TOOL_MAP.get("websearch") |
| elif "python" in hint: |
| tool_func = TOOL_MAP.get("pythonrepl") |
|
|
| if tool_func: |
| output = tool_func(input_text) |
| else: |
| output = agent.run(input_text) |
|
|
| status = "ok" |
| except Exception as e: |
| output = f"Execution Error: {str(e)}" |
|
|
| chroma_db.add_documents([Document(page_content=f"Step {step['id']} - {step['name']} Result: {output}")]) |
| return {"id": step['id'], "name": step['name'], "status": status, "output": output} |
|
|
| def execute_plan(goal: str) -> Dict[str, Any]: |
| try: |
| plan = create_plan(goal) |
| except ValueError as e: |
| return {"goal": goal, "error": str(e)} |
|
|
| results = [execute_step(step) for step in plan.get("steps", [])] |
| return {"goal": goal, "plan": plan, "results": results} |
|
|
| |
| |
| |
| app = FastAPI(title="Medini Agent API") |
|
|
| @app.post("/chat") |
| def chat_endpoint(message: str, auth: bool = Depends(verify_jwt)): |
| response = agent.run(message) |
| return {"response": response} |
|
|
| @app.post("/goal") |
| def goal_endpoint(goal: str, auth: bool = Depends(verify_jwt)): |
| return execute_plan(goal) |
|
|
| |
| |
| |
| def gradio_chat(message, history): |
| try: |
| response = agent.run(message) |
| history.append((message, response)) |
| except Exception as e: |
| history.append((message, f"Error: {str(e)}")) |
| return history, "" |
|
|
| def gradio_execute_plan(goal): |
| try: |
| return execute_plan(goal) |
| except Exception as e: |
| return {"error": f"Failed to execute plan: {str(e)}"} |
|
|
| with gr.Blocks(theme=gr.themes.Soft()) as demo: |
| gr.Markdown("# 🤖 Medini Autonomous Agent") |
| gr.Markdown("Chat or submit high-level goals. Agentic AI handles reasoning, memory, and tool use.") |
|
|
| with gr.Row(): |
| with gr.Column(scale=2): |
| gr.Markdown("## Conversational Chat") |
| chatbot = gr.Chatbot(height=400) |
| msg = gr.Textbox(placeholder="Type your message...", label="Chat Input") |
| clear_btn = gr.Button("Clear Chat") |
| msg.submit(gradio_chat, [msg, chatbot], [chatbot, msg]) |
| clear_btn.click(lambda: [], None, chatbot, queue=False) |
|
|
| with gr.Column(scale=1): |
| gr.Markdown("## Autonomous Goal Planner") |
| goal_input = gr.Textbox(placeholder="Enter high-level goal...", label="Goal") |
| run_goal_btn = gr.Button("Run Goal", variant="primary") |
| gr.Markdown("---") |
| gr.Markdown("### Execution Report") |
| goal_output = gr.JSON(label="Plan and Results") |
| run_goal_btn.click(gradio_execute_plan, [goal_input], goal_output) |
|
|
| |
| |
| |
| if __name__ == "__main__": |
| def start_api(): |
| uvicorn.run(app, host="0.0.0.0", port=8000, log_level="critical") |
| threading.Thread(target=start_api, daemon=True).start() |
| demo.launch(share=False) |
|
|