import streamlit as st from azure.cosmos import CosmosClient, PartitionKey, exceptions import os import pandas as pd import traceback import requests import shutil import zipfile from github import Github from git import Repo from datetime import datetime import base64 import json st.set_page_config(layout="wide") # Cosmos DB configuration ENDPOINT = "https://acae-afd.documents.azure.com:443/" SUBSCRIPTION_ID = "003fba60-5b3f-48f4-ab36-3ed11bc40816" DATABASE_NAME = os.environ.get("COSMOS_DATABASE_NAME") CONTAINER_NAME = os.environ.get("COSMOS_CONTAINER_NAME") Key = os.environ.get("Key") # GitHub configuration def download_github_repo(url, local_path): if os.path.exists(local_path): shutil.rmtree(local_path) Repo.clone_from(url, local_path) def create_zip_file(source_dir, output_filename): shutil.make_archive(output_filename, 'zip', source_dir) def create_repo(g, repo_name): user = g.get_user() return user.create_repo(repo_name) def push_to_github(local_path, repo, github_token): repo_url = f"https://{github_token}@github.com/{repo.full_name}.git" local_repo = Repo(local_path) if 'origin' in [remote.name for remote in local_repo.remotes]: origin = local_repo.remote('origin') origin.set_url(repo_url) else: origin = local_repo.create_remote('origin', repo_url) if not local_repo.heads: local_repo.git.checkout('-b', 'main') current_branch = 'main' else: current_branch = local_repo.active_branch.name local_repo.git.add(A=True) if local_repo.is_dirty(): local_repo.git.commit('-m', 'Initial commit') origin.push(refspec=f'{current_branch}:{current_branch}') def get_base64_download_link(file_path, file_name): with open(file_path, "rb") as file: contents = file.read() base64_encoded = base64.b64encode(contents).decode() return f'Download {file_name}' # New functions for dynamic sidebar def get_databases(client): return [db['id'] for db in client.list_databases()] def get_containers(database): return [container['id'] for container in database.list_containers()] def get_documents(container, limit=1000): query = "SELECT * FROM c" items = list(container.query_items(query=query, enable_cross_partition_query=True, max_item_count=limit)) return items # Cosmos DB functions def insert_record(record): try: response = container.create_item(body=record) return True, response except exceptions.CosmosHttpResponseError as e: return False, f"HTTP error occurred: {str(e)}. Status code: {e.status_code}" except Exception as e: return False, f"An unexpected error occurred: {str(e)}" def call_stored_procedure(record): try: response = container.scripts.execute_stored_procedure( sproc="processPrompt", params=[record], partition_key=record['id'] ) return True, response except exceptions.CosmosHttpResponseError as e: error_message = f"HTTP error occurred: {str(e)}. Status code: {e.status_code}" return False, error_message except Exception as e: error_message = f"An unexpected error occurred: {str(e)}" return False, error_message def fetch_all_records(): try: query = "SELECT * FROM c" items = list(container.query_items(query=query, enable_cross_partition_query=True)) return pd.DataFrame(items) except exceptions.CosmosHttpResponseError as e: st.error(f"HTTP error occurred while fetching records: {str(e)}. Status code: {e.status_code}") return pd.DataFrame() except Exception as e: st.error(f"An unexpected error occurred while fetching records: {str(e)}") return pd.DataFrame() def update_record(updated_record): try: container.upsert_item(body=updated_record) return True, f"Record with id {updated_record['id']} successfully updated." except exceptions.CosmosHttpResponseError as e: return False, f"HTTP error occurred: {str(e)}. Status code: {e.status_code}" except Exception as e: return False, f"An unexpected error occurred: {traceback.format_exc()}" def delete_record(name, id): try: container.delete_item(item=id, partition_key=id) return True, f"Successfully deleted record with name: {name} and id: {id}" except exceptions.CosmosResourceNotFoundError: return False, f"Record with id {id} not found. It may have been already deleted." except exceptions.CosmosHttpResponseError as e: return False, f"HTTP error occurred: {str(e)}. Status code: {e.status_code}" except Exception as e: return False, f"An unexpected error occurred: {traceback.format_exc()}" # New function to archive all databases and containers def archive_all_data(client): try: base_dir = "./cosmos_archive" if os.path.exists(base_dir): shutil.rmtree(base_dir) os.makedirs(base_dir) for database in client.list_databases(): db_name = database['id'] db_dir = os.path.join(base_dir, db_name) os.makedirs(db_dir) db_client = client.get_database_client(db_name) for container in db_client.list_containers(): container_name = container['id'] container_dir = os.path.join(db_dir, container_name) os.makedirs(container_dir) container_client = db_client.get_container_client(container_name) items = list(container_client.read_all_items()) with open(os.path.join(container_dir, f"{container_name}.json"), 'w') as f: json.dump(items, f, indent=2) archive_name = f"cosmos_archive_{datetime.now().strftime('%Y%m%d_%H%M%S')}" shutil.make_archive(archive_name, 'zip', base_dir) return get_base64_download_link(f"{archive_name}.zip", f"{archive_name}.zip") except Exception as e: return f"An error occurred while archiving data: {str(e)}" # Modify the main app def main(): st.title("🌟 Cosmos DB and GitHub Integration") # Initialize session state if 'logged_in' not in st.session_state: st.session_state.logged_in = False if 'selected_records' not in st.session_state: st.session_state.selected_records = [] if 'client' not in st.session_state: st.session_state.client = None if 'selected_database' not in st.session_state: st.session_state.selected_database = None if 'selected_container' not in st.session_state: st.session_state.selected_container = None # Login section if not st.session_state.logged_in: st.subheader("🔐 Login") #input_key = st.text_input("Enter your Cosmos DB key", type="password") input_key=Key # Cosmos DB configuration if st.button("🚀 Login"): if input_key: st.session_state.primary_key = input_key st.session_state.logged_in = True st.rerun() else: st.error("Invalid key. Please check your input.") else: # Initialize Cosmos DB client try: if st.session_state.client is None: st.session_state.client = CosmosClient(ENDPOINT, credential=st.session_state.primary_key) # Sidebar for database, container, and document selection st.sidebar.title("🗄️ Cosmos DB Navigator") databases = get_databases(st.session_state.client) selected_db = st.sidebar.selectbox("🗃️ Select Database", databases) if selected_db != st.session_state.selected_database: st.session_state.selected_database = selected_db st.session_state.selected_container = None st.rerun() if st.session_state.selected_database: database = st.session_state.client.get_database_client(st.session_state.selected_database) containers = get_containers(database) selected_container = st.sidebar.selectbox("📁 Select Container", containers) if selected_container != st.session_state.selected_container: st.session_state.selected_container = selected_container st.rerun() if st.session_state.selected_container: container = database.get_container_client(st.session_state.selected_container) limit_to_1000 = st.sidebar.checkbox("🔢 Limit to top 1000 documents", value=True) documents = get_documents(container, limit=1000 if limit_to_1000 else None) if documents: document_ids = [doc.get('id', 'Unknown') for doc in documents] selected_document = st.sidebar.selectbox("📄 Select Document", document_ids) if selected_document: st.subheader(f"📄 Document Details: {selected_document}") selected_doc = next((doc for doc in documents if doc.get('id') == selected_document), None) if selected_doc: st.json(selected_doc) else: st.sidebar.info("No documents found in this container.") # Main content area st.subheader(f"📊 Container: {st.session_state.selected_container}") if st.session_state.selected_container: df = pd.DataFrame(documents) st.dataframe(df) # GitHub section st.subheader("🐙 GitHub Operations") github_token = os.environ.get("GITHUB") # Read GitHub token from environment variable source_repo = st.text_input("Source GitHub Repository URL", value="https://github.com/AaronCWacker/AIExamples-8-24-Streamlit") new_repo_name = st.text_input("New Repository Name (for cloning)", value=f"AIExample-Clone-{datetime.now().strftime('%Y%m%d_%H%M%S')}") col1, col2 = st.columns(2) with col1: if st.button("📥 Clone Repository"): if github_token and source_repo: try: local_path = f"./temp_repo_{datetime.now().strftime('%Y%m%d_%H%M%S')}" download_github_repo(source_repo, local_path) zip_filename = f"{new_repo_name}.zip" create_zip_file(local_path, zip_filename[:-4]) st.markdown(get_base64_download_link(zip_filename, zip_filename), unsafe_allow_html=True) st.success("Repository cloned successfully!") except Exception as e: st.error(f"An error occurred: {str(e)}") finally: if os.path.exists(local_path): shutil.rmtree(local_path) if os.path.exists(zip_filename): os.remove(zip_filename) else: st.error("Please ensure GitHub token is set in environment variables and source repository URL is provided.") with col2: if st.button("📤 Push to New Repository"): if github_token and source_repo: try: g = Github(github_token) new_repo = create_repo(g, new_repo_name) local_path = f"./temp_repo_{datetime.now().strftime('%Y%m%d_%H%M%S')}" download_github_repo(source_repo, local_path) push_to_github(local_path, new_repo, github_token) st.success(f"Repository pushed successfully to {new_repo.html_url}") except Exception as e: st.error(f"An error occurred: {str(e)}") finally: if os.path.exists(local_path): shutil.rmtree(local_path) else: st.error("Please ensure GitHub token is set in environment variables and source repository URL is provided.") except exceptions.CosmosHttpResponseError as e: st.error(f"Failed to connect to Cosmos DB. HTTP error: {str(e)}. Status code: {e.status_code}") except Exception as e: st.error(f"An unexpected error occurred: {str(e)}") # Logout button if st.session_state.logged_in and st.sidebar.button("🚪 Logout"): st.session_state.logged_in = False st.session_state.selected_records.clear() st.session_state.client = None st.session_state.selected_database = None st.session_state.selected_container = None st.rerun() if __name__ == "__main__": main()