MilanBandara's picture
test files added
f1017a3
# Importing libraries
from threading import Thread
from flask import Blueprint, jsonify, request
from flask_cors import CORS
import sys
import os
# Importing process pool executor
from concurrent.futures import ProcessPoolExecutor
# Fasttext for model handling
import fasttext
# Setting absolute path
sys.path.insert(0, os.path.abspath("."))
from app.config import Config
from app.helpers import *
from app.db.models import Tasks
from app.database import db
from app.threads.process_fsa_v2 import process_fsa_categories_v2
# from app.threads.process_fsa_v2 import test_function
# Create a Blueprint of classification
fsa = Blueprint("fsa_v2", __name__, url_prefix="/api/v2/fsa")
# Enabling CORS for the blueprint
CORS(
fsa,
supports_credentials=True
)
# Thread class to run the bacth processing in the thread
class FSAThread_V2(Thread):
def __init__(self, data={}) -> None:
Thread.__init__(self)
self.data = data
# Run function of the thread
def run(self) -> None:
process_fsa_categories_v2(self.data)
# Creating a process pool executor
# Set maximum processes
max_processes = 4
process_executor = ProcessPoolExecutor(max_workers=max_processes)
# Update the database
def update_db(table_idx, remarks=None):
from app.api import app
with app.app_context():
Tasks.update_by_id(table_idx, remarks)
db.session.close()
# Prediction for single product
@fsa.route("/single-product", methods=["POST"])
def predict_categories():
# Get the request
body = request.json
# If there is no body in the request send error message
if not body:
return jsonify({"message": "Cannot decode JSON from the body"}), 422
# Get the product name from the JSON
product_name = body.get("product_name")
# Check whether product name is missing
if not product_name:
return jsonify({"message": "Product name is missing"}), 422
# Preprocessing product names for input
product_name = preprocess(product_name)
# Prediction
# Logging processing
Logger.info(message="Processing FSA categorical data for " + product_name)
# Loading L0 model to model
try:
model = fasttext.load_model('app/models/L0/L0_model.bin')
except:
return jsonify({"message": "Can't load the L0 model"}), 500
#Getting L0 prediction and accuracy
L0_label,L0_accuracy = get_label_and_accuracy(model,product_name)
L0_return_label,L0_return_score,L0_label_status = get_return_labels(L0_label,L0_accuracy,0.95)
print("L0",L0_label,L0_accuracy)
if not L0_label:
return jsonify({"message": "Error predicting L0 Category"}), 500
#Loading L1 model to model
try:
model = fasttext.load_model('app/models/L1/L1_model.bin')
except:
return jsonify({"message": "Can't load the L1 model"}), 500
#Getting L1 prediction and accuracy
L1_label,L1_accuracy = get_label_and_accuracy(model,L0_label +" " + product_name)
L1_return_label,L1_return_score,L1_label_status = get_return_labels(L1_label,L1_accuracy,0.95)
print("L1",L1_label,L1_accuracy)
if not L1_label:
return jsonify({"message": "Error predicting L1 Category"}), 500
#Loading L2 model to model
try:
model = fasttext.load_model('app/models/L2/L2_model.bin')
except:
return jsonify({"message": "Can't load the L2 model"}), 500
#Getting L2 prediction and accuracy
L2_label,L2_accuracy = get_label_and_accuracy(model,L1_label+" "+product_name)
L2_return_label,L2_return_score,L2_label_status = get_return_labels(L2_label,L2_accuracy,0.95)
print("L2",L2_label,L2_accuracy)
if not L2_label:
return jsonify({"message": "Error predicting L2 Category"}), 500
#Loading L3 model to model
try:
model = fasttext.load_model('app/models/L3/L3_model.bin')
except:
return jsonify({"message": "Can't load the L3 model"}), 500
#Getting L3 prediction and accuracy
L3_label,L3_accuracy = get_label_and_accuracy(model,L2_label+" "+product_name)
L3_return_label,L3_return_score,L3_label_status = get_return_labels(L3_label,L3_accuracy,0.95)
print("L3",L3_label,L3_accuracy)
if not L3_label:
return jsonify({"message": "Error predicting L3 Category"}), 500
if L0_label == "administrative":
try:
model = fasttext.load_model('app/models/L4/administrative/L4_Admin_model.bin')
except:
return jsonify({"message": "Can't load the L4 (Administrative) model"}), 500
#Getting L4 prediction and accuracy
L4_label,L4_accuracy = get_label_and_accuracy(model,(L3_label+ " " +product_name))
L4_return_label,L4_return_score,L4_label_status = get_return_labels(L4_label,L4_accuracy,0.75)
print("L4",L4_label,L4_accuracy)
# L0 = Beverage
elif L0_label == "beverage":
try:
model = fasttext.load_model('app/models/L4/beverage/L4_beverage_model.bin')
except:
return jsonify({"message": "Can't load the L4 (Beverage) model"}), 500
#Getting L4 prediction and accuracy
L4_label,L4_accuracy = get_label_and_accuracy(model,(L3_label+" "+product_name))
L4_return_score = None
L4_return_label,L4_return_score,L4_label_status = get_return_labels(L4_label,L4_accuracy,0.66)
print("L4",L4_label,L4_accuracy)
# L0 = Food
elif L0_label == "food":
try:
model = fasttext.load_model('app/models/L4/food/L4_food_model.bin')
except:
return jsonify({"message": "Can't load the L4 (Food) model"}), 500
#Getting L4 prediction and accuracy
L4_label,L4_accuracy = get_label_and_accuracy(model,(L3_label+" "+product_name))
L4_return_label,L4_return_score,L4_label_status = get_return_labels(L4_label,L4_accuracy,0.85)
print("L4",L4_label,L4_accuracy)
# L0 = Operationals
elif L0_label == "operationals":
try:
model = fasttext.load_model('app/models/L4/operationals/L4_operationals_model.bin')
except:
return jsonify({"message": "Can't load the L4 (Operationals) model"}), 500
#Getting L4 prediction and accuracy
L4_label,L4_accuracy = get_label_and_accuracy(model,(L3_label+" "+product_name))
L4_return_label,L4_return_score,L4_label_status = get_return_labels(L4_label,L4_accuracy,0.8)
print("L4",L4_label,L4_accuracy)
# Error prediction on L4 Category (Can't happen)
else:
return jsonify({"message": "Error prediction of L4 Category"}), 422
if not L4_label:
return jsonify({"message": "Error predicting L4 Category"}), 422
# Logging the task
Logger.info(message="Done processing FSA categorical data for" + product_name)
# Rreturning the result as JSON
return jsonify({
"classification_results": {
"l0": L0_return_label,
"l1": L1_return_label,
"l2": L2_return_label,
"l3": L3_return_label,
"l4": L4_return_label
},
"scores": {
"l0": L0_return_score,
"l1": L1_return_score,
"l2": L2_return_score,
"l3": L3_return_score,
"l4": L4_return_score
},
"remarks":{
"l0": L0_label_status,
"l1": L1_label_status,
"l2": L2_label_status,
"l3": L3_label_status,
"l4": L4_label_status
},
"all_classification_results": {
"L0": L0_label,
"L1": L1_label,
"L2": L2_label,
"L3": L3_label,
"L4": L4_label
},
"all_scores": {
"L0": L0_accuracy,
"L1": L1_accuracy,
"L2": L2_accuracy,
"L3": L3_accuracy,
"L4": L4_accuracy
}
}), 200
# Batch processing
@fsa.route("/process-csv", methods=["POST"])
def process_csv():
# Get the body of the json
body = request.json
# Error passing for missing body
if not body:
return jsonify({"message": "Cannot decode JSON from the body"}), 422
# It is assumed that uploaded file name in the file_name JSON field
file_name = body.get("uploaded_file_name")
# Original file name
original_file_name = body.get("original_file_name") or file_name
# Missing file name
if not file_name:
return jsonify({"message": "File name is missing"}), 422
files = [{"name": f"fsa_input_{file_name}", "path": f"FSA Categorization/input/{file_name}"}]
# Download files from S3 bucket of AWS
# File is downloaded to th 'app/constants/{file}'
for file in files:
download_status = download_file_from_s3(
file_name=file["name"], file_path=file["path"]
)
if isinstance(download_status, botocore.exceptions.ClientError):
return (
jsonify({"message": f"Error downloading {file} from s3"}),
422,
)
# Get the dataframe of the csv to check whether "ProdName" column is available
df = read_files(file_name=file_name)
# Check for product_names in columns
if "product_name" not in df.columns:
remove_files(f"fsa_input_{file_name}")
return jsonify({"message": "Product name column is missing from the CSV"}), 422
# Create a task
created_task = Tasks.create(file_name=file_name, original_file_name=original_file_name)
# Create a json object of data to pass the process
data = {
"file_name": file_name,
"table_idx": created_task.id,
"update_db": update_db
}
db.session.close()
# Add the process to process pool executor
result_future = process_executor.submit(process_fsa_categories_v2, (data))
# Creating a thread with data
# thread = FSAThread_V2(data=data)
# thread.start()
# Testing route
return jsonify({"message": f"{file_name} - File processing starting"}), 200