|
|
from flask import Flask, request, Response, jsonify, render_template, send_file |
|
|
from pydantic import BaseModel |
|
|
from reportlab.lib.pagesizes import letter |
|
|
from reportlab.pdfgen import canvas |
|
|
import base64 |
|
|
import os |
|
|
import logging |
|
|
import io |
|
|
from datetime import datetime |
|
|
from simple_salesforce import Salesforce |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
app = Flask(__name__) |
|
|
|
|
|
|
|
|
SF_USERNAME = "scores@app.com" |
|
|
SF_PASSWORD = "Internal@1" |
|
|
SF_SECURITY_TOKEN = "NbUKcTx45azba5HEdntE9YAh" |
|
|
SF_DOMAIN = "login" |
|
|
|
|
|
|
|
|
try: |
|
|
sf = Salesforce( |
|
|
username=SF_USERNAME, |
|
|
password=SF_PASSWORD, |
|
|
security_token=SF_SECURITY_TOKEN, |
|
|
domain=SF_DOMAIN |
|
|
) |
|
|
logger.info("Successfully connected to Salesforce") |
|
|
|
|
|
|
|
|
for obj in ['Subcontractor_Performance_Score__c', 'Vendor_Log__c']: |
|
|
try: |
|
|
description = getattr(sf, obj).describe() |
|
|
logger.info(f"Fields on {obj}:") |
|
|
for field in description['fields']: |
|
|
logger.info(f"Field Name: {field['name']}") |
|
|
except Exception as e: |
|
|
logger.error(f"Error describing {obj}: {str(e)}") |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to connect to Salesforce: {str(e)}") |
|
|
sf = None |
|
|
|
|
|
|
|
|
class VendorLog(BaseModel): |
|
|
vendorLogId: str |
|
|
vendorId: str |
|
|
workDetails: str |
|
|
qualityReport: str |
|
|
incidentLog: str |
|
|
workCompletionDate: str |
|
|
actualCompletionDate: str |
|
|
vendorLogName: str |
|
|
delayDays: int |
|
|
project: str |
|
|
|
|
|
|
|
|
vendor_logs = [] |
|
|
|
|
|
def fetch_vendor_logs_from_salesforce(): |
|
|
"""Fetch vendor logs from Salesforce.""" |
|
|
if not sf: |
|
|
logger.error("Salesforce connection not initialized") |
|
|
return [] |
|
|
|
|
|
try: |
|
|
query = """ |
|
|
SELECT Id, Vendor_Log_Id__c, VendorId__c, Work_Details__c, Quality_Report__c, |
|
|
Incident_Log__c, Work_Completion_Date__c, Actual_Completion_Date__c, |
|
|
Vendor_Log_Name__c, Delay_Days__c, Project__c |
|
|
FROM Vendor_Log__c |
|
|
WHERE CreatedDate >= 2025-05-01T00:00:00Z AND CreatedDate <= 2025-05-31T23:59:59Z |
|
|
""" |
|
|
result = sf.query(query) |
|
|
logs = [] |
|
|
for record in result['records']: |
|
|
log = VendorLog( |
|
|
vendorLogId=record.get('Vendor_Log_Id__c', 'Unknown'), |
|
|
vendorId=record.get('VendorId__c', 'Unknown'), |
|
|
workDetails=record.get('Work_Details__c', '0% completed'), |
|
|
qualityReport=record.get('Quality_Report__c', '0% quality'), |
|
|
incidentLog=record.get('Incident_Log__c', 'None'), |
|
|
workCompletionDate=record.get('Work_Completion_Date__c', '2025-05-01'), |
|
|
actualCompletionDate=record.get('Actual_Completion_Date__c', '2025-05-01'), |
|
|
vendorLogName=record.get('Vendor_Log_Name__c', 'Unknown Vendor'), |
|
|
delayDays=record.get('Delay_Days__c', 0), |
|
|
project=record.get('Project__c', 'Unknown Project') |
|
|
) |
|
|
logs.append(log) |
|
|
logger.info(f"Fetched {len(logs)} vendor logs from Salesforce") |
|
|
return logs |
|
|
except Exception as e: |
|
|
logger.error(f"Error fetching vendor logs: {str(e)}") |
|
|
return [] |
|
|
|
|
|
def fetch_performance_scores(): |
|
|
"""Fetch performance scores from Salesforce.""" |
|
|
if not sf: |
|
|
logger.error("Salesforce connection not initialized") |
|
|
return [] |
|
|
|
|
|
try: |
|
|
query = """ |
|
|
SELECT VendorId__c, Month__c, Quality_Score__c, Timeliness_Score__c, |
|
|
Safety_Score__c, Communication_Score__c, Final_Score__c, Certification_URL__c, |
|
|
Alert_Flag__c |
|
|
FROM Subcontractor_Performance_Score__c |
|
|
WHERE Month__c = '2025-05-01' |
|
|
""" |
|
|
result = sf.query(query) |
|
|
scores = [] |
|
|
for record in result['records']: |
|
|
scores.append({ |
|
|
'vendorId': record.get('VendorId__c', 'Unknown'), |
|
|
'vendorLogName': f"Vendor {record.get('VendorId__c', 'Unknown')}", |
|
|
'scores': { |
|
|
'qualityScore': record.get('Quality_Score__c', 0.0) or 0.0, |
|
|
'timelinessScore': record.get('Timeliness_Score__c', 0.0) or 0.0, |
|
|
'safetyScore': record.get('Safety_Score__c', 0.0) or 0.0, |
|
|
'communicationScore': record.get('Communication_Score__c', 0.0) or 0.0, |
|
|
'finalScore': record.get('Final_Score__c', 0.0) or 0.0 |
|
|
}, |
|
|
'certification_url': record.get('Certification_URL__c', ''), |
|
|
'alert_flag': record.get('Alert_Flag__c', False) |
|
|
}) |
|
|
logger.info(f"Fetched {len(scores)} performance scores from Salesforce") |
|
|
return scores |
|
|
except Exception as e: |
|
|
logger.error(f"Error fetching performance scores: {str(e)}") |
|
|
return [] |
|
|
|
|
|
def calculate_scores(log: VendorLog): |
|
|
"""Calculate scores for a vendor log.""" |
|
|
try: |
|
|
scores = { |
|
|
'qualityScore': float(log.qualityReport.replace('% quality', '')), |
|
|
'timelinessScore': 100.0 if log.delayDays <= 0 else 80.0 if log.delayDays <= 3 else 60.0 if log.delayDays <= 7 else 40.0, |
|
|
'safetyScore': {'None': 100.0, 'Low': 80.0, 'Minor': 80.0, 'Medium': 50.0, 'High': 20.0}.get(log.incidentLog, 100.0), |
|
|
'communicationScore': 0.0, |
|
|
'finalScore': 0.0 |
|
|
} |
|
|
scores['communicationScore'] = (scores['qualityScore'] * 0.33 + scores['timelinessScore'] * 0.33 + scores['safetyScore'] * 0.33) |
|
|
scores['finalScore'] = (scores['qualityScore'] + scores['timelinessScore'] + scores['safetyScore'] + scores['communicationScore']) / 4 |
|
|
|
|
|
for key in scores: |
|
|
scores[key] = round(scores[key], 2) |
|
|
return scores |
|
|
except Exception as e: |
|
|
logger.error(f"Error calculating scores: {str(e)}") |
|
|
return { |
|
|
'qualityScore': 0.0, |
|
|
'timelinessScore': 0.0, |
|
|
'safetyScore': 0.0, |
|
|
'communicationScore': 0.0, |
|
|
'finalScore': 0.0 |
|
|
} |
|
|
|
|
|
def get_feedback(score: float, metric: str) -> str: |
|
|
try: |
|
|
if score >= 90: |
|
|
return "Excellent: Maintain this standard" |
|
|
elif score >= 70: |
|
|
return "Good: Keep up the good work" |
|
|
elif score >= 50: |
|
|
return f"Needs Improvement: Enhance {metric.lower()} performance" |
|
|
else: |
|
|
return f"Poor: Critical issues in {metric.lower()}" |
|
|
except Exception as e: |
|
|
logger.error(f"Error generating feedback: {str(e)}") |
|
|
return "Feedback unavailable" |
|
|
|
|
|
def generate_pdf(vendor_id: str, vendor_log_name: str, scores: dict): |
|
|
"""Generate a PDF report for a vendor.""" |
|
|
try: |
|
|
buffer = io.BytesIO() |
|
|
c = canvas.Canvas(buffer, pagesize=letter) |
|
|
c.setFont('Helvetica', 12) |
|
|
c.drawString(100, 750, 'Subcontractor Performance Report') |
|
|
c.drawString(100, 730, f'Vendor ID: {vendor_id}') |
|
|
c.drawString(100, 710, f'Vendor Log Name: {vendor_log_name}') |
|
|
c.drawString(100, 690, f'Quality Score: {scores["qualityScore"]}% ({get_feedback(scores["qualityScore"], "Quality")})') |
|
|
c.drawString(100, 670, f'Timeliness Score: {scores["timelinessScore"]}% ({get_feedback(scores["timelinessScore"], "Timeliness")})') |
|
|
c.drawString(100, 650, f'Safety Score: {scores["safetyScore"]}% ({get_feedback(scores["safetyScore"], "Safety")})') |
|
|
c.drawString(100, 630, f'Communication Score: {scores["communicationScore"]}% ({get_feedback(scores["communicationScore"], "Communication")})') |
|
|
c.drawString(100, 610, f'Final Score: {scores["finalScore"]}%') |
|
|
c.drawString(100, 590, f'Generated On: May 15, 2025, 12:01 PM IST') |
|
|
c.save() |
|
|
buffer.seek(0) |
|
|
return buffer |
|
|
except Exception as e: |
|
|
logger.error(f"Error generating PDF: {str(e)}") |
|
|
raise |
|
|
|
|
|
def determine_alert_flag(final_score: float, all_logs: list): |
|
|
"""Determine if an alert flag should be set.""" |
|
|
try: |
|
|
if not all_logs: |
|
|
return False |
|
|
if final_score < 50: |
|
|
return True |
|
|
lowest_score = min([log['scores']['finalScore'] for log in all_logs]) |
|
|
return final_score == lowest_score |
|
|
except Exception as e: |
|
|
logger.error(f"Error determining alert flag: {str(e)}") |
|
|
return False |
|
|
|
|
|
@app.route('/score', methods=['POST']) |
|
|
def score_vendor(): |
|
|
"""Score a vendor and save to Salesforce.""" |
|
|
try: |
|
|
data = request.get_json() |
|
|
if not data: |
|
|
return jsonify({'error': 'Invalid request data'}), 400 |
|
|
|
|
|
log = VendorLog(**data) |
|
|
scores = calculate_scores(log) |
|
|
|
|
|
pdf_buffer = generate_pdf(log.vendorId, log.vendorLogName, scores) |
|
|
pdf_content = pdf_buffer.getvalue() |
|
|
pdf_base64 = base64.b64encode(pdf_content).decode('utf-8') |
|
|
|
|
|
alert_flag = determine_alert_flag(scores['finalScore'], vendor_logs) |
|
|
|
|
|
vendor_logs.append({ |
|
|
'vendorLogId': log.vendorLogId, |
|
|
'vendorId': log.vendorId, |
|
|
'vendorLogName': log.vendorLogName, |
|
|
'scores': scores, |
|
|
'extracted': True |
|
|
}) |
|
|
|
|
|
if sf: |
|
|
sf.Subcontractor_Performance_Score__c.create({ |
|
|
'VendorId__c': log.vendorId, |
|
|
'Month__c': datetime.now().strftime('%Y-%m-%d'), |
|
|
'Quality_Score__c': scores['qualityScore'], |
|
|
'Timeliness_Score__c': scores['timelinessScore'], |
|
|
'Safety_Score__c': scores['safetyScore'], |
|
|
'Communication_Score__c': scores['communicationScore'], |
|
|
'Final_Score__c': scores['finalScore'], |
|
|
'Certification_URL__c': pdf_base64, |
|
|
'Alert_Flag__c': alert_flag |
|
|
}) |
|
|
logger.info(f"Saved scores to Salesforce for Vendor Log: {log.vendorLogId}") |
|
|
|
|
|
return jsonify({ |
|
|
'vendorLogId': log.vendorLogId, |
|
|
'vendorId': log.vendorId, |
|
|
'vendorLogName': log.vendorLogName, |
|
|
'scores': scores, |
|
|
'pdfContent': pdf_base64, |
|
|
'alert': alert_flag |
|
|
}), 200 |
|
|
except Exception as e: |
|
|
logger.error(f"Error in /score endpoint: {str(e)}") |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
@app.route('/generate_report', methods=['POST']) |
|
|
def generate_report(): |
|
|
"""Generate a report for a selected vendor.""" |
|
|
try: |
|
|
data = request.form |
|
|
vendor_name = data.get('vendor_name') |
|
|
report_month = data.get('report_month', 'May, 2025') |
|
|
|
|
|
|
|
|
performance_scores = fetch_performance_scores() |
|
|
vendor_data = next((v for v in performance_scores if v['vendorLogName'] == vendor_name), None) |
|
|
|
|
|
if not vendor_data: |
|
|
return jsonify({'error': 'Vendor not found'}), 404 |
|
|
|
|
|
|
|
|
pdf_buffer = generate_pdf(vendor_data['vendorId'], vendor_name, vendor_data['scores']) |
|
|
|
|
|
|
|
|
pdf_content = pdf_buffer.getvalue() |
|
|
pdf_base64 = base64.b64encode(pdf_content).decode('utf-8') |
|
|
if sf: |
|
|
sf.Subcontractor_Performance_Score__c.update(vendor_data['vendorId'], { |
|
|
'Certification_URL__c': pdf_base64 |
|
|
}) |
|
|
|
|
|
return send_file( |
|
|
pdf_buffer, |
|
|
attachment_filename=f'report_{vendor_data["vendorId"]}_{datetime.now().strftime("%Y%m%d_%H%M%S")}.pdf', |
|
|
as_attachment=True |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"Error in /generate_report endpoint: {str(e)}") |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
@app.route('/', methods=['GET']) |
|
|
def get_dashboard(): |
|
|
"""Render the dashboard with Salesforce data.""" |
|
|
try: |
|
|
|
|
|
performance_scores = fetch_performance_scores() |
|
|
vendor_logs_list = fetch_vendor_logs_from_salesforce() |
|
|
|
|
|
|
|
|
vendor_logs.clear() |
|
|
for score in performance_scores: |
|
|
vendor_logs.append({ |
|
|
'vendorLogId': score['vendorId'], |
|
|
'vendorId': score['vendorId'], |
|
|
'vendorLogName': score['vendorLogName'], |
|
|
'scores': score['scores'], |
|
|
'extracted': True |
|
|
}) |
|
|
|
|
|
total_vendors = len(vendor_logs) |
|
|
performance_alerts = sum(1 for log in vendor_logs if determine_alert_flag(log['scores']['finalScore'], vendor_logs)) |
|
|
top_performers = sum(1 for log in vendor_logs if log['scores']['finalScore'] >= 90) |
|
|
improving_vendors = sum(1 for log in vendor_logs if log['scores']['finalScore'] >= 70) |
|
|
|
|
|
sorted_logs = sorted(vendor_logs, key=lambda x: x['scores']['finalScore'], reverse=True) |
|
|
top_logs_data = sorted_logs[:5] |
|
|
top_performing_logs = sorted_logs[:4] |
|
|
alert_logs = [log for log in vendor_logs if determine_alert_flag(log['scores']['finalScore'], vendor_logs)][:3] |
|
|
|
|
|
top_logs = [] |
|
|
for idx, log in enumerate(top_logs_data, 1): |
|
|
scores = log['scores'] |
|
|
alert_flag = determine_alert_flag(scores['finalScore'], vendor_logs) |
|
|
trend = "trend-up" if scores['finalScore'] >= 90 else "trend-down" if scores['finalScore'] < 70 else "trend-flat" |
|
|
trend_symbol = "↗" if trend == "trend-up" else "↘" if trend == "trend-down" else "—" |
|
|
status_class = "status-good" if not alert_flag else "status-alert" |
|
|
status_text = "Good" if not alert_flag else "Alert" |
|
|
top_logs.append({ |
|
|
'idx': idx, |
|
|
'vendorLogName': log['vendorLogName'], |
|
|
'scores': scores, |
|
|
'trend': trend, |
|
|
'trend_symbol': trend_symbol, |
|
|
'status_class': status_class, |
|
|
'status_text': status_text |
|
|
}) |
|
|
|
|
|
template_data = { |
|
|
'total_vendors': total_vendors, |
|
|
'performance_alerts': performance_alerts, |
|
|
'percent_alerts': round(performance_alerts/total_vendors*100, 1) if total_vendors else 0, |
|
|
'top_performers': top_performers, |
|
|
'percent_top': round(top_performers/total_vendors*100, 1) if total_vendors else 0, |
|
|
'improving_vendors': improving_vendors, |
|
|
'percent_improving': round(improving_vendors/total_vendors*100, 1) if total_vendors else 0, |
|
|
'top_logs': top_logs, |
|
|
'alert_logs': alert_logs, |
|
|
'top_performing_logs': top_performing_logs, |
|
|
'vendor_logs': vendor_logs, |
|
|
'sorted_logs': sorted_logs, |
|
|
'vendor_names': [log.vendorLogName for log in vendor_logs_list], |
|
|
'report_month': 'May, 2025' |
|
|
} |
|
|
|
|
|
return render_template('dashboard.html', **template_data) |
|
|
except Exception as e: |
|
|
logger.error(f"Error in / endpoint: {str(e)}") |
|
|
template_data = { |
|
|
'total_vendors': 0, |
|
|
'performance_alerts': 0, |
|
|
'percent_alerts': 0, |
|
|
'top_performers': 0, |
|
|
'percent_top': 0, |
|
|
'improving_vendors': 0, |
|
|
'percent_improving': 0, |
|
|
'top_logs': [], |
|
|
'alert_logs': [], |
|
|
'top_performing_logs': [], |
|
|
'vendor_logs': [], |
|
|
'sorted_logs': [], |
|
|
'vendor_names': [], |
|
|
'report_month': 'May, 2025' |
|
|
} |
|
|
return render_template('dashboard.html', **template_data) |
|
|
|
|
|
@app.route('/document', methods=['GET']) |
|
|
def get_document(): |
|
|
"""Render the document page.""" |
|
|
try: |
|
|
return render_template('document.html') |
|
|
except Exception as e: |
|
|
logger.error(f"Error in /document endpoint: {str(e)}") |
|
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
if __name__ == "__main__": |
|
|
app.run(host="0.0.0.0", port=7860, debug=True) |