# Importing libraries from threading import Thread from flask import Blueprint, jsonify, request from flask_cors import CORS import sys import os # Importing process pool executor from concurrent.futures import ProcessPoolExecutor # Fasttext for model handling import fasttext # Setting absolute path sys.path.insert(0, os.path.abspath(".")) from app.config import Config from app.helpers import * from app.db.models import Tasks from app.database import db from app.threads.process_fsa_v2 import process_fsa_categories_v2 # from app.threads.process_fsa_v2 import test_function # Create a Blueprint of classification fsa = Blueprint("fsa_v2", __name__, url_prefix="/api/v2/fsa") # Enabling CORS for the blueprint CORS( fsa, supports_credentials=True ) # Thread class to run the bacth processing in the thread class FSAThread_V2(Thread): def __init__(self, data={}) -> None: Thread.__init__(self) self.data = data # Run function of the thread def run(self) -> None: process_fsa_categories_v2(self.data) # Creating a process pool executor # Set maximum processes max_processes = 4 process_executor = ProcessPoolExecutor(max_workers=max_processes) # Update the database def update_db(table_idx, remarks=None): from app.api import app with app.app_context(): Tasks.update_by_id(table_idx, remarks) db.session.close() # Prediction for single product @fsa.route("/single-product", methods=["POST"]) def predict_categories(): # Get the request body = request.json # If there is no body in the request send error message if not body: return jsonify({"message": "Cannot decode JSON from the body"}), 422 # Get the product name from the JSON product_name = body.get("product_name") # Check whether product name is missing if not product_name: return jsonify({"message": "Product name is missing"}), 422 # Preprocessing product names for input product_name = preprocess(product_name) # Prediction # Logging processing Logger.info(message="Processing FSA categorical data for " + product_name) # Loading L0 model to model try: model = fasttext.load_model('app/models/L0/L0_model.bin') except: return jsonify({"message": "Can't load the L0 model"}), 500 #Getting L0 prediction and accuracy L0_label,L0_accuracy = get_label_and_accuracy(model,product_name) L0_return_label,L0_return_score,L0_label_status = get_return_labels(L0_label,L0_accuracy,0.95) print("L0",L0_label,L0_accuracy) if not L0_label: return jsonify({"message": "Error predicting L0 Category"}), 500 #Loading L1 model to model try: model = fasttext.load_model('app/models/L1/L1_model.bin') except: return jsonify({"message": "Can't load the L1 model"}), 500 #Getting L1 prediction and accuracy L1_label,L1_accuracy = get_label_and_accuracy(model,L0_label +" " + product_name) L1_return_label,L1_return_score,L1_label_status = get_return_labels(L1_label,L1_accuracy,0.95) print("L1",L1_label,L1_accuracy) if not L1_label: return jsonify({"message": "Error predicting L1 Category"}), 500 #Loading L2 model to model try: model = fasttext.load_model('app/models/L2/L2_model.bin') except: return jsonify({"message": "Can't load the L2 model"}), 500 #Getting L2 prediction and accuracy L2_label,L2_accuracy = get_label_and_accuracy(model,L1_label+" "+product_name) L2_return_label,L2_return_score,L2_label_status = get_return_labels(L2_label,L2_accuracy,0.95) print("L2",L2_label,L2_accuracy) if not L2_label: return jsonify({"message": "Error predicting L2 Category"}), 500 #Loading L3 model to model try: model = fasttext.load_model('app/models/L3/L3_model.bin') except: return jsonify({"message": "Can't load the L3 model"}), 500 #Getting L3 prediction and accuracy L3_label,L3_accuracy = get_label_and_accuracy(model,L2_label+" "+product_name) L3_return_label,L3_return_score,L3_label_status = get_return_labels(L3_label,L3_accuracy,0.95) print("L3",L3_label,L3_accuracy) if not L3_label: return jsonify({"message": "Error predicting L3 Category"}), 500 if L0_label == "administrative": try: model = fasttext.load_model('app/models/L4/administrative/L4_Admin_model.bin') except: return jsonify({"message": "Can't load the L4 (Administrative) model"}), 500 #Getting L4 prediction and accuracy L4_label,L4_accuracy = get_label_and_accuracy(model,(L3_label+ " " +product_name)) L4_return_label,L4_return_score,L4_label_status = get_return_labels(L4_label,L4_accuracy,0.75) print("L4",L4_label,L4_accuracy) # L0 = Beverage elif L0_label == "beverage": try: model = fasttext.load_model('app/models/L4/beverage/L4_beverage_model.bin') except: return jsonify({"message": "Can't load the L4 (Beverage) model"}), 500 #Getting L4 prediction and accuracy L4_label,L4_accuracy = get_label_and_accuracy(model,(L3_label+" "+product_name)) L4_return_score = None L4_return_label,L4_return_score,L4_label_status = get_return_labels(L4_label,L4_accuracy,0.66) print("L4",L4_label,L4_accuracy) # L0 = Food elif L0_label == "food": try: model = fasttext.load_model('app/models/L4/food/L4_food_model.bin') except: return jsonify({"message": "Can't load the L4 (Food) model"}), 500 #Getting L4 prediction and accuracy L4_label,L4_accuracy = get_label_and_accuracy(model,(L3_label+" "+product_name)) L4_return_label,L4_return_score,L4_label_status = get_return_labels(L4_label,L4_accuracy,0.85) print("L4",L4_label,L4_accuracy) # L0 = Operationals elif L0_label == "operationals": try: model = fasttext.load_model('app/models/L4/operationals/L4_operationals_model.bin') except: return jsonify({"message": "Can't load the L4 (Operationals) model"}), 500 #Getting L4 prediction and accuracy L4_label,L4_accuracy = get_label_and_accuracy(model,(L3_label+" "+product_name)) L4_return_label,L4_return_score,L4_label_status = get_return_labels(L4_label,L4_accuracy,0.8) print("L4",L4_label,L4_accuracy) # Error prediction on L4 Category (Can't happen) else: return jsonify({"message": "Error prediction of L4 Category"}), 422 if not L4_label: return jsonify({"message": "Error predicting L4 Category"}), 422 # Logging the task Logger.info(message="Done processing FSA categorical data for" + product_name) # Rreturning the result as JSON return jsonify({ "classification_results": { "l0": L0_return_label, "l1": L1_return_label, "l2": L2_return_label, "l3": L3_return_label, "l4": L4_return_label }, "scores": { "l0": L0_return_score, "l1": L1_return_score, "l2": L2_return_score, "l3": L3_return_score, "l4": L4_return_score }, "remarks":{ "l0": L0_label_status, "l1": L1_label_status, "l2": L2_label_status, "l3": L3_label_status, "l4": L4_label_status }, "all_classification_results": { "L0": L0_label, "L1": L1_label, "L2": L2_label, "L3": L3_label, "L4": L4_label }, "all_scores": { "L0": L0_accuracy, "L1": L1_accuracy, "L2": L2_accuracy, "L3": L3_accuracy, "L4": L4_accuracy } }), 200 # Batch processing @fsa.route("/process-csv", methods=["POST"]) def process_csv(): # Get the body of the json body = request.json # Error passing for missing body if not body: return jsonify({"message": "Cannot decode JSON from the body"}), 422 # It is assumed that uploaded file name in the file_name JSON field file_name = body.get("uploaded_file_name") # Original file name original_file_name = body.get("original_file_name") or file_name # Missing file name if not file_name: return jsonify({"message": "File name is missing"}), 422 files = [{"name": f"fsa_input_{file_name}", "path": f"FSA Categorization/input/{file_name}"}] # Download files from S3 bucket of AWS # File is downloaded to th 'app/constants/{file}' for file in files: download_status = download_file_from_s3( file_name=file["name"], file_path=file["path"] ) if isinstance(download_status, botocore.exceptions.ClientError): return ( jsonify({"message": f"Error downloading {file} from s3"}), 422, ) # Get the dataframe of the csv to check whether "ProdName" column is available df = read_files(file_name=file_name) # Check for product_names in columns if "product_name" not in df.columns: remove_files(f"fsa_input_{file_name}") return jsonify({"message": "Product name column is missing from the CSV"}), 422 # Create a task created_task = Tasks.create(file_name=file_name, original_file_name=original_file_name) # Create a json object of data to pass the process data = { "file_name": file_name, "table_idx": created_task.id, "update_db": update_db } db.session.close() # Add the process to process pool executor result_future = process_executor.submit(process_fsa_categories_v2, (data)) # Creating a thread with data # thread = FSAThread_V2(data=data) # thread.start() # Testing route return jsonify({"message": f"{file_name} - File processing starting"}), 200