MaheshP98's picture
Update app.py
d881ace verified
import gradio as gr
import pandas as pd
from datetime import datetime, timedelta
import logging
import plotly.express as px
import plotly.graph_objects as go
from sklearn.ensemble import IsolationForest
from concurrent.futures import ThreadPoolExecutor
import os
import io
import time
import asyncio
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Try to import reportlab
try:
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.lib import colors
reportlab_available = True
logging.info("reportlab module successfully imported")
except ImportError:
logging.warning("reportlab module not found. PDF generation disabled.")
reportlab_available = False
# Summarize logs
def summarize_logs(df):
try:
total_devices = df["device_id"].nunique()
total_usage = df["usage_hours"].sum() if "usage_hours" in df.columns else 0
lab_sites = df["lab_site"].nunique() if "lab_site" in df.columns else 0
equipment_types = df["equipment_type"].nunique() if "equipment_type" in df.columns else 0
return f"{total_devices} devices processed with {total_usage:.2f} total usage hours across {lab_sites} lab sites and {equipment_types} equipment types."
except Exception as e:
logging.error(f"Summary generation failed: {str(e)}")
return "Failed to generate summary."
# Anomaly detection
def detect_anomalies(df):
try:
if "usage_hours" not in df.columns or "downtime" not in df.columns:
return "Anomaly detection requires 'usage_hours' and 'downtime' columns.", pd.DataFrame()
features = df[["usage_hours", "downtime"]].fillna(0)
if len(features) > 50:
features = features.sample(n=50, random_state=42)
iso_forest = IsolationForest(contamination=0.1, random_state=42)
df["anomaly"] = iso_forest.fit_predict(features)
anomalies = df[df["anomaly"] == -1][["device_id", "usage_hours", "downtime", "timestamp", "lab_site", "equipment_type"]]
if anomalies.empty:
return "No anomalies detected.", anomalies
return "\n".join([f"- Device ID: {row['device_id']}, Usage: {row['usage_hours']}, Downtime: {row['downtime']}, Timestamp: {row['timestamp']}, Lab Site: {row['lab_site']}, Equipment Type: {row['equipment_type']}" for _, row in anomalies.head(5).iterrows()]), anomalies
except Exception as e:
logging.error(f"Anomaly detection failed: {str(e)}")
return f"Anomaly detection failed: {str(e)}", pd.DataFrame()
# AMC reminders
def check_amc_reminders(df, current_date):
try:
if "device_id" not in df.columns or "amc_date" not in df.columns:
return "AMC reminders require 'device_id' and 'amc_date' columns.", pd.DataFrame()
df["amc_date"] = pd.to_datetime(df["amc_date"], errors='coerce')
current_date = pd.to_datetime(current_date)
df["days_to_amc"] = (df["amc_date"] - current_date).dt.days
reminders = df[(df["days_to_amc"] >= 0) & (df["days_to_amc"] <= 30)][["device_id", "log_type", "status", "timestamp", "usage_hours", "downtime", "amc_date", "lab_site", "equipment_type"]]
if reminders.empty:
return "No AMC reminders due within the next 30 days.", reminders
return "\n".join([f"- Device ID: {row['device_id']}, AMC Date: {row['amc_date']}, Lab Site: {row['lab_site']}, Equipment Type: {row['equipment_type']}" for _, row in reminders.head(5).iterrows()]), reminders
except Exception as e:
logging.error(f"AMC reminder generation failed: {str(e)}")
return f"AMC reminder generation failed: {str(e)}", pd.DataFrame()
# Dashboard insights
def generate_dashboard_insights(df):
try:
total_devices = df["device_id"].nunique()
avg_usage = df["usage_hours"].mean() if "usage_hours" in df.columns else 0
lab_sites = df["lab_site"].unique().tolist() if "lab_site" in df.columns else []
equipment_types = df["equipment_type"].unique().tolist() if "equipment_type" in df.columns else []
return f"{total_devices} devices with average usage of {avg_usage:.2f} hours. Lab Sites: {', '.join(lab_sites)}. Equipment Types: {', '.join(equipment_types)}."
except Exception as e:
logging.error(f"Dashboard insights generation failed: {str(e)}")
return "Failed to generate insights."
# Placeholder chart for empty data
def create_placeholder_chart(title):
fig = go.Figure()
fig.add_annotation(
text="No data available for this chart",
xref="paper", yref="paper",
x=0.5, y=0.5, showarrow=False,
font=dict(size=16)
)
fig.update_layout(title=title, margin=dict(l=20, r=20, t=40, b=20))
return fig
# Create usage chart
def create_usage_chart(df):
try:
if df.empty or "usage_hours" not in df.columns or "device_id" not in df.columns:
logging.warning("Insufficient data for usage chart")
return create_placeholder_chart("Usage Hours per Device")
usage_data = df.groupby("device_id")["usage_hours"].sum().reset_index()
if len(usage_data) > 5:
usage_data = usage_data.nlargest(5, "usage_hours")
fig = px.bar(
usage_data,
x="device_id",
y="usage_hours",
title="Usage Hours per Device",
labels={"device_id": "Device ID", "usage_hours": "Usage Hours"}
)
fig.update_layout(title_font_size=16, margin=dict(l=20, r=20, t=40, b=20))
return fig
except Exception as e:
logging.error(f"Failed to create usage chart: {str(e)}")
return create_placeholder_chart("Usage Hours per Device")
# Create downtime chart
def create_downtime_chart(df):
try:
if df.empty or "downtime" not in df.columns or "device_id" not in df.columns:
logging.warning("Insufficient data for downtime chart")
return create_placeholder_chart("Downtime per Device")
downtime_data = df.groupby("device_id")["downtime"].sum().reset_index()
if len(downtime_data) > 5:
downtime_data = downtime_data.nlargest(5, "downtime")
fig = px.bar(
downtime_data,
x="device_id",
y="downtime",
title="Downtime per Device",
labels={"device_id": "Device ID", "downtime": "Downtime (Hours)"}
)
fig.update_layout(title_font_size=16, margin=dict(l=20, r=20, t=40, b=20))
return fig
except Exception as e:
logging.error(f"Failed to create downtime chart: {str(e)}")
return create_placeholder_chart("Downtime per Device")
# Create daily log trends chart
def create_daily_log_trends_chart(df):
try:
if df.empty or "timestamp" not in df.columns:
logging.warning("Insufficient data for daily log trends chart")
return create_placeholder_chart("Daily Log Trends")
df['date'] = pd.to_datetime(df['timestamp'], errors='coerce').dt.date
daily_logs = df.groupby('date').size().reset_index(name='log_count')
if daily_logs.empty:
return create_placeholder_chart("Daily Log Trends")
fig = px.line(
daily_logs,
x='date',
y='log_count',
title="Daily Log Trends",
labels={"date": "Date", "log_count": "Number of Logs"}
)
fig.update_layout(title_font_size=16, margin=dict(l=20, r=20, t=40, b=20))
return fig
except Exception as e:
logging.error(f"Failed to create daily log trends chart: {str(e)}")
return create_placeholder_chart("Daily Log Trends")
# Create weekly uptime chart
def create_weekly_uptime_chart(df):
try:
if df.empty or "timestamp" not in df.columns or "usage_hours" not in df.columns or "downtime" not in df.columns:
logging.warning("Insufficient data for weekly uptime chart")
return create_placeholder_chart("Weekly Uptime Percentage")
df['week'] = pd.to_datetime(df['timestamp'], errors='coerce').dt.isocalendar().week
df['year'] = pd.to_datetime(df['timestamp'], errors='coerce').dt.year
weekly_data = df.groupby(['year', 'week']).agg({
'usage_hours': 'sum',
'downtime': 'sum'
}).reset_index()
weekly_data['uptime_percent'] = (weekly_data['usage_hours'] / (weekly_data['usage_hours'] + weekly_data['downtime'])) * 100
weekly_data['year_week'] = weekly_data['year'].astype(str) + '-W' + weekly_data['week'].astype(str)
if weekly_data.empty:
return create_placeholder_chart("Weekly Uptime Percentage")
fig = px.bar(
weekly_data,
x='year_week',
y='uptime_percent',
title="Weekly Uptime Percentage",
labels={"year_week": "Year-Week", "uptime_percent": "Uptime %"}
)
fig.update_layout(title_font_size=16, margin=dict(l=20, r=20, t=40, b=20))
return fig
except Exception as e:
logging.error(f"Failed to create weekly uptime chart: {str(e)}")
return create_placeholder_chart("Weekly Uptime Percentage")
# Create anomaly alerts chart
def create_anomaly_alerts_chart(anomalies_df):
try:
if anomalies_df is None or anomalies_df.empty or "timestamp" not in anomalies_df.columns:
logging.warning("Insufficient data for anomaly alerts chart")
return create_placeholder_chart("Anomaly Alerts Over Time")
anomalies_df['date'] = pd.to_datetime(anomalies_df['timestamp'], errors='coerce').dt.date
anomaly_counts = anomalies_df.groupby('date').size().reset_index(name='anomaly_count')
if anomaly_counts.empty:
return create_placeholder_chart("Anomaly Alerts Over Time")
fig = px.scatter(
anomaly_counts,
x='date',
y='anomaly_count',
title="Anomaly Alerts Over Time",
labels={"date": "Date", "anomaly_count": "Number of Anomalies"}
)
fig.update_layout(title_font_size=16, margin=dict(l=20, r=20, t=40, b=20))
return fig
except Exception as e:
logging.error(f"Failed to create anomaly alerts chart: {str(e)}")
return create_placeholder_chart("Anomaly Alerts Over Time")
# Generate device cards
def generate_device_cards(df):
try:
if df.empty:
return '<p>No devices available to display.</p>'
device_stats = df.groupby('device_id').agg({
'status': 'last',
'timestamp': 'max',
'lab_site': 'last',
'equipment_type': 'last'
}).reset_index()
device_stats['count'] = df.groupby('device_id').size().reindex(device_stats['device_id']).values
device_stats['health'] = device_stats['status'].map({
'Active': 'Healthy',
'Inactive': 'Unhealthy',
'Pending': 'Warning'
}).fillna('Unknown')
cards_html = '<div style="display: flex; flex-wrap: wrap; gap: 20px;">'
for _, row in device_stats.iterrows():
health_color = {'Healthy': 'green', 'Unhealthy': 'red', 'Warning': 'orange', 'Unknown': 'gray'}.get(row['health'], 'gray')
timestamp_str = str(row['timestamp']) if pd.notna(row['timestamp']) else 'Unknown'
lab_site = row['lab_site'] if pd.notna(row['lab_site']) else 'Unknown'
equipment_type = row['equipment_type'] if pd.notna(row['equipment_type']) else 'Unknown'
cards_html += f"""
<div style="border: 1px solid #e0e0e0; padding: 10px; border-radius: 5px; width: 200px;">
<h4>Device: {row['device_id']}</h4>
<p><b>Health:</b> <span style="color: {health_color}">{row['health']}</span></p>
<p><b>Lab Site:</b> {lab_site}</p>
<p><b>Equipment Type:</b> {equipment_type}</p>
<p><b>Usage Count:</b> {row['count']}</p>
<p><b>Last Log:</b> {timestamp_str}</p>
</div>
"""
cards_html += '</div>'
return cards_html
except Exception as e:
logging.error(f"Failed to generate device cards: {str(e)}")
return f'<p>Error generating device cards: {str(e)}</p>'
# Generate PDF content
def generate_pdf_content(summary, preview_df, anomalies, amc_reminders, insights, device_cards_html, daily_log_chart, weekly_uptime_chart, anomaly_alerts_chart, downtime_chart):
if not reportlab_available:
return None
try:
pdf_path = f"status_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf"
doc = SimpleDocTemplate(pdf_path, pagesize=letter)
styles = getSampleStyleSheet()
story = []
def safe_paragraph(text, style):
return Paragraph(str(text).replace('\n', '<br/>'), style) if text else Paragraph("", style)
story.append(Paragraph("LabOps Status Report", styles['Title']))
story.append(Paragraph(f"Generated on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", styles['Normal']))
story.append(Spacer(1, 12))
story.append(Paragraph("Summary Report", styles['Heading2']))
story.append(safe_paragraph(summary, styles['Normal']))
story.append(Spacer(1, 12))
story.append(Paragraph("Log Preview", styles['Heading2']))
if not preview_df.empty:
data = [preview_df.columns.tolist()] + preview_df.head(5).values.tolist()
table = Table(data)
table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.grey),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'CENTER'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 12),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('BACKGROUND', (0, 1), (-1, -1), colors.beige),
('TEXTCOLOR', (0, 1), (-1, -1), colors.black),
('FONTNAME', (0, 1), (-1, -1), 'Helvetica'),
('FONTSIZE', (0, 1), (-1, -1), 10),
('GRID', (0, 0), (-1, -1), 1, colors.black)
]))
story.append(table)
else:
story.append(safe_paragraph("No preview available.", styles['Normal']))
story.append(Spacer(1, 12))
story.append(Paragraph("Device Cards", styles['Heading2']))
device_cards_text = device_cards_html.replace('<div>', '').replace('</div>', '\n').replace('<h4>', '').replace('</h4>', '\n').replace('<p>', '').replace('</p>', '\n').replace('<b>', '').replace('</b>', '').replace('<span style="color: green">', '').replace('<span style="color: red">', '').replace('<span style="color: orange">', '').replace('<span style="color: gray">', '').replace('</span>', '')
story.append(safe_paragraph(device_cards_text, styles['Normal']))
story.append(Spacer(1, 12))
story.append(Paragraph("Anomaly Detection", styles['Heading2']))
story.append(safe_paragraph(anomalies, styles['Normal']))
story.append(Spacer(1, 12))
story.append(Paragraph("AMC Reminders", styles['Heading2']))
story.append(safe_paragraph(amc_reminders, styles['Normal']))
story.append(Spacer(1, 12))
story.append(Paragraph("Dashboard Insights", styles['Heading2']))
story.append(safe_paragraph(insights, styles['Normal']))
story.append(Spacer(1, 12))
story.append(Paragraph("Charts", styles['Heading2']))
story.append(Paragraph("[Chart placeholders - see dashboard for visuals]", styles['Normal']))
doc.build(story)
logging.info(f"PDF generated at {pdf_path}")
return pdf_path
except Exception as e:
logging.error(f"Failed to generate PDF: {str(e)}")
return None
# Main processing function
async def process_logs(file_obj, lab_site_filter, equipment_type_filter, date_range, cached_df_state, last_modified_state):
start_time = time.time()
try:
if not file_obj:
return "No file uploaded.", "<p>No data available.</p>", None, '<p>No device cards available.</p>', None, None, None, None, "No anomalies detected.", "No AMC reminders.", "No insights generated.", None, cached_df_state, last_modified_state
file_path = file_obj.name
current_modified_time = os.path.getmtime(file_path)
# Read file only if it's new or modified
if cached_df_state is None or current_modified_time != last_modified_state:
logging.info(f"Processing new or modified file: {file_path}")
if not file_path.endswith(".csv"):
return "Please upload a CSV file.", "<p>Invalid file format.</p>", None, '<p>No device cards available.</p>', None, None, None, None, "", "", "", None, cached_df_state, last_modified_state
required_columns = ["device_id", "log_type", "status", "timestamp", "usage_hours", "downtime", "amc_date"]
dtypes = {
"device_id": "string",
"log_type": "string",
"status": "string",
"usage_hours": "float32",
"downtime": "float32",
"amc_date": "string",
"lab_site": "string",
"equipment_type": "string"
}
df = pd.read_csv(file_path, dtype=dtypes)
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
return f"Missing columns: {missing_columns}", "<p>Missing required columns.</p>", None, '<p>No device cards available.</p>', None, None, None, None, "", "", "", None, cached_df_state, last_modified_state
df["timestamp"] = pd.to_datetime(df["timestamp"], errors='coerce')
df["amc_date"] = pd.to_datetime(df["amc_date"], errors='coerce')
if df["timestamp"].dt.tz is None:
df["timestamp"] = df["timestamp"].dt.tz_localize('UTC').dt.tz_convert('Asia/Kolkata')
if df.empty:
return "No data available.", "<p>No data available.</p>", None, '<p>No device cards available.</p>', None, None, None, None, "", "", "", None, df, current_modified_time
else:
df = cached_df_state
# Apply filters
filtered_df = df.copy()
if lab_site_filter and lab_site_filter != 'All' and 'lab_site' in filtered_df.columns:
filtered_df = filtered_df[filtered_df['lab_site'] == lab_site_filter]
if equipment_type_filter and equipment_type_filter != 'All' and 'equipment_type' in filtered_df.columns:
filtered_df = filtered_df[filtered_df['equipment_type'] == equipment_type_filter]
if date_range and len(date_range) == 2:
days_start, days_end = date_range
today = pd.to_datetime(datetime.now()).tz_localize('Asia/Kolkata')
start_date = today + pd.Timedelta(days=days_start)
end_date = today + pd.Timedelta(days=days_end) + pd.Timedelta(days=1) - pd.Timedelta(seconds=1)
start_date = start_date.tz_convert('Asia/Kolkata') if start_date.tzinfo else start_date.tz_localize('Asia/Kolkata')
end_date = end_date.tz_convert('Asia/Kolkata') if end_date.tzinfo else end_date.tz_localize('Asia/Kolkata')
logging.info(f"Date range filter: start_date={start_date}, end_date={end_date}")
logging.info(f"Before date filter: {len(filtered_df)} rows")
filtered_df = filtered_df[(filtered_df['timestamp'] >= start_date) & (filtered_df['timestamp'] <= end_date)]
logging.info(f"After date filter: {len(filtered_df)} rows")
if filtered_df.empty:
return "No data after applying filters.", "<p>No data after filters.</p>", None, '<p>No device cards available.</p>', None, None, None, None, "", "", "", None, df, current_modified_time
# Generate table for preview
preview_df = filtered_df[['device_id', 'log_type', 'status', 'timestamp', 'usage_hours', 'downtime', 'amc_date', 'lab_site', 'equipment_type']].head(5)
preview_html = preview_df.to_html(index=False, classes='table table-striped', border=0)
# Run critical tasks concurrently
with ThreadPoolExecutor(max_workers=2) as executor:
future_anomalies = executor.submit(detect_anomalies, filtered_df)
future_amc = executor.submit(check_amc_reminders, filtered_df, datetime.now())
summary = f"Step 1: Summary Report\n{summarize_logs(filtered_df)}"
anomalies, anomalies_df = future_anomalies.result()
anomalies = f"Anomaly Detection\n{anomalies}"
amc_reminders, reminders_df = future_amc.result()
amc_reminders = f"AMC Reminders\n{amc_reminders}"
insights = f"Dashboard Insights\n{generate_dashboard_insights(filtered_df)}"
# Generate charts sequentially
usage_chart = create_usage_chart(filtered_df)
downtime_chart = create_downtime_chart(filtered_df)
daily_log_chart = create_daily_log_trends_chart(filtered_df)
weekly_uptime_chart = create_weekly_uptime_chart(filtered_df)
anomaly_alerts_chart = create_anomaly_alerts_chart(anomalies_df)
device_cards = generate_device_cards(filtered_df)
elapsed_time = time.time() - start_time
logging.info(f"Processing completed in {elapsed_time:.2f} seconds")
if elapsed_time > 3:
logging.warning(f"Processing time exceeded 3 seconds: {elapsed_time:.2f} seconds")
return (summary, preview_html, usage_chart, device_cards, daily_log_chart, weekly_uptime_chart, anomaly_alerts_chart, downtime_chart, anomalies, amc_reminders, insights, None, df, current_modified_time)
except Exception as e:
logging.error(f"Failed to process file: {str(e)}")
return f"Error: {str(e)}", "<p>Error processing data.</p>", None, '<p>Error processing data.</p>', None, None, None, None, "", "", "", None, cached_df_state, last_modified_state
# Generate PDF separately
async def generate_pdf(summary, preview_html, usage_chart, device_cards, daily_log_chart, weekly_uptime_chart, anomaly_alerts_chart, downtime_chart, anomalies, amc_reminders, insights):
try:
preview_df = pd.read_html(preview_html)[0]
pdf_file = generate_pdf_content(summary, preview_df, anomalies, amc_reminders, insights, device_cards, daily_log_chart, weekly_uptime_chart, anomaly_alerts_chart, downtime_chart)
return pdf_file
except Exception as e:
logging.error(f"Failed to generate PDF: {str(e)}")
return None
# Update filters
def update_filters(file_obj, current_file_state):
if not file_obj or file_obj.name == current_file_state:
return gr.update(), gr.update(), current_file_state
try:
with open(file_obj.name, 'rb') as f:
csv_content = f.read().decode('utf-8')
df = pd.read_csv(io.StringIO(csv_content))
df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce')
lab_site_options = ['All'] + [site for site in df['lab_site'].dropna().astype(str).unique().tolist() if site.strip()] if 'lab_site' in df.columns else ['All']
equipment_type_options = ['All'] + [equip for equip in df['equipment_type'].dropna().astype(str).unique().tolist() if equip.strip()] if 'equipment_type' in df.columns else ['All']
return gr.update(choices=lab_site_options, value='All'), gr.update(choices=equipment_type_options, value='All'), file_obj.name
except Exception as e:
logging.error(f"Failed to update filters: {str(e)}")
return gr.update(choices=['All'], value='All'), gr.update(choices=['All'], value='All'), current_file_state
# Gradio Interface
try:
logging.info("Initializing Gradio interface...")
with gr.Blocks(css="""
.dashboard-container {border: 1px solid #e0e0e0; padding: 10px; border-radius: 5px;}
.dashboard-title {font-size: 24px; font-weight: bold; margin-bottom: 5px;}
.dashboard-section {margin-bottom: 20px;}
.dashboard-section h3 {font-size: 18px; margin-bottom: 2px;}
.dashboard-section p {margin: 1px 0; line-height: 1.2;}
.dashboard-section ul {margin: 2px 0; padding-left: 20px;}
.table {width: 100%; border-collapse: collapse;}
.table th, .table td {border: 1px solid #ddd; padding: 8px; text-align: left;}
.table th {background-color: #f2f2f2;}
.table tr:nth-child(even) {background-color: #f9f9f9;}
""") as iface:
gr.Markdown("<h1>LabOps Log Analyzer Dashboard</h1>")
gr.Markdown("Upload a CSV file to analyze. Click 'Analyze' to refresh the dashboard. Use 'Export PDF' for report download.")
last_modified_state = gr.State(value=None)
current_file_state = gr.State(value=None)
cached_df_state = gr.State(value=None)
with gr.Row():
with gr.Column(scale=1):
file_input = gr.File(label="Upload Logs (CSV)", file_types=[".csv"])
with gr.Group():
gr.Markdown("### Filters")
lab_site_filter = gr.Dropdown(label="Lab Site", choices=['All'], value='All', interactive=True)
equipment_type_filter = gr.Dropdown(label="Equipment Type", choices=['All'], value='All', interactive=True)
date_range_filter = gr.Slider(label="Date Range (Days from Today, e.g., -7 to 0 means last 7 days)", minimum=-365, maximum=0, step=1, value=[-7, 0])
submit_button = gr.Button("Analyze", variant="primary")
pdf_button = gr.Button("Export PDF", variant="secondary")
with gr.Column(scale=2):
with gr.Group(elem_classes="dashboard-container"):
gr.Markdown("<div class='dashboard-title'>Analysis Results</div>")
with gr.Group(elem_classes="dashboard-section"):
gr.Markdown("### Step 1: Summary Report")
summary_output = gr.Markdown()
with gr.Group(elem_classes="dashboard-section"):
gr.Markdown("### Step 2: Log Preview")
preview_output = gr.HTML()
with gr.Group(elem_classes="dashboard-section"):
gr.Markdown("### Device Cards")
device_cards_output = gr.HTML()
with gr.Group(elem_classes="dashboard-section"):
gr.Markdown("### Charts")
with gr.Tab("Usage Hours per Device"):
usage_chart_output = gr.Plot()
with gr.Tab("Downtime per Device"):
downtime_chart_output = gr.Plot()
with gr.Tab("Daily Log Trends"):
daily_log_trends_output = gr.Plot()
with gr.Tab("Weekly Uptime Percentage"):
weekly_uptime_output = gr.Plot()
with gr.Tab("Anomaly Alerts"):
anomaly_alerts_output = gr.Plot()
with gr.Group(elem_classes="dashboard-section"):
gr.Markdown("### Step 4: Anomaly Detection")
anomaly_output = gr.Markdown()
with gr.Group(elem_classes="dashboard-section"):
gr.Markdown("### Step 5: AMC Reminders")
amc_output = gr.Markdown()
with gr.Group(elem_classes="dashboard-section"):
gr.Markdown("### Step 6: Insights")
insights_output = gr.Markdown()
with gr.Group(elem_classes="dashboard-section"):
gr.Markdown("### Export Report")
pdf_output = gr.File(label="Download Status Report as PDF")
file_input.change(
fn=update_filters,
inputs=[file_input, current_file_state],
outputs=[lab_site_filter, equipment_type_filter, current_file_state],
queue=False
)
submit_button.click(
fn=process_logs,
inputs=[file_input, lab_site_filter, equipment_type_filter, date_range_filter, cached_df_state, last_modified_state],
outputs=[summary_output, preview_output, usage_chart_output, device_cards_output, daily_log_trends_output, weekly_uptime_output, anomaly_alerts_output, downtime_chart_output, anomaly_output, amc_output, insights_output, pdf_output, cached_df_state, last_modified_state]
)
pdf_button.click(
fn=generate_pdf,
inputs=[summary_output, preview_output, usage_chart_output, device_cards_output, daily_log_trends_output, weekly_uptime_output, anomaly_alerts_output, downtime_chart_output, anomaly_output, amc_output, insights_output],
outputs=[pdf_output]
)
logging.info("Gradio interface initialized successfully")
except Exception as e:
logging.error(f"Failed to initialize Gradio interface: {str(e)}")
raise e
if __name__ == "__main__":
try:
logging.info("Launching Gradio interface...")
iface.launch(server_name="0.0.0.0", server_port=7860, debug=True, share=False)
logging.info("Gradio interface launched successfully")
except Exception as e:
logging.error(f"Failed to launch Gradio interface: {str(e)}")
print(f"Error launching app: {str(e)}")
raise e