| import streamlit as st | |
| import google.generativeai as genai | |
| import requests | |
| import subprocess | |
| import os | |
| import pylint | |
| import pandas as pd | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier | |
| from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score | |
| import git | |
| import spacy | |
| from spacy.lang.en import English | |
| import boto3 | |
| import unittest | |
| import docker | |
| import sympy as sp | |
| from scipy.optimize import minimize, differential_evolution | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from IPython.display import display | |
| from tenacity import retry, stop_after_attempt, wait_fixed | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from transformers import AutoTokenizer, AutoModel | |
| import networkx as nx | |
| from sklearn.cluster import KMeans | |
| from scipy.stats import ttest_ind | |
| from statsmodels.tsa.arima.model import ARIMA | |
| import nltk | |
| from nltk.sentiment import SentimentIntensityAnalyzer | |
| import cv2 | |
| from PIL import Image | |
| import tensorflow as tf | |
| from tensorflow.keras.applications import ResNet50 | |
| from tensorflow.keras.preprocessing import image | |
| from tensorflow.keras.applications.resnet50 import preprocess_input, decode_predictions | |
| # Configure the Gemini API | |
| genai.configure(api_key=st.secrets["GOOGLE_API_KEY"]) | |
| # Create the model with optimized parameters and enhanced system instructions | |
| generation_config = { | |
| "temperature": 0.4, | |
| "top_p": 0.8, | |
| "top_k": 50, | |
| "max_output_tokens": 4096, | |
| } | |
| model = genai.GenerativeModel( | |
| model_name="gemini-1.5-pro", | |
| generation_config=generation_config, | |
| system_instruction=""" | |
| You are Ath, an ultra-advanced AI code assistant with expertise across multiple domains including machine learning, data science, web development, cloud computing, and more. Your responses should showcase cutting-edge techniques, best practices, and innovative solutions. | |
| """ | |
| ) | |
| chat_session = model.start_chat(history=[]) | |
| @retry(stop=stop_after_attempt(5), wait=wait_fixed(2)) | |
| def generate_response(user_input): | |
| try: | |
| response = chat_session.send_message(user_input) | |
| return response.text | |
| except Exception as e: | |
| return f"Error: {e}" | |
| def optimize_code(code): | |
| with open("temp_code.py", "w") as file: | |
| file.write(code) | |
| result = subprocess.run(["pylint", "temp_code.py"], capture_output=True, text=True) | |
| os.remove("temp_code.py") | |
| return code | |
| def fetch_from_github(query): | |
| # Implement GitHub API interaction here | |
| pass | |
| def interact_with_api(api_url): | |
| response = requests.get(api_url) | |
| return response.json() | |
| def train_advanced_ml_model(X, y): | |
| X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2) | |
| models = { | |
| 'Random Forest': RandomForestClassifier(n_estimators=100, random_state=42), | |
| 'Gradient Boosting': GradientBoostingClassifier(n_estimators=100, random_state=42) | |
| } | |
| results = {} | |
| for name, model in models.items(): | |
| model.fit(X_train, y_train) | |
| y_pred = model.predict(X_test) | |
| results[name] = { | |
| 'accuracy': accuracy_score(y_test, y_pred), | |
| 'precision': precision_score(y_test, y_pred, average='weighted'), | |
| 'recall': recall_score(y_test, y_pred, average='weighted'), | |
| 'f1': f1_score(y_test, y_pred, average='weighted') | |
| } | |
| return results | |
| def handle_error(error): | |
| st.error(f"An error occurred: {error}") | |
| # Implement advanced error logging and notification system here | |
| def initialize_git_repo(repo_path): | |
| if not os.path.exists(repo_path): | |
| os.makedirs(repo_path) | |
| if not os.path.exists(os.path.join(repo_path, '.git')): | |
| repo = git.Repo.init(repo_path) | |
| else: | |
| repo = git.Repo(repo_path) | |
| return repo | |
| def integrate_with_git(repo_path, code): | |
| repo = initialize_git_repo(repo_path) | |
| with open(os.path.join(repo_path, "generated_code.py"), "w") as file: | |
| file.write(code) | |
| repo.index.add(["generated_code.py"]) | |
| repo.index.commit("Added generated code") | |
| def process_user_input(user_input): | |
| nlp = spacy.load("en_core_web_sm") | |
| doc = nlp(user_input) | |
| return doc | |
| def interact_with_cloud_services(service_name, action, params): | |
| client = boto3.client(service_name) | |
| response = getattr(client, action)(**params) | |
| return response | |
| def run_tests(): | |
| tests_dir = os.path.join(os.getcwd(), 'tests') | |
| if not os.path.exists(tests_dir): | |
| os.makedirs(tests_dir) | |
| init_file = os.path.join(tests_dir, '__init__.py') | |
| if not os.path.exists(init_file): | |
| with open(init_file, 'w') as f: | |
| f.write('') | |
| test_suite = unittest.TestLoader().discover(tests_dir) | |
| test_runner = unittest.TextTestRunner() | |
| test_result = test_runner.run(test_suite) | |
| return test_result | |
| def execute_code_in_docker(code): | |
| client = docker.from_env() | |
| try: | |
| container = client.containers.run( | |
| image="python:3.9", | |
| command=f"python -c '{code}'", | |
| detach=True, | |
| remove=True | |
| ) | |
| result = container.wait() | |
| logs = container.logs().decode('utf-8') | |
| return logs, result['StatusCode'] | |
| except Exception as e: | |
| return f"Error: {e}", 1 | |
| def solve_complex_equation(equation): | |
| x, y, z = sp.symbols('x y z') | |
| eq = sp.Eq(eval(equation)) | |
| solution = sp.solve(eq) | |
| return solution | |
| def advanced_optimization(function, bounds): | |
| result = differential_evolution(lambda x: eval(function), bounds) | |
| return result.x, result.fun | |
| def visualize_complex_data(data): | |
| df = pd.DataFrame(data) | |
| fig, axs = plt.subplots(2, 2, figsize=(16, 12)) | |
| sns.heatmap(df.corr(), annot=True, cmap='coolwarm', ax=axs[0, 0]) | |
| axs[0, 0].set_title('Correlation Heatmap') | |
| sns.pairplot(df, diag_kind='kde', ax=axs[0, 1]) | |
| axs[0, 1].set_title('Pairplot') | |
| df.plot(kind='box', ax=axs[1, 0]) | |
| axs[1, 0].set_title('Box Plot') | |
| sns.violinplot(data=df, ax=axs[1, 1]) | |
| axs[1, 1].set_title('Violin Plot') | |
| plt.tight_layout() | |
| return fig | |
| def analyze_complex_data(data): | |
| df = pd.DataFrame(data) | |
| summary = df.describe() | |
| correlation = df.corr() | |
| skewness = df.skew() | |
| kurtosis = df.kurtosis() | |
| return { | |
| 'summary': summary, | |
| 'correlation': correlation, | |
| 'skewness': skewness, | |
| 'kurtosis': kurtosis | |
| } | |
| def train_deep_learning_model(X, y): | |
| class DeepNN(nn.Module): | |
| def __init__(self, input_size): | |
| super(DeepNN, self).__init__() | |
| self.fc1 = nn.Linear(input_size, 64) | |
| self.fc2 = nn.Linear(64, 32) | |
| self.fc3 = nn.Linear(32, 1) | |
| def forward(self, x): | |
| x = torch.relu(self.fc1(x)) | |
| x = torch.relu(self.fc2(x)) | |
| x = torch.sigmoid(self.fc3(x)) | |
| return x | |
| X_tensor = torch.FloatTensor(X.values) | |
| y_tensor = torch.FloatTensor(y.values) | |
| model = DeepNN(X.shape[1]) | |
| criterion = nn.BCELoss() | |
| optimizer = optim.Adam(model.parameters()) | |
| epochs = 100 | |
| for epoch in range(epochs): | |
| optimizer.zero_grad() | |
| outputs = model(X_tensor) | |
| loss = criterion(outputs, y_tensor.unsqueeze(1)) | |
| loss.backward() | |
| optimizer.step() | |
| return model | |
| def perform_nlp_analysis(text): | |
| nlp = spacy.load("en_core_web_sm") | |
| doc = nlp(text) | |
| entities = [(ent.text, ent.label_) for ent in doc.ents] | |
| tokens = [token.text for token in doc] | |
| pos_tags = [(token.text, token.pos_) for token in doc] | |
| sia = SentimentIntensityAnalyzer() | |
| sentiment = sia.polarity_scores(text) | |
| return { | |
| 'entities': entities, | |
| 'tokens': tokens, | |
| 'pos_tags': pos_tags, | |
| 'sentiment': sentiment | |
| } | |
| def perform_image_analysis(image_path): | |
| img = cv2.imread(image_path) | |
| img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| # Perform object detection | |
| model = ResNet50(weights='imagenet') | |
| img_resized = cv2.resize(img_rgb, (224, 224)) | |
| img_array = image.img_to_array(img_resized) | |
| img_array = np.expand_dims(img_array, axis=0) | |
| img_array = preprocess_input(img_array) | |
| predictions = model.predict(img_array) | |
| decoded_predictions = decode_predictions(predictions, top=3)[0] | |
| # Perform edge detection | |
| edges = cv2.Canny(img, 100, 200) | |
| return { | |
| 'predictions': decoded_predictions, | |
| 'edges': edges | |
| } | |
| def perform_time_series_analysis(data): | |
| df = pd.DataFrame(data) | |
| model = ARIMA(df, order=(1, 1, 1)) | |
| results = model.fit() | |
| forecast = results.forecast(steps=5) | |
| return { | |
| 'model_summary': results.summary(), | |
| 'forecast': forecast | |
| } | |
| def perform_graph_analysis(nodes, edges): | |
| G = nx.Graph() | |
| G.add_nodes_from(nodes) | |
| G.add_edges_from(edges) | |
| centrality = nx.degree_centrality(G) | |
| clustering = nx.clustering(G) | |
| shortest_paths = dict(nx.all_pairs_shortest_path_length(G)) | |
| return { | |
| 'centrality': centrality, | |
| 'clustering': clustering, | |
| 'shortest_paths': shortest_paths | |
| } | |
| # Streamlit UI setup | |
| st.set_page_config(page_title="Ultra AI Code Assistant", page_icon="π", layout="wide") | |
| # ... (Keep the existing CSS styles) | |
| st.markdown('<div class="main-container">', unsafe_allow_html=True) | |
| st.title("π Ultra AI Code Assistant") | |
| st.markdown('<p class="subtitle">Powered by Advanced AI and Domain Expertise</p>', unsafe_allow_html=True) | |
| task_type = st.selectbox("Select Task Type", [ | |
| "Code Generation", | |
| "Machine Learning", | |
| "Data Analysis", | |
| "Natural Language Processing", | |
| "Image Analysis", | |
| "Time Series Analysis", | |
| "Graph Analysis" | |
| ]) | |
| prompt = st.text_area("Enter your task description or code:", height=120) | |
| if st.button("Execute Task"): | |
| if prompt.strip() == "": | |
| st.error("Please enter a valid prompt.") | |
| else: | |
| with st.spinner("Processing your request..."): | |
| try: | |
| if task_type == "Code Generation": | |
| processed_input = process_user_input(prompt) | |
| completed_text = generate_response(processed_input.text) | |
| if "Error" in completed_text: | |
| handle_error(completed_text) | |
| else: | |
| optimized_code = optimize_code(completed_text) | |
| st.success("Code generated and optimized successfully!") | |
| st.markdown('<div class="output-container">', unsafe_allow_html=True) | |
| st.markdown('<div class="code-block">', unsafe_allow_html=True) | |
| st.code(optimized_code) | |
| st.markdown('</div>', unsafe_allow_html=True) | |
| st.markdown('</div>', unsafe_allow_html=True) | |
| repo_path = "./repo" | |
| integrate_with_git(repo_path, optimized_code) | |
| test_result = run_tests() | |
| if test_result.wasSuccessful(): | |
| st.success("All tests passed successfully!") | |
| else: | |
| st.error("Some tests failed. Please check the code.") | |
| execution_result, status_code = execute_code_in_docker(optimized_code) | |
| if status_code == 0: | |
| st.success("Code executed successfully in Docker!") | |
| st.text(execution_result) | |
| else: | |
| st.error(f"Code execution failed: {execution_result}") | |
| elif task_type == "Machine Learning": | |
| # For demonstration, we'll use a sample dataset | |
| from sklearn.datasets import make_classification | |
| X, y = make_classification(n_samples=1000, n_features=20, n_classes=2, random_state=42) | |
| results = train_advanced_ml_model(X, y) | |
| st.write("Machine Learning Model Performance:") | |
| st.json(results) | |
| st.write("Deep Learning Model:") | |
| deep_model = train_deep_learning_model(pd.DataFrame(X), pd.Series(y)) | |
| st.write(deep_model) | |
| elif task_type == "Data Analysis": | |
| # For demonstration, we'll use a sample dataset | |
| data = pd.DataFrame(np.random.randn(100, 5), columns=['A', 'B', 'C', 'D', 'E']) | |
| analysis_results = analyze_complex_data(data) | |
| st.write("Data Analysis Results:") | |
| st.write(analysis_results['summary']) | |
| st.write("Correlation Matrix:") | |
| st.write(analysis_results['correlation']) | |
| fig = visualize_complex_data(data) | |
| st.pyplot(fig) | |
| elif task_type == "Natural Language Processing": | |
| nlp_results = perform_nlp_analysis(prompt) | |
| st.write("NLP Analysis Results:") | |
| st.json(nlp_results) | |
| elif task_type == "Image Analysis": | |
| uploaded_file = st.file_uploader("Choose an image...", type=["jpg", "png", "jpeg"]) | |
| if uploaded_file is not None: | |
| image = Image.open(uploaded_file) | |
| st.image(image, caption='Uploaded Image', use_column_width=True) | |
| # Save the uploaded image temporarily | |
| with open("temp_image.jpg", "wb") as f: | |
| f.write(uploaded_file.getbuffer()) | |
| analysis_results = perform_image_analysis("temp_image.jpg") | |
| st.write("Image Analysis Results:") | |
| st.write("Top 3 predictions:") | |
| for i, (imagenet_id, label, score) in enumerate(analysis_results['predictions']): | |
| st.write(f"{i + 1}: {label} ({score:.2f})") | |
| st.write("Edge Detection:") | |
| st.image(analysis_results['edges'], caption='Edge Detection', use_column_width=True) | |
| # Remove the temporary image file | |
| os.remove("temp_image.jpg") | |
| elif task_type == "Time Series Analysis": | |
| # For demonstration, we'll use a sample time series dataset | |
| dates = pd.date_range(start='1/1/2020', end='1/1/2021', freq='D') | |
| values = np.random.randn(len(dates)).cumsum() | |
| ts_data = pd.Series(values, index=dates) | |
| st.line_chart(ts_data) | |
| analysis_results = perform_time_series_analysis(ts_data) | |
| st.write("Time Series Analysis Results:") | |
| st.write(analysis_results['model_summary']) | |
| st.write("Forecast for the next 5 periods:") | |
| st.write(analysis_results['forecast']) | |
| elif task_type == "Graph Analysis": | |
| # For demonstration, we'll use a sample graph | |
| nodes = range(1, 11) | |
| edges = [(1, 2), (1, 3), (2, 4), (2, 5), (3, 6), (3, 7), (4, 8), (5, 9), (6, 10)] | |
| analysis_results = perform_graph_analysis(nodes, edges) | |
| st.write("Graph Analysis Results:") | |
| st.write("Centrality:") | |
| st.json(analysis_results['centrality']) | |
| st.write("Clustering Coefficient:") | |
| st.json(analysis_results['clustering']) | |
| # Visualize the graph | |
| G = nx.Graph() | |
| G.add_nodes_from(nodes) | |
| G.add_edges_from(edges) | |
| fig, ax = plt.subplots(figsize=(10, 8)) | |
| nx.draw(G, with_labels=True, node_color='lightblue', node_size=500, font_size=16, font_weight='bold', ax=ax) | |
| st.pyplot(fig) | |
| except Exception as e: | |
| handle_error(e) | |
| st.markdown(""" | |
| <div style='text-align: center; margin-top: 2rem; color: #4a5568;'> | |
| Created with β€οΈ by Your Ultra AI Code Assistant | |
| </div> | |
| """, unsafe_allow_html=True) | |
| st.markdown('</div>', unsafe_allow_html=True) | |
| # Additional helper functions | |
| def explain_code(code): | |
| """Generate an explanation for the given code using NLP techniques.""" | |
| explanation = generate_response(f"Explain the following code:\n\n{code}") | |
| return explanation | |
| def generate_unit_tests(code): | |
| """Generate unit tests for the given code.""" | |
| unit_tests = generate_response(f"Generate unit tests for the following code:\n\n{code}") | |
| return unit_tests | |
| def suggest_optimizations(code): | |
| """Suggest optimizations for the given code.""" | |
| optimizations = generate_response(f"Suggest optimizations for the following code:\n\n{code}") | |
| return optimizations | |
| def generate_documentation(code): | |
| """Generate documentation for the given code.""" | |
| documentation = generate_response(f"Generate documentation for the following code:\n\n{code}") | |
| return documentation | |
| # Add these new functions to the Streamlit UI | |
| if task_type == "Code Generation": | |
| st.sidebar.header("Code Analysis Tools") | |
| if st.sidebar.button("Explain Code"): | |
| explanation = explain_code(optimized_code) | |
| st.sidebar.subheader("Code Explanation") | |
| st.sidebar.write(explanation) | |
| if st.sidebar.button("Generate Unit Tests"): | |
| unit_tests = generate_unit_tests(optimized_code) | |
| st.sidebar.subheader("Generated Unit Tests") | |
| st.sidebar.code(unit_tests) | |
| if st.sidebar.button("Suggest Optimizations"): | |
| optimizations = suggest_optimizations(optimized_code) | |
| st.sidebar.subheader("Suggested Optimizations") | |
| st.sidebar.write(optimizations) | |
| if st.sidebar.button("Generate Documentation"): | |
| documentation = generate_documentation(optimized_code) | |
| st.sidebar.subheader("Generated Documentation") | |
| st.sidebar.write(documentation) | |
| # Add more advanced features | |
| def perform_security_analysis(code): | |
| """Perform a basic security analysis on the given code.""" | |
| security_analysis = generate_response(f"Perform a security analysis on the following code and suggest improvements:\n\n{code}") | |
| return security_analysis | |
| def generate_api_documentation(code): | |
| """Generate API documentation for the given code.""" | |
| api_docs = generate_response(f"Generate API documentation for the following code:\n\n{code}") | |
| return api_docs | |
| def suggest_design_patterns(code): | |
| """Suggest appropriate design patterns for the given code.""" | |
| design_patterns = generate_response(f"Suggest appropriate design patterns for the following code:\n\n{code}") | |
| return design_patterns | |
| # Add these new functions to the Streamlit UI | |
| if task_type == "Code Generation": | |
| st.sidebar.header("Advanced Code Analysis") | |
| if st.sidebar.button("Security Analysis"): | |
| security_analysis = perform_security_analysis(optimized_code) | |
| st.sidebar.subheader("Security Analysis") | |
| st.sidebar.write(security_analysis) | |
| if st.sidebar.button("Generate API Documentation"): | |
| api_docs = generate_api_documentation(optimized_code) | |
| st.sidebar.subheader("API Documentation") | |
| st.sidebar.write(api_docs) | |
| if st.sidebar.button("Suggest Design Patterns"): | |
| design_patterns = suggest_design_patterns(optimized_code) | |
| st.sidebar.subheader("Suggested Design Patterns") | |
| st.sidebar.write(design_patterns) | |
| # Add a feature to generate a complete project structure | |
| def generate_project_structure(project_description): | |
| """Generate a complete project structure based on the given description.""" | |
| project_structure = generate_response(f"Generate a complete project structure for the following project description:\n\n{project_description}") | |
| return project_structure | |
| # Add this new function to the Streamlit UI | |
| if st.sidebar.button("Generate Project Structure"): | |
| project_description = st.sidebar.text_area("Enter project description:") | |
| if project_description: | |
| project_structure = generate_project_structure(project_description) | |
| st.sidebar.subheader("Generated Project Structure") | |
| st.sidebar.code(project_structure) | |
| # Add a feature to suggest relevant libraries and frameworks | |
| def suggest_libraries(code): | |
| """Suggest relevant libraries and frameworks for the given code.""" | |
| suggestions = generate_response(f"Suggest relevant libraries and frameworks for the following code:\n\n{code}") | |
| return suggestions | |
| # Add this new function to the Streamlit UI | |
| if task_type == "Code Generation": | |
| if st.sidebar.button("Suggest Libraries"): | |
| library_suggestions = suggest_libraries(optimized_code) | |
| st.sidebar.subheader("Suggested Libraries and Frameworks") | |
| st.sidebar.write(library_suggestions) | |
| # Add a feature to generate code in multiple programming languages | |
| def translate_code(code, target_language): | |
| """Translate the given code to the specified target language.""" | |
| translated_code = generate_response(f"Translate the following code to {target_language}:\n\n{code}") | |
| return translated_code | |
| # Add this new function to the Streamlit UI | |
| if task_type == "Code Generation": | |
| target_language = st.sidebar.selectbox("Select target language for translation", ["Python", "JavaScript", "Java", "C++", "Go"]) | |
| if st.sidebar.button("Translate Code"): | |
| translated_code = translate_code(optimized_code, target_language) | |
| st.sidebar.subheader(f"Translated Code ({target_language})") | |
| st.sidebar.code(translated_code) | |
| # Add a feature to generate a README file for the project | |
| def generate_readme(project_description, code): | |
| """Generate a README file for the project based on the description and code.""" | |
| readme_content = generate_response(f"Generate a README.md file for the following project:\n\nDescription: {project_description}\n\nCode:\n{code}") | |
| return readme_content | |
| # Add this new function to the Streamlit UI | |
| if task_type == "Code Generation": | |
| if st.sidebar.button("Generate README"): | |
| project_description = st.sidebar.text_area("Enter project description:") | |
| if project_description: | |
| readme_content = generate_readme(project_description, optimized_code) | |
| st.sidebar.subheader("Generated README.md") | |
| st.sidebar.markdown(readme_content) | |
| # Add a feature to suggest code refactoring | |
| def suggest_refactoring(code): | |
| """Suggest code refactoring improvements for the given code.""" | |
| refactoring_suggestions = generate_response(f"Suggest code refactoring improvements for the following code:\n\n{code}") | |
| return refactoring_suggestions | |
| # Add this new function to the Streamlit UI | |
| if task_type == "Code Generation": | |
| if st.sidebar.button("Suggest Refactoring"): | |
| refactoring_suggestions = suggest_refactoring(optimized_code) | |
| st.sidebar.subheader("Refactoring Suggestions") | |
| st.sidebar.write(refactoring_suggestions) | |
| # Add a feature to generate sample test data | |
| def generate_test_data(code): | |
| """Generate sample test data for the given code.""" | |
| test_data = generate_response(f"Generate sample test data for the following code:\n\n{code}") | |
| return test_data | |
| # Add this new function to the Streamlit UI | |
| if task_type == "Code Generation": | |
| if st.sidebar.button("Generate Test Data"): | |
| test_data = generate_test_data(optimized_code) | |
| st.sidebar.subheader("Generated Test Data") | |
| st.sidebar.code(test_data) | |
| # Main execution | |
| if __name__ == "__main__": | |
| st.sidebar.header("About") | |
| st.sidebar.info("This Ultra AI Code Assistant is powered by advanced AI models and incorporates expertise across multiple domains including software development, machine learning, data analysis, and more.") | |
| st.sidebar.header("Feedback") | |
| feedback = st.sidebar.text_area("Please provide any feedback or suggestions:") | |
| if st.sidebar.button("Submit Feedback"): | |
| # Here you would typically send this feedback to a database or email | |
| st.sidebar.success("Thank you for your feedback!") |