Spaces:
Running
Running
import os | |
DEMO_MODE = False | |
MEMORY_STORAGE_TYPE = "RAM" | |
HF_DATASET_MEMORY_REPO = "broadfield-dev/ai-brain" | |
HF_DATASET_RULES_REPO = "broadfield-dev/ai-rules" | |
os.environ['STORAGE_BACKEND'] = MEMORY_STORAGE_TYPE | |
if MEMORY_STORAGE_TYPE == "HF_DATASET": | |
os.environ['HF_MEMORY_DATASET_REPO'] = HF_DATASET_MEMORY_REPO | |
os.environ['HF_RULES_DATASET_REPO'] = HF_DATASET_RULES_REPO | |
import json | |
import re | |
import logging | |
from datetime import datetime | |
from dotenv import load_dotenv | |
import gradio as gr | |
import time | |
import tempfile | |
import xml.etree.ElementTree as ET | |
from PIL import Image, ImageDraw | |
from model_logic import ( | |
get_available_providers, get_model_display_names_for_provider, | |
get_default_model_display_name_for_provider, call_model_stream, MODELS_BY_PROVIDER | |
) | |
from memory_logic import ( | |
initialize_memory_system, | |
add_memory_entry, retrieve_memories_semantic, get_all_memories_cached, clear_all_memory_data_backend, | |
add_rule_entry, retrieve_rules_semantic, remove_rule_entry, get_all_rules_cached, clear_all_rules_data_backend, | |
save_faiss_indices_to_disk, STORAGE_BACKEND as MEMORY_STORAGE_BACKEND, SQLITE_DB_PATH as MEMORY_SQLITE_PATH, | |
HF_MEMORY_DATASET_REPO as MEMORY_HF_MEM_REPO, HF_RULES_DATASET_REPO as MEMORY_HF_RULES_REPO, | |
load_rules_from_file, load_memories_from_file, process_rules_from_text_blob, import_kb_from_kv_dict | |
) | |
from websearch_logic import scrape_url, search_and_scrape_duckduckgo, search_and_scrape_google | |
from image_kb_logic import ( | |
set_pil_image_format_to_png, | |
extract_data_from_image, | |
decrypt_data, | |
InvalidTag, | |
parse_kv_string_to_dict, | |
convert_kb_to_kv_string, | |
generate_brain_carrier_image, | |
draw_key_list_dropdown_overlay, | |
encrypt_data, | |
embed_data_in_image, | |
_get_font, | |
PREFERRED_FONTS, | |
) | |
from prompts import ( | |
DEFAULT_SYSTEM_PROMPT, | |
METRIC_GENERATION_SYSTEM_PROMPT, | |
METRIC_GENERATION_USER_PROMPT_TEMPLATE, | |
PLAN_GENERATION_SYSTEM_PROMPT, | |
PLAN_GENERATION_USER_PROMPT_TEMPLATE, | |
INSIGHT_GENERATION_SYSTEM_PROMPT, | |
INSIGHT_GENERATION_USER_PROMPT_TEMPLATE | |
) | |
from gradio_client import Client | |
load_dotenv() | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(threadName)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
for lib_name in ["urllib3", "requests", "huggingface_hub", "PIL.PngImagePlugin", "matplotlib", "gradio_client.client", "multipart.multipart", "httpx", "sentence_transformers", "faiss", "datasets"]: | |
if logging.getLogger(lib_name): logging.getLogger(lib_name).setLevel(logging.WARNING) | |
WEB_SEARCH_ENABLED = os.getenv("WEB_SEARCH_ENABLED", "true").lower() == "true" | |
MAX_HISTORY_TURNS = int(os.getenv("MAX_HISTORY_TURNS", 7)) | |
current_chat_session_history = [] | |
LOAD_RULES_FILE = os.getenv("LOAD_RULES_FILE") | |
LOAD_MEMORIES_FILE = os.getenv("LOAD_MEMORIES_FILE") | |
logger.info(f"App Config: WebSearch={WEB_SEARCH_ENABLED}, MemoryBackend={MEMORY_STORAGE_BACKEND}") | |
logger.info(f"Startup loading: Rules from {LOAD_RULES_FILE or 'None'}, Memories from {LOAD_MEMORIES_FILE or 'None'}") | |
def format_insights_for_prompt(retrieved_insights_list: list[str]) -> tuple[str, list[dict]]: | |
if not retrieved_insights_list: | |
return "No specific guiding principles or learned insights retrieved.", [] | |
parsed = [] | |
for text in retrieved_insights_list: | |
match = re.match(r"\[(CORE_RULE|RESPONSE_PRINCIPLE|BEHAVIORAL_ADJUSTMENT|GENERAL_LEARNING)\|([\d\.]+?)\](.*)", text.strip(), re.DOTALL | re.IGNORECASE) | |
if match: | |
parsed.append({"type": match.group(1).upper().replace(" ", "_"), "score": match.group(2), "text": match.group(3).strip(), "original": text.strip()}) | |
else: | |
parsed.append({"type": "GENERAL_LEARNING", "score": "0.5", "text": text.strip(), "original": text.strip()}) | |
try: | |
parsed.sort(key=lambda x: float(x["score"]) if x["score"].replace('.', '', 1).isdigit() else -1.0, reverse=True) | |
except ValueError: logger.warning("FORMAT_INSIGHTS: Sort error due to invalid score format.") | |
grouped = {"CORE_RULE": [], "RESPONSE_PRINCIPLE": [], "BEHAVIORAL_ADJUSTMENT": [], "GENERAL_LEARNING": []} | |
for p_item in parsed: grouped.get(p_item["type"], grouped["GENERAL_LEARNING"]).append(f"- (Score: {p_item['score']}) {p_item['text']}") | |
sections = [f"{k.replace('_', ' ').title()}:\n" + "\n".join(v) for k, v in grouped.items() if v] | |
return "\n\n".join(sections) if sections else "No guiding principles retrieved.", parsed | |
def generate_interaction_metrics(user_input: str, bot_response: str, provider: str, model_display_name: str, api_key_override: str = None) -> dict: | |
metric_start_time = time.time() | |
logger.info(f"Generating metrics with: {provider}/{model_display_name}") | |
metric_prompt_content = METRIC_GENERATION_USER_PROMPT_TEMPLATE.format(user_input=user_input, bot_response=bot_response) | |
metric_messages = [{"role": "system", "content": METRIC_GENERATION_SYSTEM_PROMPT}, {"role": "user", "content": metric_prompt_content}] | |
try: | |
metrics_provider_final, metrics_model_display_final = provider, model_display_name | |
metrics_model_env = os.getenv("METRICS_MODEL") | |
if metrics_model_env and "/" in metrics_model_env: | |
m_prov, m_id = metrics_model_env.split('/', 1) | |
m_disp_name = next((dn for dn, mid in MODELS_BY_PROVIDER.get(m_prov.lower(), {}).get("models", {}).items() if mid == m_id), None) | |
if m_disp_name: metrics_provider_final, metrics_model_display_final = m_prov, m_disp_name | |
else: logger.warning(f"METRICS_MODEL '{metrics_model_env}' not found, using interaction model.") | |
response_chunks = list(call_model_stream(provider=metrics_provider_final, model_display_name=metrics_model_display_final, messages=metric_messages, api_key_override=api_key_override, temperature=0.05, max_tokens=200)) | |
resp_str = "".join(response_chunks).strip() | |
json_match = re.search(r"```json\s*(\{.*?\})\s*```", resp_str, re.DOTALL | re.IGNORECASE) or re.search(r"(\{.*?\})", resp_str, re.DOTALL) | |
if json_match: metrics_data = json.loads(json_match.group(1)) | |
else: | |
logger.warning(f"METRICS_GEN: Non-JSON response from {metrics_provider_final}/{metrics_model_display_final}: '{resp_str}'") | |
return {"takeaway": "N/A", "response_success_score": 0.5, "future_confidence_score": 0.5, "error": "metrics format error"} | |
parsed_metrics = {"takeaway": metrics_data.get("takeaway", "N/A"), "response_success_score": float(metrics_data.get("response_success_score", 0.5)), "future_confidence_score": float(metrics_data.get("future_confidence_score", 0.5)), "error": metrics_data.get("error")} | |
logger.info(f"METRICS_GEN: Generated in {time.time() - metric_start_time:.2f}s. Data: {parsed_metrics}") | |
return parsed_metrics | |
except Exception as e: | |
logger.error(f"METRICS_GEN Error: {e}", exc_info=False) | |
return {"takeaway": "N/A", "response_success_score": 0.5, "future_confidence_score": 0.5, "error": str(e)} | |
def _generate_action_plan( | |
original_query: str, provider_name: str, model_display_name: str, ui_api_key_override: str | None, chat_history: list[dict] | |
) -> dict: | |
history_str = "\n".join([f"{msg['role']}: {msg['content'][:150]}" for msg in chat_history[-4:]]) | |
plan_user_prompt = PLAN_GENERATION_USER_PROMPT_TEMPLATE.format(history_str=history_str, original_query=original_query) | |
plan_messages = [{"role": "system", "content": PLAN_GENERATION_SYSTEM_PROMPT}, {"role": "user", "content": plan_user_prompt}] | |
try: | |
response_chunks = list(call_model_stream( | |
provider=provider_name, | |
model_display_name=model_display_name, | |
messages=plan_messages, | |
api_key_override=ui_api_key_override, | |
temperature=0.0, | |
max_tokens=1000 | |
)) | |
resp_str = "".join(response_chunks).strip() | |
json_match = re.search(r"\{.*\}", resp_str, re.DOTALL) | |
if json_match: | |
plan_data = json.loads(json_match.group(0)) | |
return plan_data | |
except Exception as e: | |
logger.error(f"PLAN_GEN: Failed to generate or parse action plan: {e}") | |
return { | |
"action_type": "multi_step_plan", | |
"plan": [ | |
{"tool": "web_search", "task": original_query}, | |
{"tool": "respond", "task": "Synthesize all information from the scratchpad and provide a comprehensive final answer to the user."} | |
] | |
} | |
def process_user_interaction_gradio( | |
user_input: str, | |
max_research_steps: int, | |
provider_name: str, | |
model_display_name: str, | |
chat_history: list[dict], | |
custom_system_prompt: str = None, | |
ui_api_key_override: str = None, | |
): | |
process_start_time = time.time() | |
request_id = os.urandom(4).hex() | |
logger.info(f"PUI_GRADIO [{request_id}] Start. User: '{user_input[:50]}...' Max Steps: {max_research_steps}") | |
yield "status", "<i>[Deciding on an action plan...]</i>" | |
action_plan_data = _generate_action_plan(user_input, provider_name, model_display_name, ui_api_key_override, chat_history) | |
action_type = action_plan_data.get("action_type") | |
if action_type == "fast_response": | |
yield "status", "<i>[Executing fast response...]</i>" | |
yield "plan", [{"tool": "fast_response", "task": action_plan_data.get("reason", "Direct answer.")}] | |
now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
final_sys_prompt = custom_system_prompt or DEFAULT_SYSTEM_PROMPT | |
final_sys_prompt = f"Current Date/Time: {now_str}.\n\n" + final_sys_prompt | |
messages_for_llm = [{"role": "system", "content": final_sys_prompt}] + chat_history + [{"role": "user", "content": user_input}] | |
streamed_response = "" | |
try: | |
for chunk in call_model_stream(provider=provider_name, model_display_name=model_display_name, messages=messages_for_llm, api_key_override=ui_api_key_override, temperature=0.7, max_tokens=3000): | |
streamed_response += chunk | |
yield "response_chunk", chunk | |
except Exception as e: | |
streamed_response = f"\n\n(Error during fast response: {str(e)[:150]})" | |
yield "response_chunk", streamed_response | |
final_bot_text = streamed_response.strip() | |
yield "final_response", {"response": final_bot_text} | |
return | |
plan = action_plan_data.get("plan", []) | |
if not plan: | |
plan = [{"tool": "web_search", "task": user_input}, {"tool": "respond", "task": "Synthesize a response."}] | |
yield "plan", plan | |
research_scratchpad = "" | |
now_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
for i, step_action in enumerate(plan): | |
tool = step_action.get("tool") | |
task = step_action.get("task") | |
if tool == 'respond': | |
break | |
if i + 1 > max_research_steps: | |
research_scratchpad += f"\n\n---NOTE: Maximum research step budget of {max_research_steps} reached. Proceeding to final response.---\n" | |
logger.warning(f"PUI_GRADIO [{request_id}]: Max research steps ({max_research_steps}) reached.") | |
break | |
task_for_display = str(task) if isinstance(task, dict) else task | |
yield "status", f"<i>[Executing Step {i+1}/{len(plan)-1}: {tool} -> {task_for_display[:70]}...]</i>" | |
step_findings = f"Step {i+1} ({tool}: '{task_for_display[:1000]}'): " | |
if tool == 'web_search': | |
try: | |
web_results = search_and_scrape_duckduckgo(task, num_results=5) | |
scraped_content = "\n".join([f"Source:\nURL:{r.get('url','N/A')}\nContent:\n{(r.get('content') or r.get('error') or 'N/A')[:1500]}\n---" for r in web_results]) if web_results else "No results found." | |
synthesis_prompt = f"Relevant web content for the task '{task}':\n\n{scraped_content}\n\nConcisely summarize the findings from the content." | |
summary = "".join(list(call_model_stream(provider=provider_name, model_display_name=model_display_name, messages=[{"role": "user", "content": synthesis_prompt}], api_key_override=ui_api_key_override, temperature=0.1, max_tokens=400))) | |
step_findings += summary | |
except Exception as e: | |
step_findings += f"Error during web search: {e}" | |
elif tool == 'web_scrape': | |
try: | |
web_results = scrape_url(task) | |
scraped_content = "\n".join([f"Source:\nURL:{r.get('url','N/A')}\nContent:\n{(r.get('content') or r.get('error') or 'N/A')[:1500]}\n---" for r in web_results]) if web_results else "No results found." | |
synthesis_prompt = f"Relevant web content for the task '{task}':\n\n{scraped_content}\n\nConcisely summarize the findings from the content." | |
summary = "".join(list(call_model_stream(provider=provider_name, model_display_name=model_display_name, messages=[{"role": "user", "content": synthesis_prompt}], api_key_override=ui_api_key_override, temperature=0.1, max_tokens=400))) | |
step_findings += summary | |
except Exception as e: | |
step_findings += f"Error during web scrape: {e}" | |
elif tool == 'gradio_view_api': | |
try: | |
client = Client(task) | |
api_info = client.view_api(all_endpoints=True) | |
summary = str(api_info) | |
if summary and summary.strip(): | |
step_findings += f"Successfully retrieved API endpoints for space '{task}':\n{summary}" | |
else: | |
step_findings += f"Could not retrieve valid API endpoint information for space '{task}'." | |
except Exception as e: | |
error_message = f"Error viewing Gradio API for space '{task}': {e}" | |
logger.error(f"GRADIO_VIEW_API_TOOL Error: {e}\nTask was: {task}", exc_info=True) | |
step_findings += error_message | |
elif tool == 'gradio_client': | |
try: | |
if isinstance(task, str): | |
try: | |
params = json.loads(task) | |
except json.JSONDecodeError: | |
json_match = re.search(r"\{.*\}", task, re.DOTALL) | |
if json_match: | |
params = json.loads(json_match.group(0)) | |
else: | |
raise ValueError("Task is not a valid JSON string or does not contain a JSON object.") | |
elif isinstance(task, dict): | |
params = task | |
else: | |
raise TypeError(f"Unsupported task type for gradio_client: {type(task)}") | |
space_id = params.get("space_id") | |
api_name = params.get("api_name") | |
parameters = params.get("parameters", {}) | |
if not space_id or not api_name: | |
raise ValueError("Missing 'space_id' or 'api_name' in task JSON.") | |
if not isinstance(parameters, dict): | |
raise TypeError("The 'parameters' field in the task must be a JSON object (dictionary).") | |
client = Client(space_id) | |
result = client.predict(**parameters, api_name=api_name) | |
if isinstance(result, (str, int, float, bool)): | |
result_str = str(result) | |
elif isinstance(result, (dict, list)): | |
result_str = json.dumps(result, indent=2) | |
else: | |
result_str = f"Received result of type {type(result)}." | |
step_findings += f"Successfully called Gradio API {api_name} on space {space_id}. Result:\n{result_str}" | |
except Exception as e: | |
error_message = f"Error during Gradio Client operation: {e}" | |
logger.error(f"GRADIO_CLIENT_TOOL Error: {e}\nTask was: {task}", exc_info=True) | |
step_findings += error_message | |
elif tool == 'memory_search': | |
try: | |
retrieved_mems = retrieve_memories_semantic(task, k=3) | |
if retrieved_mems: | |
memory_context = "\n".join([f"- User: {m.get('user_input','')} -> AI: {m.get('bot_response','')} (Takeaway: {m.get('metrics',{}).get('takeaway','N/A')})" for m in retrieved_mems]) | |
step_findings += f"Found relevant memories:\n{memory_context}" | |
else: | |
step_findings += "No relevant memories found." | |
except Exception as e: | |
step_findings += f"Error during memory search: {e}" | |
elif tool == 'think': | |
try: | |
think_prompt = f"Original Query: '{user_input}'\n\nResearch Scratchpad:\n```\n{research_scratchpad}\n```\n\nMy current thinking task is: '{task}'. Based on the scratchpad, what is the conclusion of this thinking step?" | |
thought = "".join(list(call_model_stream(provider=provider_name, model_display_name=model_display_name, messages=[{"role": "user", "content": think_prompt}], api_key_override=ui_api_key_override, temperature=0.3, max_tokens=500))) | |
step_findings += f"Conclusion: {thought}" | |
except Exception as e: | |
step_findings += f"Error during thinking step: {e}" | |
else: | |
step_findings += "Unknown tool specified in plan." | |
research_scratchpad += f"\n\n---\n{step_findings}\n---" | |
yield "step_result", {"step": i + 1, "tool": tool, "task": task_for_display, "result": step_findings} | |
yield "status", "<i>[Synthesizing final report...]</i>" | |
final_sys_prompt = custom_system_prompt or DEFAULT_SYSTEM_PROMPT | |
final_sys_prompt += f"\n\nCurrent Date/Time: {now_str}. You have just completed a research plan. Synthesize the information in the 'Research Scratchpad' into a final, comprehensive answer. Cite sources by including URLs if available." | |
final_user_prompt = f"Original user query: \"{user_input}\"\n\nResearch Scratchpad:\n```\n{research_scratchpad}\n```\n\nNow, provide the final, synthesized answer to the user." | |
final_messages = [{"role": "system", "content": final_sys_prompt}, {"role": "user", "content": final_user_prompt}] | |
streamed_response = "" | |
try: | |
for chunk in call_model_stream(provider=provider_name, model_display_name=model_display_name, messages=final_messages, api_key_override=ui_api_key_override, temperature=0.6, max_tokens=3000): | |
streamed_response += chunk | |
yield "response_chunk", chunk | |
except Exception as e: | |
error_msg = f"\n\n(Error during final synthesis: {str(e)[:150]})" | |
streamed_response += error_msg | |
yield "response_chunk", error_msg | |
final_bot_text = streamed_response.strip() or "(No response or error during synthesis.)" | |
logger.info(f"PUI_GRADIO [{request_id}]: Finished. Total: {time.time() - process_start_time:.2f}s. Resp len: {len(final_bot_text)}") | |
yield "final_response", {"response": final_bot_text} | |
def perform_post_interaction_learning(user_input: str, bot_response: str, provider: str, model_disp_name: str, api_key_override: str = None): | |
task_id = os.urandom(4).hex() | |
logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: START User='{user_input[:40]}...', Bot='{bot_response[:40]}...'") | |
learning_start_time = time.time() | |
significant_learnings_summary = [] | |
try: | |
metrics = generate_interaction_metrics(user_input, bot_response, provider, model_disp_name, api_key_override) | |
logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: Metrics: {metrics}") | |
add_memory_entry(user_input, metrics, bot_response) | |
summary = f"User:\"{user_input}\"\nAI:\"{bot_response}\"\nMetrics(takeaway):{metrics.get('takeaway','N/A')},Success:{metrics.get('response_success_score','N/A')}" | |
existing_rules_ctx = "\n".join([f"- \"{r}\"" for r in retrieve_rules_semantic(f"{summary}\n{user_input}", k=10)]) or "No existing rules context." | |
insight_user_prompt = INSIGHT_GENERATION_USER_PROMPT_TEMPLATE.format(summary=summary, existing_rules_ctx=existing_rules_ctx) | |
insight_msgs = [{"role":"system", "content":INSIGHT_GENERATION_SYSTEM_PROMPT}, {"role":"user", "content":insight_user_prompt}] | |
insight_prov, insight_model_disp = provider, model_disp_name | |
insight_env_model = os.getenv("INSIGHT_MODEL_OVERRIDE") | |
if insight_env_model and "/" in insight_env_model: | |
i_p, i_id = insight_env_model.split('/', 1) | |
i_d_n = next((dn for dn, mid in MODELS_BY_PROVIDER.get(i_p.lower(), {}).get("models", {}).items() if mid == i_id), None) | |
if i_d_n: insight_prov, insight_model_disp = i_p, i_d_n | |
logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: Generating insights with {insight_prov}/{insight_model_disp} (expecting XML)") | |
raw_ops_xml_full = "".join(list(call_model_stream(provider=insight_prov, model_display_name=insight_model_disp, messages=insight_msgs, api_key_override=api_key_override, temperature=0.0, max_tokens=3500))).strip() | |
ops_data_list, processed_count = [], 0 | |
xml_match = re.search(r"```xml\s*(<operations_list>.*</operations_list>)\s*```", raw_ops_xml_full, re.DOTALL | re.IGNORECASE) or \ | |
re.search(r"(<operations_list>.*</operations_list>)", raw_ops_xml_full, re.DOTALL | re.IGNORECASE) | |
if xml_match: | |
xml_content_str = xml_match.group(1) | |
try: | |
root = ET.fromstring(xml_content_str) | |
if root.tag == "operations_list": | |
for op_element in root.findall("operation"): | |
action_el = op_element.find("action") | |
insight_el = op_element.find("insight") | |
old_insight_el = op_element.find("old_insight_to_replace") | |
action = action_el.text.strip().lower() if action_el is not None and action_el.text else None | |
insight_text = insight_el.text.strip() if insight_el is not None and insight_el.text else None | |
old_insight_text = old_insight_el.text.strip() if old_insight_el is not None and old_insight_el.text else None | |
if action and insight_text: | |
ops_data_list.append({ | |
"action": action, | |
"insight": insight_text, | |
"old_insight_to_replace": old_insight_text | |
}) | |
else: | |
logger.warning(f"POST_INTERACTION_LEARNING [{task_id}]: Skipped XML operation due to missing action or insight text. Action: {action}, Insight: {insight_text}") | |
else: | |
logger.error(f"POST_INTERACTION_LEARNING [{task_id}]: XML root tag is not <operations_list>. Found: {root.tag}. XML content:\n{xml_content_str}") | |
except ET.ParseError as e: | |
logger.error(f"POST_INTERACTION_LEARNING [{task_id}]: XML parsing error: {e}. XML content that failed:\n{xml_content_str}") | |
except Exception as e_xml_proc: | |
logger.error(f"POST_INTERACTION_LEARNING [{task_id}]: Error processing parsed XML: {e_xml_proc}. XML content:\n{xml_content_str}") | |
else: | |
logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: No <operations_list> XML structure found in LLM output. Full raw output:\n{raw_ops_xml_full}") | |
if ops_data_list: | |
logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: LLM provided {len(ops_data_list)} insight ops from XML.") | |
for op_idx, op_data in enumerate(ops_data_list): | |
action = op_data["action"] | |
insight_text = op_data["insight"] | |
old_insight = op_data["old_insight_to_replace"] | |
if not re.match(r"\[(CORE_RULE|RESPONSE_PRINCIPLE|BEHAVIORAL_ADJUSTMENT|GENERAL_LEARNING)\|([\d\.]+?)\]", insight_text, re.I|re.DOTALL): | |
logger.warning(f"POST_INTERACTION_LEARNING [{task_id}]: Op {op_idx}: Skipped op due to invalid insight_text format from XML: '{insight_text[:100]}...'") | |
continue | |
if action == "add": | |
success, status_msg = add_rule_entry(insight_text) | |
if success: | |
processed_count +=1 | |
if insight_text.upper().startswith("[CORE_RULE"): | |
significant_learnings_summary.append(f"New Core Rule Added: {insight_text}") | |
else: logger.warning(f"POST_INTERACTION_LEARNING [{task_id}]: Op {op_idx} (add from XML): Failed to add rule '{insight_text[:50]}...'. Status: {status_msg}") | |
elif action == "update": | |
if old_insight and old_insight != insight_text: | |
remove_success = remove_rule_entry(old_insight) | |
if not remove_success: | |
logger.warning(f"POST_INTERACTION_LEARNING [{task_id}]: Op {op_idx} (update from XML): Failed to remove old rule '{old_insight[:50]}...' before adding new.") | |
success, status_msg = add_rule_entry(insight_text) | |
if success: | |
processed_count +=1 | |
if insight_text.upper().startswith("[CORE_RULE"): | |
significant_learnings_summary.append(f"Core Rule Updated to: {insight_text}") | |
else: logger.warning(f"POST_INTERACTION_LEARNING [{task_id}]: Op {op_idx} (update from XML): Failed to add/update rule '{insight_text[:50]}...'. Status: {status_msg}") | |
else: | |
logger.warning(f"POST_INTERACTION_LEARNING [{task_id}]: Op {op_idx}: Skipped op due to unknown action '{action}' from XML.") | |
if significant_learnings_summary: | |
learning_digest = "SYSTEM CORE LEARNING DIGEST:\n" + "\n".join(significant_learnings_summary) | |
system_metrics = { | |
"takeaway": "Core knowledge refined.", | |
"response_success_score": 1.0, | |
"future_confidence_score": 1.0, | |
"type": "SYSTEM_REFLECTION" | |
} | |
add_memory_entry( | |
user_input="SYSTEM_INTERNAL_REFLECTION_TRIGGER", | |
metrics=system_metrics, | |
bot_response=learning_digest | |
) | |
logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: Added CORE_LEARNING_DIGEST to memories: {learning_digest[:100]}...") | |
logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: Processed {processed_count} insight ops out of {len(ops_data_list)} received from XML.") | |
else: | |
logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: No valid insight operations derived from LLM's XML output.") | |
except Exception as e: logger.error(f"POST_INTERACTION_LEARNING [{task_id}]: CRITICAL ERROR in learning task: {e}", exc_info=True) | |
logger.info(f"POST_INTERACTION_LEARNING [{task_id}]: END. Total: {time.time() - learning_start_time:.2f}s") | |
def handle_gradio_chat_submit(user_msg_txt: str, max_research_steps: int, gr_hist_list: list, sel_prov_name: str, sel_model_disp_name: str, ui_api_key: str|None, cust_sys_prompt: str): | |
global current_chat_session_history | |
cleared_input, updated_gr_hist, status_txt = "", list(gr_hist_list), "Initializing..." | |
updated_rules_text = ui_refresh_rules_display_fn() | |
updated_mems_json = ui_refresh_memories_display_fn() | |
log_html_output = gr.HTML("<p><i>Research Log will appear here.</i></p>") | |
final_report_tb = gr.Textbox(value="*Waiting...*", interactive=True, show_copy_button=True) | |
dl_report_btn = gr.DownloadButton(interactive=False, value=None, visible=False) | |
if not user_msg_txt.strip(): | |
status_txt = "Error: Empty message." | |
updated_gr_hist.append((user_msg_txt or "(Empty)", status_txt)) | |
yield (cleared_input, updated_gr_hist, status_txt, log_html_output, final_report_tb, dl_report_btn, updated_rules_text, updated_mems_json) | |
return | |
updated_gr_hist.append((user_msg_txt, "<i>Thinking... See Research Log below for progress.</i>")) | |
yield (cleared_input, updated_gr_hist, status_txt, log_html_output, final_report_tb, dl_report_btn, updated_rules_text, updated_mems_json) | |
internal_hist = list(current_chat_session_history) | |
final_bot_resp_acc = "" | |
temp_dl_file_path = None | |
try: | |
processor_gen = process_user_interaction_gradio( | |
user_input=user_msg_txt, | |
max_research_steps=max_research_steps, | |
provider_name=sel_prov_name, | |
model_display_name=sel_model_disp_name, | |
chat_history=internal_hist, | |
custom_system_prompt=cust_sys_prompt.strip() or None, | |
ui_api_key_override=ui_api_key.strip() if ui_api_key else None | |
) | |
curr_bot_disp_msg = "" | |
full_plan = [] | |
log_html_parts = [] | |
for upd_type, upd_data in processor_gen: | |
if upd_type == "status": | |
status_txt = upd_data | |
if "Deciding" in status_txt or "Executing" in status_txt: | |
log_html_output = gr.HTML(f"<p><i>{status_txt}</i></p>") | |
elif upd_type == "plan": | |
full_plan = upd_data | |
log_html_parts = ["<h3>Action Plan</h3><ol>"] | |
for i, step in enumerate(full_plan): | |
log_html_parts.append(f'<li id="log-step-{i+1}"><strong>{step.get("tool")}</strong>: {step.get("task")} <span style="color:gray;">(Pending)</span></li>') | |
log_html_parts.append("</ol><hr><h3>Log</h3>") | |
log_html_output = gr.HTML("".join(log_html_parts)) | |
elif upd_type == "step_result": | |
step_num = upd_data["step"] | |
sanitized_result = upd_data["result"].replace('<', '<').replace('>', '>').replace('\n', '<br>') | |
log_html_parts[step_num] = f'<li id="log-step-{step_num}"><strong>{upd_data.get("tool")}</strong>: {upd_data.get("task")} <span style="color:green;">(Done)</span></li>' | |
log_html_parts.append(f'<div style="margin-left: 20px; padding: 5px; border-left: 2px solid #ccc;"><small style="color: #555;">{sanitized_result}</small></div>') | |
next_step_index_in_list = step_num + 1 | |
if next_step_index_in_list < len(full_plan) + 1: | |
next_step_action = full_plan[step_num] | |
if next_step_action.get("tool") != "respond": | |
log_html_parts[next_step_index_in_list] = f'<li id="log-step-{next_step_index_in_list}"><strong>{next_step_action.get("tool")}</strong>: {next_step_action.get("task")} <span style="color:blue;">(In Progress...)</span></li>' | |
log_html_output = gr.HTML("".join(log_html_parts)) | |
elif upd_type == "response_chunk": | |
curr_bot_disp_msg += upd_data | |
if updated_gr_hist and updated_gr_hist[-1][0] == user_msg_txt: | |
updated_gr_hist[-1] = (user_msg_txt, curr_bot_disp_msg) | |
elif upd_type == "final_response": | |
final_bot_resp_acc = upd_data["response"] | |
status_txt = "Response generated. Processing learning..." | |
if not curr_bot_disp_msg and final_bot_resp_acc: curr_bot_disp_msg = final_bot_resp_acc | |
if updated_gr_hist and updated_gr_hist[-1][0] == user_msg_txt: | |
updated_gr_hist[-1] = (user_msg_txt, curr_bot_disp_msg or "(No text)") | |
final_report_tb = gr.Textbox(value=curr_bot_disp_msg, interactive=True, show_copy_button=True) | |
if curr_bot_disp_msg and not curr_bot_disp_msg.startswith("Error:"): | |
try: | |
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".md", encoding='utf-8') as tmpfile: | |
tmpfile.write(curr_bot_disp_msg) | |
temp_dl_file_path = tmpfile.name | |
dl_report_btn = gr.DownloadButton(value=temp_dl_file_path, visible=True, interactive=True) | |
except Exception as e: | |
logger.error(f"Error creating temp file for download: {e}", exc_info=False) | |
dl_report_btn = gr.DownloadButton(interactive=False, value=None, visible=False, label="Download Error") | |
else: | |
dl_report_btn = gr.DownloadButton(interactive=False, value=None, visible=False) | |
yield (cleared_input, updated_gr_hist, status_txt, log_html_output, final_report_tb, dl_report_btn, updated_rules_text, updated_mems_json) | |
if upd_type == "final_response": break | |
except Exception as e: | |
logger.error(f"Chat handler error during main processing: {e}", exc_info=True) | |
status_txt = f"Error: {str(e)[:100]}" | |
error_message_for_chat = f"Sorry, an error occurred: {str(e)[:100]}" | |
if updated_gr_hist and updated_gr_hist[-1][0] == user_msg_txt: | |
updated_gr_hist[-1] = (user_msg_txt, error_message_for_chat) | |
final_report_tb = gr.Textbox(value=error_message_for_chat, interactive=True) | |
dl_report_btn = gr.DownloadButton(interactive=False, value=None, visible=False) | |
log_html_output = gr.HTML(f'<p style="color:red;"><strong>Error processing request.</strong></p>') | |
current_rules_text_on_error = ui_refresh_rules_display_fn() | |
current_mems_json_on_error = ui_refresh_memories_display_fn() | |
yield (cleared_input, updated_gr_hist, status_txt, log_html_output, final_report_tb, dl_report_btn, current_rules_text_on_error, current_mems_json_on_error) | |
if temp_dl_file_path and os.path.exists(temp_dl_file_path): | |
try: os.unlink(temp_dl_file_path) | |
except Exception as e_unlink: logger.error(f"Error deleting temp download file {temp_dl_file_path} after error: {e_unlink}") | |
return | |
if final_bot_resp_acc and not final_bot_resp_acc.startswith("Error:"): | |
current_chat_session_history.extend([{"role": "user", "content": user_msg_txt}, {"role": "assistant", "content": final_bot_resp_acc}]) | |
status_txt = "<i>[Performing post-interaction learning...]</i>" | |
current_rules_text_before_learn = ui_refresh_rules_display_fn() | |
current_mems_json_before_learn = ui_refresh_memories_display_fn() | |
yield (cleared_input, updated_gr_hist, status_txt, log_html_output, final_report_tb, dl_report_btn, current_rules_text_before_learn, current_mems_json_before_learn) | |
try: | |
perform_post_interaction_learning( | |
user_input=user_msg_txt, | |
bot_response=final_bot_resp_acc, | |
provider=sel_prov_name, | |
model_disp_name=sel_model_disp_name, | |
api_key_override=ui_api_key.strip() if ui_api_key else None | |
) | |
status_txt = "Response & Learning Complete." | |
except Exception as e_learn: | |
logger.error(f"Error during post-interaction learning: {e_learn}", exc_info=True) | |
status_txt = "Response complete. Error during learning." | |
else: | |
status_txt = "Processing finished; no valid response or error occurred." | |
updated_rules_text = ui_refresh_rules_display_fn() | |
updated_mems_json = ui_refresh_memories_display_fn() | |
yield (cleared_input, updated_gr_hist, status_txt, log_html_output, final_report_tb, dl_report_btn, updated_rules_text, updated_mems_json) | |
if temp_dl_file_path and os.path.exists(temp_dl_file_path): | |
try: os.unlink(temp_dl_file_path) | |
except Exception as e_unlink: logger.error(f"Error deleting temp download file {temp_dl_file_path}: {e_unlink}") | |
def ui_refresh_rules_display_fn(): return "\n\n---\n\n".join(get_all_rules_cached()) or "No rules found." | |
def ui_refresh_memories_display_fn(): return get_all_memories_cached() or [] | |
def ui_download_rules_action_fn(): | |
rules_content = "\n\n---\n\n".join(get_all_rules_cached()) | |
if not rules_content.strip(): | |
gr.Warning("No rules to download.") | |
return gr.DownloadButton(value=None, interactive=False, label="No Rules") | |
try: | |
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt", encoding='utf-8') as tmpfile: | |
tmpfile.write(rules_content) | |
return tmpfile.name | |
except Exception as e: | |
logger.error(f"Error creating rules download file: {e}") | |
gr.Error(f"Failed to prepare rules for download: {e}") | |
return gr.DownloadButton(value=None, interactive=False, label="Error") | |
def ui_upload_rules_action_fn(uploaded_file_obj, progress=gr.Progress()): | |
if not uploaded_file_obj: return "No file provided for rules upload." | |
try: | |
with open(uploaded_file_obj.name, 'r', encoding='utf-8') as f: content = f.read() | |
except Exception as e_read: return f"Error reading file: {e_read}" | |
if not content.strip(): return "Uploaded rules file is empty." | |
added_count, skipped_count, error_count = 0,0,0 | |
potential_rules = [] | |
file_name_lower = uploaded_file_obj.name.lower() | |
if file_name_lower.endswith(".txt"): | |
potential_rules = content.split("\n\n---\n\n") | |
if len(potential_rules) == 1 and "\n" in content: | |
potential_rules = [r.strip() for r in content.splitlines() if r.strip()] | |
elif file_name_lower.endswith(".jsonl"): | |
for line_num, line in enumerate(content.splitlines()): | |
line = line.strip() | |
if line: | |
try: | |
rule_text_in_json_string = json.loads(line) | |
if isinstance(rule_text_in_json_string, str): | |
potential_rules.append(rule_text_in_json_string) | |
else: | |
logger.warning(f"Rule Upload (JSONL): Line {line_num+1} did not contain a string value. Got: {type(rule_text_in_json_string)}") | |
error_count +=1 | |
except json.JSONDecodeError: | |
logger.warning(f"Rule Upload (JSONL): Line {line_num+1} failed to parse as JSON: {line[:100]}") | |
error_count +=1 | |
else: | |
return "Unsupported file type for rules. Please use .txt or .jsonl." | |
valid_potential_rules = [r.strip() for r in potential_rules if r.strip()] | |
total_to_process = len(valid_potential_rules) | |
if total_to_process == 0 and error_count == 0: return "No valid rules found in file to process." | |
elif total_to_process == 0 and error_count > 0: return f"No valid rules found to process. Encountered {error_count} parsing/format errors." | |
progress(0, desc="Starting rules upload...") | |
for idx, rule_text in enumerate(valid_potential_rules): | |
success, status_msg = add_rule_entry(rule_text) | |
if success: added_count += 1 | |
elif status_msg == "duplicate": skipped_count += 1 | |
else: error_count += 1 | |
progress((idx + 1) / total_to_process, desc=f"Processed {idx+1}/{total_to_process} rules...") | |
msg = f"Rules Upload: Total valid rule segments processed: {total_to_process}. Added: {added_count}, Skipped (duplicates): {skipped_count}, Errors (parsing/add): {error_count}." | |
logger.info(msg); return msg | |
def ui_download_memories_action_fn(): | |
memories = get_all_memories_cached() | |
if not memories: | |
gr.Warning("No memories to download.") | |
return gr.DownloadButton(value=None, interactive=False, label="No Memories") | |
jsonl_content = "" | |
for mem_dict in memories: | |
try: jsonl_content += json.dumps(mem_dict) + "\n" | |
except Exception as e: logger.error(f"Error serializing memory for download: {mem_dict}, Error: {e}") | |
if not jsonl_content.strip(): | |
gr.Warning("No valid memories to serialize for download.") | |
return gr.DownloadButton(value=None, interactive=False, label="No Data") | |
try: | |
with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".jsonl", encoding='utf-8') as tmpfile: | |
tmpfile.write(jsonl_content) | |
return tmpfile.name | |
except Exception as e: | |
logger.error(f"Error creating memories download file: {e}") | |
gr.Error(f"Failed to prepare memories for download: {e}") | |
return gr.DownloadButton(value=None, interactive=False, label="Error") | |
def ui_upload_memories_action_fn(uploaded_file_obj, progress=gr.Progress()): | |
if not uploaded_file_obj: return "No file provided for memories upload." | |
try: | |
with open(uploaded_file_obj.name, 'r', encoding='utf-8') as f: content = f.read() | |
except Exception as e_read: return f"Error reading file: {e_read}" | |
if not content.strip(): return "Uploaded memories file is empty." | |
added_count, format_error_count, save_error_count = 0,0,0 | |
memory_objects_to_process = [] | |
file_ext = os.path.splitext(uploaded_file_obj.name.lower())[1] | |
if file_ext == ".json": | |
try: | |
parsed_json = json.loads(content) | |
if isinstance(parsed_json, list): memory_objects_to_process = parsed_json | |
elif isinstance(parsed_json, dict): memory_objects_to_process = [parsed_json] | |
else: | |
logger.warning(f"Memories Upload (.json): File content is not a JSON list or object. Type: {type(parsed_json)}"); format_error_count = 1 | |
except json.JSONDecodeError as e: | |
logger.warning(f"Memories Upload (.json): Invalid JSON file. Error: {e}"); format_error_count = 1 | |
elif file_ext == ".jsonl": | |
for line_num, line in enumerate(content.splitlines()): | |
line = line.strip() | |
if line: | |
try: memory_objects_to_process.append(json.loads(line)) | |
except json.JSONDecodeError: | |
logger.warning(f"Memories Upload (.jsonl): Line {line_num+1} parse error: {line[:100]}"); format_error_count += 1 | |
else: return "Unsupported file type for memories. Please use .json or .jsonl." | |
if not memory_objects_to_process and format_error_count > 0 : return f"Memories Upload: File parsing failed. Found {format_error_count} format errors and no processable objects." | |
elif not memory_objects_to_process: return "No valid memory objects found in the uploaded file." | |
total_to_process = len(memory_objects_to_process) | |
if total_to_process == 0: return "No memory objects to process (after parsing)." | |
progress(0, desc="Starting memories upload...") | |
for idx, mem_data in enumerate(memory_objects_to_process): | |
if isinstance(mem_data, dict) and all(k in mem_data for k in ["user_input", "bot_response", "metrics"]): | |
success, _ = add_memory_entry(mem_data["user_input"], mem_data["metrics"], mem_data["bot_response"]) | |
if success: added_count += 1 | |
else: save_error_count += 1 | |
else: | |
logger.warning(f"Memories Upload: Skipped invalid memory object structure: {str(mem_data)[:100]}"); format_error_count += 1 | |
progress((idx + 1) / total_to_process, desc=f"Processed {idx+1}/{total_to_process} memories...") | |
msg = f"Memories Upload: Processed {total_to_process} objects. Added: {added_count}, Format/Structure Errors: {format_error_count}, Save Errors: {save_error_count}." | |
logger.info(msg); return msg | |
def save_edited_rules_action_fn(edited_rules_text: str, progress=gr.Progress()): | |
if DEMO_MODE: | |
gr.Warning("Saving edited rules is disabled in Demo Mode.") | |
return "Saving edited rules is disabled in Demo Mode." | |
if not edited_rules_text.strip(): | |
return "No rules text to save." | |
stats = process_rules_from_text_blob(edited_rules_text, progress) | |
return f"Editor Save: Added: {stats['added']}, Skipped (duplicates): {stats['skipped']}, Errors/Invalid: {stats['errors']} from {stats['total']} unique rules in text." | |
def ui_upload_kb_from_image_fn(uploaded_image_filepath: str, password: str, progress=gr.Progress()): | |
if DEMO_MODE: | |
gr.Warning("Uploading is disabled in Demo Mode.") | |
return "Upload disabled in Demo Mode." | |
if not uploaded_image_filepath: | |
return "No image file provided or pasted." | |
progress(0, desc="Loading and standardizing image...") | |
try: | |
img_temp = Image.open(uploaded_image_filepath) | |
img = set_pil_image_format_to_png(img_temp) | |
except Exception as e: | |
logger.error(f"KB ImgUL: Open/Standardize fail: {e}") | |
return f"Error: Could not open or process image file: {e}" | |
progress(0.2, desc="Extracting data from image...") | |
try: | |
extracted_bytes = extract_data_from_image(img) | |
if not extracted_bytes: return "No data found embedded in the image." | |
except ValueError as e: | |
logger.error(f"KB ImgUL: Extract fail: {e}") | |
return f"Error extracting data: {e}" | |
except Exception as e: | |
logger.error(f"KB ImgUL: Extract error: {e}", exc_info=True) | |
return f"Unexpected extraction error: {e}" | |
kv_string = "" | |
try: | |
if extracted_bytes[:20].decode('utf-8', errors='ignore').strip().startswith("# iLearn"): | |
kv_string = extracted_bytes.decode('utf-8') | |
progress(0.4, desc="Parsing data...") | |
elif password and password.strip(): | |
progress(0.3, desc="Attempting decryption...") | |
kv_string = decrypt_data(extracted_bytes, password.strip()).decode('utf-8') | |
progress(0.4, desc="Parsing decrypted data...") | |
else: return "Data appears encrypted, but no password was provided." | |
except (UnicodeDecodeError, InvalidTag, ValueError) as e: | |
if "decryption" in str(e).lower() or isinstance(e, InvalidTag): | |
return f"Decryption Failed. Check password or file integrity. Details: {e}" | |
return "Data is binary and requires a password for decryption." | |
except Exception as e: | |
logger.error(f"KB ImgUL: Decrypt/Parse error: {e}", exc_info=True) | |
return f"Unexpected error during decryption or parsing: {e}" | |
if not kv_string: return "Could not get data from image (after potential decryption)." | |
try: | |
kv_dict = parse_kv_string_to_dict(kv_string) | |
except Exception as e: | |
logger.error(f"KB ImgUL: Parse fail: {e}") | |
return f"Error parsing data: {e}" | |
if not kv_dict: return "Parsed data is empty." | |
stats = import_kb_from_kv_dict(kv_dict, progress) | |
msg = f"Upload Complete. Rules - Add: {stats['rules_added']}, Skip: {stats['rules_skipped']}, Err: {stats['rules_errors']}. Mems - Add: {stats['mems_added']}, Err: {stats['mems_errors']}." | |
logger.info(f"Image KB Upload: {msg}") | |
return msg | |
def app_load_fn(): | |
logger.info("App loading. Initializing systems...") | |
initialize_memory_system() | |
logger.info("Memory system initialized.") | |
rules_added, rules_skipped, rules_errors = load_rules_from_file(LOAD_RULES_FILE) | |
rules_load_msg = f"Rules: Added {rules_added}, Skipped {rules_skipped}, Errors {rules_errors} from {LOAD_RULES_FILE or 'None'}." | |
logger.info(rules_load_msg) | |
mems_added, mems_format_errors, mems_save_errors = load_memories_from_file(LOAD_MEMORIES_FILE) | |
mems_load_msg = f"Memories: Added {mems_added}, Format Errors {mems_format_errors}, Save Errors {mems_save_errors} from {LOAD_MEMORIES_FILE or 'None'}." | |
logger.info(mems_load_msg) | |
final_status = f"AI Systems Initialized. {rules_load_msg} {mems_load_msg} Ready." | |
rules_on_load, mems_on_load = ui_refresh_rules_display_fn(), ui_refresh_memories_display_fn() | |
return (final_status, rules_on_load, mems_on_load, gr.HTML("<p><i>Research Log will appear here.</i></p>"), | |
gr.Textbox(value="*Waiting...*", interactive=True, show_copy_button=True), | |
gr.DownloadButton(interactive=False, value=None, visible=False)) | |
placeholder_filename = "placeholder_image.png" | |
try: | |
if not os.path.exists(placeholder_filename): | |
img = Image.new('RGB', (200, 100), color='darkblue') | |
draw = Image.Draw(img) | |
try: | |
font = _get_font(PREFERRED_FONTS, 14) | |
draw.text((10, 45), "Placeholder KB Image", font=font, fill='white') | |
except Exception: | |
draw.text((10, 45), "Placeholder", fill='white') | |
img.save(placeholder_filename) | |
logger.info(f"Created '{placeholder_filename}' for Gradio examples.") | |
except Exception as e: | |
logger.error(f"Could not create placeholder image. The examples may not load correctly. Error: {e}") | |
def ui_create_kb_image_fn(password: str, content_to_include: list, progress=gr.Progress()): | |
include_rules = "Include Rules" in content_to_include | |
include_memories = "Include Memories" in content_to_include | |
if not include_rules and not include_memories: | |
gr.Warning("Nothing selected to save.") | |
return gr.update(value=None, visible=False), gr.update(value=None, visible=False), "Nothing selected to save." | |
progress(0.1, desc="Fetching knowledge base...") | |
rules = get_all_rules_cached() if include_rules else [] | |
memories = get_all_memories_cached() if include_memories else [] | |
if not rules and not memories: | |
gr.Warning("Knowledge base is empty or selected content is empty.") | |
return gr.update(value=None, visible=False), gr.update(value=None, visible=False), "No content to save." | |
progress(0.2, desc="Serializing data...") | |
kv_string = convert_kb_to_kv_string(rules, memories, include_rules, include_memories) | |
data_bytes = kv_string.encode('utf-8') | |
if password and password.strip(): | |
progress(0.3, desc="Encrypting data...") | |
try: | |
data_bytes = encrypt_data(data_bytes, password.strip()) | |
except Exception as e: | |
logger.error(f"KB ImgDL: Encrypt failed: {e}") | |
return gr.update(value=None, visible=False), gr.update(value=None, visible=False), f"Error: {e}" | |
progress(0.5, desc="Generating carrier image...") | |
carrier_image = generate_brain_carrier_image(w=800, h=800) | |
progress(0.6, desc="Adding visual overlay...") | |
keys_for_overlay = [] | |
if include_rules: keys_for_overlay.append(f"Rule Count: {len(rules)}") | |
if include_memories: keys_for_overlay.append(f"Memory Count: {len(memories)}") | |
title_overlay = "Encrypted Knowledge Base" if password and password.strip() else "iLearn Knowledge Base" | |
image_with_overlay = draw_key_list_dropdown_overlay(carrier_image, keys=keys_for_overlay, title=title_overlay) | |
try: | |
progress(0.8, desc="Embedding data into final image...") | |
final_image_with_data = embed_data_in_image(image_with_overlay, data_bytes) | |
except ValueError as e: | |
logger.error(f"KB ImgDL: Embed failed: {e}") | |
return gr.update(value=None, visible=False), gr.update(value=None, visible=False), f"Error: {e}" | |
progress(0.9, desc="Preparing final image and download file...") | |
try: | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmpfile: | |
final_image_with_data.save(tmpfile, format="PNG") | |
tmp_path = tmpfile.name | |
progress(1.0, desc="Image created!") | |
return gr.update(value=tmp_path, visible=True), gr.update(value=tmp_path, visible=True), "Success! Image created." | |
except Exception as e: | |
logger.error(f"KB ImgDL: Save failed: {e}") | |
return gr.update(value=None, visible=False), gr.update(value=None, visible=False), f"Error: {e}" | |
def ui_load_from_sources_fn(image_filepath: str, rules_file_obj: object, mems_file_obj: object, password: str, progress=gr.Progress()): | |
if image_filepath: | |
progress(0.1, desc="Image source detected. Starting image processing...") | |
return ui_upload_kb_from_image_fn(image_filepath, password, progress) | |
if rules_file_obj: | |
progress(0.1, desc="Rules file detected. Starting rules import...") | |
return ui_upload_rules_action_fn(rules_file_obj, progress) | |
if mems_file_obj: | |
progress(0.1, desc="Memories file detected. Starting memories import...") | |
return ui_upload_memories_action_fn(mems_file_obj, progress) | |
return "No file or image uploaded. Please provide a source file to load." | |
with gr.Blocks(theme=gr.themes.Soft(), css=".gr-button { margin: 5px; } .gr-textbox, .gr-text-area, .gr-dropdown, .gr-json { border-radius: 8px; } .gr-group { border: 1px solid #e0e0e0; border-radius: 8px; padding: 10px; } .gr-row { gap: 10px; } .gr-tab { border-radius: 8px; } .status-text { font-size: 0.9em; color: #555; } .gr-json { max-height: 400px; overflow-y: auto; }") as demo: | |
gr.Markdown(f"# ๐ค iLearn: An Autonomous Learning Agent {'(DEMO MODE)' if DEMO_MODE else ''}", elem_classes=["header"]) | |
is_sqlite, is_hf_dataset = (MEMORY_STORAGE_BACKEND == "SQLITE"), (MEMORY_STORAGE_BACKEND == "HF_DATASET") | |
with gr.Row(variant="compact"): | |
agent_stat_tb = gr.Textbox(label="Agent Status", value="Initializing systems...", interactive=False, elem_classes=["status-text"], scale=4) | |
with gr.Column(scale=1, min_width=150): | |
memory_backend_info_tb = gr.Textbox(label="Memory Backend", value=MEMORY_STORAGE_BACKEND, interactive=False, elem_classes=["status-text"]) | |
sqlite_path_display = gr.Textbox(label="SQLite Path", value=MEMORY_SQLITE_PATH, interactive=False, visible=is_sqlite, elem_classes=["status-text"]) | |
hf_repos_display = gr.Textbox(label="HF Repos", value=f"M: {MEMORY_HF_MEM_REPO}, R: {MEMORY_HF_RULES_REPO}", interactive=False, visible=is_hf_dataset, elem_classes=["status-text"]) | |
with gr.Sidebar(): | |
gr.Markdown("## โ๏ธ Configuration") | |
with gr.Group(): | |
gr.Markdown("### AI Model Settings") | |
api_key_tb = gr.Textbox(label="AI Provider API Key (Override)", type="password", placeholder="Uses .env if blank") | |
available_providers = get_available_providers(); default_provider = available_providers[0] if "groq" not in available_providers else "groq" | |
prov_sel_dd = gr.Dropdown(label="AI Provider", choices=available_providers, value=default_provider, interactive=True) | |
default_model_display = get_default_model_display_name_for_provider(default_provider) if default_provider else None | |
model_sel_dd = gr.Dropdown(label="AI Model", choices=get_model_display_names_for_provider(default_provider) if default_provider else [], value=default_model_display, interactive=True) | |
research_steps_slider = gr.Slider(label="Max Research Steps", minimum=1, maximum=10, step=1, value=3, interactive=True) | |
with gr.Group(): | |
gr.Markdown("### System Prompt"); sys_prompt_tb = gr.Textbox(label="System Prompt Base", lines=8, value=DEFAULT_SYSTEM_PROMPT, interactive=True) | |
with gr.Tabs(): | |
with gr.TabItem("๐ฌ Chat & Research"): | |
with gr.Row(): | |
with gr.Column(scale=3): | |
gr.Markdown("### AI Chat Interface") | |
main_chat_disp = gr.Chatbot(label=None, height=450, bubble_full_width=False,avatar_images=(None, "https://raw.githubusercontent.com/gradio-app/gradio/main/guides/assets/logo.png"), show_copy_button=True, render_markdown=True, sanitize_html=True) | |
with gr.Row(variant="compact"): | |
user_msg_tb = gr.Textbox(show_label=False, placeholder="Ask your research question...", scale=7, lines=1, max_lines=3) | |
send_btn = gr.Button("Send", variant="primary", scale=1, min_width=100) | |
with gr.Accordion("๐ Detailed Response & Research Log", open=True): | |
research_log_html = gr.HTML(label="Research Log", value="<div class='log-container'><p><i>Waiting for a new task to begin...</i></p></div>") | |
fmt_report_tb = gr.Textbox(label="Full AI Response", lines=8, interactive=True, show_copy_button=True) | |
dl_report_btn = gr.DownloadButton("Download Report", value=None, interactive=False, visible=False) | |
with gr.TabItem("๐ง Knowledge Base"): | |
with gr.Tabs(): | |
with gr.TabItem("๐๏ธ System"): | |
gr.Markdown("View and directly manage the current rules and memories in the system.") | |
with gr.Row(equal_height=False, variant='compact'): | |
with gr.Column(): | |
gr.Markdown("### ๐ Current Rules") | |
rules_disp_ta = gr.TextArea(label=None, lines=15, placeholder="Rules will appear here.", interactive=True) | |
save_edited_rules_btn = gr.Button("๐พ Save Edited Rules", variant="primary", interactive=not DEMO_MODE) | |
clear_rules_btn = gr.Button("๐๏ธ Clear All Rules", variant="stop", visible=not DEMO_MODE) | |
with gr.Column(): | |
gr.Markdown("### ๐ Current Memories") | |
mems_disp_json = gr.JSON(label=None, value=[], scale=1) | |
clear_mems_btn = gr.Button("๐๏ธ Clear All Memories", variant="stop", visible=not DEMO_MODE) | |
with gr.TabItem("๐พ Save KB"): | |
gr.Markdown("Export the current knowledge base as text files or as a single, portable PNG image.") | |
with gr.Row(): | |
rules_stat_tb = gr.Textbox(label="Rules Status", interactive=False, lines=1, elem_classes=["status-text"]) | |
mems_stat_tb = gr.Textbox(label="Memories Status", interactive=False, lines=1, elem_classes=["status-text"]) | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("### Text File Export") | |
dl_rules_btn = gr.DownloadButton("โฌ๏ธ Download Rules (.txt)", value=None) | |
dl_mems_btn = gr.DownloadButton("โฌ๏ธ Download Memories (.jsonl)", value=None) | |
gr.Row() | |
if MEMORY_STORAGE_BACKEND == "RAM": save_faiss_sidebar_btn = gr.Button("Save FAISS Indices", variant="secondary") | |
with gr.Column(): | |
gr.Markdown("### Image Export") | |
with gr.Group(): | |
save_kb_password_tb = gr.Textbox(label="Password (optional for encryption)", type="password") | |
save_kb_include_cbg = gr.CheckboxGroup(label="Content to Include", choices=["Include Rules", "Include Memories"], value=["Include Rules", "Include Memories"]) | |
create_kb_img_btn = gr.Button("โจ Create KB Image", variant="secondary") | |
kb_image_display_output = gr.Image(label="Generated Image (Right-click to copy)", type="filepath", visible=False) | |
kb_image_download_output = gr.DownloadButton("โฌ๏ธ Download Image File", visible=False) | |
with gr.TabItem("๐ Load KB"): | |
gr.Markdown("Import rules, memories, or a full KB from local files or a portable PNG image.") | |
load_status_tb = gr.Textbox(label="Load Operation Status", interactive=False, lines=2) | |
load_kb_password_tb = gr.Textbox(label="Password (for decrypting images)", type="password") | |
with gr.Group(): | |
gr.Markdown("#### Sources (Priority: Image > Rules File > Memories File)") | |
with gr.Row(): | |
upload_kb_img_fobj = gr.Image(label="1. Image Source", type="filepath", sources=["upload", "clipboard"], interactive=not DEMO_MODE) | |
upload_rules_fobj = gr.File(label="2. Rules File Source (.txt/.jsonl)", file_types=[".txt", ".jsonl"], interactive=not DEMO_MODE) | |
upload_mems_fobj = gr.File(label="3. Memories File Source (.json/.jsonl)", file_types=[".jsonl", ".json"], interactive=not DEMO_MODE) | |
load_master_btn = gr.Button("โฌ๏ธ Load from Sources", variant="primary", interactive=not DEMO_MODE) | |
gr.Examples( | |
examples=[ | |
["https://huggingface.co/spaces/Agents-MCP-Hackathon/iLearn/resolve/main/evolutions/e0.01.01.png", ""], | |
], | |
inputs=[upload_kb_img_fobj, load_kb_password_tb], | |
label="Click an Example to Load Data" | |
) | |
def dyn_upd_model_dd(sel_prov_dyn: str): | |
models_dyn = get_model_display_names_for_provider(sel_prov_dyn); def_model_dyn = get_default_model_display_name_for_provider(sel_prov_dyn) | |
return gr.Dropdown(choices=models_dyn, value=def_model_dyn, interactive=True) | |
prov_sel_dd.change(fn=dyn_upd_model_dd, inputs=prov_sel_dd, outputs=model_sel_dd) | |
chat_ins = [user_msg_tb, research_steps_slider, main_chat_disp, prov_sel_dd, model_sel_dd, api_key_tb, sys_prompt_tb] | |
chat_outs = [user_msg_tb, main_chat_disp, agent_stat_tb, research_log_html, fmt_report_tb, dl_report_btn, rules_disp_ta, mems_disp_json] | |
chat_event_args = {"fn": handle_gradio_chat_submit, "inputs": chat_ins, "outputs": chat_outs} | |
send_btn.click(**chat_event_args); user_msg_tb.submit(**chat_event_args) | |
save_edited_rules_btn.click(fn=save_edited_rules_action_fn, inputs=[rules_disp_ta], outputs=[rules_stat_tb], show_progress="full").then(fn=ui_refresh_rules_display_fn, outputs=rules_disp_ta, show_progress=False) | |
clear_rules_btn.click(fn=lambda: ("All rules cleared." if clear_all_rules_data_backend() else "Error clearing rules."), outputs=rules_stat_tb, show_progress=False).then(fn=ui_refresh_rules_display_fn, outputs=rules_disp_ta, show_progress=False) | |
clear_mems_btn.click(fn=lambda: ("All memories cleared." if clear_all_memory_data_backend() else "Error clearing memories."), outputs=mems_stat_tb, show_progress=False).then(fn=ui_refresh_memories_display_fn, outputs=mems_disp_json, show_progress=False) | |
dl_rules_btn.click(fn=ui_download_rules_action_fn, inputs=None, outputs=dl_rules_btn, show_progress=False) | |
dl_mems_btn.click(fn=ui_download_memories_action_fn, inputs=None, outputs=dl_mems_btn, show_progress=False) | |
create_kb_img_btn.click( | |
fn=ui_create_kb_image_fn, | |
inputs=[save_kb_password_tb, save_kb_include_cbg], | |
outputs=[kb_image_display_output, kb_image_download_output, load_status_tb], | |
show_progress="full" | |
) | |
load_master_btn.click( | |
fn=ui_load_from_sources_fn, | |
inputs=[upload_kb_img_fobj, upload_rules_fobj, upload_mems_fobj, load_kb_password_tb], | |
outputs=[load_status_tb], | |
show_progress="full" | |
).then( | |
fn=ui_refresh_rules_display_fn, outputs=rules_disp_ta | |
).then( | |
fn=ui_refresh_memories_display_fn, outputs=mems_disp_json | |
) | |
if MEMORY_STORAGE_BACKEND == "RAM" and 'save_faiss_sidebar_btn' in locals(): | |
def save_faiss_action_with_feedback_sidebar_fn(): | |
try: save_faiss_indices_to_disk(); gr.Info("Attempted to save FAISS indices to disk.") | |
except Exception as e: logger.error(f"Error saving FAISS indices: {e}", exc_info=True); gr.Error(f"Error saving FAISS indices: {e}") | |
save_faiss_sidebar_btn.click(fn=save_faiss_action_with_feedback_sidebar_fn, inputs=None, outputs=None, show_progress=False) | |
app_load_outputs = [agent_stat_tb, rules_disp_ta, mems_disp_json, research_log_html, fmt_report_tb, dl_report_btn] | |
demo.load(fn=app_load_fn, inputs=None, outputs=app_load_outputs, show_progress="full") | |
if __name__ == "__main__": | |
logger.info(f"Starting Gradio AI Research Mega Agent (v9.1 - Correct 1-Click JS Download, Memory: {MEMORY_STORAGE_BACKEND})...") | |
app_port = int(os.getenv("GRADIO_PORT", 7860)) | |
app_server = os.getenv("GRADIO_SERVER_NAME", "127.0.0.1") | |
app_debug = os.getenv("GRADIO_DEBUG", "False").lower() == "false" | |
app_share = os.getenv("GRADIO_SHARE", "False").lower() == "true" | |
logger.info(f"Launching Gradio server: http://{app_server}:{app_port}. Debug: {app_debug}, Share: {app_share}") | |
demo.queue().launch(server_name=app_server, server_port=app_port, debug=app_debug, share=app_share, mcp_server=True, max_threads=40) | |
logger.info("Gradio application shut down.") |