import logging from typing import Dict, List import torch import torch.nn as nn import numpy as np import pickle from sklearn.preprocessing import StandardScaler from datetime import datetime, timedelta # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # LSTM Model Definition (must match training script) class DelayPredictor(nn.Module): def __init__(self, input_size, hidden_size, num_layers): super(DelayPredictor, self).__init__() self.hidden_size = hidden_size self.num_layers = num_layers self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True) self.attention = nn.Linear(hidden_size, 1) self.fc = nn.Linear(hidden_size, 1) self.sigmoid = nn.Sigmoid() def forward(self, x): lstm_out, _ = self.lstm(x) attn_weights = torch.softmax(self.attention(lstm_out).squeeze(-1), dim=1) context = torch.bmm(attn_weights.unsqueeze(1), lstm_out).squeeze(1) out = self.fc(context) return self.sigmoid(out) * 100 # Load model and scaler try: device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = DelayPredictor(input_size=7, hidden_size=64, num_layers=2).to(device) model.load_state_dict(torch.load("models/delay_model.pth", map_location=device)) model.eval() with open("models/scaler.pkl", "rb") as f: scaler = pickle.load(f) logger.info("LSTM model and scaler loaded successfully") except Exception as e: logger.error(f"Failed to load model or scaler: {str(e)}") model = None scaler = None def get_weather_condition(score: int) -> str: """Map weather impact score (0-100) to descriptive weather condition.""" if score <= 10: return "Sunny" elif score <= 30: return "Partly Cloudy" elif score <= 50: return "Cloudy" elif score <= 70: return "Light Rain" elif score <= 85: return "Heavy Rain" else: return "Severe Storm" def call_ai_model_for_insights(input_data: Dict, delay_risk: float) -> List[str]: """ Generate detailed hardcoded insights based on input data and delay risk. Includes a 2-week risk alert if weather_forecast_date is within 14 days. Returns 3-5 prioritized, phase/task-specific insights. """ logger.info("Generating detailed hardcoded AI insights") phase = input_data.get("phase", "") task = input_data.get("task", "") current_progress = input_data.get("current_progress", 0) expected_duration = input_data.get("task_expected_duration", 0) actual_duration = input_data.get("task_actual_duration", 0) workforce_gap = input_data.get("workforce_gap", 0) skill_level = input_data.get("workforce_skill_level", "").lower() shift_hours = input_data.get("workforce_shift_hours", 0) weather_score = input_data.get("weather_impact_score", 0) weather_condition = input_data.get("weather_condition", get_weather_condition(weather_score)) project_location = input_data.get("project_location", "Unknown") weather_forecast_date = input_data.get("weather_forecast_date", "") # Initialize insights with scores for prioritization insights = [] # Helper function to add insight with priority score def add_insight(message: str, priority: float): insights.append((message, priority)) # 2-week risk alert try: forecast_date = datetime.strptime(weather_forecast_date, "%Y-%m-%d") current_date = datetime(2025, 5, 26) # Fixed date as per system two_weeks_later = current_date + timedelta(days=14) if current_date <= forecast_date <= two_weeks_later: if delay_risk > 75 or weather_score > 75: add_insight( f"⚠️ Critical 2-Week Risk Alert: High risk of delay for {phase}: {task} in {project_location} by {weather_forecast_date} due to {'severe weather' if weather_score > 75 else 'high delay probability'}. Implement contingency plans immediately.", 1.2 ) elif delay_risk > 50 or weather_score > 50: add_insight( f"⚠️ 2-Week Risk Alert: Moderate risk of delay for {phase}: {task} in {project_location} by {weather_forecast_date}. Monitor closely and prepare mitigation measures.", 1.1 ) except ValueError: logger.warning("Invalid weather_forecast_date format; skipping 2-week risk alert") # Delay risk-based insights if delay_risk > 75: add_insight(f"Urgent: High delay risk detected for {phase}: {task} in {project_location}. Take immediate action.", 1.0) elif delay_risk > 50: add_insight(f"Monitor {phase}: {task} closely in {project_location} to prevent delays.", 0.9) elif delay_risk > 25: add_insight(f"Maintain steady progress for {phase}: {task} in {project_location}.", 0.7) else: add_insight(f"Optimize resources for {phase}: {task} in {project_location} to maintain schedule.", 0.6) # Weather-specific insights if weather_score > 85: add_insight(f"Critical: Severe storm forecast in {project_location} for {phase}: {task}. Consider halting outdoor activities.", 1.1) elif weather_score > 70: add_insight(f"Reschedule outdoor tasks for {phase}: {task} in {project_location} due to heavy rain forecast.", 1.0) elif weather_score > 50: add_insight(f"Shift to indoor or weather-resistant tasks for {phase}: {task} in {project_location} due to light rain.", 0.9) elif weather_score > 30: add_insight(f"Monitor cloudy conditions in {project_location} for {phase}: {task} to avoid unexpected delays.", 0.7) else: add_insight(f"Proceed with {phase}: {task} in {project_location} under favorable weather conditions.", 0.6) # Phase/task-specific insights task_specific = { "Planning": { "Define Scope": f"Ensure stakeholder alignment for Planning: Define Scope in {project_location}, considering weather impacts.", "Resource Allocation": f"Secure budget and resources early for Planning: Resource Allocation in {project_location}.", "Permit Acquisition": f"Expedite permits for Planning: Permit Acquisition in {project_location} to avoid weather-related delays." }, "Design": { "Architectural Drafting": f"Engage architects early for Design: Architectural Drafting in {project_location}, accounting for weather.", "Engineering Analysis": f"Hire additional engineers for Design: Engineering Analysis in {project_location} to meet deadlines.", "Design Review": f"Schedule thorough reviews for Design: Design Review in {project_location}, considering forecast." }, "Construction": { "Foundation Work": f"Optimize material delivery for Construction: Foundation Work in {project_location}, avoiding {weather_condition.lower()}.", "Structural Build": f"Ensure equipment availability for Construction: Structural Build in {project_location} under {weather_condition.lower()}.", "Utility Installation": f"Coordinate subcontractors for Construction: Utility Installation in {project_location}, monitoring weather." } } if phase in task_specific and task in task_specific[phase]: add_insight(task_specific[phase][task], 0.8) # Workforce-based insights if workforce_gap > 30: add_insight(f"Urgently hire subcontractors in {project_location} to address {workforce_gap}% workforce shortage.", 1.0) elif workforce_gap > 15: add_insight(f"Recruit additional workers in {project_location} to reduce {workforce_gap}% workforce gap.", 0.9) elif workforce_gap > 5: add_insight(f"Consider temporary staff in {project_location} to address minor workforce gap.", 0.7) if skill_level == "low": add_insight(f"Provide training in {project_location} to improve low skill levels for {phase}: {task}.", 0.9) elif skill_level == "medium" and delay_risk > 50: add_insight(f"Upskill workforce in {project_location} for efficiency in {phase}: {task}.", 0.8) elif skill_level == "high" and delay_risk < 25: add_insight(f"Leverage high skill levels in {project_location} to maintain {phase}: {task} progress.", 0.6) if shift_hours < 6: add_insight(f"Extend shift hours beyond {shift_hours} in {project_location} to meet {phase}: {task} deadlines.", 0.9) elif shift_hours < 8 and delay_risk > 50: add_insight(f"Increase shift hours to 8 in {project_location} for {phase}: {task}.", 0.8) elif shift_hours > 10: add_insight(f"Balance shifts in {project_location} to prevent burnout during {phase}: {task}.", 0.7) # Progress and duration-based insights if expected_duration > 0 and actual_duration > expected_duration: overrun_pct = ((actual_duration - expected_duration) / expected_duration) * 100 if overrun_pct > 20: add_insight(f"Address significant duration overrun ({overrun_pct:.1f}%) for {phase}: {task} in {project_location}.", 1.0) elif overrun_pct > 10: add_insight(f"Review scheduling to address {overrun_pct:.1f}% overrun in {phase}: {task} in {project_location}.", 0.8) if expected_duration > 0: expected_progress = min((actual_duration / expected_duration) * 100, 100) if current_progress < expected_progress * 0.8: add_insight(f"Accelerate task progress for {phase}: {task} in {project_location} to align with schedule.", 0.9) elif current_progress < 50 and delay_risk > 50: add_insight(f"Increase resources to boost {current_progress}% progress in {phase}: {task} in {project_location}.", 0.8) # Edge cases if workforce_gap >= 90: add_insight(f"Critical: Halt non-essential tasks in {project_location} until workforce gap for {phase}: {task} is resolved.", 1.1) if current_progress == 0 and delay_risk > 50: add_insight(f"Initiate {phase}: {task} in {project_location} immediately to avoid further delays.", 1.0) if expected_duration == 0 or actual_duration == 0: add_insight(f"Provide accurate duration estimates for {phase}: {task} in {project_location} for reliable predictions.", 0.7) if weather_score > 50 and phase == "Construction": add_insight(f"Prepare contingency plans for {phase}: {task} in {project_location} due to adverse weather forecast.", 0.95) # Sort insights by priority and select top 3-5 insights.sort(key=lambda x: x[1], reverse=True) selected_insights = [insight[0] for insight in insights[:5]] logger.info(f"Generated insights: {selected_insights}") return selected_insights or [f"No significant delay factors detected for {phase}: {task} in {project_location}."] def predict_delay(input_data: Dict) -> Dict: """ Predict delay probability using LSTM model. Inputs: Project task data (phase, progress, duration, workforce, weather). Outputs: Delay probability, AI insights, high-risk phases, weather condition. """ logger.info("Starting LSTM delay prediction") if model is None or scaler is None: logger.error("Model or scaler not loaded; falling back to default") return { "project": input_data.get("project_name", "Unnamed Project"), "phase": input_data.get("phase", ""), "task": input_data.get("task", ""), "delay_probability": 50.0, "ai_insights": "Model unavailable; please check deployment.", "high_risk_phases": [], "weather_condition": "Unknown" } phase = input_data.get("phase", "") task = input_data.get("task", "") weather_condition = input_data.get("weather_condition", get_weather_condition(input_data.get("weather_impact_score", 0))) # Prepare input features phase_mapping = {"Planning": 0, "Design": 1, "Construction": 2} skill_mapping = {"Low": 0, "Medium": 1, "High": 2} try: features = np.array([[ input_data.get("current_progress", 0), input_data.get("task_expected_duration", 0), input_data.get("task_actual_duration", 0), input_data.get("workforce_gap", 0), input_data.get("weather_impact_score", 0), skill_mapping.get(input_data.get("workforce_skill_level", "Medium"), 1), phase_mapping.get(phase, 0) ]]) except KeyError as e: logger.error(f"Invalid input data: {str(e)}") return { "project": input_data.get("project_name", "Unnamed Project"), "phase": phase, "task": task, "delay_probability": 50.0, "ai_insights": f"Invalid input: {str(e)}", "high_risk_phases": [], "weather_condition": weather_condition } # Standardize and reshape try: features_scaled = scaler.transform(features) features_tensor = torch.tensor(features_scaled.reshape(1, 1, -1), dtype=torch.float32).to(device) except Exception as e: logger.error(f"Feature preprocessing failed: {str(e)}") return { "project": input_data.get("project_name", "Unnamed Project"), "phase": phase, "task": task, "delay_probability": 50.0, "ai_insights": f"Preprocessing error: {str(e)}", "high_risk_phases": [], "weather_condition": weather_condition } # Predict with torch.no_grad(): delay_risk = model(features_tensor).cpu().numpy().item() delay_risk = round(max(0, min(delay_risk, 100)), 1) # Generate high_risk_phases task_options = { "Planning": ["Define Scope", "Resource Allocation", "Permit Acquisition"], "Design": ["Architectural Drafting", "Engineering Analysis", "Design Review"], "Construction": ["Foundation Work", "Structural Build", "Utility Installation"] } high_risk_phases = [] if phase in task_options: for t in task_options[phase]: task_risk = delay_risk if t != task: task_risk = min(max(task_risk + (hash(t) % 10 - 5), 0), 100) high_risk_phases.append({ "phase": phase, "task": t, "risk": round(task_risk, 1) }) # Generate insights insights = call_ai_model_for_insights(input_data, delay_risk) logger.info(f"Prediction completed: Delay risk = {delay_risk:.1f}%") return { "project": input_data.get("project_name", "Unnamed Project"), "phase": phase, "task": task, "delay_probability": delay_risk, "ai_insights": "; ".join(insights) if insights else "No significant delay factors detected.", "high_risk_phases": high_risk_phases, "weather_condition": weather_condition }