iLearn / app.py
broadfield-dev's picture
Update app.py
daf6794 verified
import os
DEMO_MODE = False
MEMORY_STORAGE_TYPE = "RAM"
HF_DATASET_MEMORY_REPO = "broadfield-dev/ai-brain"
HF_DATASET_RULES_REPO = "broadfield-dev/ai-rules"
os.environ['STORAGE_BACKEND'] = MEMORY_STORAGE_TYPE
if MEMORY_STORAGE_TYPE == "HF_DATASET":
os.environ['HF_MEMORY_DATASET_REPO'] = HF_DATASET_MEMORY_REPO
os.environ['HF_RULES_DATASET_REPO'] = HF_DATASET_RULES_REPO
import json
import re
import logging
from datetime import datetime
from dotenv import load_dotenv
import gradio as gr
import time
import tempfile
import xml.etree.ElementTree as ET
from PIL import Image, ImageDraw
from model_logic import (
get_available_providers, get_model_display_names_for_provider,
get_default_model_display_name_for_provider, call_model_stream, MODELS_BY_PROVIDER
)
from memory_logic import (
initialize_memory_system,
add_memory_entry, retrieve_memories_semantic, get_all_memories_cached, clear_all_memory_data_backend,
add_rule_entry, retrieve_rules_semantic, remove_rule_entry, get_all_rules_cached, clear_all_rules_data_backend,
save_faiss_indices_to_disk, STORAGE_BACKEND as MEMORY_STORAGE_BACKEND, SQLITE_DB_PATH as MEMORY_SQLITE_PATH,
HF_MEMORY_DATASET_REPO as MEMORY_HF_MEM_REPO, HF_RULES_DATASET_REPO as MEMORY_HF_RULES_REPO,
load_rules_from_file, load_memories_from_file, process_rules_from_text_blob, import_kb_from_kv_dict
)
from websearch_logic import scrape_url, search_and_scrape_duckduckgo, search_and_scrape_google
from image_kb_logic import (
set_pil_image_format_to_png,
extract_data_from_image,
decrypt_data,
InvalidTag,
parse_kv_string_to_dict,
convert_kb_to_kv_string,
generate_brain_carrier_image,
draw_key_list_dropdown_overlay,
encrypt_data,
embed_data_in_image,
_get_font,
PREFERRED_FONTS,
)
from prompts import (
DEFAULT_SYSTEM_PROMPT,
METRIC_GENERATION_SYSTEM_PROMPT,
METRIC_GENERATION_USER_PROMPT_TEMPLATE,
PLAN_GENERATION_SYSTEM_PROMPT,
PLAN_GENERATION_USER_PROMPT_TEMPLATE,
INSIGHT_GENERATION_SYSTEM_PROMPT,
INSIGHT_GENERATION_USER_PROMPT_TEMPLATE
)
from gradio_client import Client
load_dotenv()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(threadName)s - %(message)s')
logger = logging.getLogger(__name__)
for lib_name in ["urllib3", "requests", "huggingface_hub", "PIL.PngImagePlugin", "matplotlib", "gradio_client.client", "multipart.multipart", "httpx", "sentence_transformers", "faiss", "datasets"]:
if logging.getLogger(lib_name): logging.getLogger(lib_name).setLevel(logging.WARNING)
WEB_SEARCH_ENABLED = os.getenv("WEB_SEARCH_ENABLED", "true").lower() == "true"
MAX_HISTORY_TURNS = int(os.getenv("MAX_HISTORY_TURNS", 7))
current_chat_session_history = []
LOAD_RULES_FILE = os.getenv("LOAD_RULES_FILE")
LOAD_MEMORIES_FILE = os.getenv("LOAD_MEMORIES_FILE")
logger.info(f"App Config: WebSearch={WEB_SEARCH_ENABLED}, MemoryBackend={MEMORY_STORAGE_BACKEND}")
logger.info(f"Startup loading: Rules from {LOAD_RULES_FILE or 'None'}, Memories from {LOAD_MEMORIES_FILE or 'None'}")
def format_insights_for_prompt(retrieved_insights_list: list[str]) -> tuple[str, list[dict]]:
if not retrieved_insights_list:
return "No specific guiding principles or learned insights retrieved.", []
parsed = []
for text in retrieved_insights_list:
match = re.match(r"\[(CORE_RULE|RESPONSE_PRINCIPLE|BEHAVIORAL_ADJUSTMENT|GENERAL_LEARNING)\|([\d\.]+?)\](.*)", text.strip(), re.DOTALL | re.IGNORECASE)
if match:
parsed.append({"type": match.group(1).upper().replace(" ", "_"), "score": match.group(2), "text": match.group(3).strip(), "original": text.strip()})
else:
parsed.append({"type": "GENERAL_LEARNING", "score": "0.5", "text": text.strip(), "original": text.strip()})
try:
parsed.sort(key=lambda x: float(x["score"]) if x["score"].replace('.', '', 1).isdigit() else -1.0, reverse=True)
except ValueError: logger.warning("FORMAT_INSIGHTS: Sort error due to invalid score format.")
grouped = {"CORE_RULE": [], "RESPONSE_PRINCIPLE": [], "BEHAVIORAL_ADJUSTMENT": [], "GENERAL_LEARNING": []}
for p_item in parsed: grouped.get(p_item["type"], grouped["GENERAL_LEARNING"]).append(f"- (Score: {p_item['score']}) {p_item['text']}")
sections = [f"{k.replace('_', ' ').title()}:\n" + "\n".join(v) for k, v in grouped.items() if v]
return "\n\n".join(sections) if sections else "No guiding principles retrieved.", parsed
def generate_interaction_metrics(user_input: str, bot_response: str, provider: str, model_display_name: str, api_key_override: str = None) -> dict:
metric_start_time = time.time()
logger.info(f"Generating metrics with: {provider}/{model_display_name}")
metric_prompt_content = METRIC_GENERATION_USER_PROMPT_TEMPLATE.format(user_input=user_input, bot_response=bot_response)
metric_messages = [{"role": "system", "content": METRIC_GENERATION_SYSTEM_PROMPT}, {"role": "user", "content": metric_prompt_content}]
try:
metrics_provider_final, metrics_model_display_final = provider, model_display_name
metrics_model_env = os.getenv("METRICS_MODEL")
if metrics_model_env and "/" in metrics_model_env:
m_prov, m_id = metrics_model_env.split('/', 1)
m_disp_name = next((dn for dn, mid in MODELS_BY_PROVIDER.get(m_prov.lower(), {}).get("models", {}).items() if mid == m_id), None)
if m_disp_name: metrics_provider_final, metrics_model_display_final = m_prov, m_disp_name
else: logger.warning(f"METRICS_MODEL '{metrics_model_env}' not found, using interaction model.")
response_chunks = list(call_model_stream(provider=metrics_provider_final, model_display_name=metrics_model_display_final, messages=metric_messages, api_key_override=api_key_override, temperature=0.05, max_tokens=200))
resp_str = "".join(response_chunks).strip()
json_match = re.search(r"```json\s*(\{.*?\})\s*```", resp_str, re.DOTALL | re.IGNORECASE) or re.search(r"(\{.*?\})", resp_str, re.DOTALL)
if json_match: metrics_data = json.loads(json_match.group(1))
else:
logger.warning(f"METRICS_GEN: Non-JSON response from {metrics_provider_final}/{metrics_model_display_final}: '{resp_str}'")
return {"takeaway": "N/A", "response_success_score": 0.5, "future_confidence_score": 0.5, "error": "metrics format error"}
parsed_metrics = {"takeaway": metrics_data.get("takeaway", "N/A"), "response_success_score": float(metrics_data.get("response_success_score", 0.5)), "future_confidence_score": float(metrics_data.get("future_confidence_score", 0.5)), "error": metrics_data.get("error")}
logger.info(f"METRICS_GEN: Generated in {time.time() - metric_start_time:.2f}s. Data: {parsed_metrics}")
return parsed_metrics
except Exception as e:
logger.error(f"METRICS_GEN Error: {e}", exc_info=False)
return {"takeaway": "N/A", "response_success_score": 0.5, "future_confidence_score": 0.5, "error": str(e)}
def _generate_action_plan(
original_query: str, provider_name: str, model_display_name: str, ui_api_key_override: str | None, chat_history: list[dict]
) -> dict:
history_str = "\n".join([f"{msg['role']}: {msg['content'][:150]}" for msg in chat_history[-4:]])
plan_user_prompt = PLAN_GENERATION_USER_PROMPT_TEMPLATE.format(history_str=history_str, original_query=original_query)
plan_messages = [{"role": "system", "content": PLAN_GENERATION_SYSTEM_PROMPT}, {"role": "user", "content": plan_user_prompt}]
try:
response_chunks = list(call_model_stream(
provider=provider_name,
model_display_name=model_display_name,
messages=plan_messages,
api_key_override=ui_api_key_override,
temperature=0.0,
max_tokens=1000
))
resp_str = "".join(response_chunks).strip()
json_match = re.search(r"\{.*\}", resp_str, re.DOTALL)
if json_match:
plan_data = json.loads(json_match.group(0))
return plan_data
except Exception as e:
logger.error(f"PLAN_GEN: Failed to generate or parse action plan: {e}")
return {
"action_type": "multi_step_plan",
"plan": [
{"tool": "web_search", "task": original_query},
{"tool": "respond", "task": "Synthesize all information from the scratchpad and provide a comprehensive final answer to the user."}
]
}
def process_user_interaction_gradio(
user_input: str,
max_research_steps: int,
provider_name: str,
model_display_name: str,
chat_history: list[dict],
custom_system_prompt: str = None,
ui_api_key_override: str = None,
):
process_start_time = time.time()
request_id = os.urandom(4).hex()
logger.info(f"PUI_GRADIO [{request_id}] Start. User: '{user_input[:50]}...' Max Steps: {max_research_steps}")
yield "status", "<i>[Deciding on an action plan...]</i>"
action_plan_data = _generate_action_plan(user_input, provider_name, model_display_name, ui_api_key_override, chat_history)
action_type = action_plan_data.get("action_type")
if action_type == "fast_response":
yield "status", "<i>[Executing fast response...]</i>"
yield "plan", [{"tool": "fast_response", "task": action_plan_data.get("reason", "Direct answer.")}]
now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
final_sys_prompt = custom_system_prompt or DEFAULT_SYSTEM_PROMPT
final_sys_prompt = f"Current Date/Time: {now_str}.\n\n" + final_sys_prompt
messages_for_llm = [{"role": "system", "content": final_sys_prompt}] + chat_history + [{"role": "user", "content": user_input}]
streamed_response = ""
try:
for chunk in call_model_stream(provider=provider_name, model_display_name=model_display_name, messages=messages_for_llm, api_key_override=ui_api_key_override, temperature=0.7, max_tokens=3000):
streamed_response += chunk
yield "response_chunk", chunk
except Exception as e:
streamed_response = f"\n\n(Error during fast response: {str(e)[:150]})"
yield "response_chunk", streamed_response
final_bot_text = streamed_response.strip()
yield "final_response", {"response": final_bot_text}
return
plan = action_plan_data.get("plan", [])
if not plan:
plan = [{"tool": "web_search", "task": user_input}, {"tool": "respond", "task": "Synthesize a response."}]
yield "plan", plan
research_scratchpad = ""
now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
for i, step_action in enumerate(plan):
tool = step_action.get("tool")
task = step_action.get("task")
if tool == 'respond':
break
if i + 1 > max_research_steps:
research_scratchpad += f"\n\n---NOTE: Maximum research step budget of {max_research_steps} reached. Proceeding to final response.---\n"
logger.warning(f"PUI_GRADIO [{request_id}]: Max research steps ({max_research_steps}) reached.")
break
task_for_display = str(task) if isinstance(task, dict) else task
yield "status", f"<i>[Executing Step {i+1}/{len(plan)-1}: {tool} -> {task_for_display[:70]}...]</i>"
step_findings = f"Step {i+1} ({tool}: '{task_for_display[:1000]}'): "
if tool == 'web_search':
try:
web_results = search_and_scrape_duckduckgo(task, num_results=5)
scraped_content = "\n".join([f"Source:\nURL:{r.get('url','N/A')}\nContent:\n{(r.get('content') or r.get('error') or 'N/A')[:1500]}\n---" for r in web_results]) if web_results else "No results found."
synthesis_prompt = f"Relevant web content for the task '{task}':\n\n{scraped_content}\n\nConcisely summarize the findings from the content."
summary = "".join(list(call_model_stream(provider=provider_name, model_display_name=model_display_name, messages=[{"role": "user", "content": synthesis_prompt}], api_key_override=ui_api_key_override, temperature=0.1, max_tokens=400)))
step_findings += summary
except Exception as e:
step_findings += f"Error during web search: {e}"
elif tool == 'web_scrape':
try:
web_results = scrape_url(task)
scraped_content = "\n".join([f"Source:\nURL:{r.get('url','N/A')}\nContent:\n{(r.get('content') or r.get('error') or 'N/A')[:1500]}\n---" for r in web_results]) if web_results else "No results found."
synthesis_prompt = f"Relevant web content for the task '{task}':\n\n{scraped_content}\n\nConcisely summarize the findings from the content."
summary = "".join(list(call_model_stream(provider=provider_name, model_display_name=model_display_name, messages=[{"role": "user", "content": synthesis_prompt}], api_key_override=ui_api_key_override, temperature=0.1, max_tokens=400)))
step_findings += summary
except Exception as e:
step_findings += f"Error during web scrape: {e}"
elif tool == 'gradio_view_api':
try:
client = Client(task)
api_info = client.view_api(all_endpoints=True)
summary = str(api_info)
if summary and summary.strip():
step_findings += f"Successfully retrieved API endpoints for space '{task}':\n{summary}"
else:
step_findings += f"Could not retrieve valid API endpoint information for space '{task}'."
except Exception as e:
error_message = f"Error viewing Gradio API for space '{task}': {e}"
logger.error(f"GRADIO_VIEW_API_TOOL Error: {e}\nTask was: {task}", exc_info=True)
step_findings += error_message
elif tool == 'gradio_client':
try:
if isinstance(task, str):
try:
params = json.loads(task)
except json.JSONDecodeError:
json_match = re.search(r"\{.*\}", task, re.DOTALL)
if json_match:
params = json.loads(json_match.group(0))
else:
raise ValueError("Task is not a valid JSON string or does not contain a JSON object.")
elif isinstance(task, dict):
params = task
else:
raise TypeError(f"Unsupported task type for gradio_client: {type(task)}")
space_id = params.get("space_id")
api_name = params.get("api_name")
parameters = params.get("parameters", {})
if not space_id or not api_name:
raise ValueError("Missing 'space_id' or 'api_name' in task JSON.")
if not isinstance(parameters, dict):
raise TypeError("The 'parameters' field in the task must be a JSON object (dictionary).")
client = Client(space_id)
result = client.predict(**parameters, api_name=api_name)
if isinstance(result, (str, int, float, bool)):
result_str = str(result)
elif isinstance(result, (dict, list)):
result_str = json.dumps(result, indent=2)
else:
result_str = f"Received result of type {type(result)}."
step_findings += f"Successfully called Gradio API {api_name} on space {space_id}. Result:\n{result_str}"
except Exception as e:
error_message = f"Error during Gradio Client operation: {e}"
logger.error(f"GRADIO_CLIENT_TOOL Error: {e}\nTask was: {task}", exc_info=True)
step_findings += error_message
elif tool == 'memory_search':
try:
retrieved_mems = retrieve_memories_semantic(task, k=3)
if retrieved_mems:
memory_context = "\n".join([f"- User: {m.get('user_input','')} -> AI: {m.get('bot_response','')} (Takeaway: {m.get('metrics',{}).get('takeaway','N/A')})" for m in retrieved_mems])
step_findings += f"Found relevant memories:\n{memory_context}"
else:
step_findings += "No relevant memories found."
except Exception as e:
step_findings += f"Error during memory search: {e}"
elif tool == 'think':
try:
think_prompt = f"Original Query: '{user_input}'\n\nResearch Scratchpad:\n```\n{research_scratchpad}\n```\n\nMy current thinking task is: '{task}'. Based on the scratchpad, what is the conclusion of this thinking step?"
thought = "".join(list(call_model_stream(provider=provider_name, model_display_name=model_display_name, messages=[{"role": "user", "content": think_prompt}], api_key_override=ui_api_key_override, temperature=0.3, max_tokens=500)))
step_findings += f"Conclusion: {thought}"
except Exception as e:
step_findings += f"Error during thinking step: {e}"
else:
step_findings += "Unknown tool specified in plan."
research_scratchpad += f"\n\n---\n{step_findings}\n---"
yield "step_result", {"step": i + 1, "tool": tool, "task": task_for_display, "result": step_findings}
yield "status", "<i>[Synthesizing final report...]</i>"
final_sys_prompt = custom_system_prompt or DEFAULT_SYSTEM_PROMPT
final_sys_prompt += f"\n\nCurrent Date/Time: {now_str}. You have just completed a research plan. Synthesize the information in the 'Research Scratchpad' into a final, comprehensive answer. Cite sources by including URLs if available."
final_user_prompt = f"Original user query: \"{user_input}\"\n\nResearch Scratchpad:\n```\n{research_scratchpad}\n```\n\nNow, provide the final, synthesized answer to the user."
final_messages = [{"role": "system", "content": final_sys_prompt}, {"role": "user", "content": final_user_prompt}]
streamed_response = ""
try:
for chunk in call_model_stream(provider=provider_name, model_display_name=model_display_name, messages=final_messages, api_key_override=ui_api_key_override, temperature=0.6, max_tokens=3000):
streamed_response += chunk
yield "response_chunk", chunk
except Exception as e:
error_msg = f"\n\n(Error during final synthesis: {str(e)[:150]})"
streamed_response += error_msg
yield "response_chunk", error_msg
final_bot_text = streamed_response.strip() or "(No response or error during synthesis.)"
logger.info(f"PUI_GRADIO [{request_id}]: Finished. Total: {time.time() - process_start_time:.2f}s. Resp len: {len(final_bot_text)}")
yield "final_response", {"response": final_bot_text}
def perform_post_interaction_learning(user_input: str, bot_response: str, provider: str, model_disp_name: str, api_key_override: str = None):
task_id = os.urandom(4).hex()
logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: START User='{user_input[:40]}...', Bot='{bot_response[:40]}...'")
learning_start_time = time.time()
significant_learnings_summary = []
try:
metrics = generate_interaction_metrics(user_input, bot_response, provider, model_disp_name, api_key_override)
logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: Metrics: {metrics}")
add_memory_entry(user_input, metrics, bot_response)
summary = f"User:\"{user_input}\"\nAI:\"{bot_response}\"\nMetrics(takeaway):{metrics.get('takeaway','N/A')},Success:{metrics.get('response_success_score','N/A')}"
existing_rules_ctx = "\n".join([f"- \"{r}\"" for r in retrieve_rules_semantic(f"{summary}\n{user_input}", k=10)]) or "No existing rules context."
insight_user_prompt = INSIGHT_GENERATION_USER_PROMPT_TEMPLATE.format(summary=summary, existing_rules_ctx=existing_rules_ctx)
insight_msgs = [{"role":"system", "content":INSIGHT_GENERATION_SYSTEM_PROMPT}, {"role":"user", "content":insight_user_prompt}]
insight_prov, insight_model_disp = provider, model_disp_name
insight_env_model = os.getenv("INSIGHT_MODEL_OVERRIDE")
if insight_env_model and "/" in insight_env_model:
i_p, i_id = insight_env_model.split('/', 1)
i_d_n = next((dn for dn, mid in MODELS_BY_PROVIDER.get(i_p.lower(), {}).get("models", {}).items() if mid == i_id), None)
if i_d_n: insight_prov, insight_model_disp = i_p, i_d_n
logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: Generating insights with {insight_prov}/{insight_model_disp} (expecting XML)")
raw_ops_xml_full = "".join(list(call_model_stream(provider=insight_prov, model_display_name=insight_model_disp, messages=insight_msgs, api_key_override=api_key_override, temperature=0.0, max_tokens=3500))).strip()
ops_data_list, processed_count = [], 0
xml_match = re.search(r"```xml\s*(<operations_list>.*</operations_list>)\s*```", raw_ops_xml_full, re.DOTALL | re.IGNORECASE) or \
re.search(r"(<operations_list>.*</operations_list>)", raw_ops_xml_full, re.DOTALL | re.IGNORECASE)
if xml_match:
xml_content_str = xml_match.group(1)
try:
root = ET.fromstring(xml_content_str)
if root.tag == "operations_list":
for op_element in root.findall("operation"):
action_el = op_element.find("action")
insight_el = op_element.find("insight")
old_insight_el = op_element.find("old_insight_to_replace")
action = action_el.text.strip().lower() if action_el is not None and action_el.text else None
insight_text = insight_el.text.strip() if insight_el is not None and insight_el.text else None
old_insight_text = old_insight_el.text.strip() if old_insight_el is not None and old_insight_el.text else None
if action and insight_text:
ops_data_list.append({
"action": action,
"insight": insight_text,
"old_insight_to_replace": old_insight_text
})
else:
logger.warning(f"POST_INTERACTION_LEARNING [{task_id}]: Skipped XML operation due to missing action or insight text. Action: {action}, Insight: {insight_text}")
else:
logger.error(f"POST_INTERACTION_LEARNING [{task_id}]: XML root tag is not <operations_list>. Found: {root.tag}. XML content:\n{xml_content_str}")
except ET.ParseError as e:
logger.error(f"POST_INTERACTION_LEARNING [{task_id}]: XML parsing error: {e}. XML content that failed:\n{xml_content_str}")
except Exception as e_xml_proc:
logger.error(f"POST_INTERACTION_LEARNING [{task_id}]: Error processing parsed XML: {e_xml_proc}. XML content:\n{xml_content_str}")
else:
logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: No <operations_list> XML structure found in LLM output. Full raw output:\n{raw_ops_xml_full}")
if ops_data_list:
logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: LLM provided {len(ops_data_list)} insight ops from XML.")
for op_idx, op_data in enumerate(ops_data_list):
action = op_data["action"]
insight_text = op_data["insight"]
old_insight = op_data["old_insight_to_replace"]
if not re.match(r"\[(CORE_RULE|RESPONSE_PRINCIPLE|BEHAVIORAL_ADJUSTMENT|GENERAL_LEARNING)\|([\d\.]+?)\]", insight_text, re.I|re.DOTALL):
logger.warning(f"POST_INTERACTION_LEARNING [{task_id}]: Op {op_idx}: Skipped op due to invalid insight_text format from XML: '{insight_text[:100]}...'")
continue
if action == "add":
success, status_msg = add_rule_entry(insight_text)
if success:
processed_count +=1
if insight_text.upper().startswith("[CORE_RULE"):
significant_learnings_summary.append(f"New Core Rule Added: {insight_text}")
else: logger.warning(f"POST_INTERACTION_LEARNING [{task_id}]: Op {op_idx} (add from XML): Failed to add rule '{insight_text[:50]}...'. Status: {status_msg}")
elif action == "update":
if old_insight and old_insight != insight_text:
remove_success = remove_rule_entry(old_insight)
if not remove_success:
logger.warning(f"POST_INTERACTION_LEARNING [{task_id}]: Op {op_idx} (update from XML): Failed to remove old rule '{old_insight[:50]}...' before adding new.")
success, status_msg = add_rule_entry(insight_text)
if success:
processed_count +=1
if insight_text.upper().startswith("[CORE_RULE"):
significant_learnings_summary.append(f"Core Rule Updated to: {insight_text}")
else: logger.warning(f"POST_INTERACTION_LEARNING [{task_id}]: Op {op_idx} (update from XML): Failed to add/update rule '{insight_text[:50]}...'. Status: {status_msg}")
else:
logger.warning(f"POST_INTERACTION_LEARNING [{task_id}]: Op {op_idx}: Skipped op due to unknown action '{action}' from XML.")
if significant_learnings_summary:
learning_digest = "SYSTEM CORE LEARNING DIGEST:\n" + "\n".join(significant_learnings_summary)
system_metrics = {
"takeaway": "Core knowledge refined.",
"response_success_score": 1.0,
"future_confidence_score": 1.0,
"type": "SYSTEM_REFLECTION"
}
add_memory_entry(
user_input="SYSTEM_INTERNAL_REFLECTION_TRIGGER",
metrics=system_metrics,
bot_response=learning_digest
)
logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: Added CORE_LEARNING_DIGEST to memories: {learning_digest[:100]}...")
logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: Processed {processed_count} insight ops out of {len(ops_data_list)} received from XML.")
else:
logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: No valid insight operations derived from LLM's XML output.")
except Exception as e: logger.error(f"POST_INTERACTION_LEARNING [{task_id}]: CRITICAL ERROR in learning task: {e}", exc_info=True)
logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: END. Total: {time.time() - learning_start_time:.2f}s")
def handle_gradio_chat_submit(user_msg_txt: str, max_research_steps: int, gr_hist_list: list, sel_prov_name: str, sel_model_disp_name: str, ui_api_key: str|None, cust_sys_prompt: str):
global current_chat_session_history
cleared_input, updated_gr_hist, status_txt = "", list(gr_hist_list), "Initializing..."
updated_rules_text = ui_refresh_rules_display_fn()
updated_mems_json = ui_refresh_memories_display_fn()
log_html_output = gr.HTML("<p><i>Research Log will appear here.</i></p>")
final_report_tb = gr.Textbox(value="*Waiting...*", interactive=True, show_copy_button=True)
dl_report_btn = gr.DownloadButton(interactive=False, value=None, visible=False)
if not user_msg_txt.strip():
status_txt = "Error: Empty message."
updated_gr_hist.append((user_msg_txt or "(Empty)", status_txt))
yield (cleared_input, updated_gr_hist, status_txt, log_html_output, final_report_tb, dl_report_btn, updated_rules_text, updated_mems_json)
return
updated_gr_hist.append((user_msg_txt, "<i>Thinking... See Research Log below for progress.</i>"))
yield (cleared_input, updated_gr_hist, status_txt, log_html_output, final_report_tb, dl_report_btn, updated_rules_text, updated_mems_json)
internal_hist = list(current_chat_session_history)
final_bot_resp_acc = ""
temp_dl_file_path = None
try:
processor_gen = process_user_interaction_gradio(
user_input=user_msg_txt,
max_research_steps=max_research_steps,
provider_name=sel_prov_name,
model_display_name=sel_model_disp_name,
chat_history=internal_hist,
custom_system_prompt=cust_sys_prompt.strip() or None,
ui_api_key_override=ui_api_key.strip() if ui_api_key else None
)
curr_bot_disp_msg = ""
full_plan = []
log_html_parts = []
for upd_type, upd_data in processor_gen:
if upd_type == "status":
status_txt = upd_data
if "Deciding" in status_txt or "Executing" in status_txt:
log_html_output = gr.HTML(f"<p><i>{status_txt}</i></p>")
elif upd_type == "plan":
full_plan = upd_data
log_html_parts = ["<h3>Action Plan</h3><ol>"]
for i, step in enumerate(full_plan):
log_html_parts.append(f'<li id="log-step-{i+1}"><strong>{step.get("tool")}</strong>: {step.get("task")} <span style="color:gray;">(Pending)</span></li>')
log_html_parts.append("</ol><hr><h3>Log</h3>")
log_html_output = gr.HTML("".join(log_html_parts))
elif upd_type == "step_result":
step_num = upd_data["step"]
sanitized_result = upd_data["result"].replace('<', '<').replace('>', '>').replace('\n', '<br>')
log_html_parts[step_num] = f'<li id="log-step-{step_num}"><strong>{upd_data.get("tool")}</strong>: {upd_data.get("task")} <span style="color:green;">(Done)</span></li>'
log_html_parts.append(f'<div style="margin-left: 20px; padding: 5px; border-left: 2px solid #ccc;"><small style="color: #555;">{sanitized_result}</small></div>')
next_step_index_in_list = step_num + 1
if next_step_index_in_list < len(full_plan) + 1:
next_step_action = full_plan[step_num]
if next_step_action.get("tool") != "respond":
log_html_parts[next_step_index_in_list] = f'<li id="log-step-{next_step_index_in_list}"><strong>{next_step_action.get("tool")}</strong>: {next_step_action.get("task")} <span style="color:blue;">(In Progress...)</span></li>'
log_html_output = gr.HTML("".join(log_html_parts))
elif upd_type == "response_chunk":
curr_bot_disp_msg += upd_data
if updated_gr_hist and updated_gr_hist[-1][0] == user_msg_txt:
updated_gr_hist[-1] = (user_msg_txt, curr_bot_disp_msg)
elif upd_type == "final_response":
final_bot_resp_acc = upd_data["response"]
status_txt = "Response generated. Processing learning..."
if not curr_bot_disp_msg and final_bot_resp_acc: curr_bot_disp_msg = final_bot_resp_acc
if updated_gr_hist and updated_gr_hist[-1][0] == user_msg_txt:
updated_gr_hist[-1] = (user_msg_txt, curr_bot_disp_msg or "(No text)")
final_report_tb = gr.Textbox(value=curr_bot_disp_msg, interactive=True, show_copy_button=True)
if curr_bot_disp_msg and not curr_bot_disp_msg.startswith("Error:"):
try:
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".md", encoding='utf-8') as tmpfile:
tmpfile.write(curr_bot_disp_msg)
temp_dl_file_path = tmpfile.name
dl_report_btn = gr.DownloadButton(value=temp_dl_file_path, visible=True, interactive=True)
except Exception as e:
logger.error(f"Error creating temp file for download: {e}", exc_info=False)
dl_report_btn = gr.DownloadButton(interactive=False, value=None, visible=False, label="Download Error")
else:
dl_report_btn = gr.DownloadButton(interactive=False, value=None, visible=False)
yield (cleared_input, updated_gr_hist, status_txt, log_html_output, final_report_tb, dl_report_btn, updated_rules_text, updated_mems_json)
if upd_type == "final_response": break
except Exception as e:
logger.error(f"Chat handler error during main processing: {e}", exc_info=True)
status_txt = f"Error: {str(e)[:100]}"
error_message_for_chat = f"Sorry, an error occurred: {str(e)[:100]}"
if updated_gr_hist and updated_gr_hist[-1][0] == user_msg_txt:
updated_gr_hist[-1] = (user_msg_txt, error_message_for_chat)
final_report_tb = gr.Textbox(value=error_message_for_chat, interactive=True)
dl_report_btn = gr.DownloadButton(interactive=False, value=None, visible=False)
log_html_output = gr.HTML(f'<p style="color:red;"><strong>Error processing request.</strong></p>')
current_rules_text_on_error = ui_refresh_rules_display_fn()
current_mems_json_on_error = ui_refresh_memories_display_fn()
yield (cleared_input, updated_gr_hist, status_txt, log_html_output, final_report_tb, dl_report_btn, current_rules_text_on_error, current_mems_json_on_error)
if temp_dl_file_path and os.path.exists(temp_dl_file_path):
try: os.unlink(temp_dl_file_path)
except Exception as e_unlink: logger.error(f"Error deleting temp download file {temp_dl_file_path} after error: {e_unlink}")
return
if final_bot_resp_acc and not final_bot_resp_acc.startswith("Error:"):
current_chat_session_history.extend([{"role": "user", "content": user_msg_txt}, {"role": "assistant", "content": final_bot_resp_acc}])
status_txt = "<i>[Performing post-interaction learning...]</i>"
current_rules_text_before_learn = ui_refresh_rules_display_fn()
current_mems_json_before_learn = ui_refresh_memories_display_fn()
yield (cleared_input, updated_gr_hist, status_txt, log_html_output, final_report_tb, dl_report_btn, current_rules_text_before_learn, current_mems_json_before_learn)
try:
perform_post_interaction_learning(
user_input=user_msg_txt,
bot_response=final_bot_resp_acc,
provider=sel_prov_name,
model_disp_name=sel_model_disp_name,
api_key_override=ui_api_key.strip() if ui_api_key else None
)
status_txt = "Response & Learning Complete."
except Exception as e_learn:
logger.error(f"Error during post-interaction learning: {e_learn}", exc_info=True)
status_txt = "Response complete. Error during learning."
else:
status_txt = "Processing finished; no valid response or error occurred."
updated_rules_text = ui_refresh_rules_display_fn()
updated_mems_json = ui_refresh_memories_display_fn()
yield (cleared_input, updated_gr_hist, status_txt, log_html_output, final_report_tb, dl_report_btn, updated_rules_text, updated_mems_json)
if temp_dl_file_path and os.path.exists(temp_dl_file_path):
try: os.unlink(temp_dl_file_path)
except Exception as e_unlink: logger.error(f"Error deleting temp download file {temp_dl_file_path}: {e_unlink}")
def ui_refresh_rules_display_fn(): return "\n\n---\n\n".join(get_all_rules_cached()) or "No rules found."
def ui_refresh_memories_display_fn(): return get_all_memories_cached() or []
def ui_download_rules_action_fn():
rules_content = "\n\n---\n\n".join(get_all_rules_cached())
if not rules_content.strip():
gr.Warning("No rules to download.")
return gr.DownloadButton(value=None, interactive=False, label="No Rules")
try:
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt", encoding='utf-8') as tmpfile:
tmpfile.write(rules_content)
return tmpfile.name
except Exception as e:
logger.error(f"Error creating rules download file: {e}")
gr.Error(f"Failed to prepare rules for download: {e}")
return gr.DownloadButton(value=None, interactive=False, label="Error")
def ui_upload_rules_action_fn(uploaded_file_obj, progress=gr.Progress()):
if not uploaded_file_obj: return "No file provided for rules upload."
try:
with open(uploaded_file_obj.name, 'r', encoding='utf-8') as f: content = f.read()
except Exception as e_read: return f"Error reading file: {e_read}"
if not content.strip(): return "Uploaded rules file is empty."
added_count, skipped_count, error_count = 0,0,0
potential_rules = []
file_name_lower = uploaded_file_obj.name.lower()
if file_name_lower.endswith(".txt"):
potential_rules = content.split("\n\n---\n\n")
if len(potential_rules) == 1 and "\n" in content:
potential_rules = [r.strip() for r in content.splitlines() if r.strip()]
elif file_name_lower.endswith(".jsonl"):
for line_num, line in enumerate(content.splitlines()):
line = line.strip()
if line:
try:
rule_text_in_json_string = json.loads(line)
if isinstance(rule_text_in_json_string, str):
potential_rules.append(rule_text_in_json_string)
else:
logger.warning(f"Rule Upload (JSONL): Line {line_num+1} did not contain a string value. Got: {type(rule_text_in_json_string)}")
error_count +=1
except json.JSONDecodeError:
logger.warning(f"Rule Upload (JSONL): Line {line_num+1} failed to parse as JSON: {line[:100]}")
error_count +=1
else:
return "Unsupported file type for rules. Please use .txt or .jsonl."
valid_potential_rules = [r.strip() for r in potential_rules if r.strip()]
total_to_process = len(valid_potential_rules)
if total_to_process == 0 and error_count == 0: return "No valid rules found in file to process."
elif total_to_process == 0 and error_count > 0: return f"No valid rules found to process. Encountered {error_count} parsing/format errors."
progress(0, desc="Starting rules upload...")
for idx, rule_text in enumerate(valid_potential_rules):
success, status_msg = add_rule_entry(rule_text)
if success: added_count += 1
elif status_msg == "duplicate": skipped_count += 1
else: error_count += 1
progress((idx + 1) / total_to_process, desc=f"Processed {idx+1}/{total_to_process} rules...")
msg = f"Rules Upload: Total valid rule segments processed: {total_to_process}. Added: {added_count}, Skipped (duplicates): {skipped_count}, Errors (parsing/add): {error_count}."
logger.info(msg); return msg
def ui_download_memories_action_fn():
memories = get_all_memories_cached()
if not memories:
gr.Warning("No memories to download.")
return gr.DownloadButton(value=None, interactive=False, label="No Memories")
jsonl_content = ""
for mem_dict in memories:
try: jsonl_content += json.dumps(mem_dict) + "\n"
except Exception as e: logger.error(f"Error serializing memory for download: {mem_dict}, Error: {e}")
if not jsonl_content.strip():
gr.Warning("No valid memories to serialize for download.")
return gr.DownloadButton(value=None, interactive=False, label="No Data")
try:
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".jsonl", encoding='utf-8') as tmpfile:
tmpfile.write(jsonl_content)
return tmpfile.name
except Exception as e:
logger.error(f"Error creating memories download file: {e}")
gr.Error(f"Failed to prepare memories for download: {e}")
return gr.DownloadButton(value=None, interactive=False, label="Error")
def ui_upload_memories_action_fn(uploaded_file_obj, progress=gr.Progress()):
if not uploaded_file_obj: return "No file provided for memories upload."
try:
with open(uploaded_file_obj.name, 'r', encoding='utf-8') as f: content = f.read()
except Exception as e_read: return f"Error reading file: {e_read}"
if not content.strip(): return "Uploaded memories file is empty."
added_count, format_error_count, save_error_count = 0,0,0
memory_objects_to_process = []
file_ext = os.path.splitext(uploaded_file_obj.name.lower())[1]
if file_ext == ".json":
try:
parsed_json = json.loads(content)
if isinstance(parsed_json, list): memory_objects_to_process = parsed_json
elif isinstance(parsed_json, dict): memory_objects_to_process = [parsed_json]
else:
logger.warning(f"Memories Upload (.json): File content is not a JSON list or object. Type: {type(parsed_json)}"); format_error_count = 1
except json.JSONDecodeError as e:
logger.warning(f"Memories Upload (.json): Invalid JSON file. Error: {e}"); format_error_count = 1
elif file_ext == ".jsonl":
for line_num, line in enumerate(content.splitlines()):
line = line.strip()
if line:
try: memory_objects_to_process.append(json.loads(line))
except json.JSONDecodeError:
logger.warning(f"Memories Upload (.jsonl): Line {line_num+1} parse error: {line[:100]}"); format_error_count += 1
else: return "Unsupported file type for memories. Please use .json or .jsonl."
if not memory_objects_to_process and format_error_count > 0 : return f"Memories Upload: File parsing failed. Found {format_error_count} format errors and no processable objects."
elif not memory_objects_to_process: return "No valid memory objects found in the uploaded file."
total_to_process = len(memory_objects_to_process)
if total_to_process == 0: return "No memory objects to process (after parsing)."
progress(0, desc="Starting memories upload...")
for idx, mem_data in enumerate(memory_objects_to_process):
if isinstance(mem_data, dict) and all(k in mem_data for k in ["user_input", "bot_response", "metrics"]):
success, _ = add_memory_entry(mem_data["user_input"], mem_data["metrics"], mem_data["bot_response"])
if success: added_count += 1
else: save_error_count += 1
else:
logger.warning(f"Memories Upload: Skipped invalid memory object structure: {str(mem_data)[:100]}"); format_error_count += 1
progress((idx + 1) / total_to_process, desc=f"Processed {idx+1}/{total_to_process} memories...")
msg = f"Memories Upload: Processed {total_to_process} objects. Added: {added_count}, Format/Structure Errors: {format_error_count}, Save Errors: {save_error_count}."
logger.info(msg); return msg
def save_edited_rules_action_fn(edited_rules_text: str, progress=gr.Progress()):
if DEMO_MODE:
gr.Warning("Saving edited rules is disabled in Demo Mode.")
return "Saving edited rules is disabled in Demo Mode."
if not edited_rules_text.strip():
return "No rules text to save."
stats = process_rules_from_text_blob(edited_rules_text, progress)
return f"Editor Save: Added: {stats['added']}, Skipped (duplicates): {stats['skipped']}, Errors/Invalid: {stats['errors']} from {stats['total']} unique rules in text."
def ui_upload_kb_from_image_fn(uploaded_image_filepath: str, password: str, progress=gr.Progress()):
if DEMO_MODE:
gr.Warning("Uploading is disabled in Demo Mode.")
return "Upload disabled in Demo Mode."
if not uploaded_image_filepath:
return "No image file provided or pasted."
progress(0, desc="Loading and standardizing image...")
try:
img_temp = Image.open(uploaded_image_filepath)
img = set_pil_image_format_to_png(img_temp)
except Exception as e:
logger.error(f"KB ImgUL: Open/Standardize fail: {e}")
return f"Error: Could not open or process image file: {e}"
progress(0.2, desc="Extracting data from image...")
try:
extracted_bytes = extract_data_from_image(img)
if not extracted_bytes: return "No data found embedded in the image."
except ValueError as e:
logger.error(f"KB ImgUL: Extract fail: {e}")
return f"Error extracting data: {e}"
except Exception as e:
logger.error(f"KB ImgUL: Extract error: {e}", exc_info=True)
return f"Unexpected extraction error: {e}"
kv_string = ""
try:
if extracted_bytes[:20].decode('utf-8', errors='ignore').strip().startswith("# iLearn"):
kv_string = extracted_bytes.decode('utf-8')
progress(0.4, desc="Parsing data...")
elif password and password.strip():
progress(0.3, desc="Attempting decryption...")
kv_string = decrypt_data(extracted_bytes, password.strip()).decode('utf-8')
progress(0.4, desc="Parsing decrypted data...")
else: return "Data appears encrypted, but no password was provided."
except (UnicodeDecodeError, InvalidTag, ValueError) as e:
if "decryption" in str(e).lower() or isinstance(e, InvalidTag):
return f"Decryption Failed. Check password or file integrity. Details: {e}"
return "Data is binary and requires a password for decryption."
except Exception as e:
logger.error(f"KB ImgUL: Decrypt/Parse error: {e}", exc_info=True)
return f"Unexpected error during decryption or parsing: {e}"
if not kv_string: return "Could not get data from image (after potential decryption)."
try:
kv_dict = parse_kv_string_to_dict(kv_string)
except Exception as e:
logger.error(f"KB ImgUL: Parse fail: {e}")
return f"Error parsing data: {e}"
if not kv_dict: return "Parsed data is empty."
stats = import_kb_from_kv_dict(kv_dict, progress)
msg = f"Upload Complete. Rules - Add: {stats['rules_added']}, Skip: {stats['rules_skipped']}, Err: {stats['rules_errors']}. Mems - Add: {stats['mems_added']}, Err: {stats['mems_errors']}."
logger.info(f"Image KB Upload: {msg}")
return msg
def app_load_fn():
logger.info("App loading. Initializing systems...")
initialize_memory_system()
logger.info("Memory system initialized.")
rules_added, rules_skipped, rules_errors = load_rules_from_file(LOAD_RULES_FILE)
rules_load_msg = f"Rules: Added {rules_added}, Skipped {rules_skipped}, Errors {rules_errors} from {LOAD_RULES_FILE or 'None'}."
logger.info(rules_load_msg)
mems_added, mems_format_errors, mems_save_errors = load_memories_from_file(LOAD_MEMORIES_FILE)
mems_load_msg = f"Memories: Added {mems_added}, Format Errors {mems_format_errors}, Save Errors {mems_save_errors} from {LOAD_MEMORIES_FILE or 'None'}."
logger.info(mems_load_msg)
final_status = f"AI Systems Initialized. {rules_load_msg} {mems_load_msg} Ready."
rules_on_load, mems_on_load = ui_refresh_rules_display_fn(), ui_refresh_memories_display_fn()
return (final_status, rules_on_load, mems_on_load, gr.HTML("<p><i>Research Log will appear here.</i></p>"),
gr.Textbox(value="*Waiting...*", interactive=True, show_copy_button=True),
gr.DownloadButton(interactive=False, value=None, visible=False))
placeholder_filename = "placeholder_image.png"
try:
if not os.path.exists(placeholder_filename):
img = Image.new('RGB', (200, 100), color='darkblue')
draw = Image.Draw(img)
try:
font = _get_font(PREFERRED_FONTS, 14)
draw.text((10, 45), "Placeholder KB Image", font=font, fill='white')
except Exception:
draw.text((10, 45), "Placeholder", fill='white')
img.save(placeholder_filename)
logger.info(f"Created '{placeholder_filename}' for Gradio examples.")
except Exception as e:
logger.error(f"Could not create placeholder image. The examples may not load correctly. Error: {e}")
def ui_create_kb_image_fn(password: str, content_to_include: list, progress=gr.Progress()):
include_rules = "Include Rules" in content_to_include
include_memories = "Include Memories" in content_to_include
if not include_rules and not include_memories:
gr.Warning("Nothing selected to save.")
return gr.update(value=None, visible=False), gr.update(value=None, visible=False), "Nothing selected to save."
progress(0.1, desc="Fetching knowledge base...")
rules = get_all_rules_cached() if include_rules else []
memories = get_all_memories_cached() if include_memories else []
if not rules and not memories:
gr.Warning("Knowledge base is empty or selected content is empty.")
return gr.update(value=None, visible=False), gr.update(value=None, visible=False), "No content to save."
progress(0.2, desc="Serializing data...")
kv_string = convert_kb_to_kv_string(rules, memories, include_rules, include_memories)
data_bytes = kv_string.encode('utf-8')
if password and password.strip():
progress(0.3, desc="Encrypting data...")
try:
data_bytes = encrypt_data(data_bytes, password.strip())
except Exception as e:
logger.error(f"KB ImgDL: Encrypt failed: {e}")
return gr.update(value=None, visible=False), gr.update(value=None, visible=False), f"Error: {e}"
progress(0.5, desc="Generating carrier image...")
carrier_image = generate_brain_carrier_image(w=800, h=800)
progress(0.6, desc="Adding visual overlay...")
keys_for_overlay = []
if include_rules: keys_for_overlay.append(f"Rule Count: {len(rules)}")
if include_memories: keys_for_overlay.append(f"Memory Count: {len(memories)}")
title_overlay = "Encrypted Knowledge Base" if password and password.strip() else "iLearn Knowledge Base"
image_with_overlay = draw_key_list_dropdown_overlay(carrier_image, keys=keys_for_overlay, title=title_overlay)
try:
progress(0.8, desc="Embedding data into final image...")
final_image_with_data = embed_data_in_image(image_with_overlay, data_bytes)
except ValueError as e:
logger.error(f"KB ImgDL: Embed failed: {e}")
return gr.update(value=None, visible=False), gr.update(value=None, visible=False), f"Error: {e}"
progress(0.9, desc="Preparing final image and download file...")
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmpfile:
final_image_with_data.save(tmpfile, format="PNG")
tmp_path = tmpfile.name
progress(1.0, desc="Image created!")
return gr.update(value=tmp_path, visible=True), gr.update(value=tmp_path, visible=True), "Success! Image created."
except Exception as e:
logger.error(f"KB ImgDL: Save failed: {e}")
return gr.update(value=None, visible=False), gr.update(value=None, visible=False), f"Error: {e}"
def ui_load_from_sources_fn(image_filepath: str, rules_file_obj: object, mems_file_obj: object, password: str, progress=gr.Progress()):
if image_filepath:
progress(0.1, desc="Image source detected. Starting image processing...")
return ui_upload_kb_from_image_fn(image_filepath, password, progress)
if rules_file_obj:
progress(0.1, desc="Rules file detected. Starting rules import...")
return ui_upload_rules_action_fn(rules_file_obj, progress)
if mems_file_obj:
progress(0.1, desc="Memories file detected. Starting memories import...")
return ui_upload_memories_action_fn(mems_file_obj, progress)
return "No file or image uploaded. Please provide a source file to load."
with gr.Blocks(theme=gr.themes.Soft(), css=".gr-button { margin: 5px; } .gr-textbox, .gr-text-area, .gr-dropdown, .gr-json { border-radius: 8px; } .gr-group { border: 1px solid #e0e0e0; border-radius: 8px; padding: 10px; } .gr-row { gap: 10px; } .gr-tab { border-radius: 8px; } .status-text { font-size: 0.9em; color: #555; } .gr-json { max-height: 400px; overflow-y: auto; }") as demo:
gr.Markdown(f"# ๐Ÿค– iLearn: An Autonomous Learning Agent {'(DEMO MODE)' if DEMO_MODE else ''}", elem_classes=["header"])
is_sqlite, is_hf_dataset = (MEMORY_STORAGE_BACKEND == "SQLITE"), (MEMORY_STORAGE_BACKEND == "HF_DATASET")
with gr.Row(variant="compact"):
agent_stat_tb = gr.Textbox(label="Agent Status", value="Initializing systems...", interactive=False, elem_classes=["status-text"], scale=4)
with gr.Column(scale=1, min_width=150):
memory_backend_info_tb = gr.Textbox(label="Memory Backend", value=MEMORY_STORAGE_BACKEND, interactive=False, elem_classes=["status-text"])
sqlite_path_display = gr.Textbox(label="SQLite Path", value=MEMORY_SQLITE_PATH, interactive=False, visible=is_sqlite, elem_classes=["status-text"])
hf_repos_display = gr.Textbox(label="HF Repos", value=f"M: {MEMORY_HF_MEM_REPO}, R: {MEMORY_HF_RULES_REPO}", interactive=False, visible=is_hf_dataset, elem_classes=["status-text"])
with gr.Sidebar():
gr.Markdown("## โš™๏ธ Configuration")
with gr.Group():
gr.Markdown("### AI Model Settings")
api_key_tb = gr.Textbox(label="AI Provider API Key (Override)", type="password", placeholder="Uses .env if blank")
available_providers = get_available_providers(); default_provider = available_providers[0] if "groq" not in available_providers else "groq"
prov_sel_dd = gr.Dropdown(label="AI Provider", choices=available_providers, value=default_provider, interactive=True)
default_model_display = get_default_model_display_name_for_provider(default_provider) if default_provider else None
model_sel_dd = gr.Dropdown(label="AI Model", choices=get_model_display_names_for_provider(default_provider) if default_provider else [], value=default_model_display, interactive=True)
research_steps_slider = gr.Slider(label="Max Research Steps", minimum=1, maximum=10, step=1, value=3, interactive=True)
with gr.Group():
gr.Markdown("### System Prompt"); sys_prompt_tb = gr.Textbox(label="System Prompt Base", lines=8, value=DEFAULT_SYSTEM_PROMPT, interactive=True)
with gr.Tabs():
with gr.TabItem("๐Ÿ’ฌ Chat & Research"):
with gr.Row():
with gr.Column(scale=3):
gr.Markdown("### AI Chat Interface")
main_chat_disp = gr.Chatbot(label=None, height=450, bubble_full_width=False,avatar_images=(None, "https://raw.githubusercontent.com/gradio-app/gradio/main/guides/assets/logo.png"), show_copy_button=True, render_markdown=True, sanitize_html=True)
with gr.Row(variant="compact"):
user_msg_tb = gr.Textbox(show_label=False, placeholder="Ask your research question...", scale=7, lines=1, max_lines=3)
send_btn = gr.Button("Send", variant="primary", scale=1, min_width=100)
with gr.Accordion("๐Ÿ“ Detailed Response & Research Log", open=True):
research_log_html = gr.HTML(label="Research Log", value="<div class='log-container'><p><i>Waiting for a new task to begin...</i></p></div>")
fmt_report_tb = gr.Textbox(label="Full AI Response", lines=8, interactive=True, show_copy_button=True)
dl_report_btn = gr.DownloadButton("Download Report", value=None, interactive=False, visible=False)
with gr.TabItem("๐Ÿง  Knowledge Base"):
with gr.Tabs():
with gr.TabItem("๐ŸŽ›๏ธ System"):
gr.Markdown("View and directly manage the current rules and memories in the system.")
with gr.Row(equal_height=False, variant='compact'):
with gr.Column():
gr.Markdown("### ๐Ÿ“œ Current Rules")
rules_disp_ta = gr.TextArea(label=None, lines=15, placeholder="Rules will appear here.", interactive=True)
save_edited_rules_btn = gr.Button("๐Ÿ’พ Save Edited Rules", variant="primary", interactive=not DEMO_MODE)
clear_rules_btn = gr.Button("๐Ÿ—‘๏ธ Clear All Rules", variant="stop", visible=not DEMO_MODE)
with gr.Column():
gr.Markdown("### ๐Ÿ“š Current Memories")
mems_disp_json = gr.JSON(label=None, value=[], scale=1)
clear_mems_btn = gr.Button("๐Ÿ—‘๏ธ Clear All Memories", variant="stop", visible=not DEMO_MODE)
with gr.TabItem("๐Ÿ’พ Save KB"):
gr.Markdown("Export the current knowledge base as text files or as a single, portable PNG image.")
with gr.Row():
rules_stat_tb = gr.Textbox(label="Rules Status", interactive=False, lines=1, elem_classes=["status-text"])
mems_stat_tb = gr.Textbox(label="Memories Status", interactive=False, lines=1, elem_classes=["status-text"])
with gr.Row():
with gr.Column():
gr.Markdown("### Text File Export")
dl_rules_btn = gr.DownloadButton("โฌ‡๏ธ Download Rules (.txt)", value=None)
dl_mems_btn = gr.DownloadButton("โฌ‡๏ธ Download Memories (.jsonl)", value=None)
gr.Row()
if MEMORY_STORAGE_BACKEND == "RAM": save_faiss_sidebar_btn = gr.Button("Save FAISS Indices", variant="secondary")
with gr.Column():
gr.Markdown("### Image Export")
with gr.Group():
save_kb_password_tb = gr.Textbox(label="Password (optional for encryption)", type="password")
save_kb_include_cbg = gr.CheckboxGroup(label="Content to Include", choices=["Include Rules", "Include Memories"], value=["Include Rules", "Include Memories"])
create_kb_img_btn = gr.Button("โœจ Create KB Image", variant="secondary")
kb_image_display_output = gr.Image(label="Generated Image (Right-click to copy)", type="filepath", visible=False)
kb_image_download_output = gr.DownloadButton("โฌ‡๏ธ Download Image File", visible=False)
with gr.TabItem("๐Ÿ“‚ Load KB"):
gr.Markdown("Import rules, memories, or a full KB from local files or a portable PNG image.")
load_status_tb = gr.Textbox(label="Load Operation Status", interactive=False, lines=2)
load_kb_password_tb = gr.Textbox(label="Password (for decrypting images)", type="password")
with gr.Group():
gr.Markdown("#### Sources (Priority: Image > Rules File > Memories File)")
with gr.Row():
upload_kb_img_fobj = gr.Image(label="1. Image Source", type="filepath", sources=["upload", "clipboard"], interactive=not DEMO_MODE)
upload_rules_fobj = gr.File(label="2. Rules File Source (.txt/.jsonl)", file_types=[".txt", ".jsonl"], interactive=not DEMO_MODE)
upload_mems_fobj = gr.File(label="3. Memories File Source (.json/.jsonl)", file_types=[".jsonl", ".json"], interactive=not DEMO_MODE)
load_master_btn = gr.Button("โฌ†๏ธ Load from Sources", variant="primary", interactive=not DEMO_MODE)
gr.Examples(
examples=[
["https://huggingface.co/spaces/Agents-MCP-Hackathon/iLearn/resolve/main/evolutions/e0.01.01.png", ""],
],
inputs=[upload_kb_img_fobj, load_kb_password_tb],
label="Click an Example to Load Data"
)
def dyn_upd_model_dd(sel_prov_dyn: str):
models_dyn = get_model_display_names_for_provider(sel_prov_dyn); def_model_dyn = get_default_model_display_name_for_provider(sel_prov_dyn)
return gr.Dropdown(choices=models_dyn, value=def_model_dyn, interactive=True)
prov_sel_dd.change(fn=dyn_upd_model_dd, inputs=prov_sel_dd, outputs=model_sel_dd)
chat_ins = [user_msg_tb, research_steps_slider, main_chat_disp, prov_sel_dd, model_sel_dd, api_key_tb, sys_prompt_tb]
chat_outs = [user_msg_tb, main_chat_disp, agent_stat_tb, research_log_html, fmt_report_tb, dl_report_btn, rules_disp_ta, mems_disp_json]
chat_event_args = {"fn": handle_gradio_chat_submit, "inputs": chat_ins, "outputs": chat_outs}
send_btn.click(**chat_event_args); user_msg_tb.submit(**chat_event_args)
save_edited_rules_btn.click(fn=save_edited_rules_action_fn, inputs=[rules_disp_ta], outputs=[rules_stat_tb], show_progress="full").then(fn=ui_refresh_rules_display_fn, outputs=rules_disp_ta, show_progress=False)
clear_rules_btn.click(fn=lambda: ("All rules cleared." if clear_all_rules_data_backend() else "Error clearing rules."), outputs=rules_stat_tb, show_progress=False).then(fn=ui_refresh_rules_display_fn, outputs=rules_disp_ta, show_progress=False)
clear_mems_btn.click(fn=lambda: ("All memories cleared." if clear_all_memory_data_backend() else "Error clearing memories."), outputs=mems_stat_tb, show_progress=False).then(fn=ui_refresh_memories_display_fn, outputs=mems_disp_json, show_progress=False)
dl_rules_btn.click(fn=ui_download_rules_action_fn, inputs=None, outputs=dl_rules_btn, show_progress=False)
dl_mems_btn.click(fn=ui_download_memories_action_fn, inputs=None, outputs=dl_mems_btn, show_progress=False)
create_kb_img_btn.click(
fn=ui_create_kb_image_fn,
inputs=[save_kb_password_tb, save_kb_include_cbg],
outputs=[kb_image_display_output, kb_image_download_output, load_status_tb],
show_progress="full"
)
load_master_btn.click(
fn=ui_load_from_sources_fn,
inputs=[upload_kb_img_fobj, upload_rules_fobj, upload_mems_fobj, load_kb_password_tb],
outputs=[load_status_tb],
show_progress="full"
).then(
fn=ui_refresh_rules_display_fn, outputs=rules_disp_ta
).then(
fn=ui_refresh_memories_display_fn, outputs=mems_disp_json
)
if MEMORY_STORAGE_BACKEND == "RAM" and 'save_faiss_sidebar_btn' in locals():
def save_faiss_action_with_feedback_sidebar_fn():
try: save_faiss_indices_to_disk(); gr.Info("Attempted to save FAISS indices to disk.")
except Exception as e: logger.error(f"Error saving FAISS indices: {e}", exc_info=True); gr.Error(f"Error saving FAISS indices: {e}")
save_faiss_sidebar_btn.click(fn=save_faiss_action_with_feedback_sidebar_fn, inputs=None, outputs=None, show_progress=False)
app_load_outputs = [agent_stat_tb, rules_disp_ta, mems_disp_json, research_log_html, fmt_report_tb, dl_report_btn]
demo.load(fn=app_load_fn, inputs=None, outputs=app_load_outputs, show_progress="full")
if __name__ == "__main__":
logger.info(f"Starting Gradio AI Research Mega Agent (v9.1 - Correct 1-Click JS Download, Memory: {MEMORY_STORAGE_BACKEND})...")
app_port = int(os.getenv("GRADIO_PORT", 7860))
app_server = os.getenv("GRADIO_SERVER_NAME", "127.0.0.1")
app_debug = os.getenv("GRADIO_DEBUG", "False").lower() == "false"
app_share = os.getenv("GRADIO_SHARE", "False").lower() == "true"
logger.info(f"Launching Gradio server: http://{app_server}:{app_port}. Debug: {app_debug}, Share: {app_share}")
demo.queue().launch(server_name=app_server, server_port=app_port, debug=app_debug, share=app_share, mcp_server=True, max_threads=40)
logger.info("Gradio application shut down.")