Spaces:
Sleeping
Sleeping
import streamlit as st | |
from azure.cosmos import CosmosClient, PartitionKey, exceptions | |
import os | |
import pandas as pd | |
import traceback | |
import requests | |
import shutil | |
import zipfile | |
from github import Github | |
from git import Repo | |
from datetime import datetime | |
import base64 | |
import json | |
st.set_page_config(layout="wide") | |
# Cosmos DB configuration | |
ENDPOINT = "https://acae-afd.documents.azure.com:443/" | |
SUBSCRIPTION_ID = "003fba60-5b3f-48f4-ab36-3ed11bc40816" | |
DATABASE_NAME = os.environ.get("COSMOS_DATABASE_NAME") | |
CONTAINER_NAME = os.environ.get("COSMOS_CONTAINER_NAME") | |
Key = os.environ.get("Key") | |
# GitHub configuration | |
def download_github_repo(url, local_path): | |
if os.path.exists(local_path): | |
shutil.rmtree(local_path) | |
Repo.clone_from(url, local_path) | |
def create_zip_file(source_dir, output_filename): | |
shutil.make_archive(output_filename, 'zip', source_dir) | |
def create_repo(g, repo_name): | |
user = g.get_user() | |
return user.create_repo(repo_name) | |
def push_to_github(local_path, repo, github_token): | |
repo_url = f"https://{github_token}@github.com/{repo.full_name}.git" | |
local_repo = Repo(local_path) | |
if 'origin' in [remote.name for remote in local_repo.remotes]: | |
origin = local_repo.remote('origin') | |
origin.set_url(repo_url) | |
else: | |
origin = local_repo.create_remote('origin', repo_url) | |
if not local_repo.heads: | |
local_repo.git.checkout('-b', 'main') | |
current_branch = 'main' | |
else: | |
current_branch = local_repo.active_branch.name | |
local_repo.git.add(A=True) | |
if local_repo.is_dirty(): | |
local_repo.git.commit('-m', 'Initial commit') | |
origin.push(refspec=f'{current_branch}:{current_branch}') | |
def get_base64_download_link(file_path, file_name): | |
with open(file_path, "rb") as file: | |
contents = file.read() | |
base64_encoded = base64.b64encode(contents).decode() | |
return f'<a href="data:application/zip;base64,{base64_encoded}" download="{file_name}">Download {file_name}</a>' | |
# Cosmos DB functions | |
def insert_record(record): | |
try: | |
response = container.create_item(body=record) | |
return True, response | |
except exceptions.CosmosHttpResponseError as e: | |
return False, f"HTTP error occurred: {str(e)}. Status code: {e.status_code}" | |
except Exception as e: | |
return False, f"An unexpected error occurred: {str(e)}" | |
def call_stored_procedure(record): | |
try: | |
response = container.scripts.execute_stored_procedure( | |
sproc="processPrompt", | |
params=[record], | |
partition_key=record['id'] | |
) | |
return True, response | |
except exceptions.CosmosHttpResponseError as e: | |
error_message = f"HTTP error occurred: {str(e)}. Status code: {e.status_code}" | |
return False, error_message | |
except Exception as e: | |
error_message = f"An unexpected error occurred: {str(e)}" | |
return False, error_message | |
def fetch_all_records(): | |
try: | |
query = "SELECT * FROM c" | |
items = list(container.query_items(query=query, enable_cross_partition_query=True)) | |
return pd.DataFrame(items) | |
except exceptions.CosmosHttpResponseError as e: | |
st.error(f"HTTP error occurred while fetching records: {str(e)}. Status code: {e.status_code}") | |
return pd.DataFrame() | |
except Exception as e: | |
st.error(f"An unexpected error occurred while fetching records: {str(e)}") | |
return pd.DataFrame() | |
def update_record(updated_record): | |
try: | |
container.upsert_item(body=updated_record) | |
return True, f"Record with id {updated_record['id']} successfully updated." | |
except exceptions.CosmosHttpResponseError as e: | |
return False, f"HTTP error occurred: {str(e)}. Status code: {e.status_code}" | |
except Exception as e: | |
return False, f"An unexpected error occurred: {traceback.format_exc()}" | |
def delete_record(name, id): | |
try: | |
container.delete_item(item=id, partition_key=id) | |
return True, f"Successfully deleted record with name: {name} and id: {id}" | |
except exceptions.CosmosResourceNotFoundError: | |
return False, f"Record with id {id} not found. It may have been already deleted." | |
except exceptions.CosmosHttpResponseError as e: | |
return False, f"HTTP error occurred: {str(e)}. Status code: {e.status_code}" | |
except Exception as e: | |
return False, f"An unexpected error occurred: {traceback.format_exc()}" | |
# New function to archive all databases and containers | |
def archive_all_data(client): | |
try: | |
base_dir = "./cosmos_archive" | |
if os.path.exists(base_dir): | |
shutil.rmtree(base_dir) | |
os.makedirs(base_dir) | |
for database in client.list_databases(): | |
db_name = database['id'] | |
db_dir = os.path.join(base_dir, db_name) | |
os.makedirs(db_dir) | |
db_client = client.get_database_client(db_name) | |
for container in db_client.list_containers(): | |
container_name = container['id'] | |
container_dir = os.path.join(db_dir, container_name) | |
os.makedirs(container_dir) | |
container_client = db_client.get_container_client(container_name) | |
items = list(container_client.read_all_items()) | |
with open(os.path.join(container_dir, f"{container_name}.json"), 'w') as f: | |
json.dump(items, f, indent=2) | |
archive_name = f"cosmos_archive_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
shutil.make_archive(archive_name, 'zip', base_dir) | |
return get_base64_download_link(f"{archive_name}.zip", f"{archive_name}.zip") | |
except Exception as e: | |
return f"An error occurred while archiving data: {str(e)}" | |
# Streamlit app | |
st.title("π Cosmos DB and GitHub Integration") | |
# Initialize session state | |
if 'logged_in' not in st.session_state: | |
st.session_state.logged_in = False | |
if 'selected_records' not in st.session_state: | |
st.session_state.selected_records = [] | |
# Login section | |
if not st.session_state.logged_in: | |
st.subheader("π Login") | |
input_key = Key | |
if st.button("π Login"): | |
if input_key: | |
st.session_state.primary_key = input_key | |
st.session_state.logged_in = True | |
st.rerun() | |
else: | |
st.error("Invalid key. Please check your environment variables.") | |
else: | |
# Initialize Cosmos DB client | |
try: | |
client = CosmosClient(ENDPOINT, credential=st.session_state.primary_key) | |
database = client.get_database_client(DATABASE_NAME) | |
container = database.get_container_client(CONTAINER_NAME) | |
except exceptions.CosmosHttpResponseError as e: | |
st.error(f"Failed to connect to Cosmos DB. HTTP error: {str(e)}. Status code: {e.status_code}") | |
st.stop() | |
except Exception as e: | |
st.error(f"An unexpected error occurred while connecting to Cosmos DB: {str(e)}") | |
st.stop() | |
# GitHub section | |
st.subheader("π GitHub Operations") | |
github_token = os.environ.get("GITHUB") # Read GitHub token from environment variable | |
source_repo = st.text_input("Source GitHub Repository URL", value="https://github.com/AaronCWacker/AIExamples-8-24-Streamlit") | |
new_repo_name = st.text_input("New Repository Name (for cloning)", value=f"AIExample-Clone-{datetime.now().strftime('%Y%m%d_%H%M%S')}") | |
col1, col2 = st.columns(2) | |
with col1: | |
if st.button("π₯ Clone Repository"): | |
if github_token and source_repo: | |
try: | |
local_path = f"./temp_repo_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
download_github_repo(source_repo, local_path) | |
zip_filename = f"{new_repo_name}.zip" | |
create_zip_file(local_path, zip_filename[:-4]) | |
st.markdown(get_base64_download_link(zip_filename, zip_filename), unsafe_allow_html=True) | |
st.success("Repository cloned successfully!") | |
except Exception as e: | |
st.error(f"An error occurred: {str(e)}") | |
finally: | |
if os.path.exists(local_path): | |
shutil.rmtree(local_path) | |
if os.path.exists(zip_filename): | |
os.remove(zip_filename) | |
else: | |
st.error("Please ensure GitHub token is set in environment variables and source repository URL is provided.") | |
with col2: | |
if st.button("π€ Push to New Repository"): | |
if github_token and source_repo: | |
try: | |
g = Github(github_token) | |
new_repo = create_repo(g, new_repo_name) | |
local_path = f"./temp_repo_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
download_github_repo(source_repo, local_path) | |
push_to_github(local_path, new_repo, github_token) | |
st.success(f"Repository pushed successfully to {new_repo.html_url}") | |
except Exception as e: | |
st.error(f"An error occurred: {str(e)}") | |
finally: | |
if os.path.exists(local_path): | |
shutil.rmtree(local_path) | |
else: | |
st.error("Please ensure GitHub token is set in environment variables and source repository URL is provided.") | |
# Cosmos DB Operations | |
st.subheader("βοΈ Cosmos DB Operations") | |
# Archive all data | |
if st.button("π¦ Archive All Cosmos DB Data"): | |
download_link = archive_all_data(client) | |
st.markdown(download_link, unsafe_allow_html=True) | |
# Fetch and display all records | |
st.subheader("π All Records") | |
df = fetch_all_records() | |
if df.empty: | |
st.write("No records found in the database.") | |
else: | |
st.write("Records:") | |
for index, row in df.iterrows(): | |
col1, col2, col3 = st.columns([5, 1, 1]) | |
with col1: | |
st.write(f"ID: {row['id']}, Name: {row['name']}, Document: {row['document']}, " | |
f"Evaluation Text: {row['evaluationText']}, Evaluation Score: {row['evaluationScore']}") | |
with col2: | |
key = f"select_{row['id']}" | |
if st.button(f"π Select", key=key): | |
st.session_state.selected_record = row.to_dict() | |
with col3: | |
if st.button(f"ποΈ Delete", key=f"delete_{row['id']}"): | |
success, message = delete_record(row['name'], row['id']) | |
if success: | |
st.success(message) | |
st.rerun() | |
else: | |
st.error(message) | |
# Display selected record for editing | |
if 'selected_record' in st.session_state and st.session_state.selected_record: | |
selected_record = st.session_state.selected_record | |
st.subheader(f"Editing Record - ID: {selected_record['id']}") | |
updated_name = st.text_input("Name", value=selected_record['name']) | |
updated_document = st.text_area("Document", value=selected_record['document']) | |
updated_evaluation_text = st.text_area("Evaluation Text", value=selected_record['evaluationText']) | |
updated_evaluation_score = st.text_input("Evaluation Score", value=str(selected_record['evaluationScore'])) | |
if st.button("πΎ Save Changes"): | |
updated_record = { | |
"id": selected_record['id'], | |
"name": updated_name, | |
"document": updated_document, | |
"evaluationText": updated_evaluation_text, | |
"evaluationScore": updated_evaluation_score | |
} | |
success, message = update_record(updated_record) | |
if success: | |
st.success(message) | |
st.session_state.selected_record = updated_record | |
else: | |
st.error(message) | |
# Input fields for new record | |
st.subheader("π Enter New Record Details") | |
new_id = st.text_input("ID") | |
new_name = st.text_input("Name") | |
new_document = st.text_area("Document") | |
new_evaluation_text = st.text_area("Evaluation Text") | |
new_evaluation_score = st.text_input("Evaluation Score") | |
col1, col2 = st.columns(2) | |
with col1: | |
if st.button("πΎ Insert Record"): | |
record = { | |
"id": new_id, | |
"name": new_name, | |
"document": new_document, | |
"evaluationText": new_evaluation_text, | |
"evaluationScore": new_evaluation_score | |
} | |
success, response = insert_record(record) | |
if success: | |
st.success("β Record inserted successfully!") | |
st.json(response) | |
else: | |
st.error(f"β Failed to insert record: {response}") | |
st.rerun() | |
with col2: | |
if st.button("π§ Call Procedure"): | |
record = { | |
"id": new_id, | |
"name": new_name, | |
"document": new_document, | |
"evaluationText": new_evaluation_text, | |
"evaluationScore": new_evaluation_score | |
} | |
success, response = call_stored_procedure(record) | |
if success: | |
st.success("β Stored procedure executed successfully!") | |
st.json(response) | |
else: | |
st.error(f"β Failed to execute stored procedure: {response}") | |
# Logout button | |
if st.button("πͺ Logout"): | |
st.session_state.logged_in = False | |
st.session_state.selected_records.clear() | |
st.session_state.selected_record = None | |
st.rerun() | |
# Display connection info | |
st.sidebar.subheader("π Connection Information") | |
st.sidebar.text(f"Endpoint: {ENDPOINT}") | |
st.sidebar.text(f"Subscription ID: {SUBSCRIPTION_ID}") | |
st.sidebar.text(f"Database: {DATABASE_NAME}") | |
st.sidebar.text(f"Container: {CONTAINER_NAME}") | |
#if __name__ == "__main__": | |
# st.write("Application is running") |