statistical / app.py
LeeMinHoon's picture
Update app.py
db6d9dc verified
import gradio as gr
import pandas as pd
import logging
from io import BytesIO
import datetime
import zipfile
import tempfile
import os
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
class ExcelDataProcessor:
def __init__(self, excel_file=None):
self.videos_sheet = None
if excel_file:
self._initialize_videos_sheet(excel_file)
logger.info("ExcelDataProcessor initialized")
def _initialize_videos_sheet(self, excel_file):
"""Initialize videos_sheet by combining relevant sheets and normalizing data"""
try:
df_dict = pd.read_excel(excel_file, sheet_name=None)
sheet_dfs = {name: df for name, df in df_dict.items() if '.' in name}
if not sheet_dfs:
logger.warning("No sheets found with '.' in their names")
return
# Combine sheets
combined_df = pd.concat(
[df.assign(SheetName=name) for name, df in sheet_dfs.items()],
ignore_index=True
)
combined_df = combined_df.dropna(how='all')
# Normalize 'Created At' column
if 'Created At' in combined_df.columns:
def parse_date(date_str):
if pd.isna(date_str):
return pd.NaT
try:
# Try ISO 8601 format (2025-05-11T19:50:53Z)
return pd.to_datetime(date_str, utc=True).date()
except:
try:
# Try DD-MM-YYYY format (18-04-2025)
return pd.to_datetime(date_str, format='%d-%m-%Y').date()
except:
logger.warning(f"Cannot parse date: {date_str}")
return pd.NaT
combined_df['Created At'] = combined_df['Created At'].apply(parse_date)
self.videos_sheet = combined_df
logger.info("Initialized videos_sheet with combined data")
except Exception as e:
logger.error(f"Error initializing videos_sheet: {str(e)}")
self.videos_sheet = None
@staticmethod
def _extract_number(sheet_name):
"""Extract number from sheet name for sorting"""
try:
return int(sheet_name[:sheet_name.find('.')])
except ValueError:
logger.warning(f"Could not extract number from sheet name: {sheet_name}")
return float('inf')
@staticmethod
def _create_output_buffer(df, base_name):
"""Create Excel file in memory"""
output = BytesIO()
with pd.ExcelWriter(output, engine='xlsxwriter') as writer:
df.to_excel(writer, sheet_name='Results')
output.seek(0)
return output, f"{base_name}.xlsx"
def _apply_date_filter(self, df, target_date, by_ref, use_filter):
"""Apply date filter based on operation type and filter choice"""
df_filtered = df.copy()
if not use_filter:
return df_filtered
if by_ref:
return df_filtered[df_filtered['Created At'] < target_date].copy()
return df_filtered[df_filtered['Created At'] >= target_date].copy()
def count_daily_registers_by_source_name(self, df, target_date, use_filter):
"""Count daily registers by source name"""
logger.info("Starting count_daily_registers_by_source_name")
df_filtered = self._apply_date_filter(df, target_date, False, use_filter)
df_filtered['Created At'] = pd.to_datetime(df_filtered['Created At']).dt.date
pivot_table = pd.pivot_table(
df_filtered,
index='Source Name',
columns='Created At',
values='User ID',
aggfunc='count',
fill_value=0
)
pivot_table.loc['Total'] = pivot_table.sum()
return pivot_table
def count_daily_registers_by_ref(self, df, target_date, use_filter):
"""Count daily registers by reference"""
logger.info("Starting count_daily_registers_by_ref")
df_filtered = self._apply_date_filter(df, target_date, True, use_filter)
df_filtered['Created At'] = pd.to_datetime(df_filtered['Created At']).dt.date
df_filtered.loc[(df_filtered['Source Name'] == 'direct') & (df_filtered['Ref By'].isna()), 'Ref By'] = 'direct'
pivot_table = pd.pivot_table(
df_filtered,
index='Ref By',
columns='Created At',
values='User ID',
aggfunc='count',
fill_value=0
)
pivot_table.loc['Total'] = pivot_table.sum()
return pivot_table
def count_users_by_source_name(self, df, target_date, use_filter):
"""Count unique users by source name"""
logger.info("Starting count_users_by_source_name")
df_filtered = self._apply_date_filter(df, target_date, False, use_filter)
df_filtered = df_filtered.drop_duplicates(subset=['User ID'], keep='first')
df_filtered['Created At'] = pd.to_datetime(df_filtered['Created At']).dt.date
pivot_table = pd.pivot_table(
df_filtered,
index='Source Name',
values='User ID',
aggfunc='count',
fill_value=0
)
return pivot_table
def count_users_by_ref(self, df, target_date, use_filter):
"""Count unique users by reference"""
logger.info("Starting count_users_by_ref")
df_filtered = self._apply_date_filter(df, target_date, True, use_filter)
df_filtered = df_filtered.drop_duplicates(subset=['User ID'], keep='first')
df_filtered['Created At'] = pd.to_datetime(df_filtered['Created At']).dt.date
df_filtered.loc[(df_filtered['Source Name'] == 'direct') & (df_filtered['Ref By'].isna()), 'Ref By'] = 'direct'
pivot_table = pd.pivot_table(
df_filtered,
index='Ref By',
values='User ID',
aggfunc='count',
fill_value=0
)
return pivot_table
def count_users_each_sheet_by_source_name(self, target_date, use_filter):
"""Count users in each sheet by source name"""
logger.info("Starting count_users_each_sheet_by_source_name")
if self.videos_sheet is None:
logger.warning("No videos_sheet data available")
return "No valid sheet data available", None
combined_df_filtered = self.videos_sheet.dropna(subset=['Source Name']).copy()
combined_df_filtered = self._apply_date_filter(combined_df_filtered, target_date, False, use_filter)
if not {'Source Name', 'User ID', 'SheetName'}.issubset(combined_df_filtered.columns):
return "Required columns missing", None
pivot_table = pd.pivot_table(
combined_df_filtered,
index='Source Name',
columns='SheetName',
values='User ID',
aggfunc='count',
fill_value=0
)
sorted_columns = sorted(pivot_table.columns, key=self._extract_number)
pivot_table = pivot_table[sorted_columns]
pivot_table.loc['Total'] = pivot_table.sum()
return "Success", pivot_table
def count_users_each_sheet_by_ref(self, target_date, use_filter):
"""Count users in each sheet by reference"""
logger.info("Starting count_users_each_sheet_by_ref")
if self.videos_sheet is None:
logger.warning("No videos_sheet data available")
return "No valid sheet data available", None
combined_df_filtered = self.videos_sheet.copy()
combined_df_filtered = self._apply_date_filter(combined_df_filtered, target_date, True, use_filter)
combined_df_filtered.loc[(combined_df_filtered['Source Name'] == 'direct') &
(combined_df_filtered['Ref By'].isna()), 'Ref By'] = 'direct'
if not {'Ref By', 'User ID', 'SheetName'}.issubset(combined_df_filtered.columns):
return "Required columns missing", None
pivot_table = pd.pivot_table(
combined_df_filtered,
index='Ref By',
columns='SheetName',
values='User ID',
aggfunc='count',
fill_value=0
)
sorted_columns = sorted(pivot_table.columns, key=self._extract_number)
pivot_table = pivot_table[sorted_columns]
pivot_table.loc['Total'] = pivot_table.sum()
return "Success", pivot_table
def count_users_each_sheet_by_date(self, target_date, use_filter):
"""Count users in each sheet by date"""
logger.info("Starting count_users_each_sheet_by_date")
if self.videos_sheet is None:
logger.warning("No videos_sheet data available")
return "No valid sheet data available", None
combined_df_filtered = self.videos_sheet[self.videos_sheet['Created At'].notna()].copy()
combined_df_filtered = self._apply_date_filter(combined_df_filtered, target_date, False, use_filter)
if not {'Created At', 'User ID', 'SheetName'}.issubset(combined_df_filtered.columns):
return "Required columns missing", None
pivot_table = pd.pivot_table(
combined_df_filtered,
index='Created At',
columns='SheetName',
values='User ID',
aggfunc='count',
fill_value=0
)
sorted_columns = sorted(pivot_table.columns, key=self._extract_number)
pivot_table = pivot_table[sorted_columns]
pivot_table.loc['Total'] = pivot_table.sum()
return "Success", pivot_table
def process_file(self, excel_file, operations, target_date, use_date_filter):
"""Process file with selected operations"""
logger.info(f"Processing file with operations: {operations}")
# Initialize videos_sheet if not already done
if self.videos_sheet is None:
self._initialize_videos_sheet(excel_file)
results = {}
output_files = []
result_preview = None
if not excel_file:
logger.warning("No file uploaded")
return "Please upload an Excel file", None, None
try:
# Process single-sheet operations
single_sheet_ops = [
"count_daily_registers_by_source_name",
"count_daily_registers_by_ref",
"count_users_by_source_name",
"count_users_by_ref"
]
if any(op in operations for op in single_sheet_ops):
df = pd.read_excel(excel_file, sheet_name="User Register")
# Normalize 'Created At' for User Register sheet
if 'Created At' in df.columns:
def parse_date(date_str):
if pd.isna(date_str):
return pd.NaT
try:
# Try ISO 8601 format (2025-05-11T19:50:53Z)
return pd.to_datetime(date_str, utc=True).date()
except:
try:
# Try DD-MM-YYYY format (18-04-2025)
return pd.to_datetime(date_str, format='%d-%m-%Y').date()
except:
logger.warning(f"Cannot parse date: {date_str}")
return pd.NaT
df['Created At'] = df['Created At'].apply(parse_date)
if "count_daily_registers_by_source_name" in operations:
pivot = self.count_daily_registers_by_source_name(df, target_date, use_date_filter)
results["Daily Registers by Source Name"] = pivot
buffer, filename = self._create_output_buffer(pivot, "daily_registers_source")
output_files.append((buffer, filename))
if result_preview is None:
result_preview = pivot
if "count_daily_registers_by_ref" in operations:
pivot = self.count_daily_registers_by_ref(df, target_date, use_date_filter)
results["Daily Registers by Ref"] = pivot
buffer, filename = self._create_output_buffer(pivot, "daily_registers_ref")
output_files.append((buffer, filename))
if result_preview is None:
result_preview = pivot
if "count_users_by_source_name" in operations:
pivot = self.count_users_by_source_name(df, target_date, use_date_filter)
results["Users by Source Name"] = pivot
buffer, filename = self._create_output_buffer(pivot, "users_source")
output_files.append((buffer, filename))
if result_preview is None:
result_preview = pivot
if "count_users_by_ref" in operations:
pivot = self.count_users_by_ref(df, target_date, use_date_filter)
results["Users by Ref"] = pivot
buffer, filename = self._create_output_buffer(pivot, "users_ref")
output_files.append((buffer, filename))
if result_preview is None:
result_preview = pivot
# Process multi-sheet operations
if "count_users_each_sheet_by_source_name" in operations:
status, pivot = self.count_users_each_sheet_by_source_name(target_date, use_date_filter)
if status != "Success":
return status, None, None
results["Users Each Sheet by Source Name"] = pivot
buffer, filename = self._create_output_buffer(pivot, "users_sheet_source")
output_files.append((buffer, filename))
if result_preview is None:
result_preview = pivot
if "count_users_each_sheet_by_ref" in operations:
status, pivot = self.count_users_each_sheet_by_ref(target_date, use_date_filter)
if status != "Success":
return status, None, None
results["Users Each Sheet by Ref"] = pivot
buffer, filename = self._create_output_buffer(pivot, "users_sheet_ref")
output_files.append((buffer, filename))
if result_preview is None:
result_preview = pivot
if "count_users_each_sheet_by_date" in operations:
status, pivot = self.count_users_each_sheet_by_date(target_date, use_date_filter)
if status != "Success":
return status, None, None
results["Users Each Sheet by Date"] = pivot
buffer, filename = self._create_output_buffer(pivot, "users_sheet_date")
output_files.append((buffer, filename))
if result_preview is None:
result_preview = pivot
# Create ZIP file
if output_files:
# Use temporary file for ZIP
with tempfile.NamedTemporaryFile(delete=False, suffix='.zip') as tmp_file:
with zipfile.ZipFile(tmp_file, 'w') as zip_file:
for buffer, filename in output_files:
zip_file.writestr(filename, buffer.getvalue())
tmp_file_path = tmp_file.name
if result_preview is not None and result_preview.size > 10000:
result_preview = result_preview.head(100)
return "Processing completed successfully!", result_preview, tmp_file_path
return "No operations performed", None, None
except Exception as e:
logger.error(f"Error during file processing: {str(e)}", exc_info=True)
return f"Error: {str(e)}", None, None
# ... (other methods remain unchanged)
def create_gradio_interface():
"""Create and configure the Gradio interface"""
processor = ExcelDataProcessor()
with gr.Blocks(title="Excel Data Processor") as app:
gr.Markdown("# Excel Data Processing Tool")
gr.Markdown("Upload an Excel file, select operations, and optionally filter by date.")
with gr.Row():
with gr.Column(scale=1):
file_input = gr.File(label="Upload Excel File")
with gr.Group():
gr.Markdown("### Date Filter")
use_date_filter = gr.Checkbox(label="Apply Date Filter", value=False)
target_date = gr.Textbox(
label="Target Date (YYYY-MM-DD)",
value="2025-04-14",
placeholder="YYYY-MM-DD"
)
operations = gr.CheckboxGroup(
choices=[
"count_daily_registers_by_source_name",
"count_daily_registers_by_ref",
"count_users_by_source_name",
"count_users_by_ref",
"count_users_each_sheet_by_source_name",
"count_users_each_sheet_by_ref",
"count_users_each_sheet_by_date"
],
label="Select Operations",
value=["count_daily_registers_by_source_name"]
)
process_btn = gr.Button("Process Excel File", variant="primary")
with gr.Column(scale=2):
status_output = gr.Textbox(label="Status")
result_output = gr.Dataframe(label="Preview Results (Limited to avoid UI freezing)")
download_btn = gr.File(label="Download Results (ZIP)")
def validate_and_process(file, ops, date_str, use_filter):
"""Validate inputs and process file"""
logger.info(f"Processing started with operations: {ops}")
try:
target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d').date()
except ValueError:
return "Invalid date format. Use YYYY-MM-DD", None, None
# Re-initialize processor with new file
processor.__init__(excel_file=file)
return processor.process_file(file, ops, target_date, use_filter)
process_btn.click(
fn=lambda file, ops, date, filter: ("Processing... Please wait.", None, None),
inputs=[file_input, operations, target_date, use_date_filter],
outputs=[status_output, result_output, download_btn],
queue=False
).then(
fn=validate_and_process,
inputs=[file_input, operations, target_date, use_date_filter],
outputs=[status_output, result_output, download_btn]
)
gr.Markdown("""
## Instructions
1. Upload your Excel file
2. Optionally enable date filtering and specify a target date
3. Select desired operations
4. Click "Process Excel File"
5. View preview results and download the ZIP file containing all outputs
## Date Filter
- When enabled, operations by reference use dates < target date
- Operations by source name use dates >= target date
- When disabled, all dates are included
## Operations
- count_daily_registers_by_source_name: Daily registrations by source
- count_daily_registers_by_ref: Daily registrations by referral
- count_users_by_source_name: Unique users by source
- count_users_by_ref: Unique users by referral
- count_users_each_sheet_by_source_name: Users per sheet by source
- count_users_each_sheet_by_ref: Users per sheet by referral
- count_users_each_sheet_by_date: Users per sheet by date
""")
return app
if __name__ == "__main__":
app = create_gradio_interface()
app.launch()