Spaces:
Sleeping
Sleeping
import gradio as gr | |
import pandas as pd | |
import logging | |
from io import BytesIO | |
import datetime | |
import zipfile | |
import tempfile | |
import os | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
handlers=[logging.StreamHandler()] | |
) | |
logger = logging.getLogger(__name__) | |
class ExcelDataProcessor: | |
def __init__(self, excel_file=None): | |
self.videos_sheet = None | |
if excel_file: | |
self._initialize_videos_sheet(excel_file) | |
logger.info("ExcelDataProcessor initialized") | |
def _initialize_videos_sheet(self, excel_file): | |
"""Initialize videos_sheet by combining relevant sheets and normalizing data""" | |
try: | |
df_dict = pd.read_excel(excel_file, sheet_name=None) | |
sheet_dfs = {name: df for name, df in df_dict.items() if '.' in name} | |
if not sheet_dfs: | |
logger.warning("No sheets found with '.' in their names") | |
return | |
# Combine sheets | |
combined_df = pd.concat( | |
[df.assign(SheetName=name) for name, df in sheet_dfs.items()], | |
ignore_index=True | |
) | |
combined_df = combined_df.dropna(how='all') | |
# Normalize 'Created At' column | |
if 'Created At' in combined_df.columns: | |
def parse_date(date_str): | |
if pd.isna(date_str): | |
return pd.NaT | |
try: | |
# Try ISO 8601 format (2025-05-11T19:50:53Z) | |
return pd.to_datetime(date_str, utc=True).date() | |
except: | |
try: | |
# Try DD-MM-YYYY format (18-04-2025) | |
return pd.to_datetime(date_str, format='%d-%m-%Y').date() | |
except: | |
logger.warning(f"Cannot parse date: {date_str}") | |
return pd.NaT | |
combined_df['Created At'] = combined_df['Created At'].apply(parse_date) | |
self.videos_sheet = combined_df | |
logger.info("Initialized videos_sheet with combined data") | |
except Exception as e: | |
logger.error(f"Error initializing videos_sheet: {str(e)}") | |
self.videos_sheet = None | |
def _extract_number(sheet_name): | |
"""Extract number from sheet name for sorting""" | |
try: | |
return int(sheet_name[:sheet_name.find('.')]) | |
except ValueError: | |
logger.warning(f"Could not extract number from sheet name: {sheet_name}") | |
return float('inf') | |
def _create_output_buffer(df, base_name): | |
"""Create Excel file in memory""" | |
output = BytesIO() | |
with pd.ExcelWriter(output, engine='xlsxwriter') as writer: | |
df.to_excel(writer, sheet_name='Results') | |
output.seek(0) | |
return output, f"{base_name}.xlsx" | |
def _apply_date_filter(self, df, target_date, by_ref, use_filter): | |
"""Apply date filter based on operation type and filter choice""" | |
df_filtered = df.copy() | |
if not use_filter: | |
return df_filtered | |
if by_ref: | |
return df_filtered[df_filtered['Created At'] < target_date].copy() | |
return df_filtered[df_filtered['Created At'] >= target_date].copy() | |
def count_daily_registers_by_source_name(self, df, target_date, use_filter): | |
"""Count daily registers by source name""" | |
logger.info("Starting count_daily_registers_by_source_name") | |
df_filtered = self._apply_date_filter(df, target_date, False, use_filter) | |
df_filtered['Created At'] = pd.to_datetime(df_filtered['Created At']).dt.date | |
pivot_table = pd.pivot_table( | |
df_filtered, | |
index='Source Name', | |
columns='Created At', | |
values='User ID', | |
aggfunc='count', | |
fill_value=0 | |
) | |
pivot_table.loc['Total'] = pivot_table.sum() | |
return pivot_table | |
def count_daily_registers_by_ref(self, df, target_date, use_filter): | |
"""Count daily registers by reference""" | |
logger.info("Starting count_daily_registers_by_ref") | |
df_filtered = self._apply_date_filter(df, target_date, True, use_filter) | |
df_filtered['Created At'] = pd.to_datetime(df_filtered['Created At']).dt.date | |
df_filtered.loc[(df_filtered['Source Name'] == 'direct') & (df_filtered['Ref By'].isna()), 'Ref By'] = 'direct' | |
pivot_table = pd.pivot_table( | |
df_filtered, | |
index='Ref By', | |
columns='Created At', | |
values='User ID', | |
aggfunc='count', | |
fill_value=0 | |
) | |
pivot_table.loc['Total'] = pivot_table.sum() | |
return pivot_table | |
def count_users_by_source_name(self, df, target_date, use_filter): | |
"""Count unique users by source name""" | |
logger.info("Starting count_users_by_source_name") | |
df_filtered = self._apply_date_filter(df, target_date, False, use_filter) | |
df_filtered = df_filtered.drop_duplicates(subset=['User ID'], keep='first') | |
df_filtered['Created At'] = pd.to_datetime(df_filtered['Created At']).dt.date | |
pivot_table = pd.pivot_table( | |
df_filtered, | |
index='Source Name', | |
values='User ID', | |
aggfunc='count', | |
fill_value=0 | |
) | |
return pivot_table | |
def count_users_by_ref(self, df, target_date, use_filter): | |
"""Count unique users by reference""" | |
logger.info("Starting count_users_by_ref") | |
df_filtered = self._apply_date_filter(df, target_date, True, use_filter) | |
df_filtered = df_filtered.drop_duplicates(subset=['User ID'], keep='first') | |
df_filtered['Created At'] = pd.to_datetime(df_filtered['Created At']).dt.date | |
df_filtered.loc[(df_filtered['Source Name'] == 'direct') & (df_filtered['Ref By'].isna()), 'Ref By'] = 'direct' | |
pivot_table = pd.pivot_table( | |
df_filtered, | |
index='Ref By', | |
values='User ID', | |
aggfunc='count', | |
fill_value=0 | |
) | |
return pivot_table | |
def count_users_each_sheet_by_source_name(self, target_date, use_filter): | |
"""Count users in each sheet by source name""" | |
logger.info("Starting count_users_each_sheet_by_source_name") | |
if self.videos_sheet is None: | |
logger.warning("No videos_sheet data available") | |
return "No valid sheet data available", None | |
combined_df_filtered = self.videos_sheet.dropna(subset=['Source Name']).copy() | |
combined_df_filtered = self._apply_date_filter(combined_df_filtered, target_date, False, use_filter) | |
if not {'Source Name', 'User ID', 'SheetName'}.issubset(combined_df_filtered.columns): | |
return "Required columns missing", None | |
pivot_table = pd.pivot_table( | |
combined_df_filtered, | |
index='Source Name', | |
columns='SheetName', | |
values='User ID', | |
aggfunc='count', | |
fill_value=0 | |
) | |
sorted_columns = sorted(pivot_table.columns, key=self._extract_number) | |
pivot_table = pivot_table[sorted_columns] | |
pivot_table.loc['Total'] = pivot_table.sum() | |
return "Success", pivot_table | |
def count_users_each_sheet_by_ref(self, target_date, use_filter): | |
"""Count users in each sheet by reference""" | |
logger.info("Starting count_users_each_sheet_by_ref") | |
if self.videos_sheet is None: | |
logger.warning("No videos_sheet data available") | |
return "No valid sheet data available", None | |
combined_df_filtered = self.videos_sheet.copy() | |
combined_df_filtered = self._apply_date_filter(combined_df_filtered, target_date, True, use_filter) | |
combined_df_filtered.loc[(combined_df_filtered['Source Name'] == 'direct') & | |
(combined_df_filtered['Ref By'].isna()), 'Ref By'] = 'direct' | |
if not {'Ref By', 'User ID', 'SheetName'}.issubset(combined_df_filtered.columns): | |
return "Required columns missing", None | |
pivot_table = pd.pivot_table( | |
combined_df_filtered, | |
index='Ref By', | |
columns='SheetName', | |
values='User ID', | |
aggfunc='count', | |
fill_value=0 | |
) | |
sorted_columns = sorted(pivot_table.columns, key=self._extract_number) | |
pivot_table = pivot_table[sorted_columns] | |
pivot_table.loc['Total'] = pivot_table.sum() | |
return "Success", pivot_table | |
def count_users_each_sheet_by_date(self, target_date, use_filter): | |
"""Count users in each sheet by date""" | |
logger.info("Starting count_users_each_sheet_by_date") | |
if self.videos_sheet is None: | |
logger.warning("No videos_sheet data available") | |
return "No valid sheet data available", None | |
combined_df_filtered = self.videos_sheet[self.videos_sheet['Created At'].notna()].copy() | |
combined_df_filtered = self._apply_date_filter(combined_df_filtered, target_date, False, use_filter) | |
if not {'Created At', 'User ID', 'SheetName'}.issubset(combined_df_filtered.columns): | |
return "Required columns missing", None | |
pivot_table = pd.pivot_table( | |
combined_df_filtered, | |
index='Created At', | |
columns='SheetName', | |
values='User ID', | |
aggfunc='count', | |
fill_value=0 | |
) | |
sorted_columns = sorted(pivot_table.columns, key=self._extract_number) | |
pivot_table = pivot_table[sorted_columns] | |
pivot_table.loc['Total'] = pivot_table.sum() | |
return "Success", pivot_table | |
def process_file(self, excel_file, operations, target_date, use_date_filter): | |
"""Process file with selected operations""" | |
logger.info(f"Processing file with operations: {operations}") | |
# Initialize videos_sheet if not already done | |
if self.videos_sheet is None: | |
self._initialize_videos_sheet(excel_file) | |
results = {} | |
output_files = [] | |
result_preview = None | |
if not excel_file: | |
logger.warning("No file uploaded") | |
return "Please upload an Excel file", None, None | |
try: | |
# Process single-sheet operations | |
single_sheet_ops = [ | |
"count_daily_registers_by_source_name", | |
"count_daily_registers_by_ref", | |
"count_users_by_source_name", | |
"count_users_by_ref" | |
] | |
if any(op in operations for op in single_sheet_ops): | |
df = pd.read_excel(excel_file, sheet_name="User Register") | |
# Normalize 'Created At' for User Register sheet | |
if 'Created At' in df.columns: | |
def parse_date(date_str): | |
if pd.isna(date_str): | |
return pd.NaT | |
try: | |
# Try ISO 8601 format (2025-05-11T19:50:53Z) | |
return pd.to_datetime(date_str, utc=True).date() | |
except: | |
try: | |
# Try DD-MM-YYYY format (18-04-2025) | |
return pd.to_datetime(date_str, format='%d-%m-%Y').date() | |
except: | |
logger.warning(f"Cannot parse date: {date_str}") | |
return pd.NaT | |
df['Created At'] = df['Created At'].apply(parse_date) | |
if "count_daily_registers_by_source_name" in operations: | |
pivot = self.count_daily_registers_by_source_name(df, target_date, use_date_filter) | |
results["Daily Registers by Source Name"] = pivot | |
buffer, filename = self._create_output_buffer(pivot, "daily_registers_source") | |
output_files.append((buffer, filename)) | |
if result_preview is None: | |
result_preview = pivot | |
if "count_daily_registers_by_ref" in operations: | |
pivot = self.count_daily_registers_by_ref(df, target_date, use_date_filter) | |
results["Daily Registers by Ref"] = pivot | |
buffer, filename = self._create_output_buffer(pivot, "daily_registers_ref") | |
output_files.append((buffer, filename)) | |
if result_preview is None: | |
result_preview = pivot | |
if "count_users_by_source_name" in operations: | |
pivot = self.count_users_by_source_name(df, target_date, use_date_filter) | |
results["Users by Source Name"] = pivot | |
buffer, filename = self._create_output_buffer(pivot, "users_source") | |
output_files.append((buffer, filename)) | |
if result_preview is None: | |
result_preview = pivot | |
if "count_users_by_ref" in operations: | |
pivot = self.count_users_by_ref(df, target_date, use_date_filter) | |
results["Users by Ref"] = pivot | |
buffer, filename = self._create_output_buffer(pivot, "users_ref") | |
output_files.append((buffer, filename)) | |
if result_preview is None: | |
result_preview = pivot | |
# Process multi-sheet operations | |
if "count_users_each_sheet_by_source_name" in operations: | |
status, pivot = self.count_users_each_sheet_by_source_name(target_date, use_date_filter) | |
if status != "Success": | |
return status, None, None | |
results["Users Each Sheet by Source Name"] = pivot | |
buffer, filename = self._create_output_buffer(pivot, "users_sheet_source") | |
output_files.append((buffer, filename)) | |
if result_preview is None: | |
result_preview = pivot | |
if "count_users_each_sheet_by_ref" in operations: | |
status, pivot = self.count_users_each_sheet_by_ref(target_date, use_date_filter) | |
if status != "Success": | |
return status, None, None | |
results["Users Each Sheet by Ref"] = pivot | |
buffer, filename = self._create_output_buffer(pivot, "users_sheet_ref") | |
output_files.append((buffer, filename)) | |
if result_preview is None: | |
result_preview = pivot | |
if "count_users_each_sheet_by_date" in operations: | |
status, pivot = self.count_users_each_sheet_by_date(target_date, use_date_filter) | |
if status != "Success": | |
return status, None, None | |
results["Users Each Sheet by Date"] = pivot | |
buffer, filename = self._create_output_buffer(pivot, "users_sheet_date") | |
output_files.append((buffer, filename)) | |
if result_preview is None: | |
result_preview = pivot | |
# Create ZIP file | |
if output_files: | |
# Use temporary file for ZIP | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.zip') as tmp_file: | |
with zipfile.ZipFile(tmp_file, 'w') as zip_file: | |
for buffer, filename in output_files: | |
zip_file.writestr(filename, buffer.getvalue()) | |
tmp_file_path = tmp_file.name | |
if result_preview is not None and result_preview.size > 10000: | |
result_preview = result_preview.head(100) | |
return "Processing completed successfully!", result_preview, tmp_file_path | |
return "No operations performed", None, None | |
except Exception as e: | |
logger.error(f"Error during file processing: {str(e)}", exc_info=True) | |
return f"Error: {str(e)}", None, None | |
# ... (other methods remain unchanged) | |
def create_gradio_interface(): | |
"""Create and configure the Gradio interface""" | |
processor = ExcelDataProcessor() | |
with gr.Blocks(title="Excel Data Processor") as app: | |
gr.Markdown("# Excel Data Processing Tool") | |
gr.Markdown("Upload an Excel file, select operations, and optionally filter by date.") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
file_input = gr.File(label="Upload Excel File") | |
with gr.Group(): | |
gr.Markdown("### Date Filter") | |
use_date_filter = gr.Checkbox(label="Apply Date Filter", value=False) | |
target_date = gr.Textbox( | |
label="Target Date (YYYY-MM-DD)", | |
value="2025-04-14", | |
placeholder="YYYY-MM-DD" | |
) | |
operations = gr.CheckboxGroup( | |
choices=[ | |
"count_daily_registers_by_source_name", | |
"count_daily_registers_by_ref", | |
"count_users_by_source_name", | |
"count_users_by_ref", | |
"count_users_each_sheet_by_source_name", | |
"count_users_each_sheet_by_ref", | |
"count_users_each_sheet_by_date" | |
], | |
label="Select Operations", | |
value=["count_daily_registers_by_source_name"] | |
) | |
process_btn = gr.Button("Process Excel File", variant="primary") | |
with gr.Column(scale=2): | |
status_output = gr.Textbox(label="Status") | |
result_output = gr.Dataframe(label="Preview Results (Limited to avoid UI freezing)") | |
download_btn = gr.File(label="Download Results (ZIP)") | |
def validate_and_process(file, ops, date_str, use_filter): | |
"""Validate inputs and process file""" | |
logger.info(f"Processing started with operations: {ops}") | |
try: | |
target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d').date() | |
except ValueError: | |
return "Invalid date format. Use YYYY-MM-DD", None, None | |
# Re-initialize processor with new file | |
processor.__init__(excel_file=file) | |
return processor.process_file(file, ops, target_date, use_filter) | |
process_btn.click( | |
fn=lambda file, ops, date, filter: ("Processing... Please wait.", None, None), | |
inputs=[file_input, operations, target_date, use_date_filter], | |
outputs=[status_output, result_output, download_btn], | |
queue=False | |
).then( | |
fn=validate_and_process, | |
inputs=[file_input, operations, target_date, use_date_filter], | |
outputs=[status_output, result_output, download_btn] | |
) | |
gr.Markdown(""" | |
## Instructions | |
1. Upload your Excel file | |
2. Optionally enable date filtering and specify a target date | |
3. Select desired operations | |
4. Click "Process Excel File" | |
5. View preview results and download the ZIP file containing all outputs | |
## Date Filter | |
- When enabled, operations by reference use dates < target date | |
- Operations by source name use dates >= target date | |
- When disabled, all dates are included | |
## Operations | |
- count_daily_registers_by_source_name: Daily registrations by source | |
- count_daily_registers_by_ref: Daily registrations by referral | |
- count_users_by_source_name: Unique users by source | |
- count_users_by_ref: Unique users by referral | |
- count_users_each_sheet_by_source_name: Users per sheet by source | |
- count_users_each_sheet_by_ref: Users per sheet by referral | |
- count_users_each_sheet_by_date: Users per sheet by date | |
""") | |
return app | |
if __name__ == "__main__": | |
app = create_gradio_interface() | |
app.launch() |