diff --git "a/app.py" "b/app.py"
--- "a/app.py"
+++ "b/app.py"
@@ -1,5 +1,4 @@
-# Import libraries and references:
-import anthropic
+# %% ───────────── IMPORTS ─────────────
import base64
import glob
import hashlib
@@ -7,11 +6,6 @@ import json
import os
import pandas as pd
import pytz
-
-
-
-
-
import random
import re
import shutil
@@ -25,24 +19,15 @@ from azure.cosmos import CosmosClient, exceptions
from datetime import datetime
from git import Repo
from github import Github
-
-from gradio_client import Client
-from urllib.parse import quote
-
-
-# Add these imports at the top of your file
from gradio_client import Client, handle_file
import tempfile
-from PIL import Image
import io
import requests
import numpy as np
-
-
-# 🎭 App Configuration - Because every app needs a good costume!
-Site_Name = '🐙GitCosmos🌌 - AI Azure Cosmos DB and Github Agent'
-title = "🐙GitCosmos🌌 - AI Azure Cosmos DB and Github Agent"
+# %% ───────────── APP CONFIGURATION ─────────────
+Site_Name = '🐙 GitCosmos'
+title = "🐙 GitCosmos"
helpURL = 'https://huggingface.co/awacke1'
bugURL = 'https://huggingface.co/spaces/awacke1/AzureCosmosDBUI/'
icons = '🐙🌌💫'
@@ -58,68 +43,55 @@ st.set_page_config(
}
)
-
-# 🌌 Cosmos DB configuration - Where data goes to party!
+# Cosmos DB & App URLs
ENDPOINT = "https://acae-afd.documents.azure.com:443/"
DATABASE_NAME = os.environ.get("COSMOS_DATABASE_NAME")
CONTAINER_NAME = os.environ.get("COSMOS_CONTAINER_NAME")
Key = os.environ.get("Key")
-
-# 🌐 Your local app URL - Home sweet home
LOCAL_APP_URL = "https://huggingface.co/spaces/awacke1/AzureCosmosDBUI"
CosmosDBUrl = 'https://portal.azure.com/#@AaronCWackergmail.onmicrosoft.com/resource/subscriptions/003fba60-5b3f-48f4-ab36-3ed11bc40816/resourceGroups/datasets/providers/Microsoft.DocumentDB/databaseAccounts/acae-afd/dataExplorer'
-# 🤖 Anthropic configuration - Teaching machines to be more human (and funnier)
-anthropicclient = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
-
-# 🧠 Initialize session state - Because even apps need a good memory
-if "chat_history" not in st.session_state:
- st.session_state.chat_history = []
-
-
-
-# 🛠️ Helper Functions - The unsung heroes of our code
-
-# 📎 Get a file download link - Making file sharing as easy as stealing candy from a baby
+# %% ───────────── HELPER FUNCTIONS ─────────────
+# 📎 Get download link from file
def get_download_link(file_path):
with open(file_path, "rb") as file:
contents = file.read()
b64 = base64.b64encode(contents).decode()
file_name = os.path.basename(file_path)
- return f'Download {file_name}📂'
+ return f'Download {file_name} 📂'
-# 🎲 Generate a unique ID - Because being unique is important (just ask your mother)
+# 🆔 Generate a unique ID
def generate_unique_id():
timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f')
unique_uuid = str(uuid.uuid4())
- returnValue = f"{timestamp}-{unique_uuid}"
- st.write('New Unique ID:' + returnValue)
- return
+ return_value = f"{timestamp}-{unique_uuid}"
+ st.write('New ID: ' + return_value)
+ return return_value
-# 📝 Generate a filename - Naming files like a pro (or a very confused librarian)
+# 📄 Generate a filename from a prompt
def generate_filename(prompt, file_type):
central = pytz.timezone('US/Central')
safe_date_time = datetime.now(central).strftime("%m%d_%H%M")
safe_prompt = re.sub(r'\W+', '', prompt)[:90]
return f"{safe_date_time}{safe_prompt}.{file_type}"
-# 💾 Create and save a file - Because data hoarding is a legitimate hobby
+# 💾 Create and save a file with prompt & response
def create_file(filename, prompt, response, should_save=True):
if not should_save:
return
with open(filename, 'w', encoding='utf-8') as file:
file.write(prompt + "\n\n" + response)
-# 📖 Load file content - Bringing words back from the digital grave
+# 📖 Load file content from disk
def load_file(file_name):
with open(file_name, "r", encoding='utf-8') as file:
content = file.read()
return content
-# 🔍 Display glossary entity - Making search fun again (as if it ever was)
+# 🔍 Display glossary entity with quick links
def display_glossary_entity(k):
search_urls = {
- "🚀🌌ArXiv": lambda k: f"/?q={quote(k)}",
+ "🚀": lambda k: f"/?q={k}",
"📖": lambda k: f"https://en.wikipedia.org/wiki/{quote(k)}",
"🔍": lambda k: f"https://www.google.com/search?q={quote(k)}",
"🎥": lambda k: f"https://www.youtube.com/results?search_query={quote(k)}",
@@ -127,7 +99,7 @@ def display_glossary_entity(k):
links_md = ' '.join([f"{emoji}" for emoji, url in search_urls.items()])
st.markdown(f"{k} {links_md}", unsafe_allow_html=True)
-# 🗜️ Create zip of files - Squeezing files together like sardines in a can
+# 🗜️ Create zip of multiple files
def create_zip_of_files(files):
zip_name = "all_files.zip"
with zipfile.ZipFile(zip_name, 'w') as zipf:
@@ -135,157 +107,143 @@ def create_zip_of_files(files):
zipf.write(file)
return zip_name
-# 🎬 Get video HTML - Making videos play nice (or at least trying to)
+# 🎬 Get HTML for video playback
def get_video_html(video_path, width="100%"):
video_url = f"data:video/mp4;base64,{base64.b64encode(open(video_path, 'rb').read()).decode()}"
return f'''
'''
-# 🎵 Get audio HTML - Let the music play (and hope it's not Baby Shark)
+# 🎵 Get HTML for audio playback
def get_audio_html(audio_path, width="100%"):
audio_url = f"data:audio/mpeg;base64,{base64.b64encode(open(audio_path, 'rb').read()).decode()}"
return f'''
'''
-# 🌌 Cosmos DB functions - Where data goes to live its best life
+# 📝 Preprocess text for JSON safety
+def preprocess_text(text):
+ text = text.replace('\r\n', '\\n').replace('\r', '\\n').replace('\n', '\\n')
+ text = text.replace('"', '\\"')
+ text = re.sub(r'[\t]', ' ', text)
+ text = re.sub(r'[^\x00-\x7F]+', '', text)
+ return text.strip()
-# 📚 Get databases - Collecting databases like Pokemon cards
+# %% ───────────── COSMOS DB FUNCTIONS ─────────────
+# 📚 List all databases in Cosmos
def get_databases(client):
return [db['id'] for db in client.list_databases()]
-# 📦 Get containers - Finding where all the good stuff is hidden
+# 📦 List all containers in a database
def get_containers(database):
return [container['id'] for container in database.list_containers()]
-# 📄 Get documents - Retrieving the sacred texts (or just some JSON)
+# 📄 Query documents from a container (most recent first)
def get_documents(container, limit=None):
query = "SELECT * FROM c ORDER BY c._ts DESC"
items = list(container.query_items(query=query, enable_cross_partition_query=True, max_item_count=limit))
return items
-# 📥 Insert record - Adding new data (and crossing fingers it doesn't break anything)
+# 📥 Insert a new record into Cosmos
def insert_record(container, record):
try:
container.create_item(body=record)
- return True, "Record inserted successfully! 🎉"
+ return True, "Inserted! 🎉"
except exceptions.CosmosHttpResponseError as e:
- return False, f"HTTP error occurred: {str(e)} 🚨"
+ return False, f"HTTP error: {str(e)} 🚨"
except Exception as e:
- return False, f"An unexpected error occurred: {str(e)} 😱"
+ return False, f"Error: {str(e)} 😱"
-# 🔄 Update record - Giving data a makeover
+# 🔄 Update an existing Cosmos record
def update_record(container, updated_record):
try:
container.upsert_item(body=updated_record)
- return True, f"Record with id {updated_record['id']} successfully updated. 🛠️"
+ return True, f"Updated {updated_record['id']} 🛠️"
except exceptions.CosmosHttpResponseError as e:
- return False, f"HTTP error occurred: {str(e)} 🚨"
+ return False, f"HTTP error: {str(e)} 🚨"
except Exception as e:
- return False, f"An unexpected error occurred: {traceback.format_exc()} 😱"
+ return False, f"Error: {traceback.format_exc()} 😱"
-# 🗑️ Delete record - Saying goodbye to data (it's not you, it's me)
+# 🗑️ Delete a Cosmos record by id
def delete_record(container, record):
try:
container.delete_item(item=record['id'], partition_key=record['id'])
- return True, f"Record with id {record['id']} successfully deleted. 🗑️"
+ return True, f"Deleted {record['id']} 🗑️"
except exceptions.CosmosHttpResponseError as e:
- return False, f"HTTP error occurred: {str(e)} 🚨"
+ return False, f"HTTP error: {str(e)} 🚨"
except Exception as e:
- return False, f"An unexpected error occurred: {traceback.format_exc()} 😱"
+ return False, f"Error: {traceback.format_exc()} 😱"
-
-# 💾 Save to Cosmos DB - Preserving data for future generations (or just until the next update)
+# 💾 Save a new document to Cosmos DB with extra fields
def save_to_cosmos_db(container, query, response1, response2):
try:
if container:
- # Generate a unique ID that includes a timestamp
timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f')
unique_uuid = str(uuid.uuid4())
new_id = f"{timestamp}-{unique_uuid}"
-
- # Create new document with proper name field
record = {
"id": new_id,
- "name": new_id, # Set name equal to ID to avoid null name error
+ "name": new_id,
"query": query,
"response1": response1,
"response2": response2,
"timestamp": datetime.utcnow().isoformat(),
- "type": "ai_response", # Add document type for better organization
+ "type": "ai_response",
"version": "1.0"
}
-
- try:
- # Create the new document
- container.create_item(body=record)
- st.success(f"Record saved successfully with ID: {record['id']}")
- # Refresh the documents display
- st.session_state.documents = get_documents(container)
- except exceptions.CosmosHttpResponseError as e:
- st.error(f"Error saving record to Cosmos DB: {e}")
+ container.create_item(body=record)
+ st.success(f"Saved: {record['id']}")
+ st.session_state.documents = get_documents(container)
else:
- st.error("Cosmos DB container is not initialized.")
+ st.error("Cosmos container not initialized.")
except Exception as e:
- st.error(f"An unexpected error occurred: {str(e)}")
-
-
-
-
-
-
-
-
-def save_to_cosmos_db_old(container, query, response1, response2):
+ st.error(f"Save error: {str(e)}")
+# 🗄️ Archive all documents in a container and provide a download link
+def archive_current_container(database_name, container_name, client):
try:
- if container:
- record = {
- "id": generate_unique_id(),
- "query": query,
- "response1": response1,
- "response2": response2,
- "timestamp": datetime.utcnow().isoformat()
- }
- try:
- container.create_item(body=record)
- st.success(f"Record saved successfully with ID: {record['id']}")
- # Refresh the documents display
- st.session_state.documents = get_documents(container)
- except exceptions.CosmosHttpResponseError as e:
- st.error(f"Error saving record to Cosmos DB: {e}")
- else:
- st.error("Cosmos DB container is not initialized.")
+ base_dir = "./cosmos_archive_current_container"
+ if os.path.exists(base_dir):
+ shutil.rmtree(base_dir)
+ os.makedirs(base_dir)
+ db_client = client.get_database_client(database_name)
+ container_client = db_client.get_container_client(container_name)
+ items = list(container_client.read_all_items())
+ container_dir = os.path.join(base_dir, container_name)
+ os.makedirs(container_dir)
+ for item in items:
+ item_id = item.get('id', f"unknown_{datetime.now().strftime('%Y%m%d%H%M%S')}")
+ with open(os.path.join(container_dir, f"{item_id}.json"), 'w') as f:
+ json.dump(item, f, indent=2)
+ archive_name = f"{container_name}_archive_{datetime.now().strftime('%Y%m%d%H%M%S')}"
+ shutil.make_archive(archive_name, 'zip', base_dir)
+ return get_download_link(f"{archive_name}.zip")
except Exception as e:
- st.error(f"An unexpected error occurred: {str(e)}")
-
+ return f"Archive error: {str(e)} 😢"
-
-# 🐙 GitHub functions - Where code goes to socialize
-
-# 📥 Download GitHub repo - Cloning repos like it's going out of style
+# %% ───────────── GITHUB FUNCTIONS ─────────────
+# 📥 Clone a GitHub repository locally
def download_github_repo(url, local_path):
if os.path.exists(local_path):
shutil.rmtree(local_path)
Repo.clone_from(url, local_path)
-# 🗜️ Create zip file - Squeezing files tighter than your budget
+# 🗜️ Zip a directory
def create_zip_file(source_dir, output_filename):
shutil.make_archive(output_filename, 'zip', source_dir)
-# 🏗️ Create repo - Building digital homes for lonely code
+# 🏗️ Create a new GitHub repo via API
def create_repo(g, repo_name):
user = g.get_user()
return user.create_repo(repo_name)
-# 🚀 Push to GitHub - Sending code to the cloud (hopefully not the rainy kind)
+# 🚀 Push local repo changes to GitHub
def push_to_github(local_path, repo, github_token):
repo_url = f"https://{github_token}@github.com/{repo.full_name}.git"
local_repo = Repo(local_path)
@@ -304,192 +262,18 @@ def push_to_github(local_path, repo, github_token):
local_repo.git.commit('-m', 'Initial commit')
origin.push(refspec=f'{current_branch}:{current_branch}')
-
-def save_or_clone_to_cosmos_db(container, document=None, clone_id=None):
- def generate_complex_unique_id():
- timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f')
- random_component = ''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=8))
- return f"{timestamp}-{random_component}-{str(uuid.uuid4())}"
- max_retries = 10
- base_delay = 0.1
- for attempt in range(max_retries):
- try:
- new_id = generate_complex_unique_id()
- if clone_id:
- try:
- existing_doc = container.read_item(item=clone_id, partition_key=clone_id)
- new_doc = {
- 'id': new_id,
- 'originalText': existing_doc.get('originalText', ''),
- 'qtPrompts': existing_doc.get('qtPrompts', []),
- 'cloned_from': clone_id,
- 'cloned_at': datetime.utcnow().isoformat()
- }
- except exceptions.CosmosResourceNotFoundError:
- return False, f"Document with ID {clone_id} not found for cloning."
- else:
- if document is None:
- return False, "No document provided for saving"
- document['id'] = new_id
- document['created_at'] = datetime.utcnow().isoformat()
- new_doc = document
- response = container.create_item(body=new_doc)
- return True, f"{'Cloned' if clone_id else 'New'} document saved successfully with ID: {response['id']}"
- except exceptions.CosmosHttpResponseError as e:
- if e.status_code == 409:
- delay = base_delay * (2 ** attempt) + random.uniform(0, 0.1)
- time.sleep(delay)
- continue
- return False, f"Error saving to Cosmos DB: {str(e)}"
- except Exception as e:
- return False, f"An unexpected error occurred: {str(e)}"
- return False, "Failed to save document after maximum retries."
-
-
-# 📦 Archive current container - Packing up data like you're moving to a new digital house
-def archive_current_container(database_name, container_name, client):
- try:
- base_dir = "./cosmos_archive_current_container"
- if os.path.exists(base_dir):
- shutil.rmtree(base_dir)
- os.makedirs(base_dir)
- db_client = client.get_database_client(database_name)
- container_client = db_client.get_container_client(container_name)
- items = list(container_client.read_all_items())
- container_dir = os.path.join(base_dir, container_name)
- os.makedirs(container_dir)
- for item in items:
- item_id = item.get('id', f"unknown_{datetime.now().strftime('%Y%m%d%H%M%S')}")
- with open(os.path.join(container_dir, f"{item_id}.json"), 'w') as f:
- json.dump(item, f, indent=2)
- archive_name = f"{container_name}_archive_{datetime.now().strftime('%Y%m%d%H%M%S')}"
- shutil.make_archive(archive_name, 'zip', base_dir)
- return get_download_link(f"{archive_name}.zip")
- except Exception as e:
- return f"An error occurred while archiving data: {str(e)} 😢"
-
-def gen_AI_IO_filename(display_query, output):
- # Get current time in Central Time Zone with milliseconds
- now_central = datetime.now(pytz.timezone("America/Chicago"))
- timestamp = now_central.strftime("%Y-%m-%d-%I-%M-%S-%f-%p")
-
- # Limit components to prevent excessive filename length
- display_query = display_query[:50] # Truncate display_query to 50 chars
- output_snippet = re.sub(r'[^A-Za-z0-9]+', '_', output[:100]) # Truncate output_snippet to 100 chars
-
- filename = f"{timestamp} - {display_query} - {output_snippet}.md"
- return filename
-
-# 🔍 Search glossary - Finding needles in digital haystacks
-def search_glossary(query):
- st.markdown(f"### 🔍 SearchGlossary for: {query}")
- model_options = ['mistralai/Mixtral-8x7B-Instruct-v0.1', 'mistralai/Mistral-7B-Instruct-v0.2']
- model_choice = st.selectbox('🧠 Select LLM Model', options=model_options, index=1, key=f"model_choice_{id(query)}")
- database_options = ['Semantic Search', 'Arxiv Search - Latest - (EXPERIMENTAL)']
- database_choice = st.selectbox('📚 Select Database', options=database_options, index=0, key=f"database_choice_{id(query)}")
-
- # 🕵️♂️ Searching the glossary for: query
- all_results = ""
- # Limit the query display to 80 characters
- display_query = query[:80] + "..." if len(query) > 80 else query
- st.markdown(f"🕵️♂️ Running ArXiV AI Analysis with Query: {display_query} - ML model: {model_choice} and Option: {database_options}")
-
- # 🔍 ArXiV RAG researcher expert ~-<>-~ Paper Summary & Ask LLM
- client = Client("awacke1/Arxiv-Paper-Search-And-QA-RAG-Pattern")
- # 🔍 ArXiv RAG researcher expert ~-<>-~ Paper Summary & Ask LLM - api_name: /ask_llm
- result = client.predict(
- prompt=query,
- llm_model_picked="mistralai/Mixtral-8x7B-Instruct-v0.1",
- stream_outputs=True,
- api_name="/ask_llm"
- )
- st.markdown("# Mixtral-8x7B-Instruct-v0.1")
- st.markdown(result)
- #st.code(result, language="python", line_numbers=True)
-
-
- # 🔍 ArXiv RAG researcher expert ~-<>-~ Paper Summary & Ask LLM - api_name: /ask_llm
- result2 = client.predict(
- prompt=query,
- llm_model_picked="mistralai/Mistral-7B-Instruct-v0.2",
- stream_outputs=True,
- api_name="/ask_llm"
- )
- st.markdown("# Mistral-7B-Instruct-v0.2")
- st.markdown(result2)
- #st.code(result2, language="python", line_numbers=True)
-
- # 🔍 ArXiv RAG researcher expert ~-<>-~ Paper Summary & Ask LLM - api_name: /update_with_rag_md
- response2 = client.predict(
- message=query, # str in 'parameter_13' Textbox component
- llm_results_use=10,
- database_choice="Semantic Search",
- llm_model_picked="mistralai/Mistral-7B-Instruct-v0.2",
- api_name="/update_with_rag_md"
- )
- st.markdown("# Mistral-7B-Instruct-v0.2 update_with_rag_md 0")
- st.markdown(response2[0])
- #st.code(response2[0], language="python", line_numbers=True, wrap_lines=True)
-
- st.markdown("# Mistral-7B-Instruct-v0.2 update_with_rag_md 1")
- st.markdown(response2[1])
- #st.code(response2[1], language="python", line_numbers=True, wrap_lines=True)
-
-
- # ✅ Persist AI Results to Markdown Files
- filename = gen_AI_IO_filename(display_query, result)
- create_file(filename, query, result)
- st.markdown(f"✅ File saved as: `{filename}`")
-
- filename = gen_AI_IO_filename(display_query, result2)
- create_file(filename, query, result2)
- st.markdown(f"✅ File saved as: `{filename}`")
-
- filename = gen_AI_IO_filename(display_query, response2[0])
- create_file(filename, query, response2[0])
- st.markdown(f"✅ File saved as: `{filename}`")
-
- filename = gen_AI_IO_filename(display_query, response2[1])
- create_file(filename, query, response2[1])
- st.markdown(f"✅ File saved as: `{filename}`")
-
- return result, result2, response2
-
-
-# 📝 Generate a safe filename from the first few lines of content
-def generate_filename_from_content(content, file_type="md"):
- # Extract the first few lines or sentences
- first_sentence = content.split('\n', 1)[0][:90] # Limit the length to 90 characters
- # Remove special characters to make it a valid filename
- safe_name = re.sub(r'[^\w\s-]', '', first_sentence)
- # Limit length to be compatible with Windows and Linux
- safe_name = safe_name[:50].strip() # Adjust length limit
- return f"{safe_name}.{file_type}"
-
-
-# 💾 Create and save a file
-def create_file_from_content(content, should_save=True):
- if not should_save:
- return
- filename = generate_filename_from_content(content)
- with open(filename, 'w', encoding='utf-8') as file:
- file.write(content)
- return filename
-
-
-# 📂 Display list of saved .md files in the sidebar
+# %% ───────────── FILE & MEDIA MANAGEMENT FUNCTIONS ─────────────
+# 📂 List saved Markdown files in sidebar with actions
def display_saved_files_in_sidebar():
- all_files = glob.glob("*.md")
- all_files.sort(reverse=True)
- all_files = [file for file in all_files if not file.lower().startswith('readme')] # Exclude README.md
- st.sidebar.markdown("## 📁 Saved Markdown Files")
+ all_files = sorted([f for f in glob.glob("*.md") if not f.lower().startswith('readme')], reverse=True)
+ st.sidebar.markdown("## 📁 Files")
for file in all_files:
col1, col2, col3 = st.sidebar.columns([6, 2, 1])
with col1:
st.markdown(f"📄 {file}")
with col2:
st.sidebar.download_button(
- label="⬇️ Download",
+ label="⬇️",
data=open(file, 'rb').read(),
file_name=file
)
@@ -498,287 +282,98 @@ def display_saved_files_in_sidebar():
os.remove(file)
st.rerun()
-def clone_record(container, clone_id):
- try:
- existing_doc = container.read_item(item=clone_id, partition_key=clone_id)
- new_doc = existing_doc.copy()
- new_doc['id'] = generate_unique_id() # Generate new unique ID with timestamp
- new_doc['name'] = new_doc['id'] # Generate new unique ID with timestamp
- new_doc['createdAt'] = datetime.utcnow().isoformat() # Update the creation time
- new_doc['_rid'] = None # Reset _rid or any system-managed fields
- new_doc['_self'] = None
- new_doc['_etag'] = None
- new_doc['_attachments'] = None
- new_doc['_ts'] = None # Reset timestamp to be updated by Cosmos DB automatically
- # Insert the cloned document
- response = container.create_item(body=new_doc)
- st.success(f"Cloned document saved successfully with ID: {new_doc['id']} 🎉")
- # Refresh the documents in session state
- st.session_state.documents = list(container.query_items(
- query="SELECT * FROM c ORDER BY c._ts DESC",
- enable_cross_partition_query=True
- ))
- except exceptions.CosmosResourceNotFoundError:
- st.error(f"Document with ID {clone_id} not found for cloning.")
- except exceptions.CosmosHttpResponseError as e:
- st.error(f"HTTP error occurred: {str(e)} 🚨")
- except Exception as e:
- st.error(f"An unexpected error occurred: {str(e)} 😱")
-
-
-def create_new_blank_record(container):
- try:
- # Get the structure of the latest document (to preserve schema)
- latest_doc = container.query_items(query="SELECT * FROM c ORDER BY c._ts DESC", enable_cross_partition_query=True, max_item_count=1)
- if latest_doc:
- new_doc_structure = latest_doc[0].copy()
- else:
- new_doc_structure = {}
- new_doc = {key: "" for key in new_doc_structure.keys()} # Set all fields to blank
- new_doc['id'] = generate_unique_id() # Generate new unique ID
- new_doc['createdAt'] = datetime.utcnow().isoformat() # Set creation time
- # Insert the new blank document
- response = container.create_item(body=new_doc)
- st.success(f"New blank document saved successfully with ID: {new_doc['id']} 🎉")
- # Refresh the documents in session state
- st.session_state.documents = list(container.query_items(
- query="SELECT * FROM c ORDER BY c._ts DESC",
- enable_cross_partition_query=True
- ))
- except exceptions.CosmosHttpResponseError as e:
- st.error(f"HTTP error occurred: {str(e)} 🚨")
- except Exception as e:
- st.error(f"An unexpected error occurred: {str(e)} 😱")
-
-
-# Function to preprocess the pasted content
-def preprocess_text(text):
- # Replace CRLF and other newline variations with the JSON newline escape sequence
- text = text.replace('\r\n', '\\n')
- text = text.replace('\r', '\\n')
- text = text.replace('\n', '\\n')
- # Escape double quotes inside the text
- text = text.replace('"', '\\"')
- # Optionally remove or handle other special characters that might not be JSON-safe
- # Here, we remove characters like tabs or non-ASCII characters (as an example)
- text = re.sub(r'[\t]', ' ', text) # Replace tabs with spaces
- text = re.sub(r'[^\x00-\x7F]+', '', text) # Remove non-ASCII characters
- # Normalize spaces (strip leading/trailing whitespace)
- text = text.strip()
- return text
-
-
-
-def load_file_content(file_path):
- """Load and return file content with error handling"""
- try:
- with open(file_path, 'r', encoding='utf-8') as file:
- return file.read()
-
-
-
- except Exception as e:
- st.error(f"Error loading file: {str(e)}")
- return None
-
-def save_file_content(file_path, content):
- """Save file content with error handling"""
- try:
- with open(file_path, 'w', encoding='utf-8') as file:
- file.write(content)
- return True
- except Exception as e:
- st.error(f"Error saving file: {str(e)}")
- return False
-
+# 👀 Display file viewer in main area
def display_file_viewer(file_path):
- """Display file content in markdown viewer"""
- content = load_file_content(file_path)
+ content = load_file(file_path)
if content:
st.markdown("### 📄 File Viewer")
- st.markdown(f"**Viewing:** {file_path}")
-
- # Add file metadata
+ st.markdown(f"**{file_path}**")
file_stats = os.stat(file_path)
- st.markdown(f"**Last modified:** {datetime.fromtimestamp(file_stats.st_mtime).strftime('%Y-%m-%d %H:%M:%S')}")
- st.markdown(f"**Size:** {file_stats.st_size} bytes")
-
- # Display content in markdown
+ st.markdown(f"**Mod:** {datetime.fromtimestamp(file_stats.st_mtime).strftime('%Y-%m-%d %H:%M:%S')} | **Size:** {file_stats.st_size} bytes")
st.markdown("---")
st.markdown(content)
-
- # Add download button
- st.download_button(
- label="⬇️ Download File",
- data=content,
- file_name=os.path.basename(file_path),
- mime="text/markdown"
- )
-
-
+ st.download_button("⬇️", data=content, file_name=os.path.basename(file_path), mime="text/markdown")
+# ✏️ Display file editor (Markdown & Code)
def display_file_editor(file_path):
- """Display file content in both Markdown and Code Editor views"""
- # Initialize file content in session state if not already present
if 'file_content' not in st.session_state:
st.session_state.file_content = {}
-
- # Load content if not in session state or if it's a different file
if file_path not in st.session_state.file_content:
- content = load_file_content(file_path)
+ content = load_file(file_path)
if content is not None:
st.session_state.file_content[file_path] = content
else:
return
-
- st.markdown("### ✏️ File Editor")
+ st.markdown("### ✏️ Edit File")
st.markdown(f"**Editing:** {file_path}")
-
- # Create tabs for different views
- markdown_tab, code_tab = st.tabs(["Markdown View", "Code Editor"])
-
- with markdown_tab:
- st.markdown("### 📄 Markdown Preview")
+ md_tab, code_tab = st.tabs(["Markdown", "Code"])
+ with md_tab:
st.markdown(st.session_state.file_content[file_path])
-
with code_tab:
- st.markdown("### 💻 Code Editor")
- # Create a unique key for the text area
- editor_key = f"editor_{hash(file_path)}"
-
- # Editor with syntax highlighting for markdown
- new_content = st.text_area(
- "Edit content below:",
- value=st.session_state.file_content[file_path],
- height=400,
- key=editor_key
- )
-
- # Add save and download buttons below both views
+ new_content = st.text_area("Edit:", value=st.session_state.file_content[file_path], height=400, key=f"editor_{hash(file_path)}")
col1, col2 = st.columns([1, 5])
with col1:
- if st.button("💾 Save Changes"):
+ if st.button("💾 Save"):
if save_file_content(file_path, new_content):
st.session_state.file_content[file_path] = new_content
- st.success("File saved successfully! 🎉")
+ st.success("Saved! 🎉")
time.sleep(1)
st.rerun()
-
with col2:
- st.download_button(
- label="⬇️ Download File",
- data=new_content,
- file_name=os.path.basename(file_path),
- mime="text/markdown"
- )
+ st.download_button("⬇️", data=new_content, file_name=os.path.basename(file_path), mime="text/markdown")
+# 💾 Save content to a file (with error handling)
+def save_file_content(file_path, content):
+ try:
+ with open(file_path, 'w', encoding='utf-8') as file:
+ file.write(content)
+ return True
+ except Exception as e:
+ st.error(f"Save error: {str(e)}")
+ return False
-
-
-def display_file_editor_old(file_path):
- """Display file content in editor with save functionality"""
- # Initialize file content in session state if not already present
- if 'file_content' not in st.session_state:
- st.session_state.file_content = {}
-
- # Load content if not in session state or if it's a different file
- if file_path not in st.session_state.file_content:
- content = load_file_content(file_path)
- if content is not None:
- st.session_state.file_content[file_path] = content
- else:
- return
-
- st.markdown("### ✏️ File Editor")
- st.markdown(f"**Editing:** {file_path}")
-
- # Create a unique key for the text area
- editor_key = f"editor_{hash(file_path)}"
-
- # Editor with syntax highlighting for markdown
- new_content = st.text_area(
- "Edit content below:",
- value=st.session_state.file_content[file_path],
- height=400,
- key=editor_key
- )
-
- col1, col2 = st.columns([1, 5])
- with col1:
- if st.button("💾 Save Changes"):
- if save_file_content(file_path, new_content):
- st.session_state.file_content[file_path] = new_content
- st.success("File saved successfully! 🎉")
- time.sleep(1)
- st.rerun()
-
- with col2:
- st.download_button(
- label="⬇️ Download File",
- data=new_content,
- file_name=os.path.basename(file_path),
- mime="text/markdown"
- )
-
+# 🗂️ Update file management UI section (view, edit, delete)
def update_file_management_section():
- # Initialize session state variables
if 'file_view_mode' not in st.session_state:
st.session_state.file_view_mode = None
if 'current_file' not in st.session_state:
st.session_state.current_file = None
if 'file_content' not in st.session_state:
st.session_state.file_content = {}
-
- all_files = glob.glob("*.md")
- all_files.sort(reverse=True)
-
- # File management buttons in sidebar
- st.sidebar.title("📁 File Management")
-
- if st.sidebar.button("🗑 Delete All Files"):
+ all_files = sorted(glob.glob("*.md"), reverse=True)
+ st.sidebar.title("📁 Files")
+ if st.sidebar.button("🗑 Delete All"):
for file in all_files:
os.remove(file)
- st.session_state.file_content = {} # Clear the file content cache
+ st.session_state.file_content = {}
st.session_state.current_file = None
st.session_state.file_view_mode = None
st.rerun()
-
- if st.sidebar.button("⬇️ Download All Files"):
+ if st.sidebar.button("⬇️ Download All"):
zip_file = create_zip_of_files(all_files)
st.sidebar.markdown(get_download_link(zip_file), unsafe_allow_html=True)
-
- # Display files in sidebar with action buttons
for file in all_files:
- col1, col2, col3, col4 = st.sidebar.columns([1,3,1,1])
-
+ col1, col2, col3, col4 = st.sidebar.columns([1, 3, 1, 1])
with col1:
if st.button("🌐", key=f"view_{file}"):
st.session_state.current_file = file
st.session_state.file_view_mode = 'view'
if file not in st.session_state.file_content:
- content = load_file_content(file)
+ content = load_file(file)
if content is not None:
st.session_state.file_content[file] = content
st.rerun()
-
with col2:
st.markdown(get_download_link(file), unsafe_allow_html=True)
-
-
-
-
-
-
with col3:
if st.button("📂", key=f"edit_{file}"):
st.session_state.current_file = file
st.session_state.file_view_mode = 'edit'
if file not in st.session_state.file_content:
- content = load_file_content(file)
+ content = load_file(file)
if content is not None:
st.session_state.file_content[file] = content
st.rerun()
-
with col4:
if st.button("🗑", key=f"delete_{file}"):
os.remove(file)
@@ -788,880 +383,364 @@ def update_file_management_section():
st.session_state.current_file = None
st.session_state.file_view_mode = None
st.rerun()
-
- # Display viewer or editor in main area based on mode
if st.session_state.current_file:
if st.session_state.file_view_mode == 'view':
display_file_viewer(st.session_state.current_file)
elif st.session_state.file_view_mode == 'edit':
display_file_editor(st.session_state.current_file)
-
-# Function to create HTML for autoplaying and looping video (for the full cinematic effect 🎥)
-def get_video_html(video_path, width="100%"):
- video_url = f"data:video/mp4;base64,{base64.b64encode(open(video_path, 'rb').read()).decode()}"
- return f'''
-
- '''
-
-
-
-
-# *********
+# %% ───────────── VIDEO & AUDIO UI FUNCTIONS ─────────────
+# 🖼️ Validate and preprocess an image for video generation
def validate_and_preprocess_image(file_data, target_size=(576, 1024)):
- """Validate and preprocess image for video generation with improved BytesIO handling"""
try:
- st.write("Starting image preprocessing...")
-
- # Handle different input types
+ st.write("Preprocessing image...")
if isinstance(file_data, bytes):
- st.write("Processing bytes input")
img = Image.open(io.BytesIO(file_data))
elif hasattr(file_data, 'read'):
- st.write("Processing file-like object")
- # Reset file pointer if possible
if hasattr(file_data, 'seek'):
file_data.seek(0)
img = Image.open(file_data)
elif isinstance(file_data, Image.Image):
- st.write("Processing PIL Image input")
img = file_data
else:
- raise ValueError(f"Unsupported input type: {type(file_data)}")
-
- st.write(f"Successfully loaded image: {img.format}, size={img.size}, mode={img.mode}")
-
- # Convert to RGB if necessary
+ raise ValueError(f"Unsupported input: {type(file_data)}")
if img.mode != 'RGB':
- st.write(f"Converting image from {img.mode} to RGB")
img = img.convert('RGB')
-
- # Calculate aspect ratio
aspect_ratio = img.size[0] / img.size[1]
- st.write(f"Original aspect ratio: {aspect_ratio:.2f}")
-
- # Determine target dimensions maintaining aspect ratio
- if aspect_ratio > target_size[0]/target_size[1]: # Wider than target
+ if aspect_ratio > target_size[0] / target_size[1]:
new_width = target_size[0]
new_height = int(new_width / aspect_ratio)
- else: # Taller than target
+ else:
new_height = target_size[1]
new_width = int(new_height * aspect_ratio)
-
- # Ensure dimensions are even numbers
new_width = (new_width // 2) * 2
new_height = (new_height // 2) * 2
-
- st.write(f"Resizing to: {new_width}x{new_height}")
-
- # Resize image using high-quality downsampling
resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
-
- # Create white background image of target size
final_img = Image.new('RGB', target_size, (255, 255, 255))
-
- # Calculate position to paste resized image (center)
paste_x = (target_size[0] - new_width) // 2
paste_y = (target_size[1] - new_height) // 2
-
- # Paste resized image onto white background
final_img.paste(resized_img, (paste_x, paste_y))
-
- st.write(f"Final image size: {final_img.size}")
return final_img
-
except Exception as e:
- st.error(f"Error in image preprocessing: {str(e)}\nType of input: {type(file_data)}")
+ st.error(f"Image error: {str(e)}")
return None
+# ▶️ Add video generation UI with Gradio client
def add_video_generation_ui(container):
- """Enhanced video generation UI with improved file handling"""
- st.markdown("### 🎥 Video Generation")
-
+ st.markdown("### 🎥 Video Gen")
col1, col2 = st.columns([2, 1])
-
with col1:
- uploaded_file = st.file_uploader(
- "Upload Image for Video Generation 🖼️",
- type=['png', 'jpg', 'jpeg'],
- help="Upload a clear, well-lit image. Recommended size: 576x1024 pixels."
- )
-
+ uploaded_file = st.file_uploader("Upload Image 🖼️", type=['png', 'jpg', 'jpeg'])
with col2:
- st.markdown("#### Generation Parameters")
- motion_bucket_id = st.slider(
- "Motion Intensity 🌊",
- min_value=1,
- max_value=255,
- value=127,
- help="Lower values create subtle movement, higher values create more dramatic motion"
- )
- fps_id = st.slider(
- "Frames per Second 🎬",
- min_value=1,
- max_value=30,
- value=6,
- help="Higher values create smoother but potentially less stable videos"
- )
-
- with st.expander("Advanced Options"):
- use_custom_seed = st.checkbox("Use Custom Seed")
- if use_custom_seed:
- seed = st.number_input("Seed Value", value=int(time.time() * 1000))
- else:
- seed = None
-
+ st.markdown("#### Params")
+ motion = st.slider("🌊 Motion", 1, 255, 127)
+ fps = st.slider("🎬 FPS", 1, 30, 6)
+ with st.expander("Advanced"):
+ use_custom = st.checkbox("Custom Seed")
+ seed = st.number_input("Seed", value=int(time.time() * 1000)) if use_custom else None
if uploaded_file is not None:
try:
- # Read file data
file_data = uploaded_file.read()
-
- # Preview original image
- preview_col1, preview_col2 = st.columns(2)
- with preview_col1:
- st.write("Original Image:")
- original_img = Image.open(io.BytesIO(file_data))
- st.image(original_img, caption="Original", use_column_width=True)
-
- # Preview preprocessed image
- with preview_col2:
- # Create a new BytesIO object with the file data
- preprocessed = validate_and_preprocess_image(io.BytesIO(file_data))
- if preprocessed:
- st.write("Preprocessed Image:")
- st.image(preprocessed, caption="Preprocessed", use_column_width=True)
+ preview1, preview2 = st.columns(2)
+ with preview1:
+ st.write("Original")
+ st.image(Image.open(io.BytesIO(file_data)), use_column_width=True)
+ with preview2:
+ proc_img = validate_and_preprocess_image(io.BytesIO(file_data))
+ if proc_img:
+ st.write("Processed")
+ st.image(proc_img, use_column_width=True)
else:
- st.error("Failed to preprocess image")
+ st.error("Preprocess failed")
return
-
- if st.button("🎥 Generate Video", help="Start video generation process"):
- with st.spinner("Processing your video... This may take a few minutes 🎬"):
- # Save preprocessed image to temporary file
+ if st.button("🎥 Generate"):
+ with st.spinner("Generating video..."):
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as temp_file:
- preprocessed.save(temp_file.name, format='PNG')
- st.write(f"Saved preprocessed image to temporary file: {temp_file.name}")
-
+ proc_img.save(temp_file.name, format='PNG')
try:
- # Initialize the Gradio client
- client = Client(
- "awacke1/stable-video-diffusion",
- hf_token=os.environ.get("HUGGINGFACE_TOKEN")
- )
-
- # Generate video
+ client = Client("awacke1/stable-video-diffusion", hf_token=os.environ.get("HUGGINGFACE_TOKEN"))
result = client.predict(
image=temp_file.name,
seed=seed if seed is not None else int(time.time() * 1000),
randomize_seed=seed is None,
- motion_bucket_id=motion_bucket_id,
- fps_id=fps_id,
+ motion_bucket_id=motion,
+ fps_id=fps,
api_name="/video"
)
-
if result and isinstance(result, tuple) and len(result) >= 1:
video_path = result[0].get('video') if isinstance(result[0], dict) else None
if video_path and os.path.exists(video_path):
- # Save video locally
video_filename = f"generated_video_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4"
shutil.copy(video_path, video_filename)
-
- st.success(f"""
- Video generated successfully! 🎉
- - Seed: {seed if seed is not None else 'Random'}
- - Motion Intensity: {motion_bucket_id}
- - FPS: {fps_id}
- """)
-
+ st.success(f"Video generated! 🎉")
st.video(video_filename)
-
- # Save to Cosmos DB
if container:
video_record = {
"id": generate_unique_id(),
"type": "generated_video",
"filename": video_filename,
"seed": seed if seed is not None else "random",
- "motion_bucket_id": motion_bucket_id,
- "fps_id": fps_id,
+ "motion": motion,
+ "fps": fps,
"timestamp": datetime.now().isoformat()
}
success, message = insert_record(container, video_record)
if success:
- st.success("Video record saved to database!")
+ st.success("DB record saved!")
else:
- st.error(f"Error saving video record: {message}")
+ st.error(f"DB error: {message}")
else:
- st.error("Failed to generate video: Invalid result format")
+ st.error("Invalid result format")
else:
- st.error("Failed to generate video: No result returned")
-
+ st.error("No result returned")
except Exception as e:
- st.error(f"Error generating video: {str(e)}")
+ st.error(f"Video gen error: {str(e)}")
finally:
- # Cleanup temporary file
try:
os.unlink(temp_file.name)
- st.write("Cleaned up temporary file")
+ st.write("Temp file removed")
except Exception as e:
- st.warning(f"Error cleaning up temporary file: {str(e)}")
-
+ st.warning(f"Cleanup error: {str(e)}")
except Exception as e:
- st.error(f"Error processing uploaded file: {str(e)}")
-
-
-# ******************************************
+ st.error(f"Upload error: {str(e)}")
-# Function to create HTML for audio player (when life needs a soundtrack 🎶)
-def get_audio_html(audio_path, width="100%"):
- audio_url = f"data:audio/mpeg;base64,{base64.b64encode(open(audio_path, 'rb').read()).decode()}"
- return f'''
-
- '''
-
-# 🎭 Main function - "All the world's a stage, and all the code merely players" -Shakespeare, probably
+# %% ───────────── MAIN FUNCTION ─────────────
+# 🚀 Main app entry point
def main():
- st.markdown("### 🐙Git🌌Cosmos💫 - Azure Cosmos DB and Github Agent")
-
- # 🎲 Session state vars - "Life is like a session state, you never know what you're gonna get"
- if 'logged_in' not in st.session_state:
- st.session_state.logged_in = False
- if 'selected_records' not in st.session_state:
- st.session_state.selected_records = []
- if 'client' not in st.session_state:
- st.session_state.client = None
- if 'selected_database' not in st.session_state:
- st.session_state.selected_database = None
- if 'selected_container' not in st.session_state:
- st.session_state.selected_container = None
- if 'selected_document_id' not in st.session_state:
- st.session_state.selected_document_id = None
- if 'current_index' not in st.session_state:
- st.session_state.current_index = 0
- if 'cloned_doc' not in st.session_state:
- st.session_state.cloned_doc = None
-
- # 🔍 Query processing - "To search or not to search, that is the query"
- try:
- query_params = st.query_params
- query = query_params.get('q') or query_params.get('query') or ''
- if query:
- result, result2, result3, response2 = search_glossary(query)
-
- # 💾 Save results - "Every file you save is a future you pave"
- try:
- if st.button("Save AI Output"):
- filename = create_file_from_content(result)
- st.success(f"File saved: {filename}")
- filename = create_file_from_content(result2)
- st.success(f"File saved: {filename}")
- filename = create_file_from_content(result3)
- st.success(f"File saved: {filename}")
- filename = create_file_from_content(response2)
- st.success(f"File saved: {filename}")
-
- display_saved_files_in_sidebar()
- except Exception as e:
- st.error(f"An unexpected error occurred: {str(e)} 😱")
-
- # 🌟 Cosmos DB operations - "In Cosmos DB we trust, but we still handle errors we must"
- try:
- save_to_cosmos_db(st.session_state.cosmos_container, query, result, result)
- save_to_cosmos_db(st.session_state.cosmos_container, query, result2, result2)
- save_to_cosmos_db(st.session_state.cosmos_container, query, result3, result3)
- save_to_cosmos_db(st.session_state.cosmos_container, query, response2[0], response2[0])
- save_to_cosmos_db(st.session_state.cosmos_container, query, response2[1], response2[1])
- except exceptions.CosmosHttpResponseError as e:
- st.error(f"HTTP error occurred: {str(e)} 🚨")
- except Exception as e:
- st.error(f"An unexpected error occurred: {str(e)} 😱")
- st.stop()
- except Exception as e:
- st.markdown(' ')
-
- # 🔐 Auth check - "With great keys come great connectivity"
+ st.markdown("### 🐙 GitCosmos - Cosmos & Git Hub")
+ if "chat_history" not in st.session_state:
+ st.session_state.chat_history = []
+ # Auth & Cosmos client initialization
if Key:
st.session_state.primary_key = Key
st.session_state.logged_in = True
else:
- st.error("Cosmos DB Key is not set in environment variables. 🔑❌")
+ st.error("Missing Cosmos Key 🔑❌")
return
-
if st.session_state.logged_in:
- # 🌌 DB initialization - "In the beginning, there was connection string..."
try:
- if st.session_state.client is None:
+ if st.session_state.get("client") is None:
st.session_state.client = CosmosClient(ENDPOINT, credential=st.session_state.primary_key)
- # 📚 Navigation setup - "Navigation is not about where you are, but where you're going"
- st.sidebar.title("🐙Git🌌Cosmos💫🗄️Navigator")
+ st.sidebar.title("🐙 Navigator")
databases = get_databases(st.session_state.client)
- selected_db = st.sidebar.selectbox("🗃️ Select Database", databases)
+ selected_db = st.sidebar.selectbox("🗃️ DB", databases)
st.markdown(CosmosDBUrl)
-
- # 🔄 State management - "Change is the only constant in state management"
- if selected_db != st.session_state.selected_database:
+ if selected_db != st.session_state.get("selected_database"):
st.session_state.selected_database = selected_db
st.session_state.selected_container = None
st.session_state.selected_document_id = None
st.session_state.current_index = 0
st.rerun()
-
if st.session_state.selected_database:
database = st.session_state.client.get_database_client(st.session_state.selected_database)
containers = get_containers(database)
- selected_container = st.sidebar.selectbox("📁 Select Container", containers)
-
- # 🔄 Container state handling - "Container changes, state arranges"
- if selected_container != st.session_state.selected_container:
+ selected_container = st.sidebar.selectbox("📁 Container", containers)
+ if selected_container != st.session_state.get("selected_container"):
st.session_state.selected_container = selected_container
st.session_state.selected_document_id = None
st.session_state.current_index = 0
st.rerun()
-
if st.session_state.selected_container:
container = database.get_container_client(st.session_state.selected_container)
- # 📦 Export functionality - "Pack it, zip it, ship it"
- if st.sidebar.button("📦 Export Container Data"):
- download_link = archive_current_container(st.session_state.selected_database,
- st.session_state.selected_container,
- st.session_state.client)
+ if st.sidebar.button("📦 Export"):
+ download_link = archive_current_container(st.session_state.selected_database, st.session_state.selected_container, st.session_state.client)
if download_link.startswith(' num_docs_to_display:
- documents_to_display = documents[:num_docs_to_display]
- st.sidebar.info(f"Showing top {num_docs_to_display} most recent documents.")
- else:
- documents_to_display = documents
- st.sidebar.info(f"Showing all {len(documents_to_display)} documents.")
-
- if documents_to_display:
- # 🎨 View options - "Different strokes for different folks"
- view_options = ['Show as Markdown', 'Show as Code Editor', 'Show as Run AI', 'Clone Document', 'New Record']
- selected_view = st.sidebar.selectbox("Select Viewer/Editor", view_options, index=2)
-
-
- if selected_view == 'Show as Markdown':
- Label = '#### 📄 Markdown view - Mark it down, mark it up'
- st.markdown(Label)
- total_docs = len(documents)
+ num_docs = st.slider("Docs", 1, 20, 1)
+ documents_to_display = documents[:num_docs] if total_docs > num_docs else documents
+ st.sidebar.info(f"Showing {len(documents_to_display)} docs")
+ # --- Document Viewer / Editor ---
+ view_options = ['Markdown', 'Code', 'Run AI', 'Clone', 'New']
+ selected_view = st.sidebar.selectbox("View", view_options, index=1)
+ if selected_view == 'Markdown':
+ st.markdown("#### 📄 Markdown")
+ if documents:
doc = documents[st.session_state.current_index]
- # st.markdown(f"#### Document ID: {doc.get('id', '')}")
-
- # 🕵️ Value extraction - "Finding spaces in all the right places"
- values_with_space = []
- def extract_values(obj):
- if isinstance(obj, dict):
- for k, v in obj.items():
- extract_values(v)
- elif isinstance(obj, list):
- for item in obj:
- extract_values(item)
- elif isinstance(obj, str):
- if ' ' in obj:
- values_with_space.append(obj)
-
- extract_values(doc)
- st.markdown("#### 🔗 Links for Extracted Texts")
- for term in values_with_space:
- display_glossary_entity(term)
-
content = json.dumps(doc, indent=2)
st.markdown(f"```json\n{content}\n```")
-
- # ⬅️➡️ Navigation - "Left and right, day and night"
- col_prev, col_next = st.columns([1, 1])
+ col_prev, col_next = st.columns(2)
with col_prev:
- if st.button("⬅️ Previous", key='prev_markdown'):
- if st.session_state.current_index > 0:
- st.session_state.current_index -= 1
- st.rerun()
+ if st.button("⬅️") and st.session_state.current_index > 0:
+ st.session_state.current_index -= 1
+ st.rerun()
with col_next:
- if st.button("➡️ Next", key='next_markdown'):
- if st.session_state.current_index < total_docs - 1:
- st.session_state.current_index += 1
- st.rerun()
-
- elif selected_view == 'Show as Code Editor':
- Label = '#### 💻 Code editor view'
- st.markdown(Label)
- total_docs = len(documents)
-
- if total_docs == 0:
- st.warning("No documents available.")
- return
-
+ if st.button("➡️") and st.session_state.current_index < total_docs - 1:
+ st.session_state.current_index += 1
+ st.rerun()
+ elif selected_view == 'Code':
+ st.markdown("#### 💻 Code Editor")
+ if documents:
doc = documents[st.session_state.current_index]
- doc_str = st.text_area("Edit Document",
- value=json.dumps(doc, indent=2),
- height=300,
- key=f'code_editor_{st.session_state.current_index}')
-
- col_prev, col_next = st.columns([1, 1])
+ doc_str = st.text_area("Edit JSON", value=json.dumps(doc, indent=2), height=300, key=f'code_{st.session_state.current_index}')
+ col_prev, col_next = st.columns(2)
with col_prev:
- if st.button("⬅️ Previous", key='prev_code'):
- if st.session_state.current_index > 0:
- st.session_state.current_index -= 1
- st.rerun()
+ if st.button("⬅️") and st.session_state.current_index > 0:
+ st.session_state.current_index -= 1
+ st.rerun()
with col_next:
- if st.button("➡️ Next", key='next_code'):
- if st.session_state.current_index < total_docs - 1:
- st.session_state.current_index += 1
- st.rerun()
-
- col_save, col_delete = st.columns([1, 1])
+ if st.button("➡️") and st.session_state.current_index < total_docs - 1:
+ st.session_state.current_index += 1
+ st.rerun()
+ col_save, col_delete = st.columns(2)
with col_save:
- if st.button("💾 Save Changes", key=f'save_button_{st.session_state.current_index}'):
+ if st.button("💾 Save", key=f'save_{st.session_state.current_index}'):
try:
updated_doc = json.loads(doc_str)
- response = container.upsert_item(body=updated_doc)
- if response:
- st.success(f"Document {updated_doc['id']} saved successfully.")
- st.session_state.selected_document_id = updated_doc['id']
- st.rerun()
- except json.JSONDecodeError:
- st.error("Invalid JSON format. Please check your edits.")
+ container.upsert_item(body=updated_doc)
+ st.success(f"Saved {updated_doc['id']}")
+ st.rerun()
except Exception as e:
- st.error(f"Error saving document: {str(e)}")
-
+ st.error(f"Save err: {str(e)}")
with col_delete:
- if st.button("🗑️ Delete", key=f'delete_button_{st.session_state.current_index}'):
+ if st.button("🗑️ Delete", key=f'delete_{st.session_state.current_index}'):
try:
current_doc = json.loads(doc_str)
doc_id = current_doc.get("id")
-
if not doc_id:
- st.error("Document ID not found.")
+ st.error("Missing ID")
return
-
- # Confirm deletion
- if 'confirm_delete' not in st.session_state:
- st.session_state.confirm_delete = False
-
- if not st.session_state.confirm_delete:
- if st.button("⚠️ Click to confirm deletion", key=f'confirm_delete_{st.session_state.current_index}'):
- st.session_state.confirm_delete = True
- st.rerun()
- else:
- try:
- # Delete the document
- container.delete_item(item=doc_id, partition_key=doc_id)
-
- # Update the session state
- st.session_state.confirm_delete = False
-
- # Update the current index if necessary
- if total_docs > 1:
- if st.session_state.current_index == total_docs - 1:
- st.session_state.current_index = max(0, total_docs - 2)
- documents.pop(st.session_state.current_index)
- else:
- st.session_state.current_index = 0
- documents.clear()
-
- st.success(f"Document {doc_id} deleted successfully.")
- st.rerun()
-
- except Exception as e:
- st.error(f"Error deleting document: {str(e)}")
- st.session_state.confirm_delete = False
-
- except json.JSONDecodeError:
- st.error("Invalid JSON format. Please check the document.")
+ container.delete_item(item=doc_id, partition_key=doc_id)
+ st.success(f"Deleted {doc_id}")
+ st.rerun()
except Exception as e:
- st.error(f"Error processing deletion: {str(e)}")
-
-
-
-
- elif selected_view == 'Show as Run AI':
- Label = '#### ✏️ Run AI with wisdom, save with precision'
- st.markdown(Label)
- num_cols = len(documents_to_display)
- cols = st.columns(num_cols)
-
- for idx, (col, doc) in enumerate(zip(cols, documents_to_display)):
- with col:
- # ID and Name fields
- editable_id = st.text_input("ID", value=doc.get('id', ''), key=f'edit_id_{idx}')
- editable_name = st.text_input("Name", value=doc.get('name', ''), key=f'edit_name_{idx}')
-
- # Create editable document copy without id and name
- editable_doc = doc.copy()
- editable_doc.pop('id', None)
- editable_doc.pop('name', None)
-
- doc_str = st.text_area("Document Content (in JSON format)",
- value=json.dumps(editable_doc, indent=2),
- height=300,
- key=f'doc_str_{idx}')
-
- # Save and AI operations columns
-
-
- # Video Generator call - the video generation UI for container:
- add_video_generation_ui(container)
-
-
-
- if st.button("🤖 Run AI", key=f'run_with_ai_button_{idx}'):
- # Your existing AI processing code here
- values_with_space = []
- def extract_values2(obj):
- if isinstance(obj, dict):
- for k, v in obj.items():
- extract_values2(v)
- elif isinstance(obj, list):
- for item in obj:
- extract_values2(item)
- elif isinstance(obj, str):
- if ' ' in obj:
- values_with_space.append(obj)
-
- extract_values2(doc)
- for term in values_with_space:
- display_glossary_entity(term)
- search_glossary(term)
-
- if st.button("💾 Save Changes", key=f'save_runai_{idx}'):
- try:
- updated_doc = json.loads(doc_str)
- # Reinsert ID and name from editable fields
- updated_doc['id'] = editable_id
- updated_doc['name'] = editable_name
- response = container.upsert_item(body=updated_doc)
- if response:
- st.success(f"Document {updated_doc['id']} saved successfully.")
- st.session_state.selected_document_id = updated_doc['id']
- st.rerun()
- except Exception as e:
- st.error(f"Error saving document: {str(e)}")
-
-
- # File Editor (When you need to tweak things ✏️)
- if hasattr(st.session_state, 'current_file'):
- st.subheader(f"Editing: {st.session_state.current_file} 🛠")
- new_content = st.text_area("File Content ✏️:", st.session_state.file_content, height=300)
- if st.button("Save Changes 💾"):
- with open(st.session_state.current_file, 'w', encoding='utf-8') as file:
- file.write(new_content)
- st.success("File updated successfully! 🎉")
-
- # Image Gallery (For your viewing pleasure 📸)
- st.subheader("Image Gallery 🖼")
- image_files = glob.glob("*.png") + glob.glob("*.jpg") + glob.glob("*.jpeg")
- image_cols = st.slider("Gallery Columns 🖼", min_value=1, max_value=15, value=5)
- cols = st.columns(image_cols)
- for idx, image_file in enumerate(image_files):
- with cols[idx % image_cols]:
- img = Image.open(image_file)
- #st.image(img, caption=image_file, use_column_width=True)
- st.image(img, use_column_width=True)
- display_glossary_entity(os.path.splitext(image_file)[0])
-
- # Video Gallery (Let’s roll the tapes 🎬)
- st.subheader("Video Gallery 🎥")
- video_files = glob.glob("*.mp4")
- video_cols = st.slider("Gallery Columns 🎬", min_value=1, max_value=5, value=3)
- cols = st.columns(video_cols)
- for idx, video_file in enumerate(video_files):
- with cols[idx % video_cols]:
- st.markdown(get_video_html(video_file, width="100%"), unsafe_allow_html=True)
- display_glossary_entity(os.path.splitext(video_file)[0])
-
- # Audio Gallery (Tunes for the mood 🎶)
- st.subheader("Audio Gallery 🎧")
- audio_files = glob.glob("*.mp3") + glob.glob("*.wav")
- audio_cols = st.slider("Gallery Columns 🎶", min_value=1, max_value=15, value=5)
- cols = st.columns(audio_cols)
- for idx, audio_file in enumerate(audio_files):
- with cols[idx % audio_cols]:
- st.markdown(get_audio_html(audio_file, width="100%"), unsafe_allow_html=True)
- display_glossary_entity(os.path.splitext(audio_file)[0])
-
-
-
-
- elif selected_view == 'Clone Document':
- st.markdown("#### 📄 Clone Document (Save As)")
-
- total_docs = len(documents)
+ st.error(f"Delete err: {str(e)}")
+ elif selected_view == 'Run AI':
+ st.markdown("#### 🤖 Run AI (stub)")
+ st.info("AI functionality not implemented.")
+ elif selected_view == 'Clone':
+ st.markdown("#### 📄 Clone")
+ if documents:
doc = documents[st.session_state.current_index]
-
- # Display current document info
- st.markdown(f"**Original Document ID:** {doc.get('id', '')}")
- st.markdown(f"**Original Document Name:** {doc.get('name', '')}")
-
- # Generate new unique ID and name
- unique_filename = gen_AI_IO_filename("Clone", doc.get('name', ''))
- new_id = st.text_input("New Document ID", value=unique_filename, key='new_clone_id')
- new_name = st.text_input("New Document Name", value=f"Clone_{unique_filename[:8]}", key='new_clone_name')
-
- # Create new document with all original content except system fields
- new_doc = {
- 'id': new_id,
- 'name': new_name,
- **{k: v for k, v in doc.items() if k not in ['id', 'name', '_rid', '_self', '_etag', '_attachments', '_ts']}
- }
-
- # Show editable preview of the new document
- doc_str = st.text_area(
- "Edit Document Content (in JSON format)",
- value=json.dumps(new_doc, indent=2),
- height=300,
- key='clone_preview'
- )
-
+ st.markdown(f"Original ID: {doc.get('id', '')}")
+ new_id = st.text_input("New ID", value=generate_unique_id(), key='new_clone_id')
+ new_name = st.text_input("New Name", value=f"Clone_{new_id[:8]}", key='new_clone_name')
+ new_doc = {'id': new_id, 'name': new_name, **{k: v for k, v in doc.items() if k not in ['id', 'name', '_rid', '_self', '_etag', '_attachments', '_ts']}}
+ doc_str = st.text_area("Edit JSON", value=json.dumps(new_doc, indent=2), height=300, key='clone_preview')
col1, col2 = st.columns(2)
-
with col1:
- if st.button("🔄 Generate New ID/Name", key='regenerate_id'):
- # Generate new unique filename
- new_unique_filename = gen_AI_IO_filename("Clone", doc.get('name', ''))
- st.session_state.new_clone_id = new_unique_filename
- st.session_state.new_clone_name = f"Clone_{new_unique_filename[:8]}"
+ if st.button("🔄 Regenerate"):
+ new_id = generate_unique_id()
+ st.session_state.new_clone_id = new_id
st.rerun()
-
with col2:
- if st.button("💾 Save As New Document", key='save_clone'):
+ if st.button("💾 Save Clone"):
try:
- # Parse the edited document content
final_doc = json.loads(doc_str)
-
- # Ensure the new ID and name are used
- final_doc['id'] = new_id
- final_doc['name'] = new_name
-
- # Remove any system fields that might have been copied
- system_fields = ['_rid', '_self', '_etag', '_attachments', '_ts']
- for field in system_fields:
+ for field in ['_rid', '_self', '_etag', '_attachments', '_ts']:
final_doc.pop(field, None)
-
- # Create the new document
- response = container.create_item(body=final_doc)
-
- if response:
- st.success(f"""
- ✅ New document created successfully!
- - ID: {final_doc['id']}
- - Name: {final_doc['name']}
- """)
- # Update session state to show the new document
- st.session_state.selected_document_id = final_doc['id']
- st.rerun()
- else:
- st.error("Failed to create new document")
- except json.JSONDecodeError as e:
- st.error(f"Invalid JSON format: {str(e)}")
+ container.create_item(body=final_doc)
+ st.success(f"Cloned {final_doc['id']}")
+ st.rerun()
except Exception as e:
- st.error(f"Error creating document: {str(e)}")
-
- # Navigation buttons for viewing other documents to clone
- col_prev, col_next = st.columns([1, 1])
+ st.error(f"Clone err: {str(e)}")
+ col_prev, col_next = st.columns(2)
with col_prev:
- if st.button("⬅️ Previous", key='prev_clone'):
- if st.session_state.current_index > 0:
- st.session_state.current_index -= 1
- st.rerun()
+ if st.button("⬅️") and st.session_state.current_index > 0:
+ st.session_state.current_index -= 1
+ st.rerun()
with col_next:
- if st.button("➡️ Next", key='next_clone'):
- if st.session_state.current_index < total_docs - 1:
- st.session_state.current_index += 1
- st.rerun()
-
-
- elif selected_view == 'New Record':
- st.markdown("#### Create a new document:")
-
- if st.button("🤖 Insert Auto-Generated Record"):
- auto_doc = {
- "id": generate_unique_id(),
- "name": f"Auto-generated Record {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
- "content": "This is an auto-generated record.",
- "timestamp": datetime.now().isoformat()
- }
- success, message = save_or_clone_to_cosmos_db(container, document=auto_doc)
- if success:
- st.success(message)
+ if st.button("➡️") and st.session_state.current_index < total_docs - 1:
+ st.session_state.current_index += 1
st.rerun()
- else:
- st.error(message)
+ elif selected_view == 'New':
+ st.markdown("#### ➕ New Doc")
+ if st.button("🤖 Auto-Gen"):
+ auto_doc = {
+ "id": generate_unique_id(),
+ "name": f"Auto {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
+ "content": "Auto-generated record.",
+ "timestamp": datetime.now().isoformat()
+ }
+ success, message = insert_record(container, auto_doc)
+ if success:
+ st.success(message)
+ st.rerun()
else:
- new_id = st.text_input("ID", value=generate_unique_id(), key='new_id')
- default_doc = {
- "id": new_id,
- "name": "New Document",
- "content": "",
- "timestamp": datetime.now().isoformat()
- }
- new_doc_str = st.text_area("Document Content (in JSON format)",
- value=json.dumps(default_doc, indent=2),
- height=300)
-
- if st.button("➕ Create New Document"):
- try:
- # Preprocess the text before loading it into JSON
- cleaned_doc_str = preprocess_text(new_doc_str)
- new_doc = json.loads(cleaned_doc_str)
- new_doc['id'] = new_id # Ensure ID matches input field
-
- success, message = insert_record(container, new_doc)
- if success:
- st.success(f"New document created with id: {new_doc['id']} 🎉")
- st.session_state.selected_document_id = new_doc['id']
- st.rerun()
- else:
- st.error(message)
- except json.JSONDecodeError as e:
- st.error(f"Invalid JSON: {str(e)} 🚫")
-
- st.subheader(f"📊 Container: {st.session_state.selected_container}")
- if st.session_state.selected_container:
- if documents_to_display:
- Label = '#### 📊 Data display - Data tells tales that words cannot'
- st.markdown(Label)
- df = pd.DataFrame(documents_to_display)
- st.dataframe(df)
+ st.error(message)
+ else:
+ new_id = st.text_input("ID", value=generate_unique_id(), key='new_id')
+ default_doc = {
+ "id": new_id,
+ "name": "New Doc",
+ "content": "",
+ "timestamp": datetime.now().isoformat()
+ }
+ new_doc_str = st.text_area("JSON", value=json.dumps(default_doc, indent=2), height=300)
+ if st.button("➕ Create"):
+ try:
+ cleaned = preprocess_text(new_doc_str)
+ new_doc = json.loads(cleaned)
+ new_doc['id'] = new_id
+ success, message = insert_record(container, new_doc)
+ if success:
+ st.success(f"Created {new_doc['id']}")
+ st.rerun()
else:
- st.info("No documents to display. 🧐")
-
-
- Label = '#### 🐙 GitHub integration - Git happens'
- st.subheader("🐙 GitHub Operations")
- github_token = os.environ.get("GITHUB")
- source_repo = st.text_input("Source GitHub Repository URL",
- value="https://github.com/AaronCWacker/AIExamples-8-24-Streamlit")
- new_repo_name = st.text_input("New Repository Name (for cloning)",
- value=f"AIExample-Clone-{datetime.now().strftime('%Y%m%d_%H%M%S')}")
-
- col1, col2 = st.columns(2)
- with col1:
- if st.button("📥 Clone Repository"):
- if github_token and source_repo:
-
- st.markdown(Label)
- try:
- local_path = f"./temp_repo_{datetime.now().strftime('%Y%m%d%H%M%S')}"
- download_github_repo(source_repo, local_path)
- zip_filename = f"{new_repo_name}.zip"
- create_zip_file(local_path, zip_filename[:-4])
- st.markdown(get_download_link(zip_filename), unsafe_allow_html=True)
- st.success("Repository cloned successfully! 🎉")
- except Exception as e:
- st.error(f"An error occurred: {str(e)} 😢")
- finally:
- if os.path.exists(local_path):
- shutil.rmtree(local_path)
- if os.path.exists(zip_filename):
- os.remove(zip_filename)
- else:
- st.error("Please ensure GitHub token is set in environment variables and source repository URL is provided. 🔑❓")
-
- with col2:
- if st.button("📤 Push to New Repository"):
- if github_token and source_repo:
-
- st.markdown(Label)
- try:
- g = Github(github_token)
- new_repo = create_repo(g, new_repo_name)
- local_path = f"./temp_repo_{datetime.now().strftime('%Y%m%d%H%M%S')}"
- download_github_repo(source_repo, local_path)
- push_to_github(local_path, new_repo, github_token)
- st.success(f"Repository pushed successfully to {new_repo.html_url} 🚀")
- except Exception as e:
- st.error(f"An error occurred: {str(e)} 😢")
- finally:
- if os.path.exists(local_path):
- shutil.rmtree(local_path)
- else:
- st.error("Please ensure GitHub token is set in environment variables and source repository URL is provided. 🔑❓")
-
-
- st.subheader("💬 Chat with Claude")
- user_input = st.text_area("Message 📨:", height=100)
-
- if st.button("Send 📨"):
- Label = '#### 💬 Chat functionality - Every chat is a chance to learn'
- st.markdown(Label)
- if user_input:
- response = anthropicclient.messages.create(
- model="claude-3-sonnet-20240229",
- max_tokens=1000,
- messages=[
- {"role": "user", "content": user_input}
- ]
- )
- st.write("Claude's reply 🧠:")
- st.write(response.content[0].text)
- filename = generate_filename(user_input, "md")
- create_file(filename, user_input, response.content[0].text)
- st.session_state.chat_history.append({"user": user_input, "claude": response.content[0].text})
- # Save to Cosmos DB
- save_to_cosmos_db(container, user_input, response.content[0].text, "")
-
-
-
- # 📜 Chat history display - "History repeats itself, first as chat, then as wisdom"
- st.subheader("Past Conversations 📜")
- for chat in st.session_state.chat_history:
- st.text_area("You said 💬:", chat["user"], height=100, disabled=True)
- st.text_area("Claude replied 🤖:", chat["claude"], height=200, disabled=True)
- st.markdown("---")
-
-
-
-
-
-
-
-
-
-
-
- # 📝 File editor - "Edit with care, save with flair"
- if hasattr(st.session_state, 'current_file'):
- st.subheader(f"Editing: {st.session_state.current_file} 🛠")
- new_content = st.text_area("File Content ✏️:", st.session_state.file_content, height=300)
-
- # Preprocess the text before loading it into JSON - Added to protect copy paste into JSON to keep format.
- cleaned_doc_str = preprocess_text(new_content)
- new_doc = json.loads(cleaned_doc_str)
- new_content = cleaned_doc_str
-
- if st.button("Save Changes 💾"):
- with open(st.session_state.current_file, 'w', encoding='utf-8') as file:
- file.write(new_content)
- st.success("File updated successfully! 🎉")
-
- # 📂 File management - "Manage many, maintain order"
- update_file_management_section()
-
+ st.error(message)
+ except Exception as e:
+ st.error(f"Create err: {str(e)}")
+ st.subheader(f"📊 {st.session_state.selected_container}")
+ if documents_to_display:
+ df = pd.DataFrame(documents_to_display)
+ st.dataframe(df)
+ else:
+ st.info("No docs.")
+ # --- End of Document UI ---
+ st.subheader("🐙 GitHub Ops")
+ github_token = os.environ.get("GITHUB")
+ source_repo = st.text_input("Source Repo URL", value="https://github.com/AaronCWacker/AIExamples-8-24-Streamlit")
+ new_repo_name = st.text_input("New Repo Name", value=f"Clone-{datetime.now().strftime('%Y%m%d_%H%M%S')}")
+ col_g1, col_g2 = st.columns(2)
+ with col_g1:
+ if st.button("📥 Clone Repo"):
+ if github_token and source_repo:
+ try:
+ local_path = f"./temp_repo_{datetime.now().strftime('%Y%m%d%H%M%S')}"
+ download_github_repo(source_repo, local_path)
+ zip_filename = f"{new_repo_name}.zip"
+ create_zip_file(local_path, zip_filename[:-4])
+ st.markdown(get_download_link(zip_filename), unsafe_allow_html=True)
+ st.success("Cloned! 🎉")
+ except Exception as e:
+ st.error(f"Clone err: {str(e)}")
+ finally:
+ if os.path.exists(local_path):
+ shutil.rmtree(local_path)
+ if os.path.exists(zip_filename):
+ os.remove(zip_filename)
+ else:
+ st.error("Missing token or URL 🔑❓")
+ with col_g2:
+ if st.button("📤 Push Repo"):
+ if github_token and source_repo:
+ try:
+ g = Github(github_token)
+ new_repo = create_repo(g, new_repo_name)
+ local_path = f"./temp_repo_{datetime.now().strftime('%Y%m%d%H%M%S')}"
+ download_github_repo(source_repo, local_path)
+ push_to_github(local_path, new_repo, github_token)
+ st.success(f"Pushed to {new_repo.html_url} 🚀")
+ except Exception as e:
+ st.error(f"Push err: {str(e)}")
+ finally:
+ if os.path.exists(local_path):
+ shutil.rmtree(local_path)
+ else:
+ st.error("Missing token or URL 🔑❓")
+ # --- File Management Section ---
+ update_file_management_section()
except exceptions.CosmosHttpResponseError as e:
- st.error(f"Failed to connect to Cosmos DB. HTTP error: {str(e)} 🚨")
+ st.error(f"Cosmos error: {str(e)} 🚨")
except Exception as e:
- st.error(f"An unexpected error occurred: {str(e)} 😱")
-
+ st.error(f"Error: {str(e)} 😱")
if st.session_state.logged_in and st.sidebar.button("🚪 Logout"):
- Label = '#### 🚪 Logout - All good things must come to an end'
- st.markdown(Label)
+ st.markdown("#### 🚪 Logout")
st.session_state.logged_in = False
- st.session_state.selected_records.clear()
+ st.session_state.selected_records = []
st.session_state.client = None
st.session_state.selected_database = None
st.session_state.selected_container = None
@@ -1669,6 +748,5 @@ def main():
st.session_state.current_index = 0
st.rerun()
-
if __name__ == "__main__":
- main()
\ No newline at end of file
+ main()