Spaces:
Running
Running
sabarish
Optimize app for 500+ users: Add multithreaded CSV uploads and LRU/TTL caching for Dashboards and NLP engine
c89f56c | import os | |
| import pandas as pd | |
| from werkzeug.utils import secure_filename | |
| from utils.nlp_utils import preprocess_text, analyze_sentiment | |
| from models import db, Feedback, Upload | |
| from flask import current_app | |
| def allowed_file(filename): | |
| return '.' in filename and \ | |
| filename.rsplit('.', 1)[1].lower() in current_app.config['ALLOWED_EXTENSIONS'] | |
| def identify_text_column(df): | |
| """ | |
| Heuristic to find the column most likely containing the feedback text. | |
| Looks for keywords or selects the column with the longest average string length. | |
| """ | |
| # Lowercase column names for easier checking | |
| col_names = [str(c).lower().strip() for c in df.columns] | |
| # Expanded heuristic matching for academic, social media, and product reviews | |
| keywords = [ | |
| 'review_text', 'feedback_text', 'text', 'feedback', 'review', | |
| 'comment', 'description', 'message', 'body', 'content', 'tweet', 'post' | |
| ] | |
| for kw in keywords: | |
| for i, col in enumerate(col_names): | |
| if kw == col or kw in col: | |
| return df.columns[i] | |
| # Fallback: Find column with string type and highest max length | |
| text_cols = df.select_dtypes(include=['object', 'string']).columns | |
| if len(text_cols) == 0: | |
| return None | |
| max_len_col = text_cols[0] | |
| max_len = 0 | |
| for col in text_cols: | |
| # Get mean length of strings in this column (sample first 50 rows for speed) | |
| avg_len = df[col].astype(str).head(50).apply(len).mean() | |
| if avg_len > max_len: | |
| max_len = avg_len | |
| max_len_col = col | |
| return max_len_col | |
| import threading | |
| def process_uploaded_file_async(app, filepath, upload_record_id, user_id, selected_column=None): | |
| """ | |
| Background worker function that runs outside the active request thread. | |
| """ | |
| with app.app_context(): | |
| try: | |
| # Read file | |
| if filepath.endswith('.csv'): | |
| df = pd.read_csv(filepath) | |
| elif filepath.endswith(('.xls', '.xlsx')): | |
| df = pd.read_excel(filepath) | |
| else: | |
| _mark_upload_status(upload_record_id, 'Failed', "Unsupported file format") | |
| return | |
| if df.empty: | |
| _mark_upload_status(upload_record_id, 'Failed', "The uploaded file is empty") | |
| return | |
| # Determine target text column | |
| text_col = selected_column if selected_column and selected_column in df.columns else identify_text_column(df) | |
| if not text_col: | |
| _mark_upload_status(upload_record_id, 'Failed', "Could not identify a text column") | |
| return | |
| # Look for a department/category column if it exists | |
| dept_col = None | |
| dept_keywords = [ | |
| 'department', 'category', 'product', 'dept', 'course', | |
| 'branch', 'faculty', 'subject', 'program', 'unit' | |
| ] | |
| for col in df.columns: | |
| if any(kw in str(col).lower().strip() for kw in dept_keywords): | |
| dept_col = col | |
| break | |
| # Drop NA from the text column to avoid processing empties | |
| df = df.dropna(subset=[text_col]) | |
| total_rows = len(df) | |
| # Update upload tally to track maximum rows early | |
| upload_record = Upload.query.get(upload_record_id) | |
| if upload_record: | |
| upload_record.total_rows = total_rows | |
| db.session.commit() | |
| feedbacks = [] | |
| for index, row in df.iterrows(): | |
| orig_text = str(row[text_col]) | |
| # Skip very empty rows | |
| if not orig_text.strip() or orig_text.lower() == 'nan': | |
| continue | |
| cleaned_text = preprocess_text(orig_text) | |
| sentiment, score = analyze_sentiment(cleaned_text) | |
| department = str(row[dept_col]) if dept_col and pd.notna(row[dept_col]) else None | |
| feedback = Feedback( | |
| user_id=user_id, | |
| upload_id=upload_record_id, | |
| original_text=orig_text, | |
| cleaned_text=cleaned_text, | |
| sentiment=sentiment, | |
| sentiment_score=score, | |
| department_category=department | |
| ) | |
| feedbacks.append(feedback) | |
| # Commit in chunks of 50 to show live progress and conserve memory | |
| if len(feedbacks) >= 50: | |
| db.session.bulk_save_objects(feedbacks) | |
| upload_record = Upload.query.get(upload_record_id) | |
| if upload_record: | |
| upload_record.processed_rows += len(feedbacks) | |
| db.session.commit() | |
| feedbacks = [] | |
| # Save remaining | |
| if feedbacks: | |
| db.session.bulk_save_objects(feedbacks) | |
| upload_record = Upload.query.get(upload_record_id) | |
| if upload_record: | |
| upload_record.processed_rows += len(feedbacks) | |
| db.session.commit() | |
| # Mark as totally completed | |
| upload_record = Upload.query.get(upload_record_id) | |
| if upload_record: | |
| upload_record.status = 'Completed' | |
| db.session.commit() | |
| except Exception as e: | |
| db.session.rollback() | |
| _mark_upload_status(upload_record_id, 'Failed', str(e)) | |
| def _mark_upload_status(upload_id, status, error_msg=None): | |
| upload_record = Upload.query.get(upload_id) | |
| if upload_record: | |
| upload_record.status = 'Failed' | |
| db.session.commit() | |
| if error_msg: | |
| print(f"Upload {upload_id} failed: {error_msg}") | |
| def process_uploaded_file(filepath, upload_record_id, user_id, selected_column=None): | |
| """ | |
| Entry point for the web route. Instantly spawns a background thread and returns to not block the browser. | |
| """ | |
| app = current_app._get_current_object() | |
| thread = threading.Thread( | |
| target=process_uploaded_file_async, | |
| args=(app, filepath, upload_record_id, user_id, selected_column) | |
| ) | |
| thread.daemon = True # Allows server to shut down freely | |
| thread.start() | |
| return True, "File uploaded successfully. Neural Network is processing rows in the background!" | |