| |
|
| | import pandas as pd
|
| | import re
|
| | from datetime import datetime
|
| | import numpy as np
|
| | import sqlite3
|
| | import os
|
| | import streamlit as st
|
| |
|
| | class SalesDataProcessor:
|
| | def __init__(self, db):
|
| | self.db = db
|
| | self.setup_product_mapping()
|
| | self.setup_location_mapping()
|
| | self.setup_database_tables()
|
| |
|
| | def setup_database_tables(self):
|
| | """Initialize database tables if they don't exist"""
|
| | conn = self.db.get_connection()
|
| | cursor = conn.cursor()
|
| |
|
| |
|
| | cursor.execute('''
|
| | CREATE TABLE IF NOT EXISTS sales (
|
| | id INTEGER PRIMARY KEY AUTOINCREMENT,
|
| | source_sheet TEXT,
|
| | sr_no TEXT,
|
| | customer_name TEXT,
|
| | village TEXT,
|
| | taluka TEXT,
|
| | district TEXT,
|
| | invoice_no TEXT UNIQUE,
|
| | reference TEXT,
|
| | dispatch_date TEXT,
|
| | product_type TEXT,
|
| | quantity INTEGER,
|
| | rate_per_unit REAL,
|
| | amount REAL,
|
| | final_amount REAL,
|
| | total_liters REAL,
|
| | payment_date TEXT,
|
| | gpay_amount REAL,
|
| | cash_amount REAL,
|
| | cheque_amount REAL,
|
| | rrn_number TEXT,
|
| | sold_by TEXT,
|
| | sale_type TEXT,
|
| | payment_status TEXT,
|
| | payment_method TEXT,
|
| | processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
| | source_file TEXT
|
| | )
|
| | ''')
|
| |
|
| |
|
| | cursor.execute('''
|
| | CREATE TABLE IF NOT EXISTS customers (
|
| | id INTEGER PRIMARY KEY AUTOINCREMENT,
|
| | customer_name TEXT,
|
| | village TEXT,
|
| | taluka TEXT,
|
| | district TEXT,
|
| | total_purchases REAL DEFAULT 0,
|
| | total_orders INTEGER DEFAULT 0,
|
| | last_order_date TEXT,
|
| | created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
| | )
|
| | ''')
|
| |
|
| | conn.commit()
|
| | conn.close()
|
| |
|
| | def setup_product_mapping(self):
|
| | """Standard product mapping for all packaging types"""
|
| | self.PRODUCT_MAPPING = {
|
| | '1 LTR PLASTIC JAR': '1L_PLASTIC_JAR',
|
| | '2 LTR PLASTIC JAR': '2L_PLASTIC_JAR',
|
| | '5 LTR PLASTIC JAR': '5L_PLASTIC_JAR',
|
| | '10 LTR PLASTIC JAR': '10L_PLASTIC_JAR',
|
| | '5 LTR STEEL BARNI': '5L_STEEL_BARNI',
|
| | '10 LTR STEEL BARNI': '10L_STEEL_BARNI',
|
| | '20 LTR STEEL BARNI': '20L_STEEL_BARNI',
|
| | '20 LTR PLASTIC CAN': '20L_PLASTIC_CAN',
|
| | '1 LTR PET BOTTLE': '1L_PET_BOTTLE',
|
| | '20 LTR CARBO': '20L_CARBO'
|
| | }
|
| |
|
| | def setup_location_mapping(self):
|
| | """Gujarati location name standardization"""
|
| | self.GUJARATI_LOCALITIES = {
|
| | 'રામપુરા': 'RAMPURA',
|
| | 'શેખડી': 'SHEKHADI',
|
| | 'સિંહોલ': 'SINHOL',
|
| | 'વનાદરા': 'VANADARA',
|
| | 'માવલી': 'MAVLI',
|
| | 'સિમરડા': 'SIMRADA',
|
| | 'બિલપડ': 'BILPAD',
|
| | 'વઘોડિયા': 'VAGHODIA',
|
| | 'સાકરિયા': 'SAKARIYA'
|
| | }
|
| |
|
| | def safe_float(self, value):
|
| | """Safely convert to float, handle errors"""
|
| | if pd.isna(value) or value in ['', 'NOT_AVAILABLE', None, '_']:
|
| | return 0.0
|
| | try:
|
| | return float(value)
|
| | except (ValueError, TypeError):
|
| | return 0.0
|
| |
|
| | def safe_int(self, value):
|
| | """Safely convert to integer"""
|
| | return int(self.safe_float(value))
|
| |
|
| | def parse_date(self, date_str):
|
| | """Handle all date formats intelligently"""
|
| | if pd.isna(date_str) or date_str in ['', 'NOT_AVAILABLE', None, '_']:
|
| | return 'NOT_AVAILABLE'
|
| |
|
| | if isinstance(date_str, (int, float)):
|
| | try:
|
| | return (datetime(1899, 12, 30) + pd.Timedelta(days=date_str)).strftime('%Y-%m-%d')
|
| | except:
|
| | return 'INVALID_DATE'
|
| |
|
| | date_str = str(date_str).strip()
|
| |
|
| | date_formats = [
|
| | '%Y-%m-%d %H:%M:%S',
|
| | '%d/%m/%Y',
|
| | '%Y-%m-%d',
|
| | '%d-%m-%Y',
|
| | '%d/%m/%Y %H:%M:%S'
|
| | ]
|
| |
|
| | for fmt in date_formats:
|
| | try:
|
| | return datetime.strptime(date_str, fmt).strftime('%Y-%m-%d')
|
| | except ValueError:
|
| | continue
|
| |
|
| | return 'INVALID_DATE'
|
| |
|
| | def clean_name(self, name):
|
| | """Handle names, duplicates, variations"""
|
| | if pd.isna(name) or name in ['', '-', '_', None]:
|
| | return 'NOT_AVAILABLE'
|
| | name = ' '.join(str(name).strip().split())
|
| | return name
|
| |
|
| | def standardize_location(self, location):
|
| | """Handle Gujarati location names"""
|
| | if pd.isna(location) or location in ['', 'NOT_AVAILABLE', None]:
|
| | return 'NOT_AVAILABLE'
|
| |
|
| | location_str = str(location).strip()
|
| |
|
| | if isinstance(location_str, str):
|
| | for guj_name, eng_name in self.GUJARATI_LOCALITIES.items():
|
| | if guj_name in location_str:
|
| | return eng_name
|
| |
|
| | return location_str.upper()
|
| |
|
| | def standardize_product(self, product_name):
|
| | """Convert any product name to standard format"""
|
| | if pd.isna(product_name) or product_name in ['', 'NOT_AVAILABLE', None]:
|
| | return 'UNKNOWN_PRODUCT'
|
| |
|
| | product_str = str(product_name).strip()
|
| | product_upper = product_str.upper()
|
| |
|
| | for key, value in self.PRODUCT_MAPPING.items():
|
| | if key in product_upper:
|
| | return value
|
| |
|
| |
|
| | if '1 LTR' in product_upper or '1L' in product_upper:
|
| | if 'PLASTIC' in product_upper or 'JAR' in product_upper:
|
| | return '1L_PLASTIC_JAR'
|
| | elif 'PET' in product_upper or 'BOTTLE' in product_upper:
|
| | return '1L_PET_BOTTLE'
|
| | elif '2 LTR' in product_upper or '2L' in product_upper:
|
| | return '2L_PLASTIC_JAR'
|
| | elif '5 LTR' in product_upper or '5L' in product_upper:
|
| | if 'STEEL' in product_upper or 'BARNI' in product_upper:
|
| | return '5L_STEEL_BARNI'
|
| | else:
|
| | return '5L_PLASTIC_JAR'
|
| | elif '10 LTR' in product_upper or '10L' in product_upper:
|
| | if 'STEEL' in product_upper or 'BARNI' in product_upper:
|
| | return '10L_STEEL_BARNI'
|
| | else:
|
| | return '10L_PLASTIC_JAR'
|
| | elif '20 LTR' in product_upper or '20L' in product_upper:
|
| | if 'STEEL' in product_upper or 'BARNI' in product_upper:
|
| | return '20L_STEEL_BARNI'
|
| | elif 'PLASTIC' in product_upper or 'CAN' in product_upper:
|
| | return '20L_PLASTIC_CAN'
|
| | elif 'CARBO' in product_upper:
|
| | return '20L_CARBO'
|
| |
|
| | return f"UNKNOWN_{product_upper.replace(' ', '_')}"
|
| |
|
| | def detect_sale_type(self, row):
|
| | """Detect if it's demo sale (single unit) or bulk sale"""
|
| | quantity = self.safe_int(row.get('QTN', 0))
|
| | reference = str(row.get('REF.', '')).upper()
|
| |
|
| | if reference == 'DEMO' or quantity == 1:
|
| | return 'DEMO_SALE'
|
| | else:
|
| | return 'BULK_SALE'
|
| |
|
| | def calculate_payment_status(self, row):
|
| | """Determine payment status intelligently"""
|
| | final_amt = self.safe_float(row.get('FINAL AMT', 0))
|
| | gpay = self.safe_float(row.get('G-PAY', 0))
|
| | cash = self.safe_float(row.get('CASH', 0))
|
| | cheque = self.safe_float(row.get('CHQ', 0))
|
| |
|
| | paid_amt = gpay + cash + cheque
|
| |
|
| | if paid_amt >= final_amt:
|
| | return 'PAID'
|
| | elif paid_amt > 0:
|
| | return 'PARTIAL_PAID'
|
| | elif self.parse_date(row.get('PAYMENT DATE')) not in ['NOT_AVAILABLE', 'INVALID_DATE']:
|
| | return 'PENDING'
|
| | else:
|
| | return 'UNPAID'
|
| |
|
| | def detect_payment_method(self, row):
|
| | """Intelligently detect payment method"""
|
| | gpay = self.safe_float(row.get('G-PAY', 0))
|
| | cash = self.safe_float(row.get('CASH', 0))
|
| | cheque = self.safe_float(row.get('CHQ', 0))
|
| |
|
| | if gpay > 0:
|
| | return 'GPAY'
|
| | elif cash > 0:
|
| | return 'CASH'
|
| | elif cheque > 0:
|
| | return 'CHEQUE'
|
| | else:
|
| | return 'NOT_PAID'
|
| |
|
| | def process_dataframe(self, df, sheet_name, source_file):
|
| | """Process entire dataframe and standardize all records"""
|
| | standardized_records = []
|
| |
|
| | for idx, row in df.iterrows():
|
| | if (pd.isna(row.get('NAME', '')) and
|
| | pd.isna(row.get('PACKING', '')) and
|
| | pd.isna(row.get('INV NO', ''))):
|
| | continue
|
| |
|
| | try:
|
| | standardized_record = self.standardize_record(row, sheet_name, source_file)
|
| | standardized_records.append(standardized_record)
|
| | except Exception as e:
|
| | st.error(f"⚠️ Error processing row {idx}: {e}")
|
| | continue
|
| |
|
| | return standardized_records
|
| |
|
| | def standardize_record(self, row, sheet_name, source_file):
|
| | """Standardize a single record"""
|
| | record = {
|
| | 'source_sheet': sheet_name,
|
| | 'sr_no': self.clean_name(row.get('SR NO.', 'NOT_AVAILABLE')),
|
| | 'customer_name': self.clean_name(row.get('NAME', 'NOT_AVAILABLE')),
|
| | 'village': self.standardize_location(row.get('VILLAGE', 'NOT_AVAILABLE')),
|
| | 'taluka': self.standardize_location(row.get('TALUKA', 'NOT_AVAILABLE')),
|
| | 'district': self.standardize_location(row.get('DISTRICT', 'NOT_AVAILABLE')),
|
| | 'invoice_no': self.clean_name(row.get('INV NO', 'NOT_AVAILABLE')),
|
| | 'reference': self.clean_name(row.get('REF.', 'NOT_AVAILABLE')),
|
| | 'dispatch_date': self.parse_date(row.get('DISPATCH DATE')),
|
| | 'product_type': self.standardize_product(row.get('PACKING', 'NOT_AVAILABLE')),
|
| | 'quantity': self.safe_int(row.get('QTN', 0)),
|
| | 'rate_per_unit': self.safe_float(row.get('RATE', 0)),
|
| | 'amount': self.safe_float(row.get('AMT', 0)),
|
| | 'final_amount': self.safe_float(row.get('FINAL AMT', 0)),
|
| | 'total_liters': self.safe_float(row.get('TOTAL LTR', 0)),
|
| | 'payment_date': self.parse_date(row.get('PAYMENT DATE')),
|
| | 'gpay_amount': self.safe_float(row.get('G-PAY', 0)),
|
| | 'cash_amount': self.safe_float(row.get('CASH', 0)),
|
| | 'cheque_amount': self.safe_float(row.get('CHQ', 0)),
|
| | 'rrn_number': self.clean_name(row.get('RRN', 'NOT_AVAILABLE')),
|
| | 'sold_by': self.clean_name(row.get('BY', 'NOT_AVAILABLE')),
|
| | 'sale_type': self.detect_sale_type(row),
|
| | 'payment_status': self.calculate_payment_status(row),
|
| | 'payment_method': self.detect_payment_method(row),
|
| | 'source_file': os.path.basename(source_file)
|
| | }
|
| |
|
| |
|
| | if record['amount'] == 0 and record['quantity'] > 0 and record['rate_per_unit'] > 0:
|
| | record['amount'] = record['quantity'] * record['rate_per_unit']
|
| |
|
| | if record['final_amount'] == 0 and record['amount'] > 0:
|
| | record['final_amount'] = record['amount']
|
| |
|
| | return record
|
| |
|
| | def insert_into_database(self, records):
|
| | """Insert processed records into database"""
|
| | conn = self.db.get_connection()
|
| | cursor = conn.cursor()
|
| |
|
| | inserted_count = 0
|
| | updated_count = 0
|
| |
|
| | for record in records:
|
| | try:
|
| |
|
| | cursor.execute('SELECT id FROM sales WHERE invoice_no = ?', (record['invoice_no'],))
|
| | existing = cursor.fetchone()
|
| |
|
| | if existing:
|
| |
|
| | update_query = '''
|
| | UPDATE sales SET
|
| | source_sheet=?, sr_no=?, customer_name=?, village=?, taluka=?, district=?,
|
| | reference=?, dispatch_date=?, product_type=?, quantity=?, rate_per_unit=?,
|
| | amount=?, final_amount=?, total_liters=?, payment_date=?, gpay_amount=?,
|
| | cash_amount=?, cheque_amount=?, rrn_number=?, sold_by=?, sale_type=?,
|
| | payment_status=?, payment_method=?, source_file=?
|
| | WHERE invoice_no=?
|
| | '''
|
| | cursor.execute(update_query, (
|
| | record['source_sheet'], record['sr_no'], record['customer_name'],
|
| | record['village'], record['taluka'], record['district'],
|
| | record['reference'], record['dispatch_date'], record['product_type'],
|
| | record['quantity'], record['rate_per_unit'], record['amount'],
|
| | record['final_amount'], record['total_liters'], record['payment_date'],
|
| | record['gpay_amount'], record['cash_amount'], record['cheque_amount'],
|
| | record['rrn_number'], record['sold_by'], record['sale_type'],
|
| | record['payment_status'], record['payment_method'], record['source_file'],
|
| | record['invoice_no']
|
| | ))
|
| | updated_count += 1
|
| | else:
|
| |
|
| | insert_query = '''
|
| | INSERT INTO sales (
|
| | source_sheet, sr_no, customer_name, village, taluka, district,
|
| | invoice_no, reference, dispatch_date, product_type, quantity,
|
| | rate_per_unit, amount, final_amount, total_liters, payment_date,
|
| | gpay_amount, cash_amount, cheque_amount, rrn_number, sold_by,
|
| | sale_type, payment_status, payment_method, source_file
|
| | ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
| | '''
|
| | cursor.execute(insert_query, (
|
| | record['source_sheet'], record['sr_no'], record['customer_name'],
|
| | record['village'], record['taluka'], record['district'],
|
| | record['invoice_no'], record['reference'], record['dispatch_date'],
|
| | record['product_type'], record['quantity'], record['rate_per_unit'],
|
| | record['amount'], record['final_amount'], record['total_liters'],
|
| | record['payment_date'], record['gpay_amount'], record['cash_amount'],
|
| | record['cheque_amount'], record['rrn_number'], record['sold_by'],
|
| | record['sale_type'], record['payment_status'], record['payment_method'],
|
| | record['source_file']
|
| | ))
|
| | inserted_count += 1
|
| |
|
| | except Exception as e:
|
| | st.error(f"❌ Database error for invoice {record['invoice_no']}: {e}")
|
| | continue
|
| |
|
| | conn.commit()
|
| |
|
| |
|
| | self.update_customers_table(conn)
|
| |
|
| | conn.close()
|
| |
|
| | return inserted_count, updated_count
|
| |
|
| | def update_customers_table(self, conn):
|
| | """Update customers table from sales data"""
|
| | cursor = conn.cursor()
|
| |
|
| |
|
| | cursor.execute('DELETE FROM customers')
|
| |
|
| |
|
| | cursor.execute('''
|
| | INSERT INTO customers (customer_name, village, taluka, district, total_purchases, total_orders, last_order_date)
|
| | SELECT
|
| | customer_name,
|
| | village,
|
| | taluka,
|
| | district,
|
| | SUM(final_amount) as total_purchases,
|
| | COUNT(*) as total_orders,
|
| | MAX(dispatch_date) as last_order_date
|
| | FROM sales
|
| | WHERE customer_name != 'NOT_AVAILABLE'
|
| | GROUP BY customer_name, village, taluka, district
|
| | ''')
|
| |
|
| | conn.commit()
|
| |
|
| | def process_excel_file(self, file_path):
|
| | """Main method to process Excel file - called from Streamlit"""
|
| | try:
|
| | st.info(f"🔄 Processing: {os.path.basename(file_path)}")
|
| |
|
| |
|
| | xl = pd.ExcelFile(file_path)
|
| |
|
| |
|
| | all_records = []
|
| |
|
| | for sheet_name in xl.sheet_names:
|
| | with st.spinner(f"Processing sheet: {sheet_name}..."):
|
| |
|
| | df = pd.read_excel(file_path, sheet_name=sheet_name)
|
| |
|
| |
|
| | standardized_records = self.process_dataframe(df, sheet_name, file_path)
|
| | all_records.extend(standardized_records)
|
| |
|
| | if not all_records:
|
| | st.warning("⚠️ No valid records found in the file")
|
| | return False
|
| |
|
| |
|
| | with st.spinner("Inserting into database..."):
|
| | inserted, updated = self.insert_into_database(all_records)
|
| |
|
| |
|
| | if inserted > 0 or updated > 0:
|
| | st.success(f"✅ Processed {len(all_records)} records from {os.path.basename(file_path)}")
|
| | st.success(f"📊 New: {inserted}, Updated: {updated}")
|
| |
|
| |
|
| | self.show_import_summary(all_records)
|
| | return True
|
| | else:
|
| | st.warning("⚠️ No records were inserted or updated")
|
| | return False
|
| |
|
| | except Exception as e:
|
| | st.error(f"❌ Error processing file: {e}")
|
| | return False
|
| |
|
| | def show_import_summary(self, records):
|
| | """Show summary of imported data"""
|
| | if not records:
|
| | return
|
| |
|
| | df = pd.DataFrame(records)
|
| |
|
| | col1, col2, col3, col4 = st.columns(4)
|
| |
|
| | with col1:
|
| | st.metric("Total Records", len(records))
|
| | with col2:
|
| | demo_sales = len(df[df['sale_type'] == 'DEMO_SALE'])
|
| | st.metric("Demo Sales", demo_sales)
|
| | with col3:
|
| | bulk_sales = len(df[df['sale_type'] == 'BULK_SALE'])
|
| | st.metric("Bulk Sales", bulk_sales)
|
| | with col4:
|
| | total_amount = df['final_amount'].sum()
|
| | st.metric("Total Amount", f"₹{total_amount:,.2f}")
|
| |
|
| |
|
| | st.subheader("📦 Products Imported")
|
| | product_summary = df['product_type'].value_counts().head(5)
|
| | for product, count in product_summary.items():
|
| | st.write(f"- {product}: {count} records")
|
| |
|
| | def get_import_stats(self):
|
| | """Get import statistics for dashboard"""
|
| | conn = self.db.get_connection()
|
| |
|
| | try:
|
| |
|
| | total_records = pd.read_sql('SELECT COUNT(*) as count FROM sales', conn)['count'].iloc[0]
|
| |
|
| |
|
| | files_processed = pd.read_sql('SELECT COUNT(DISTINCT source_file) as count FROM sales', conn)['count'].iloc[0]
|
| |
|
| |
|
| | recent_imports = pd.read_sql('''
|
| | SELECT source_file, COUNT(*) as records, MAX(processed_at) as last_import
|
| | FROM sales
|
| | GROUP BY source_file
|
| | ORDER BY last_import DESC
|
| | LIMIT 5
|
| | ''', conn)
|
| |
|
| | return {
|
| | 'total_records': total_records,
|
| | 'files_processed': files_processed,
|
| | 'recent_imports': recent_imports.to_dict('records')
|
| | }
|
| | finally:
|
| | conn.close() |