import streamlit as st from azure.cosmos import CosmosClient, PartitionKey, exceptions import os import pandas as pd import traceback import requests import shutil import zipfile from github import Github from git import Repo from datetime import datetime import base64 import json st.set_page_config(layout="wide") # Cosmos DB configuration ENDPOINT = "https://acae-afd.documents.azure.com:443/" SUBSCRIPTION_ID = "003fba60-5b3f-48f4-ab36-3ed11bc40816" DATABASE_NAME = os.environ.get("COSMOS_DATABASE_NAME") CONTAINER_NAME = os.environ.get("COSMOS_CONTAINER_NAME") Key = os.environ.get("Key") # GitHub configuration def download_github_repo(url, local_path): if os.path.exists(local_path): shutil.rmtree(local_path) Repo.clone_from(url, local_path) def create_zip_file(source_dir, output_filename): shutil.make_archive(output_filename, 'zip', source_dir) def create_repo(g, repo_name): user = g.get_user() return user.create_repo(repo_name) def push_to_github(local_path, repo, github_token): repo_url = f"https://{github_token}@github.com/{repo.full_name}.git" local_repo = Repo(local_path) if 'origin' in [remote.name for remote in local_repo.remotes]: origin = local_repo.remote('origin') origin.set_url(repo_url) else: origin = local_repo.create_remote('origin', repo_url) if not local_repo.heads: local_repo.git.checkout('-b', 'main') current_branch = 'main' else: current_branch = local_repo.active_branch.name local_repo.git.add(A=True) if local_repo.is_dirty(): local_repo.git.commit('-m', 'Initial commit') origin.push(refspec=f'{current_branch}:{current_branch}') def get_base64_download_link(file_path, file_name): with open(file_path, "rb") as file: contents = file.read() base64_encoded = base64.b64encode(contents).decode() return f'Download {file_name}' # Cosmos DB functions def insert_record(record): try: response = container.create_item(body=record) return True, response except exceptions.CosmosHttpResponseError as e: return False, f"HTTP error occurred: {str(e)}. Status code: {e.status_code}" except Exception as e: return False, f"An unexpected error occurred: {str(e)}" def call_stored_procedure(record): try: response = container.scripts.execute_stored_procedure( sproc="processPrompt", params=[record], partition_key=record['id'] ) return True, response except exceptions.CosmosHttpResponseError as e: error_message = f"HTTP error occurred: {str(e)}. Status code: {e.status_code}" return False, error_message except Exception as e: error_message = f"An unexpected error occurred: {str(e)}" return False, error_message def fetch_all_records(): try: query = "SELECT * FROM c" items = list(container.query_items(query=query, enable_cross_partition_query=True)) return pd.DataFrame(items) except exceptions.CosmosHttpResponseError as e: st.error(f"HTTP error occurred while fetching records: {str(e)}. Status code: {e.status_code}") return pd.DataFrame() except Exception as e: st.error(f"An unexpected error occurred while fetching records: {str(e)}") return pd.DataFrame() def update_record(updated_record): try: container.upsert_item(body=updated_record) return True, f"Record with id {updated_record['id']} successfully updated." except exceptions.CosmosHttpResponseError as e: return False, f"HTTP error occurred: {str(e)}. Status code: {e.status_code}" except Exception as e: return False, f"An unexpected error occurred: {traceback.format_exc()}" def delete_record(name, id): try: container.delete_item(item=id, partition_key=id) return True, f"Successfully deleted record with name: {name} and id: {id}" except exceptions.CosmosResourceNotFoundError: return False, f"Record with id {id} not found. It may have been already deleted." except exceptions.CosmosHttpResponseError as e: return False, f"HTTP error occurred: {str(e)}. Status code: {e.status_code}" except Exception as e: return False, f"An unexpected error occurred: {traceback.format_exc()}" # New function to archive all databases and containers def archive_all_data(client): try: base_dir = "./cosmos_archive" if os.path.exists(base_dir): shutil.rmtree(base_dir) os.makedirs(base_dir) for database in client.list_databases(): db_name = database['id'] db_dir = os.path.join(base_dir, db_name) os.makedirs(db_dir) db_client = client.get_database_client(db_name) for container in db_client.list_containers(): container_name = container['id'] container_dir = os.path.join(db_dir, container_name) os.makedirs(container_dir) container_client = db_client.get_container_client(container_name) items = list(container_client.read_all_items()) with open(os.path.join(container_dir, f"{container_name}.json"), 'w') as f: json.dump(items, f, indent=2) archive_name = f"cosmos_archive_{datetime.now().strftime('%Y%m%d_%H%M%S')}" shutil.make_archive(archive_name, 'zip', base_dir) return get_base64_download_link(f"{archive_name}.zip", f"{archive_name}.zip") except Exception as e: return f"An error occurred while archiving data: {str(e)}" # Streamlit app st.title("🌟 Cosmos DB and GitHub Integration") # Initialize session state if 'logged_in' not in st.session_state: st.session_state.logged_in = False if 'selected_records' not in st.session_state: st.session_state.selected_records = [] # Login section if not st.session_state.logged_in: st.subheader("🔐 Login") input_key = Key if st.button("🚀 Login"): if input_key: st.session_state.primary_key = input_key st.session_state.logged_in = True st.rerun() else: st.error("Invalid key. Please check your environment variables.") else: # Initialize Cosmos DB client try: client = CosmosClient(ENDPOINT, credential=st.session_state.primary_key) database = client.get_database_client(DATABASE_NAME) container = database.get_container_client(CONTAINER_NAME) except exceptions.CosmosHttpResponseError as e: st.error(f"Failed to connect to Cosmos DB. HTTP error: {str(e)}. Status code: {e.status_code}") st.stop() except Exception as e: st.error(f"An unexpected error occurred while connecting to Cosmos DB: {str(e)}") st.stop() # GitHub section st.subheader("🐙 GitHub Operations") github_token = st.text_input("GitHub Personal Access Token", type="password") source_repo = st.text_input("Source GitHub Repository URL") new_repo_name = st.text_input("New Repository Name (for cloning)") col1, col2 = st.columns(2) with col1: if st.button("📥 Clone Repository"): if github_token and source_repo and new_repo_name: try: local_path = f"./temp_repo_{datetime.now().strftime('%Y%m%d_%H%M%S')}" download_github_repo(source_repo, local_path) zip_filename = f"{new_repo_name}.zip" create_zip_file(local_path, zip_filename[:-4]) st.markdown(get_base64_download_link(zip_filename, zip_filename), unsafe_allow_html=True) st.success("Repository cloned successfully!") except Exception as e: st.error(f"An error occurred: {str(e)}") finally: if os.path.exists(local_path): shutil.rmtree(local_path) if os.path.exists(zip_filename): os.remove(zip_filename) else: st.error("Please provide all required GitHub information.") with col2: if st.button("📤 Push to New Repository"): if github_token and source_repo and new_repo_name: try: g = Github(github_token) new_repo = create_repo(g, new_repo_name) local_path = f"./temp_repo_{datetime.now().strftime('%Y%m%d_%H%M%S')}" download_github_repo(source_repo, local_path) push_to_github(local_path, new_repo, github_token) st.success(f"Repository pushed successfully to {new_repo.html_url}") except Exception as e: st.error(f"An error occurred: {str(e)}") finally: if os.path.exists(local_path): shutil.rmtree(local_path) else: st.error("Please provide all required GitHub information.") # Cosmos DB Operations st.subheader("☁️ Cosmos DB Operations") # Archive all data if st.button("📦 Archive All Cosmos DB Data"): download_link = archive_all_data(client) st.markdown(download_link, unsafe_allow_html=True) # Fetch and display all records st.subheader("📊 All Records") df = fetch_all_records() if df.empty: st.write("No records found in the database.") else: st.write("Records:") for index, row in df.iterrows(): col1, col2, col3 = st.columns([5, 1, 1]) with col1: st.write(f"ID: {row['id']}, Name: {row['name']}, Document: {row['document']}, " f"Evaluation Text: {row['evaluationText']}, Evaluation Score: {row['evaluationScore']}") with col2: key = f"select_{row['id']}" if st.button(f"👉 Select", key=key): st.session_state.selected_record = row.to_dict() with col3: if st.button(f"🗑️ Delete", key=f"delete_{row['id']}"): success, message = delete_record(row['name'], row['id']) if success: st.success(message) st.rerun() else: st.error(message) # Display selected record for editing if 'selected_record' in st.session_state and st.session_state.selected_record: selected_record = st.session_state.selected_record st.subheader(f"Editing Record - ID: {selected_record['id']}") updated_name = st.text_input("Name", value=selected_record['name']) updated_document = st.text_area("Document", value=selected_record['document']) updated_evaluation_text = st.text_area("Evaluation Text", value=selected_record['evaluationText']) updated_evaluation_score = st.text_input("Evaluation Score", value=str(selected_record['evaluationScore'])) if st.button("💾 Save Changes"): updated_record = { "id": selected_record['id'], "name": updated_name, "document": updated_document, "evaluationText": updated_evaluation_text, "evaluationScore": updated_evaluation_score } success, message = update_record(updated_record) if success: st.success(message) st.session_state.selected_record = updated_record else: st.error(message) # Input fields for new record st.subheader("📝 Enter New Record Details") new_id = st.text_input("ID") new_name = st.text_input("Name") new_document = st.text_area("Document") new_evaluation_text = st.text_area("Evaluation Text") new_evaluation_score = st.text_input("Evaluation Score") col1, col2 = st.columns(2) with col1: if st.button("💾 Insert Record"): record = { "id": new_id, "name": new_name, "document": new_document, "evaluationText": new_evaluation_text, "evaluationScore": new_evaluation_score } success, response = insert_record(record) if success: st.success("✅ Record inserted successfully!") st.json(response) else: st.error(f"❌ Failed to insert record: {response}") st.rerun() with col2: if st.button("🔧 Call Procedure"): record = { "id": new_id, "name": new_name, "document": new_document, "evaluationText": new_evaluation_text, "evaluationScore": new_evaluation_score } success, response = call_stored_procedure(record) if success: st.success("✅ Stored procedure executed successfully!") st.json(response) else: st.error(f"❌ Failed to execute stored procedure: {response}") # Logout button if st.button("🚪 Logout"): st.session_state.logged_in = False st.session_state.selected_records.clear() st.session_state.selected_record = None st.rerun() # Display connection info st.sidebar.subheader("🔗 Connection Information") st.sidebar.text(f"Endpoint: {ENDPOINT}") st.sidebar.text(f"Subscription ID: {SUBSCRIPTION_ID}") st.sidebar.text(f"Database: {DATABASE_NAME}") st.sidebar.text(f"Container: {CONTAINER_NAME}") #if __name__ == "__main__": # main()