| | import logging |
| | from typing import Dict, List |
| | import torch |
| | import torch.nn as nn |
| | import numpy as np |
| | import pickle |
| | from sklearn.preprocessing import StandardScaler |
| | from datetime import datetime, timedelta |
| |
|
| | |
| | logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | class DelayPredictor(nn.Module): |
| | def __init__(self, input_size, hidden_size, num_layers): |
| | super(DelayPredictor, self).__init__() |
| | self.hidden_size = hidden_size |
| | self.num_layers = num_layers |
| | self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True) |
| | self.attention = nn.Linear(hidden_size, 1) |
| | self.fc = nn.Linear(hidden_size, 1) |
| | self.sigmoid = nn.Sigmoid() |
| |
|
| | def forward(self, x): |
| | lstm_out, _ = self.lstm(x) |
| | attn_weights = torch.softmax(self.attention(lstm_out).squeeze(-1), dim=1) |
| | context = torch.bmm(attn_weights.unsqueeze(1), lstm_out).squeeze(1) |
| | out = self.fc(context) |
| | return self.sigmoid(out) * 100 |
| |
|
| | |
| | try: |
| | device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| | model = DelayPredictor(input_size=7, hidden_size=64, num_layers=2).to(device) |
| | model.load_state_dict(torch.load("models/delay_model.pth", map_location=device)) |
| | model.eval() |
| | with open("models/scaler.pkl", "rb") as f: |
| | scaler = pickle.load(f) |
| | logger.info("LSTM model and scaler loaded successfully") |
| | except Exception as e: |
| | logger.error(f"Failed to load model or scaler: {str(e)}") |
| | model = None |
| | scaler = None |
| |
|
| | def get_weather_condition(score: int) -> str: |
| | """Map weather impact score (0-100) to descriptive weather condition.""" |
| | if score <= 10: |
| | return "Sunny" |
| | elif score <= 30: |
| | return "Partly Cloudy" |
| | elif score <= 50: |
| | return "Cloudy" |
| | elif score <= 70: |
| | return "Light Rain" |
| | elif score <= 85: |
| | return "Heavy Rain" |
| | else: |
| | return "Severe Storm" |
| |
|
| | def call_ai_model_for_insights(input_data: Dict, delay_risk: float) -> List[str]: |
| | """ |
| | Generate detailed hardcoded insights based on input data and delay risk. |
| | Includes a 2-week risk alert if weather_forecast_date is within 14 days. |
| | Returns 3-5 prioritized, phase/task-specific insights. |
| | """ |
| | logger.info("Generating detailed hardcoded AI insights") |
| | phase = input_data.get("phase", "") |
| | task = input_data.get("task", "") |
| | current_progress = input_data.get("current_progress", 0) |
| | expected_duration = input_data.get("task_expected_duration", 0) |
| | actual_duration = input_data.get("task_actual_duration", 0) |
| | workforce_gap = input_data.get("workforce_gap", 0) |
| | skill_level = input_data.get("workforce_skill_level", "").lower() |
| | shift_hours = input_data.get("workforce_shift_hours", 0) |
| | weather_score = input_data.get("weather_impact_score", 0) |
| | weather_condition = input_data.get("weather_condition", get_weather_condition(weather_score)) |
| | project_location = input_data.get("project_location", "Unknown") |
| | weather_forecast_date = input_data.get("weather_forecast_date", "") |
| |
|
| | |
| | insights = [] |
| |
|
| | |
| | def add_insight(message: str, priority: float): |
| | insights.append((message, priority)) |
| |
|
| | |
| | try: |
| | forecast_date = datetime.strptime(weather_forecast_date, "%Y-%m-%d") |
| | current_date = datetime(2025, 5, 26) |
| | two_weeks_later = current_date + timedelta(days=14) |
| | if current_date <= forecast_date <= two_weeks_later: |
| | if delay_risk > 75 or weather_score > 75: |
| | add_insight( |
| | f"⚠️ Critical 2-Week Risk Alert: High risk of delay for {phase}: {task} in {project_location} by {weather_forecast_date} due to {'severe weather' if weather_score > 75 else 'high delay probability'}. Implement contingency plans immediately.", |
| | 1.2 |
| | ) |
| | elif delay_risk > 50 or weather_score > 50: |
| | add_insight( |
| | f"⚠️ 2-Week Risk Alert: Moderate risk of delay for {phase}: {task} in {project_location} by {weather_forecast_date}. Monitor closely and prepare mitigation measures.", |
| | 1.1 |
| | ) |
| | except ValueError: |
| | logger.warning("Invalid weather_forecast_date format; skipping 2-week risk alert") |
| |
|
| | |
| | if delay_risk > 75: |
| | add_insight(f"Urgent: High delay risk detected for {phase}: {task} in {project_location}. Take immediate action.", 1.0) |
| | elif delay_risk > 50: |
| | add_insight(f"Monitor {phase}: {task} closely in {project_location} to prevent delays.", 0.9) |
| | elif delay_risk > 25: |
| | add_insight(f"Maintain steady progress for {phase}: {task} in {project_location}.", 0.7) |
| | else: |
| | add_insight(f"Optimize resources for {phase}: {task} in {project_location} to maintain schedule.", 0.6) |
| |
|
| | |
| | if weather_score > 85: |
| | add_insight(f"Critical: Severe storm forecast in {project_location} for {phase}: {task}. Consider halting outdoor activities.", 1.1) |
| | elif weather_score > 70: |
| | add_insight(f"Reschedule outdoor tasks for {phase}: {task} in {project_location} due to heavy rain forecast.", 1.0) |
| | elif weather_score > 50: |
| | add_insight(f"Shift to indoor or weather-resistant tasks for {phase}: {task} in {project_location} due to light rain.", 0.9) |
| | elif weather_score > 30: |
| | add_insight(f"Monitor cloudy conditions in {project_location} for {phase}: {task} to avoid unexpected delays.", 0.7) |
| | else: |
| | add_insight(f"Proceed with {phase}: {task} in {project_location} under favorable weather conditions.", 0.6) |
| |
|
| | |
| | task_specific = { |
| | "Planning": { |
| | "Define Scope": f"Ensure stakeholder alignment for Planning: Define Scope in {project_location}, considering weather impacts.", |
| | "Resource Allocation": f"Secure budget and resources early for Planning: Resource Allocation in {project_location}.", |
| | "Permit Acquisition": f"Expedite permits for Planning: Permit Acquisition in {project_location} to avoid weather-related delays." |
| | }, |
| | "Design": { |
| | "Architectural Drafting": f"Engage architects early for Design: Architectural Drafting in {project_location}, accounting for weather.", |
| | "Engineering Analysis": f"Hire additional engineers for Design: Engineering Analysis in {project_location} to meet deadlines.", |
| | "Design Review": f"Schedule thorough reviews for Design: Design Review in {project_location}, considering forecast." |
| | }, |
| | "Construction": { |
| | "Foundation Work": f"Optimize material delivery for Construction: Foundation Work in {project_location}, avoiding {weather_condition.lower()}.", |
| | "Structural Build": f"Ensure equipment availability for Construction: Structural Build in {project_location} under {weather_condition.lower()}.", |
| | "Utility Installation": f"Coordinate subcontractors for Construction: Utility Installation in {project_location}, monitoring weather." |
| | } |
| | } |
| | if phase in task_specific and task in task_specific[phase]: |
| | add_insight(task_specific[phase][task], 0.8) |
| |
|
| | |
| | if workforce_gap > 30: |
| | add_insight(f"Urgently hire subcontractors in {project_location} to address {workforce_gap}% workforce shortage.", 1.0) |
| | elif workforce_gap > 15: |
| | add_insight(f"Recruit additional workers in {project_location} to reduce {workforce_gap}% workforce gap.", 0.9) |
| | elif workforce_gap > 5: |
| | add_insight(f"Consider temporary staff in {project_location} to address minor workforce gap.", 0.7) |
| |
|
| | if skill_level == "low": |
| | add_insight(f"Provide training in {project_location} to improve low skill levels for {phase}: {task}.", 0.9) |
| | elif skill_level == "medium" and delay_risk > 50: |
| | add_insight(f"Upskill workforce in {project_location} for efficiency in {phase}: {task}.", 0.8) |
| | elif skill_level == "high" and delay_risk < 25: |
| | add_insight(f"Leverage high skill levels in {project_location} to maintain {phase}: {task} progress.", 0.6) |
| |
|
| | if shift_hours < 6: |
| | add_insight(f"Extend shift hours beyond {shift_hours} in {project_location} to meet {phase}: {task} deadlines.", 0.9) |
| | elif shift_hours < 8 and delay_risk > 50: |
| | add_insight(f"Increase shift hours to 8 in {project_location} for {phase}: {task}.", 0.8) |
| | elif shift_hours > 10: |
| | add_insight(f"Balance shifts in {project_location} to prevent burnout during {phase}: {task}.", 0.7) |
| |
|
| | |
| | if expected_duration > 0 and actual_duration > expected_duration: |
| | overrun_pct = ((actual_duration - expected_duration) / expected_duration) * 100 |
| | if overrun_pct > 20: |
| | add_insight(f"Address significant duration overrun ({overrun_pct:.1f}%) for {phase}: {task} in {project_location}.", 1.0) |
| | elif overrun_pct > 10: |
| | add_insight(f"Review scheduling to address {overrun_pct:.1f}% overrun in {phase}: {task} in {project_location}.", 0.8) |
| |
|
| | if expected_duration > 0: |
| | expected_progress = min((actual_duration / expected_duration) * 100, 100) |
| | if current_progress < expected_progress * 0.8: |
| | add_insight(f"Accelerate task progress for {phase}: {task} in {project_location} to align with schedule.", 0.9) |
| | elif current_progress < 50 and delay_risk > 50: |
| | add_insight(f"Increase resources to boost {current_progress}% progress in {phase}: {task} in {project_location}.", 0.8) |
| |
|
| | |
| | if workforce_gap >= 90: |
| | add_insight(f"Critical: Halt non-essential tasks in {project_location} until workforce gap for {phase}: {task} is resolved.", 1.1) |
| | if current_progress == 0 and delay_risk > 50: |
| | add_insight(f"Initiate {phase}: {task} in {project_location} immediately to avoid further delays.", 1.0) |
| | if expected_duration == 0 or actual_duration == 0: |
| | add_insight(f"Provide accurate duration estimates for {phase}: {task} in {project_location} for reliable predictions.", 0.7) |
| | if weather_score > 50 and phase == "Construction": |
| | add_insight(f"Prepare contingency plans for {phase}: {task} in {project_location} due to adverse weather forecast.", 0.95) |
| |
|
| | |
| | insights.sort(key=lambda x: x[1], reverse=True) |
| | selected_insights = [insight[0] for insight in insights[:5]] |
| |
|
| | logger.info(f"Generated insights: {selected_insights}") |
| | return selected_insights or [f"No significant delay factors detected for {phase}: {task} in {project_location}."] |
| |
|
| | def predict_delay(input_data: Dict) -> Dict: |
| | """ |
| | Predict delay probability using LSTM model. |
| | Inputs: Project task data (phase, progress, duration, workforce, weather). |
| | Outputs: Delay probability, AI insights, high-risk phases, weather condition. |
| | """ |
| | logger.info("Starting LSTM delay prediction") |
| | if model is None or scaler is None: |
| | logger.error("Model or scaler not loaded; falling back to default") |
| | return { |
| | "project": input_data.get("project_name", "Unnamed Project"), |
| | "phase": input_data.get("phase", ""), |
| | "task": input_data.get("task", ""), |
| | "delay_probability": 50.0, |
| | "ai_insights": "Model unavailable; please check deployment.", |
| | "high_risk_phases": [], |
| | "weather_condition": "Unknown" |
| | } |
| |
|
| | phase = input_data.get("phase", "") |
| | task = input_data.get("task", "") |
| | weather_condition = input_data.get("weather_condition", get_weather_condition(input_data.get("weather_impact_score", 0))) |
| |
|
| | |
| | phase_mapping = {"Planning": 0, "Design": 1, "Construction": 2} |
| | skill_mapping = {"Low": 0, "Medium": 1, "High": 2} |
| | try: |
| | features = np.array([[ |
| | input_data.get("current_progress", 0), |
| | input_data.get("task_expected_duration", 0), |
| | input_data.get("task_actual_duration", 0), |
| | input_data.get("workforce_gap", 0), |
| | input_data.get("weather_impact_score", 0), |
| | skill_mapping.get(input_data.get("workforce_skill_level", "Medium"), 1), |
| | phase_mapping.get(phase, 0) |
| | ]]) |
| | except KeyError as e: |
| | logger.error(f"Invalid input data: {str(e)}") |
| | return { |
| | "project": input_data.get("project_name", "Unnamed Project"), |
| | "phase": phase, |
| | "task": task, |
| | "delay_probability": 50.0, |
| | "ai_insights": f"Invalid input: {str(e)}", |
| | "high_risk_phases": [], |
| | "weather_condition": weather_condition |
| | } |
| |
|
| | |
| | try: |
| | features_scaled = scaler.transform(features) |
| | features_tensor = torch.tensor(features_scaled.reshape(1, 1, -1), dtype=torch.float32).to(device) |
| | except Exception as e: |
| | logger.error(f"Feature preprocessing failed: {str(e)}") |
| | return { |
| | "project": input_data.get("project_name", "Unnamed Project"), |
| | "phase": phase, |
| | "task": task, |
| | "delay_probability": 50.0, |
| | "ai_insights": f"Preprocessing error: {str(e)}", |
| | "high_risk_phases": [], |
| | "weather_condition": weather_condition |
| | } |
| |
|
| | |
| | with torch.no_grad(): |
| | delay_risk = model(features_tensor).cpu().numpy().item() |
| | delay_risk = round(max(0, min(delay_risk, 100)), 1) |
| |
|
| | |
| | task_options = { |
| | "Planning": ["Define Scope", "Resource Allocation", "Permit Acquisition"], |
| | "Design": ["Architectural Drafting", "Engineering Analysis", "Design Review"], |
| | "Construction": ["Foundation Work", "Structural Build", "Utility Installation"] |
| | } |
| | high_risk_phases = [] |
| | if phase in task_options: |
| | for t in task_options[phase]: |
| | task_risk = delay_risk |
| | if t != task: |
| | task_risk = min(max(task_risk + (hash(t) % 10 - 5), 0), 100) |
| | high_risk_phases.append({ |
| | "phase": phase, |
| | "task": t, |
| | "risk": round(task_risk, 1) |
| | }) |
| |
|
| | |
| | insights = call_ai_model_for_insights(input_data, delay_risk) |
| |
|
| | logger.info(f"Prediction completed: Delay risk = {delay_risk:.1f}%") |
| | return { |
| | "project": input_data.get("project_name", "Unnamed Project"), |
| | "phase": phase, |
| | "task": task, |
| | "delay_probability": delay_risk, |
| | "ai_insights": "; ".join(insights) if insights else "No significant delay factors detected.", |
| | "high_risk_phases": high_risk_phases, |
| | "weather_condition": weather_condition |
| | } |