import base64 import io import ast import traceback import os from threading import Thread import dash from dash import dcc, html, Input, Output, State, callback_context import dash_bootstrap_components as dbc import pandas as pd import plotly.graph_objs as go import google.generativeai as genai # Initialize Dash app app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP]) # Layout app.layout = dbc.Container([ html.H1("Data Analysis Dashboard", className="my-4"), dbc.Card([ dbc.CardBody([ dcc.Upload( id='upload-data', children=html.Div([ 'Drag and Drop or ', html.A('Select Files') ]), style={ 'width': '100%', 'height': '60px', 'lineHeight': '60px', 'borderWidth': '1px', 'borderStyle': 'dashed', 'borderRadius': '5px', 'textAlign': 'center', 'margin': '10px' }, multiple=False ), html.Div(id='upload-feedback', className="mt-2"), html.Div([ html.Span(id='filename-display', className="mr-2"), dbc.Button("Delete File", id="delete-file-button", color="danger", className="mt-2", style={'display': 'none'}) ], className="mt-2"), dbc.Input(id="instructions", placeholder="Describe the analysis you want...", type="text", className="mt-3"), dbc.Button("Generate Insights", id="submit-button", color="primary", className="mt-3"), ]) ], className="mb-4"), html.Div(id="error-message", className="text-danger mb-3"), dcc.Loading( id="loading-visualizations", type="default", children=[ dbc.Card([ dbc.CardBody([ dcc.Graph(id='visualization-1'), dcc.Graph(id='visualization-2'), dcc.Graph(id='visualization-3'), ]) ]) ] ), dcc.Store(id='uploaded-data') ], fluid=True) def parse_contents(contents, filename): content_type, content_string = contents.split(',') decoded = base64.b64decode(content_string) try: if 'csv' in filename: df = pd.read_csv(io.StringIO(decoded.decode('utf-8'))) elif 'xls' in filename: df = pd.read_excel(io.BytesIO(decoded)) else: return None return df except Exception as e: print(e) return None def process_data(df, instructions): try: # Get API key from environment variable api_key = os.getenv('GEMINI_API_KEY') if not api_key: raise ValueError("Gemini API key not found in environment variables") # Initialize Gemini with provided API key genai.configure(api_key=api_key) model = genai.GenerativeModel('gemini-2.5-pro-preview-03-25') # Generate visualization code response = model.generate_content(f""" Analyze the following dataset and instructions: Data columns: {list(df.columns)} Data shape: {df.shape} Instructions: {instructions} Based on this, create 3 appropriate visualizations that provide meaningful insights. For each visualization: 1. Choose the most suitable plot type (bar, line, scatter, hist, pie, heatmap) 2. Determine appropriate data aggregation (e.g., top 5 categories, yearly averages) 3. Select relevant columns for x-axis, y-axis, and any additional dimensions (color, size) 4. Provide a clear, concise title that explains the insight Consider data density and choose visualizations that simplify and clarify the information. Limit the number of data points displayed to ensure readability (e.g., top 5, top 10, yearly). Return your response as a Python list of dictionaries: [ {{"title": "...", "plot_type": "...", "x": "...", "y": "...", "agg_func": "...", "top_n": ..., "additional": {{"color": "...", "size": "..."}}}}, {{"title": "...", "plot_type": "...", "x": "...", "y": "...", "agg_func": "...", "top_n": ..., "additional": {{"color": "...", "size": "..."}}}}, {{"title": "...", "plot_type": "...", "x": "...", "y": "...", "agg_func": "...", "top_n": ..., "additional": {{"color": "...", "size": "..."}}}} ] """) # Extract code block safely code_block = response.text if '```python' in code_block: code_block = code_block.split('```python')[1].split('```')[0].strip() elif '```' in code_block: code_block = code_block.split('```')[1].strip() plots = ast.literal_eval(code_block) return plots except Exception as e: print(f"Error in process_data: {str(e)}") return None def generate_plot(df, plot_info): plot_df = df.copy() if plot_info['agg_func'] == 'sum': plot_df = plot_df.groupby(plot_info['x'])[plot_info['y']].sum().reset_index() elif plot_info['agg_func'] == 'mean': plot_df = plot_df.groupby(plot_info['x'])[plot_info['y']].mean().reset_index() elif plot_info['agg_func'] == 'count': plot_df = plot_df.groupby(plot_info['x']).size().reset_index(name=plot_info['y']) if 'top_n' in plot_info and plot_info['top_n']: plot_df = plot_df.nlargest(plot_info['top_n'], plot_info['y']) if plot_info['plot_type'] == 'bar': fig = go.Figure(go.Bar(x=plot_df[plot_info['x']], y=plot_df[plot_info['y']])) elif plot_info['plot_type'] == 'line': fig = go.Figure(go.Scatter(x=plot_df[plot_info['x']], y=plot_df[plot_info['y']], mode='lines')) elif plot_info['plot_type'] == 'scatter': fig = go.Figure(go.Scatter(x=plot_df[plot_info['x']], y=plot_df[plot_info['y']], mode='markers')) elif plot_info['plot_type'] == 'hist': fig = go.Figure(go.Histogram(x=plot_df[plot_info['x']])) elif plot_info['plot_type'] == 'pie': fig = go.Figure(go.Pie(labels=plot_df[plot_info['x']], values=plot_df[plot_info['y']])) elif plot_info['plot_type'] == 'heatmap': pivot_df = plot_df.pivot(index=plot_info['x'], columns=plot_info['additional']['color'], values=plot_info['y']) fig = go.Figure(go.Heatmap(z=pivot_df.values, x=pivot_df.columns, y=pivot_df.index)) fig.update_layout(title=plot_info['title'], xaxis_title=plot_info['x'], yaxis_title=plot_info['y']) return fig @app.callback( [Output('upload-feedback', 'children'), Output('filename-display', 'children'), Output('delete-file-button', 'style'), Output('uploaded-data', 'data')], [Input('upload-data', 'contents'), Input('delete-file-button', 'n_clicks')], [State('upload-data', 'filename')] ) def update_upload_feedback(contents, delete_clicks, filename): ctx = callback_context if not ctx.triggered: return dash.no_update, dash.no_update, dash.no_update, dash.no_update trigger_id = ctx.triggered[0]['prop_id'].split('.')[0] if trigger_id == 'delete-file-button': return "File deleted.", "", {'display': 'none'}, None if contents is not None: df = parse_contents(contents, filename) if df is not None: return ( dbc.Alert("File uploaded successfully!", color="success"), f"Uploaded: {filename}", {'display': 'inline-block'}, contents ) else: return ( dbc.Alert("Error parsing the file. Please upload a valid CSV or Excel file.", color="danger"), "", {'display': 'none'}, None ) return dash.no_update, dash.no_update, dash.no_update, dash.no_update @app.callback( [Output('visualization-1', 'figure'), Output('visualization-2', 'figure'), Output('visualization-3', 'figure'), Output('error-message', 'children')], [Input('submit-button', 'n_clicks')], [State('uploaded-data', 'data'), State('upload-data', 'filename'), State('instructions', 'value')] ) def update_output(n_clicks, contents, filename, instructions): if n_clicks is None or contents is None: return dash.no_update, dash.no_update, dash.no_update, "" try: df = parse_contents(contents, filename) if df is None: return dash.no_update, dash.no_update, dash.no_update, "Unable to parse the uploaded file." plots = process_data(df, instructions) if plots is None or len(plots) < 3: return dash.no_update, dash.no_update, dash.no_update, "Unable to generate visualizations. Please check your instructions and try again." figures = [generate_plot(df, plot_info) for plot_info in plots[:3]] return figures[0], figures[1], figures[2], "" except Exception as e: error_message = f"An error occurred: {str(e)}" return dash.no_update, dash.no_update, dash.no_update, error_message if __name__ == '__main__': print("Starting the Dash application...") app.run(debug=False, host='0.0.0.0', port=7860) print("Dash application has finished running.")