Spaces:
Paused
Paused
| # unsupervised.py - Módulo para models | |
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from sklearn.preprocessing import StandardScaler | |
| from sklearn.decomposition import PCA | |
| from sklearn.cluster import KMeans, DBSCAN, AgglomerativeClustering | |
| from sklearn.manifold import TSNE | |
| from sklearn.metrics import silhouette_score, calinski_harabasz_score, davies_bouldin_score | |
| import google.generativeai as genai | |
| import umap | |
| class UnsupervisedAnalyzer: | |
| def __init__(self, data): | |
| self.data = data | |
| self.scaler = StandardScaler() | |
| def preprocess_data(self, feature_cols): | |
| """Escalar datos seleccionados""" | |
| X = self.data[feature_cols] | |
| return self.scaler.fit_transform(X) | |
| def perform_kmeans(self, X_scaled, n_clusters): | |
| """Realizar clustering K-Means""" | |
| kmeans = KMeans( | |
| n_clusters=n_clusters, | |
| random_state=42, | |
| n_init=10 | |
| ) | |
| clusters = kmeans.fit_predict(X_scaled) | |
| # Calcular métricas | |
| metrics = { | |
| 'Silhouette Score': silhouette_score(X_scaled, clusters), | |
| 'Calinski-Harabasz Score': calinski_harabasz_score(X_scaled, clusters), | |
| 'Davies-Bouldin Score': davies_bouldin_score(X_scaled, clusters) | |
| } | |
| return { | |
| 'clusters': clusters, | |
| 'metrics': metrics, | |
| 'centroids': kmeans.cluster_centers_ | |
| } | |
| def perform_dbscan(self, X_scaled, eps, min_samples): | |
| """Realizar clustering DBSCAN""" | |
| dbscan = DBSCAN(eps=eps, min_samples=min_samples) | |
| clusters = dbscan.fit_predict(X_scaled) | |
| # Calcular métricas | |
| unique_clusters = np.setdiff1d(np.unique(clusters), [-1]) | |
| metrics = { | |
| 'Noise Points': np.sum(clusters == -1), | |
| 'Number of Clusters': len(unique_clusters) | |
| } | |
| # Solo calcular métricas si hay clusters válidos | |
| if len(unique_clusters) > 0: | |
| non_noise_mask = clusters != -1 | |
| metrics.update({ | |
| 'Silhouette Score': silhouette_score(X_scaled[non_noise_mask], clusters[non_noise_mask]), | |
| 'Calinski-Harabasz Score': calinski_harabasz_score(X_scaled[non_noise_mask], clusters[non_noise_mask]), | |
| 'Davies-Bouldin Score': davies_bouldin_score(X_scaled[non_noise_mask], clusters[non_noise_mask]) | |
| }) | |
| else: | |
| metrics.update({ | |
| 'Silhouette Score': None, | |
| 'Calinski-Harabasz Score': None, | |
| 'Davies-Bouldin Score': None | |
| }) | |
| return { | |
| 'clusters': clusters, | |
| 'metrics': metrics | |
| } | |
| def perform_hierarchical_clustering(self, X_scaled, n_clusters): | |
| """Realizar clustering jerárquico""" | |
| hierarchical = AgglomerativeClustering(n_clusters=n_clusters) | |
| clusters = hierarchical.fit_predict(X_scaled) | |
| # Calcular métricas | |
| metrics = { | |
| 'Silhouette Score': silhouette_score(X_scaled, clusters), | |
| 'Calinski-Harabasz Score': calinski_harabasz_score(X_scaled, clusters), | |
| 'Davies-Bouldin Score': davies_bouldin_score(X_scaled, clusters) | |
| } | |
| return { | |
| 'clusters': clusters, | |
| 'metrics': metrics | |
| } | |
| def perform_dimensionality_reduction(self, X_scaled, method='PCA', n_components=2): | |
| """Realizar reducción de dimensionalidad""" | |
| if method == 'PCA': | |
| reducer = PCA(n_components=n_components) | |
| reduced_data = reducer.fit_transform(X_scaled) | |
| return { | |
| 'reduced_data': reduced_data, | |
| 'explained_variance': reducer.explained_variance_ratio_ | |
| } | |
| elif method == 't-SNE': | |
| reducer = TSNE(n_components=n_components, random_state=42) | |
| reduced_data = reducer.fit_transform(X_scaled) | |
| return { | |
| 'reduced_data': reduced_data | |
| } | |
| elif method == 'UMAP': | |
| reducer = umap.UMAP(n_components=n_components, random_state=42) | |
| reduced_data = reducer.fit_transform(X_scaled) | |
| return { | |
| 'reduced_data': reduced_data | |
| } | |
| def generate_method_explanation(method, parameters, metrics): | |
| """Generar explicación del método usando Gemini""" | |
| try: | |
| genai.configure(api_key=st.session_state.gemini_api_key) | |
| model = genai.GenerativeModel('gemini-1.5-flash') | |
| # Preparar prompt basado en el método | |
| prompt = f"""Explica detalladamente el método de análisis no supervisado: {method} | |
| Parámetros: | |
| {', '.join([f"{k}: {v}" for k, v in parameters.items()])} | |
| Métricas: | |
| {', '.join([f"{k}: {v}" for k, v in metrics.items()])} | |
| En tu explicación, incluye: | |
| 1. Objetivo principal del método | |
| 2. Cómo funciona el algoritmo | |
| 3. Interpretación de los parámetros | |
| 4. Significado de las métricas | |
| 5. Casos de uso recomendados""" | |
| response = model.generate_content(prompt) | |
| return response.text | |
| except Exception as e: | |
| return f"Error al generar explicación: {str(e)}" | |
| def visualize_clustering(X_scaled, clusters, method_name, n_components=2): | |
| """Visualización de clustering""" | |
| reducer = PCA(n_components=n_components) | |
| X_reduced = reducer.fit_transform(X_scaled) | |
| if n_components == 2: | |
| fig = px.scatter( | |
| x=X_reduced[:, 0], | |
| y=X_reduced[:, 1], | |
| color=clusters.astype(str), | |
| title=f'Clustering {method_name} - Visualización PCA', | |
| labels={'x': 'PCA Componente 1', 'y': 'PCA Componente 2'} | |
| ) | |
| else: | |
| fig = go.Figure(data=[ | |
| go.Scatter3d( | |
| x=X_reduced[:, 0], | |
| y=X_reduced[:, 1], | |
| z=X_reduced[:, 2], | |
| mode='markers', | |
| marker=dict( | |
| size=5, | |
| color=clusters, | |
| colorscale='Viridis', | |
| opacity=0.8 | |
| ) | |
| ) | |
| ]) | |
| fig.update_layout( | |
| title=f'Clustering {method_name} - Visualización 3D', | |
| scene=dict( | |
| xaxis_title='PCA 1', | |
| yaxis_title='PCA 2', | |
| zaxis_title='PCA 3' | |
| ) | |
| ) | |
| return fig | |
| def show_unsupervised_analysis(): | |
| st.title("Análisis No Supervisado") | |
| # Verificar datos preparados | |
| if 'prepared_data' not in st.session_state or st.session_state.prepared_data is None: | |
| st.warning("Por favor, carga y prepara tus datos primero") | |
| return | |
| # Obtener datos | |
| data = st.session_state.prepared_data | |
| # Seleccionar columnas numéricas | |
| numeric_cols = data.select_dtypes(include=['int64', 'float64']).columns.tolist() | |
| if not numeric_cols: | |
| st.error("No hay variables numéricas para análisis no supervisado") | |
| return | |
| # Selección de características | |
| feature_cols = st.multiselect( | |
| "Seleccionar Variables para Análisis", | |
| numeric_cols, | |
| default=numeric_cols[:min(5, len(numeric_cols))] | |
| ) | |
| if not feature_cols: | |
| st.warning("Selecciona al menos una variable") | |
| return | |
| # Inicializar analizador | |
| analyzer = UnsupervisedAnalyzer(data) | |
| X_scaled = analyzer.preprocess_data(feature_cols) | |
| # Selección de métodos | |
| methods = st.multiselect( | |
| "Seleccionar Métodos de Análisis", | |
| [ | |
| "K-Means", | |
| "DBSCAN", | |
| "Clustering Jerárquico", | |
| "PCA", | |
| "t-SNE", | |
| "UMAP" | |
| ] | |
| ) | |
| # Contenedor para resultados | |
| results = {} | |
| # Columnas para visualización | |
| if methods: | |
| cols = st.columns(len(methods)) | |
| for i, method in enumerate(methods): | |
| with cols[i]: | |
| st.subheader(method) | |
| # Parámetros específicos por método | |
| if method == "K-Means": | |
| n_clusters = st.slider( | |
| "Número de Clusters", | |
| min_value=2, | |
| max_value=10, | |
| value=3, | |
| key=f"kmeans_clusters_{i}" | |
| ) | |
| result = analyzer.perform_kmeans(X_scaled, n_clusters) | |
| results['K-Means'] = result | |
| # Visualización | |
| fig = visualize_clustering(X_scaled, result['clusters'], method) | |
| st.plotly_chart(fig) | |
| # Métricas | |
| st.write("Métricas:") | |
| for metric, value in result['metrics'].items(): | |
| st.metric(metric, f"{value:.4f}") | |
| # Explicación con Gemini | |
| if st.session_state.get('gemini_api_key'): | |
| explanation = generate_method_explanation( | |
| method, | |
| {'Número de Clusters': n_clusters}, | |
| result['metrics'] | |
| ) | |
| with st.expander("Explicación del Método"): | |
| st.markdown(explanation) | |
| elif method == "DBSCAN": | |
| eps = st.slider( | |
| "Epsilon", | |
| min_value=0.1, | |
| max_value=2.0, | |
| value=0.5, | |
| key=f"dbscan_eps_{i}" | |
| ) | |
| min_samples = st.slider( | |
| "Mínimo de Muestras", | |
| min_value=2, | |
| max_value=20, | |
| value=5, | |
| key=f"dbscan_min_samples_{i}" | |
| ) | |
| result = analyzer.perform_dbscan(X_scaled, eps, min_samples) | |
| results['DBSCAN'] = result | |
| # Visualización | |
| fig = visualize_clustering(X_scaled, result['clusters'], method) | |
| st.plotly_chart(fig) | |
| # Métricas | |
| st.write("Métricas:") | |
| for metric, value in result['metrics'].items(): | |
| st.metric(metric, str(value)) | |
| # Explicación con Gemini | |
| if st.session_state.get('gemini_api_key'): | |
| explanation = generate_method_explanation( | |
| method, | |
| { | |
| 'Epsilon': eps, | |
| 'Mínimo de Muestras': min_samples | |
| }, | |
| result['metrics'] | |
| ) | |
| with st.expander("Explicación del Método"): | |
| st.markdown(explanation) | |
| elif method == "Clustering Jerárquico": | |
| n_clusters = st.slider( | |
| "Número de Clusters", | |
| min_value=2, | |
| max_value=10, | |
| value=3, | |
| key=f"hierarchical_clusters_{i}" | |
| ) | |
| result = analyzer.perform_hierarchical_clustering(X_scaled, n_clusters) | |
| results['Clustering Jerárquico'] = result | |
| # Visualización | |
| fig = visualize_clustering(X_scaled, result['clusters'], method) | |
| st.plotly_chart(fig) | |
| # Métricas | |
| st.write("Métricas:") | |
| for metric, value in result['metrics'].items(): | |
| st.metric(metric, f"{value:.4f}") | |
| # Explicación con Gemini | |
| if st.session_state.get('gemini_api_key'): | |
| explanation = generate_method_explanation( | |
| method, | |
| {'Número de Clusters': n_clusters}, | |
| result['metrics'] | |
| ) | |
| with st.expander("Explicación del Método"): | |
| st.markdown(explanation) | |
| elif method in ["PCA", "t-SNE", "UMAP"]: | |
| n_components = st.slider( | |
| "Número de Componentes", | |
| min_value=2, | |
| max_value=3, | |
| value=2, | |
| key=f"{method}_components_{i}" | |
| ) | |
| result = analyzer.perform_dimensionality_reduction( | |
| X_scaled, | |
| method=method, | |
| n_components=n_components | |
| ) | |
| results[method] = result | |
| # Visualización de reducción de dimensionalidad | |
| fig = px.scatter( | |
| x=result['reduced_data'][:, 0], | |
| y=result['reduced_data'][:, 1], | |
| title=f'Reducción de Dimensionalidad - {method}' | |
| ) | |
| st.plotly_chart(fig) | |
| # Varianza explicada para PCA | |
| if method == 'PCA': | |
| st.write("Varianza Explicada:") | |
| varianza_df = pd.DataFrame({ | |
| 'Componente': range(1, len(result['explained_variance']) + 1), | |
| 'Varianza Explicada (%)': result['explained_variance'] * 100, | |
| 'Varianza Acumulada (%)': np.cumsum(result['explained_variance']) * 100 | |
| }) | |
| st.dataframe(varianza_df) | |
| # Explicación con Gemini | |
| if st.session_state.get('gemini_api_key'): | |
| explanation = generate_method_explanation( | |
| method, | |
| {'Número de Componentes': n_components}, | |
| {} | |
| ) | |
| with st.expander("Explicación del Método"): | |
| st.markdown(explanation) | |
| # Exportar resultados | |
| if st.button("Exportar Resultados"): | |
| export_data = [] | |
| for method, result in results.items(): | |
| method_data = { | |
| 'Método': method, | |
| 'Variables': ', '.join(feature_cols) | |
| } | |
| # Agregar métricas si están disponibles | |
| if 'metrics' in result: | |
| method_data.update(result['metrics']) | |
| export_data.append(method_data) | |
| export_df = pd.DataFrame(export_data) | |
| csv = export_df.to_csv(index=False).encode('utf-8') | |
| st.download_button( | |
| label="Descargar Resultados", | |
| data=csv, | |
| file_name="unsupervised_analysis_results.csv", | |
| mime="text/csv", | |
| key="download_unsupervised_results" | |
| ) | |
| def show_unsupervised(): | |
| """Función principal para mostrar la página de análisis no supervisado""" | |
| st.title("🔍 Análisis No Supervisado") | |
| # Verificar datos preparados | |
| if 'prepared_data' not in st.session_state or st.session_state.prepared_data is None: | |
| st.warning("Por favor, carga y prepara tus datos primero en la página de Preparación.") | |
| return | |
| # Obtener datos preparados | |
| data = st.session_state.prepared_data | |
| # Sección de selección de variables | |
| st.header("Configuración del Análisis") | |
| # Seleccionar columnas numéricas | |
| numeric_cols = data.select_dtypes(include=['int64', 'float64']).columns.tolist() | |
| if not numeric_cols: | |
| st.error("No hay variables numéricas disponibles para realizar análisis no supervisado.") | |
| return | |
| # Selección de características | |
| st.subheader("Selección de Variables") | |
| feature_cols = st.multiselect( | |
| "Selecciona las variables para el análisis", | |
| numeric_cols, | |
| default=numeric_cols[:min(5, len(numeric_cols))] | |
| ) | |
| if not feature_cols: | |
| st.warning("Por favor, selecciona al menos una variable.") | |
| return | |
| # Inicializar analizador | |
| analyzer = UnsupervisedAnalyzer(data) | |
| X_scaled = analyzer.preprocess_data(feature_cols) | |
| # Sección de métodos de análisis | |
| st.header("Métodos de Análisis") | |
| # Selección de métodos | |
| metodos = st.multiselect( | |
| "Elige los métodos de análisis no supervisado", | |
| [ | |
| "K-Means", | |
| "DBSCAN", | |
| "Clustering Jerárquico", | |
| "Análisis de Componentes Principales (PCA)", | |
| "t-SNE", | |
| "UMAP" | |
| ] | |
| ) | |
| # Contenedor de resultados | |
| resultados = {} | |
| # Procesamiento de métodos seleccionados | |
| if metodos: | |
| # Crear columnas dinámicamente | |
| cols = st.columns(len(metodos)) | |
| for i, metodo in enumerate(metodos): | |
| with cols[i]: | |
| st.subheader(metodo) | |
| # Parámetros específicos por método | |
| if metodo == "K-Means": | |
| n_clusters = st.slider( | |
| "Número de Clusters", | |
| min_value=2, | |
| max_value=10, | |
| value=3, | |
| key=f"kmeans_clusters_{i}" | |
| ) | |
| # Realizar K-Means | |
| resultado = analyzer.perform_kmeans(X_scaled, n_clusters) | |
| resultados[metodo] = resultado | |
| # Visualización | |
| fig = visualize_clustering(X_scaled, resultado['clusters'], metodo) | |
| st.plotly_chart(fig) | |
| # Mostrar métricas | |
| st.subheader("Métricas") | |
| for metrica, valor in resultado['metrics'].items(): | |
| st.metric(metrica, f"{valor:.4f}") | |
| # Explicación con Gemini | |
| if st.session_state.get('gemini_api_key'): | |
| explicacion = generate_method_explanation( | |
| metodo, | |
| {'Número de Clusters': n_clusters}, | |
| resultado['metrics'] | |
| ) | |
| with st.expander("Explicación del Método"): | |
| st.markdown(explicacion) | |
| elif metodo == "DBSCAN": | |
| eps = st.slider( | |
| "Epsilon", | |
| min_value=0.1, | |
| max_value=2.0, | |
| value=0.5, | |
| key=f"dbscan_eps_{i}" | |
| ) | |
| min_samples = st.slider( | |
| "Mínimo de Muestras", | |
| min_value=2, | |
| max_value=20, | |
| value=5, | |
| key=f"dbscan_min_samples_{i}" | |
| ) | |
| # Realizar DBSCAN | |
| resultado = analyzer.perform_dbscan(X_scaled, eps, min_samples) | |
| resultados[metodo] = resultado | |
| # Visualización | |
| fig = visualize_clustering(X_scaled, resultado['clusters'], metodo) | |
| st.plotly_chart(fig) | |
| # Mostrar métricas | |
| st.subheader("Métricas") | |
| for metrica, valor in resultado['metrics'].items(): | |
| st.metric(metrica, str(valor)) | |
| # Explicación con Gemini | |
| if st.session_state.get('gemini_api_key'): | |
| explicacion = generate_method_explanation( | |
| metodo, | |
| { | |
| 'Epsilon': eps, | |
| 'Mínimo de Muestras': min_samples | |
| }, | |
| resultado['metrics'] | |
| ) | |
| with st.expander("Explicación del Método"): | |
| st.markdown(explicacion) | |
| # Continuar con los demás métodos de manera similar... | |
| # Sección de exportación de resultados | |
| if st.button("Exportar Resultados del Análisis"): | |
| # Crear DataFrame con resultados | |
| datos_exportacion = [] | |
| for metodo, resultado in resultados.items(): | |
| datos_metodo = { | |
| 'Método': metodo, | |
| 'Variables': ', '.join(feature_cols) | |
| } | |
| # Agregar métricas si están disponibles | |
| if 'metrics' in resultado: | |
| datos_metodo.update(resultado['metrics']) | |
| datos_exportacion.append(datos_metodo) | |
| df_exportacion = pd.DataFrame(datos_exportacion) | |
| # Descargar CSV | |
| csv = df_exportacion.to_csv(index=False).encode('utf-8') | |
| st.download_button( | |
| label="Descargar Resultados", | |
| data=csv, | |
| file_name="analisis_no_supervisado.csv", | |
| mime="text/csv" | |
| ) | |
| # Función principal para ejecutar el análisis no supervisado | |
| def main(): | |
| show_unsupervised() | |
| if __name__ == "__main__": | |
| main() |