Spaces:
Running
Running
# ============================================================================= | |
# βββββββββββββ IMPORTS βββββββββββββ | |
# ============================================================================= | |
import base64 | |
import glob | |
import hashlib | |
import json | |
import os | |
import pandas as pd | |
import pytz | |
import random | |
import re | |
import shutil | |
import streamlit as st | |
import time | |
import traceback | |
import uuid | |
import zipfile | |
from PIL import Image | |
from azure.cosmos import CosmosClient, PartitionKey, exceptions | |
from datetime import datetime | |
from git import Repo | |
from github import Github | |
from gradio_client import Client | |
import tempfile | |
import io | |
import requests | |
import numpy as np | |
from urllib.parse import quote | |
# ============================================================================= | |
# βββββββββββββ EXTERNAL HELP LINKS βββββββββββββ | |
# ============================================================================= | |
external_links = [ | |
{"title": "CosmosDB GenAI Full Text Search", "url": "https://learn.microsoft.com/en-us/azure/cosmos-db/gen-ai/full-text-search", "emoji": "π»"}, | |
{"title": "CosmosDB SQL API Client Library", "url": "https://learn.microsoft.com/en-us/python/api/overview/azure/cosmos-readme?view=azure-python", "emoji": "π»"}, | |
{"title": "CosmosDB Index and Query Vectors", "url": "https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/how-to-python-vector-index-query", "emoji": "π»"}, | |
{"title": "CosmosDB NoSQL Materialized Views", "url": "https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/materialized-views", "emoji": "π»"}, | |
{"title": "LangChain Vector Store Guide", "url": "https://python.langchain.com/docs/integrations/vectorstores/azure_cosmos_db_no_sql/", "emoji": "π»"}, | |
{"title": "Vector Database Prompt Engineering RAG for Python", "url": "https://learn.microsoft.com/en-us/azure/cosmos-db/vector-database?source=recommendations", "emoji": "π»"}, | |
{"title": "MergeKit Official GitHub", "url": "https://github.com/arcee-ai/MergeKit", "emoji": "π»"}, | |
{"title": "MergeKit Sample Usage", "url": "https://github.com/arcee-ai/MergeKit#examples", "emoji": "π"}, | |
{"title": "DistillKit Official GitHub", "url": "https://github.com/arcee-ai/DistillKit", "emoji": "π»"}, | |
{"title": "DistillKit Sample Usage", "url": "https://github.com/arcee-ai/DistillKit#usage", "emoji": "π"}, | |
{"title": "arcee.ai Official Website", "url": "https://arcee.ai", "emoji": "π"}, | |
] | |
# ============================================================================= | |
# βββββββββββββ APP CONFIGURATION βββββββββββββ | |
# ============================================================================= | |
Site_Name = 'π GitCosmos' | |
title = "π GitCosmos" | |
helpURL = 'https://huggingface.co/awacke1' | |
bugURL = 'https://huggingface.co/spaces/awacke1/AzureCosmosDBUI/' | |
icons = 'πππ«' | |
st.set_page_config( | |
page_title=title, | |
page_icon=icons, | |
layout="wide", | |
initial_sidebar_state="auto", | |
menu_items={ | |
'Get Help': helpURL, | |
'Report a bug': bugURL, | |
'About': title | |
} | |
) | |
ENDPOINT = "https://acae-afd.documents.azure.com:443/" | |
DATABASE_NAME = os.environ.get("COSMOS_DATABASE_NAME") | |
CONTAINER_NAME = os.environ.get("COSMOS_CONTAINER_NAME") | |
Key = os.environ.get("Key") | |
CosmosDBUrl = 'https://portal.azure.com/#@AaronCWackergmail.onmicrosoft.com/resource/subscriptions/003fba60-5b3f-48f4-ab36-3ed11bc40816/resourceGroups/datasets/providers/Microsoft.DocumentDB/databaseAccounts/acae-afd/dataExplorer' | |
# ============================================================================= | |
# βββββββββββββ HELPER FUNCTIONS βββββββββββββ | |
# ============================================================================= | |
def get_download_link(file_path): | |
with open(file_path, "rb") as file: | |
contents = file.read() | |
b64 = base64.b64encode(contents).decode() | |
file_name = os.path.basename(file_path) | |
return f'<a href="data:file/txt;base64,{b64}" download="{file_name}">Download {file_name} π</a>' | |
def generate_unique_id(): | |
timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f') | |
unique_uuid = str(uuid.uuid4()) | |
return f"{timestamp}-{unique_uuid}" | |
def generate_filename(prompt, file_type): | |
central = pytz.timezone('US/Central') | |
safe_date_time = datetime.now(central).strftime("%m%d_%H%M") | |
safe_prompt = re.sub(r'\W+', '', prompt)[:90] | |
return f"{safe_date_time}{safe_prompt}.{file_type}" | |
def create_file(filename, prompt, response, should_save=True): | |
if should_save: | |
with open(filename, 'w', encoding='utf-8') as file: | |
file.write(prompt + "\n\n" + response) | |
def load_file(file_name): | |
with open(file_name, "r", encoding='utf-8') as file: | |
return file.read() | |
def create_zip_of_files(files): | |
zip_name = "all_files.zip" | |
with zipfile.ZipFile(zip_name, 'w') as zipf: | |
for file in files: | |
zipf.write(file) | |
return zip_name | |
def preprocess_text(text): | |
text = text.replace('\r\n', '\\n').replace('\r', '\\n').replace('\n', '\\n') | |
text = text.replace('"', '\\"') | |
text = re.sub(r'[\t]', ' ', text) | |
text = re.sub(r'[^\x00-\x7F]+', '', text) | |
return text.strip() | |
def sanitize_json_text(text): | |
text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F]', '', text) | |
return text.strip() | |
# ============================================================================= | |
# βββββββββββββ COSMOS DB FUNCTIONS βββββββββββββ | |
# ============================================================================= | |
def get_databases(client): | |
return [db['id'] for db in client.list_databases()] | |
def get_containers(database): | |
return [container['id'] for container in database.list_containers()] | |
def get_documents(container, limit=None): | |
query = "SELECT * FROM c ORDER BY c._ts DESC" | |
items = list(container.query_items(query=query, enable_cross_partition_query=True, max_item_count=limit)) | |
return items | |
def insert_record(container, record): | |
try: | |
container.create_item(body=record) | |
return True, "Inserted! π" | |
except exceptions.CosmosHttpResponseError as e: | |
return False, f"HTTP error: {str(e)} π¨" | |
except Exception as e: | |
return False, f"Error: {str(e)} π±" | |
def update_record(container, updated_record): | |
try: | |
container.upsert_item(body=updated_record) | |
return True, f"Updated {updated_record['id']} π οΈ" | |
except exceptions.CosmosHttpResponseError as e: | |
return False, f"HTTP error: {str(e)} π¨" | |
except Exception as e: | |
return False, f"Error: {str(e)} π±" | |
def delete_record(container, record): | |
try: | |
doc_id = record["id"] | |
partition_key_value = record.get("pk", doc_id) | |
st.write(f"Deleting {doc_id} with partition key {partition_key_value}") | |
container.delete_item(item=doc_id, partition_key=partition_key_value) | |
return True, f"Record {doc_id} deleted. ποΈ" | |
except exceptions.CosmosResourceNotFoundError: | |
return True, f"Record {doc_id} not found (already deleted). ποΈ" | |
except exceptions.CosmosHttpResponseError as e: | |
return False, f"HTTP error deleting {doc_id}: {str(e)} π¨" | |
except Exception as e: | |
return False, f"Unexpected error deleting {doc_id}: {str(e)} π±" | |
def save_to_cosmos_db(container, query, response1, response2): | |
try: | |
timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S%f') | |
unique_uuid = str(uuid.uuid4()) | |
new_id = f"{timestamp}-{unique_uuid}" | |
record = { | |
"id": new_id, | |
"pk": new_id, | |
"name": new_id, | |
"query": query, | |
"response1": response1, | |
"response2": response2, | |
"timestamp": datetime.utcnow().isoformat(), | |
"type": "ai_response", | |
"version": "1.0" | |
} | |
container.create_item(body=record) | |
st.success(f"Saved: {record['id']}") | |
except Exception as e: | |
st.error(f"Save error: {str(e)}") | |
def archive_current_container(database_name, container_name, client): | |
try: | |
base_dir = "./cosmos_archive" | |
if os.path.exists(base_dir): | |
shutil.rmtree(base_dir) | |
os.makedirs(base_dir) | |
db_client = client.get_database_client(database_name) | |
container_client = db_client.get_container_client(container_name) | |
items = list(container_client.read_all_items()) | |
container_dir = os.path.join(base_dir, container_name) | |
os.makedirs(container_dir) | |
for item in items: | |
item_id = item.get('id', f"unknown_{datetime.now().strftime('%Y%m%d%H%M%S')}") | |
with open(os.path.join(container_dir, f"{item_id}.json"), 'w') as f: | |
json.dump(item, f, indent=2) | |
archive_name = f"{container_name}_archive_{datetime.now().strftime('%Y%m%d%H%M%S')}" | |
shutil.make_archive(archive_name, 'zip', base_dir) | |
return get_download_link(f"{archive_name}.zip") | |
except Exception as e: | |
return f"Archive error: {str(e)} π’" | |
def create_new_container(database, container_id, partition_key_path): | |
try: | |
container = database.create_container( | |
id=container_id, | |
partition_key=PartitionKey(path=partition_key_path) | |
) | |
return container | |
except exceptions.CosmosResourceExistsError: | |
return database.get_container_client(container_id) | |
except exceptions.CosmosHttpResponseError as e: | |
st.error(f"Error creating container: {str(e)}") | |
return None | |
# ============================================================================= | |
# βββββββββββββ GITHUB FUNCTIONS βββββββββββββ | |
# ============================================================================= | |
def download_github_repo(url, local_path): | |
if os.path.exists(local_path): | |
shutil.rmtree(local_path) | |
Repo.clone_from(url, local_path) | |
def create_zip_file(source_dir, output_filename): | |
shutil.make_archive(output_filename, 'zip', source_dir) | |
def create_repo(g, repo_name): | |
user = g.get_user() | |
return user.create_repo(repo_name) | |
def push_to_github(local_path, repo, github_token): | |
repo_url = f"https://{github_token}@github.com/{repo.full_name}.git" | |
local_repo = Repo(local_path) | |
if 'origin' in [remote.name for remote in local_repo.remotes]: | |
origin = local_repo.remote('origin') | |
origin.set_url(repo_url) | |
else: | |
origin = local_repo.create_remote('origin', repo_url) | |
if not local_repo.heads: | |
local_repo.git.checkout('-b', 'main') | |
current_branch = 'main' | |
else: | |
current_branch = local_repo.active_branch.name | |
local_repo.git.add(A=True) | |
if local_repo.is_dirty(): | |
local_repo.git.commit('-m', 'Initial commit') | |
origin.push(refspec=f'{current_branch}:{current_branch}') | |
# ============================================================================= | |
# βββββββββββββ FILE & MEDIA MANAGEMENT FUNCTIONS βββββββββββββ | |
# ============================================================================= | |
def display_file_viewer(file_path): | |
content = load_file(file_path) | |
if content: | |
st.markdown("### π File Viewer") | |
st.markdown(f"**{file_path}**") | |
file_stats = os.stat(file_path) | |
st.markdown(f"**Mod:** {datetime.fromtimestamp(file_stats.st_mtime).strftime('%Y-%m-%d %H:%M:%S')} | **Size:** {file_stats.st_size} bytes") | |
st.markdown("---") | |
st.markdown(content) | |
st.download_button("β¬οΈ", data=content, file_name=os.path.basename(file_path), mime="text/markdown") | |
def display_file_editor(file_path): | |
if 'file_content' not in st.session_state: | |
st.session_state.file_content = {} | |
if file_path not in st.session_state.file_content: | |
content = load_file(file_path) | |
if content: | |
st.session_state.file_content[file_path] = content | |
st.markdown("### βοΈ Edit File") | |
st.markdown(f"**Editing:** {file_path}") | |
new_content = st.text_area("Edit:", value=st.session_state.file_content.get(file_path, ""), height=400, key=f"editor_{hash(file_path)}") | |
col1, col2 = st.columns([1, 5]) | |
with col1: | |
if st.button("πΎ Save"): | |
sanitized = sanitize_json_text(new_content) | |
try: | |
json.loads(sanitized) | |
with open(file_path, 'w', encoding='utf-8') as file: | |
file.write(sanitized) | |
st.session_state.file_content[file_path] = sanitized | |
st.success("Saved! π") | |
time.sleep(1) | |
st.rerun() | |
except Exception as e: | |
st.error(f"Save error: {str(e)}") | |
with col2: | |
st.download_button("β¬οΈ", data=new_content, file_name=os.path.basename(file_path), mime="text/markdown") | |
def update_file_management_section(): | |
if 'file_view_mode' not in st.session_state: | |
st.session_state.file_view_mode = None | |
if 'current_file' not in st.session_state: | |
st.session_state.current_file = None | |
all_files = sorted(glob.glob("*.md"), reverse=True) | |
st.sidebar.subheader("π Files") | |
if st.sidebar.button("π Delete All"): | |
for file in all_files: | |
os.remove(file) | |
st.session_state.file_content = {} | |
st.session_state.current_file = None | |
st.session_state.file_view_mode = None | |
st.rerun() | |
if st.sidebar.button("β¬οΈ Download All"): | |
zip_file = create_zip_of_files(all_files) | |
st.sidebar.markdown(get_download_link(zip_file), unsafe_allow_html=True) | |
for file in all_files: | |
col1, col2, col3, col4 = st.sidebar.columns([1, 3, 1, 1]) | |
with col1: | |
if st.button("π", key=f"view_{file}"): | |
st.session_state.current_file = file | |
st.session_state.file_view_mode = 'view' | |
st.rerun() | |
with col2: | |
st.markdown(get_download_link(file), unsafe_allow_html=True) | |
with col3: | |
if st.button("π", key=f"edit_{file}"): | |
st.session_state.current_file = file | |
st.session_state.file_view_mode = 'edit' | |
st.rerun() | |
with col4: | |
if st.button("π", key=f"delete_{file}"): | |
os.remove(file) | |
if file in st.session_state.file_content: | |
del st.session_state.file_content[file] | |
if st.session_state.current_file == file: | |
st.session_state.current_file = None | |
st.session_state.file_view_mode = None | |
st.rerun() | |
st.sidebar.subheader("External Help Links") | |
for link in external_links: | |
st.sidebar.markdown(f"{link['emoji']} [{link['title']}]({link['url']})", unsafe_allow_html=True) | |
if st.session_state.current_file: | |
if st.session_state.file_view_mode == 'view': | |
display_file_viewer(st.session_state.current_file) | |
elif st.session_state.file_view_mode == 'edit': | |
display_file_editor(st.session_state.current_file) | |
# ============================================================================= | |
# βββββββββββββ UI FUNCTIONS βββββββββββββ | |
# ============================================================================= | |
def edit_all_documents(container, search_keyword=None): | |
st.markdown("### π All Documents" + (f" (Filtered: '{search_keyword}')" if search_keyword else "")) | |
documents = get_documents(container) | |
if search_keyword: | |
documents = [doc for doc in documents if vector_keyword_search(search_keyword, doc)] | |
if not documents: | |
st.info("No documents match the current filter." if search_keyword else "No documents in this container.") | |
return | |
if 'saved_docs' not in st.session_state: | |
st.session_state.saved_docs = {} | |
for doc in documents: | |
ts = doc.get("_ts", 0) | |
dt = datetime.fromtimestamp(ts) if ts else datetime.now() | |
formatted_ts = dt.strftime("%I:%M %p %m/%d/%Y") | |
header = f"{doc.get('name', 'Unnamed')} - {formatted_ts}" | |
with st.expander(header): | |
doc_key = f"editor_{doc['id']}" | |
initial_value = st.session_state.saved_docs.get(doc['id'], json.dumps(doc, indent=2)) | |
edited_content = st.text_area("Edit JSON", value=initial_value, height=300, key=doc_key) | |
col_save, col_delete = st.columns(2) | |
with col_save: | |
if st.button("πΎ Save", key=f"save_{doc['id']}"): | |
try: | |
cleaned_content = sanitize_json_text(edited_content) | |
updated_doc = json.loads(cleaned_content) | |
updated_doc['id'] = doc['id'] | |
updated_doc['pk'] = doc.get('pk', doc['id']) | |
for field in ['_ts', '_rid', '_self', '_etag', '_attachments']: | |
updated_doc.pop(field, None) | |
success, message = update_record(container, updated_doc) | |
if success: | |
st.success(f"Saved {doc['id']}") | |
st.session_state.saved_docs[doc['id']] = json.dumps(updated_doc, indent=2) | |
st.rerun() | |
else: | |
st.error(message) | |
except json.JSONDecodeError as e: | |
st.error(f"Invalid JSON format: {str(e)}\nProblematic input:\n{cleaned_content}") | |
except Exception as e: | |
st.error(f"Save error: {str(e)}") | |
with col_delete: | |
if st.button("ποΈ Delete", key=f"delete_{doc['id']}"): | |
success, message = delete_record(container, doc) | |
if success: | |
st.success(message) | |
if doc['id'] in st.session_state.saved_docs: | |
del st.session_state.saved_docs[doc['id']] | |
st.rerun() | |
else: | |
st.error(message) | |
def new_item_default(container): | |
new_id = generate_unique_id() | |
default_doc = { | |
"id": new_id, | |
"pk": new_id, | |
"name": "New Document", | |
"content": "Start editing here...", | |
"timestamp": datetime.now().isoformat(), | |
"type": "sample" | |
} | |
success, message = insert_record(container, default_doc) | |
if success: | |
st.success("New document created! β¨") | |
st.rerun() | |
else: | |
st.error(f"Error creating new item: {message}") | |
def add_field_to_doc(): | |
key = st.session_state.new_field_key | |
value = st.session_state.new_field_value | |
try: | |
doc = json.loads(st.session_state.doc_editor) | |
doc[key] = value | |
st.session_state.doc_editor = json.dumps(doc, indent=2) | |
container = st.session_state.current_container | |
success, message = update_record(container, doc) | |
if success: | |
st.success(f"Added field {key} π") | |
else: | |
st.error(message) | |
except Exception as e: | |
st.error(f"Error adding field: {str(e)}") | |
def new_ai_record(container): | |
new_id = generate_unique_id() | |
default_doc = { | |
"id": new_id, | |
"pk": new_id, | |
"name": "AI Modality Record", | |
"function_url": "https://example.com/function", | |
"input_text": "### Input (markdown)\n\nType your input here.", | |
"output_text": "### Output (markdown)\n\nResult will appear here.", | |
"timestamp": datetime.now().isoformat(), | |
"type": "ai_modality" | |
} | |
success, message = insert_record(container, default_doc) | |
if success: | |
st.success("New AI modality record created! π‘") | |
st.rerun() | |
else: | |
st.error(f"Error creating AI record: {message}") | |
def new_links_record(container): | |
new_id = generate_unique_id() | |
links_md = "\n".join([f"- {link['emoji']} [{link['title']}]({link['url']})" for link in external_links]) | |
default_doc = { | |
"id": new_id, | |
"pk": new_id, | |
"name": "Portal Links Record", | |
"function_url": "", | |
"input_text": links_md, | |
"output_text": "", | |
"timestamp": datetime.now().isoformat(), | |
"type": "ai_modality" | |
} | |
success, message = insert_record(container, default_doc) | |
if success: | |
st.success("New Portal Links record created! π") | |
st.rerun() | |
else: | |
st.error(f"Error creating links record: {message}") | |
def vector_keyword_search(keyword, doc): | |
keyword = keyword.lower() | |
for key, value in doc.items(): | |
if isinstance(value, str) and keyword in value.lower(): | |
return True | |
return False | |
def search_documents_ui(container): | |
with st.sidebar.form("search_form"): | |
keyword = st.text_input("Search Keyword", key="search_keyword") | |
col1, col2 = st.columns(2) | |
with col1: | |
search_submitted = st.form_submit_button("π Search") | |
with col2: | |
clear_submitted = st.form_submit_button("ποΈ Clear") | |
if search_submitted and keyword: | |
st.session_state.active_search = keyword # Use a separate key | |
st.rerun() | |
if clear_submitted: | |
if 'active_search' in st.session_state: | |
del st.session_state.active_search | |
st.rerun() | |
def validate_and_preprocess_image(file_data, target_size=(576, 1024)): | |
try: | |
if isinstance(file_data, bytes): | |
img = Image.open(io.BytesIO(file_data)) | |
elif hasattr(file_data, 'read'): | |
if hasattr(file_data, 'seek'): | |
file_data.seek(0) | |
img = Image.open(file_data) | |
else: | |
raise ValueError(f"Unsupported input: {type(file_data)}") | |
if img.mode != 'RGB': | |
img = img.convert('RGB') | |
aspect_ratio = img.size[0] / img.size[1] | |
if aspect_ratio > target_size[0] / target_size[1]: | |
new_width = target_size[0] | |
new_height = int(new_width / aspect_ratio) | |
else: | |
new_height = target_size[1] | |
new_width = int(new_height * aspect_ratio) | |
new_width = (new_width // 2) * 2 | |
new_height = (new_height // 2) * 2 | |
resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS) | |
final_img = Image.new('RGB', target_size, (255, 255, 255)) | |
paste_x = (target_size[0] - new_width) // 2 | |
paste_y = (target_size[1] - new_height) // 2 | |
final_img.paste(resized_img, (paste_x, paste_y)) | |
return final_img | |
except Exception as e: | |
st.error(f"Image error: {str(e)}") | |
return None | |
def add_video_generation_ui(container): | |
st.markdown("### π₯ Video Generation") | |
col1, col2 = st.columns([2, 1]) | |
with col1: | |
uploaded_file = st.file_uploader("Upload Image πΌοΈ", type=['png', 'jpg', 'jpeg']) | |
with col2: | |
st.markdown("#### Parameters") | |
motion = st.slider("π Motion", 1, 255, 127) | |
fps = st.slider("π¬ FPS", 1, 30, 6) | |
with st.expander("Advanced"): | |
use_custom = st.checkbox("Custom Seed") | |
seed = st.number_input("Seed", value=int(time.time() * 1000)) if use_custom else None | |
if uploaded_file: | |
file_data = uploaded_file.read() | |
preview1, preview2 = st.columns(2) | |
with preview1: | |
st.write("Original") | |
st.image(Image.open(io.BytesIO(file_data)), use_column_width=True) | |
with preview2: | |
proc_img = validate_and_preprocess_image(io.BytesIO(file_data)) | |
if proc_img: | |
st.write("Processed") | |
st.image(proc_img, use_column_width=True) | |
else: | |
return | |
if st.button("π₯ Generate"): | |
with st.spinner("Generating video..."): | |
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as temp_file: | |
proc_img.save(temp_file.name, format='PNG') | |
try: | |
client = Client("awacke1/stable-video-diffusion", hf_token=os.environ.get("HUGGINGFACE_TOKEN")) | |
result = client.predict( | |
image=temp_file.name, | |
seed=seed if seed is not None else int(time.time() * 1000), | |
randomize_seed=seed is None, | |
motion_bucket_id=motion, | |
fps_id=fps, | |
api_name="/video" | |
) | |
video_path = result[0].get('video') if isinstance(result[0], dict) else None | |
if video_path and os.path.exists(video_path): | |
video_filename = f"generated_video_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp4" | |
shutil.copy(video_path, video_filename) | |
st.success(f"Video generated! π") | |
st.video(video_filename) | |
if container: | |
video_record = { | |
"id": generate_unique_id(), | |
"pk": generate_unique_id(), | |
"type": "generated_video", | |
"filename": video_filename, | |
"seed": seed if seed is not None else "random", | |
"motion": motion, | |
"fps": fps, | |
"timestamp": datetime.now().isoformat() | |
} | |
success, message = insert_record(container, video_record) | |
if success: | |
st.success("DB record saved!") | |
else: | |
st.error(f"DB error: {message}") | |
except Exception as e: | |
st.error(f"Video gen error: {str(e)}") | |
finally: | |
os.unlink(temp_file.name) | |
# ============================================================================= | |
# βββββββββββββ MAIN FUNCTION βββββββββββββ | |
# ============================================================================= | |
def main(): | |
st.markdown("### π GitCosmos - Cosmos & Git Hub") | |
st.markdown(f"[π Portal]({CosmosDBUrl})") | |
if "chat_history" not in st.session_state: | |
st.session_state.chat_history = [] | |
if "current_container" not in st.session_state: | |
st.session_state.current_container = None | |
if not Key: | |
st.error("Missing Cosmos Key πβ") | |
return | |
st.session_state.primary_key = Key | |
st.session_state.logged_in = True | |
# Sidebar: Hierarchical Navigation | |
st.sidebar.title("π Navigator") | |
# Databases Section | |
st.sidebar.subheader("ποΈ Databases") | |
if "client" not in st.session_state: | |
st.session_state.client = CosmosClient(ENDPOINT, credential=Key) | |
databases = get_databases(st.session_state.client) | |
selected_db = st.sidebar.selectbox("Select Database", databases, key="db_select") | |
if selected_db != st.session_state.get("selected_database"): | |
st.session_state.selected_database = selected_db | |
st.session_state.selected_container = None | |
st.session_state.current_container = None | |
if 'active_search' in st.session_state: | |
del st.session_state.active_search | |
st.rerun() | |
# Containers Section | |
if st.session_state.selected_database: | |
database = st.session_state.client.get_database_client(st.session_state.selected_database) | |
st.sidebar.subheader("π Containers") | |
if st.sidebar.button("π New Container"): | |
with st.sidebar.form("new_container_form"): | |
container_id = st.text_input("Container ID", "new-container") | |
partition_key = st.text_input("Partition Key", "/pk") | |
if st.form_submit_button("Create"): | |
container = create_new_container(database, container_id, partition_key) | |
if container: | |
st.success(f"Container '{container_id}' created!") | |
st.rerun() | |
containers = get_containers(database) | |
selected_container = st.sidebar.selectbox("Select Container", containers, key="container_select") | |
if selected_container != st.session_state.get("selected_container"): | |
st.session_state.selected_container = selected_container | |
st.session_state.current_container = database.get_container_client(selected_container) | |
if 'active_search' in st.session_state: | |
del st.session_state.active_search | |
st.rerun() | |
# Actions Section | |
st.sidebar.subheader("βοΈ Actions") | |
if st.session_state.current_container: | |
if st.sidebar.button("π¦ Export Container"): | |
download_link = archive_current_container(st.session_state.selected_database, st.session_state.selected_container, st.session_state.client) | |
st.sidebar.markdown(download_link, unsafe_allow_html=True) if download_link.startswith('<a') else st.sidebar.error(download_link) | |
# Items Section | |
st.sidebar.subheader("π Items") | |
if st.session_state.current_container: | |
if st.sidebar.button("β New Item"): | |
new_item_default(st.session_state.current_container) | |
st.sidebar.text_input("New Field Key", key="new_field_key") | |
st.sidebar.text_input("New Field Value", key="new_field_value") | |
if st.sidebar.button("β Add Field") and "doc_editor" in st.session_state: | |
add_field_to_doc() | |
if st.sidebar.button("π€ New AI Record"): | |
new_ai_record(st.session_state.current_container) | |
if st.sidebar.button("π New Links Record"): | |
new_links_record(st.session_state.current_container) | |
search_documents_ui(st.session_state.current_container) | |
# Central Area: Editable Documents with Search Filter | |
if st.session_state.current_container: | |
search_keyword = st.session_state.get('active_search', None) | |
edit_all_documents(st.session_state.current_container, search_keyword) | |
else: | |
st.info("Select a database and container to view and edit documents.") | |
# Additional Features | |
update_file_management_section() | |
add_video_generation_ui(st.session_state.current_container if st.session_state.current_container else None) | |
# Logout | |
if st.session_state.logged_in and st.sidebar.button("πͺ Logout"): | |
st.session_state.logged_in = False | |
st.session_state.client = None | |
st.session_state.selected_database = None | |
st.session_state.selected_container = None | |
st.session_state.current_container = None | |
if 'active_search' in st.session_state: | |
del st.session_state.active_search | |
st.rerun() | |
if __name__ == "__main__": | |
main() |