sentiment-analysis / utils /file_processor.py
sabarish
Optimize app for 500+ users: Add multithreaded CSV uploads and LRU/TTL caching for Dashboards and NLP engine
c89f56c
import os
import pandas as pd
from werkzeug.utils import secure_filename
from utils.nlp_utils import preprocess_text, analyze_sentiment
from models import db, Feedback, Upload
from flask import current_app
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in current_app.config['ALLOWED_EXTENSIONS']
def identify_text_column(df):
"""
Heuristic to find the column most likely containing the feedback text.
Looks for keywords or selects the column with the longest average string length.
"""
# Lowercase column names for easier checking
col_names = [str(c).lower().strip() for c in df.columns]
# Expanded heuristic matching for academic, social media, and product reviews
keywords = [
'review_text', 'feedback_text', 'text', 'feedback', 'review',
'comment', 'description', 'message', 'body', 'content', 'tweet', 'post'
]
for kw in keywords:
for i, col in enumerate(col_names):
if kw == col or kw in col:
return df.columns[i]
# Fallback: Find column with string type and highest max length
text_cols = df.select_dtypes(include=['object', 'string']).columns
if len(text_cols) == 0:
return None
max_len_col = text_cols[0]
max_len = 0
for col in text_cols:
# Get mean length of strings in this column (sample first 50 rows for speed)
avg_len = df[col].astype(str).head(50).apply(len).mean()
if avg_len > max_len:
max_len = avg_len
max_len_col = col
return max_len_col
import threading
def process_uploaded_file_async(app, filepath, upload_record_id, user_id, selected_column=None):
"""
Background worker function that runs outside the active request thread.
"""
with app.app_context():
try:
# Read file
if filepath.endswith('.csv'):
df = pd.read_csv(filepath)
elif filepath.endswith(('.xls', '.xlsx')):
df = pd.read_excel(filepath)
else:
_mark_upload_status(upload_record_id, 'Failed', "Unsupported file format")
return
if df.empty:
_mark_upload_status(upload_record_id, 'Failed', "The uploaded file is empty")
return
# Determine target text column
text_col = selected_column if selected_column and selected_column in df.columns else identify_text_column(df)
if not text_col:
_mark_upload_status(upload_record_id, 'Failed', "Could not identify a text column")
return
# Look for a department/category column if it exists
dept_col = None
dept_keywords = [
'department', 'category', 'product', 'dept', 'course',
'branch', 'faculty', 'subject', 'program', 'unit'
]
for col in df.columns:
if any(kw in str(col).lower().strip() for kw in dept_keywords):
dept_col = col
break
# Drop NA from the text column to avoid processing empties
df = df.dropna(subset=[text_col])
total_rows = len(df)
# Update upload tally to track maximum rows early
upload_record = Upload.query.get(upload_record_id)
if upload_record:
upload_record.total_rows = total_rows
db.session.commit()
feedbacks = []
for index, row in df.iterrows():
orig_text = str(row[text_col])
# Skip very empty rows
if not orig_text.strip() or orig_text.lower() == 'nan':
continue
cleaned_text = preprocess_text(orig_text)
sentiment, score = analyze_sentiment(cleaned_text)
department = str(row[dept_col]) if dept_col and pd.notna(row[dept_col]) else None
feedback = Feedback(
user_id=user_id,
upload_id=upload_record_id,
original_text=orig_text,
cleaned_text=cleaned_text,
sentiment=sentiment,
sentiment_score=score,
department_category=department
)
feedbacks.append(feedback)
# Commit in chunks of 50 to show live progress and conserve memory
if len(feedbacks) >= 50:
db.session.bulk_save_objects(feedbacks)
upload_record = Upload.query.get(upload_record_id)
if upload_record:
upload_record.processed_rows += len(feedbacks)
db.session.commit()
feedbacks = []
# Save remaining
if feedbacks:
db.session.bulk_save_objects(feedbacks)
upload_record = Upload.query.get(upload_record_id)
if upload_record:
upload_record.processed_rows += len(feedbacks)
db.session.commit()
# Mark as totally completed
upload_record = Upload.query.get(upload_record_id)
if upload_record:
upload_record.status = 'Completed'
db.session.commit()
except Exception as e:
db.session.rollback()
_mark_upload_status(upload_record_id, 'Failed', str(e))
def _mark_upload_status(upload_id, status, error_msg=None):
upload_record = Upload.query.get(upload_id)
if upload_record:
upload_record.status = 'Failed'
db.session.commit()
if error_msg:
print(f"Upload {upload_id} failed: {error_msg}")
def process_uploaded_file(filepath, upload_record_id, user_id, selected_column=None):
"""
Entry point for the web route. Instantly spawns a background thread and returns to not block the browser.
"""
app = current_app._get_current_object()
thread = threading.Thread(
target=process_uploaded_file_async,
args=(app, filepath, upload_record_id, user_id, selected_column)
)
thread.daemon = True # Allows server to shut down freely
thread.start()
return True, "File uploaded successfully. Neural Network is processing rows in the background!"