Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| import logging | |
| from io import BytesIO | |
| import datetime | |
| import zipfile | |
| import tempfile | |
| import os | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[logging.StreamHandler()] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class ExcelDataProcessor: | |
| def __init__(self, excel_file=None): | |
| self.videos_sheet = None | |
| if excel_file: | |
| self._initialize_videos_sheet(excel_file) | |
| logger.info("ExcelDataProcessor initialized") | |
| def _initialize_videos_sheet(self, excel_file): | |
| """Initialize videos_sheet by combining relevant sheets and normalizing data""" | |
| try: | |
| df_dict = pd.read_excel(excel_file, sheet_name=None) | |
| sheet_dfs = {name: df for name, df in df_dict.items() if '.' in name} | |
| if not sheet_dfs: | |
| logger.warning("No sheets found with '.' in their names") | |
| return | |
| # Combine sheets | |
| combined_df = pd.concat( | |
| [df.assign(SheetName=name) for name, df in sheet_dfs.items()], | |
| ignore_index=True | |
| ) | |
| combined_df = combined_df.dropna(how='all') | |
| # Normalize 'Created At' column | |
| if 'Created At' in combined_df.columns: | |
| def parse_date(date_str): | |
| if pd.isna(date_str): | |
| return pd.NaT | |
| try: | |
| # Try ISO 8601 format (2025-05-11T19:50:53Z) | |
| return pd.to_datetime(date_str, utc=True).date() | |
| except: | |
| try: | |
| # Try DD-MM-YYYY format (18-04-2025) | |
| return pd.to_datetime(date_str, format='%d-%m-%Y').date() | |
| except: | |
| logger.warning(f"Cannot parse date: {date_str}") | |
| return pd.NaT | |
| combined_df['Created At'] = combined_df['Created At'].apply(parse_date) | |
| self.videos_sheet = combined_df | |
| logger.info("Initialized videos_sheet with combined data") | |
| except Exception as e: | |
| logger.error(f"Error initializing videos_sheet: {str(e)}") | |
| self.videos_sheet = None | |
| def _extract_number(sheet_name): | |
| """Extract number from sheet name for sorting""" | |
| try: | |
| return int(sheet_name[:sheet_name.find('.')]) | |
| except ValueError: | |
| logger.warning(f"Could not extract number from sheet name: {sheet_name}") | |
| return float('inf') | |
| def _create_output_buffer(df, base_name): | |
| """Create Excel file in memory""" | |
| output = BytesIO() | |
| with pd.ExcelWriter(output, engine='xlsxwriter') as writer: | |
| df.to_excel(writer, sheet_name='Results') | |
| output.seek(0) | |
| return output, f"{base_name}.xlsx" | |
| def _apply_date_filter(self, df, target_date, by_ref, use_filter): | |
| """Apply date filter based on operation type and filter choice""" | |
| df_filtered = df.copy() | |
| if not use_filter: | |
| return df_filtered | |
| if by_ref: | |
| return df_filtered[df_filtered['Created At'] < target_date].copy() | |
| return df_filtered[df_filtered['Created At'] >= target_date].copy() | |
| def count_daily_registers_by_source_name(self, df, target_date, use_filter): | |
| """Count daily registers by source name""" | |
| logger.info("Starting count_daily_registers_by_source_name") | |
| df_filtered = self._apply_date_filter(df, target_date, False, use_filter) | |
| df_filtered['Created At'] = pd.to_datetime(df_filtered['Created At']).dt.date | |
| pivot_table = pd.pivot_table( | |
| df_filtered, | |
| index='Source Name', | |
| columns='Created At', | |
| values='User ID', | |
| aggfunc='count', | |
| fill_value=0 | |
| ) | |
| pivot_table.loc['Total'] = pivot_table.sum() | |
| return pivot_table | |
| def count_daily_registers_by_ref(self, df, target_date, use_filter): | |
| """Count daily registers by reference""" | |
| logger.info("Starting count_daily_registers_by_ref") | |
| df_filtered = self._apply_date_filter(df, target_date, True, use_filter) | |
| df_filtered['Created At'] = pd.to_datetime(df_filtered['Created At']).dt.date | |
| df_filtered.loc[(df_filtered['Source Name'] == 'direct') & (df_filtered['Ref By'].isna()), 'Ref By'] = 'direct' | |
| pivot_table = pd.pivot_table( | |
| df_filtered, | |
| index='Ref By', | |
| columns='Created At', | |
| values='User ID', | |
| aggfunc='count', | |
| fill_value=0 | |
| ) | |
| pivot_table.loc['Total'] = pivot_table.sum() | |
| return pivot_table | |
| def count_users_by_source_name(self, df, target_date, use_filter): | |
| """Count unique users by source name""" | |
| logger.info("Starting count_users_by_source_name") | |
| df_filtered = self._apply_date_filter(df, target_date, False, use_filter) | |
| df_filtered = df_filtered.drop_duplicates(subset=['User ID'], keep='first') | |
| df_filtered['Created At'] = pd.to_datetime(df_filtered['Created At']).dt.date | |
| pivot_table = pd.pivot_table( | |
| df_filtered, | |
| index='Source Name', | |
| values='User ID', | |
| aggfunc='count', | |
| fill_value=0 | |
| ) | |
| return pivot_table | |
| def count_users_by_ref(self, df, target_date, use_filter): | |
| """Count unique users by reference""" | |
| logger.info("Starting count_users_by_ref") | |
| df_filtered = self._apply_date_filter(df, target_date, True, use_filter) | |
| df_filtered = df_filtered.drop_duplicates(subset=['User ID'], keep='first') | |
| df_filtered['Created At'] = pd.to_datetime(df_filtered['Created At']).dt.date | |
| df_filtered.loc[(df_filtered['Source Name'] == 'direct') & (df_filtered['Ref By'].isna()), 'Ref By'] = 'direct' | |
| pivot_table = pd.pivot_table( | |
| df_filtered, | |
| index='Ref By', | |
| values='User ID', | |
| aggfunc='count', | |
| fill_value=0 | |
| ) | |
| return pivot_table | |
| def count_users_each_sheet_by_source_name(self, target_date, use_filter): | |
| """Count users in each sheet by source name""" | |
| logger.info("Starting count_users_each_sheet_by_source_name") | |
| if self.videos_sheet is None: | |
| logger.warning("No videos_sheet data available") | |
| return "No valid sheet data available", None | |
| combined_df_filtered = self.videos_sheet.dropna(subset=['Source Name']).copy() | |
| combined_df_filtered = self._apply_date_filter(combined_df_filtered, target_date, False, use_filter) | |
| if not {'Source Name', 'User ID', 'SheetName'}.issubset(combined_df_filtered.columns): | |
| return "Required columns missing", None | |
| pivot_table = pd.pivot_table( | |
| combined_df_filtered, | |
| index='Source Name', | |
| columns='SheetName', | |
| values='User ID', | |
| aggfunc='count', | |
| fill_value=0 | |
| ) | |
| sorted_columns = sorted(pivot_table.columns, key=self._extract_number) | |
| pivot_table = pivot_table[sorted_columns] | |
| pivot_table.loc['Total'] = pivot_table.sum() | |
| return "Success", pivot_table | |
| def count_users_each_sheet_by_ref(self, target_date, use_filter): | |
| """Count users in each sheet by reference""" | |
| logger.info("Starting count_users_each_sheet_by_ref") | |
| if self.videos_sheet is None: | |
| logger.warning("No videos_sheet data available") | |
| return "No valid sheet data available", None | |
| combined_df_filtered = self.videos_sheet.copy() | |
| combined_df_filtered = self._apply_date_filter(combined_df_filtered, target_date, True, use_filter) | |
| combined_df_filtered.loc[(combined_df_filtered['Source Name'] == 'direct') & | |
| (combined_df_filtered['Ref By'].isna()), 'Ref By'] = 'direct' | |
| if not {'Ref By', 'User ID', 'SheetName'}.issubset(combined_df_filtered.columns): | |
| return "Required columns missing", None | |
| pivot_table = pd.pivot_table( | |
| combined_df_filtered, | |
| index='Ref By', | |
| columns='SheetName', | |
| values='User ID', | |
| aggfunc='count', | |
| fill_value=0 | |
| ) | |
| sorted_columns = sorted(pivot_table.columns, key=self._extract_number) | |
| pivot_table = pivot_table[sorted_columns] | |
| pivot_table.loc['Total'] = pivot_table.sum() | |
| return "Success", pivot_table | |
| def count_users_each_sheet_by_date(self, target_date, use_filter): | |
| """Count users in each sheet by date""" | |
| logger.info("Starting count_users_each_sheet_by_date") | |
| if self.videos_sheet is None: | |
| logger.warning("No videos_sheet data available") | |
| return "No valid sheet data available", None | |
| combined_df_filtered = self.videos_sheet[self.videos_sheet['Created At'].notna()].copy() | |
| combined_df_filtered = self._apply_date_filter(combined_df_filtered, target_date, False, use_filter) | |
| if not {'Created At', 'User ID', 'SheetName'}.issubset(combined_df_filtered.columns): | |
| return "Required columns missing", None | |
| pivot_table = pd.pivot_table( | |
| combined_df_filtered, | |
| index='Created At', | |
| columns='SheetName', | |
| values='User ID', | |
| aggfunc='count', | |
| fill_value=0 | |
| ) | |
| sorted_columns = sorted(pivot_table.columns, key=self._extract_number) | |
| pivot_table = pivot_table[sorted_columns] | |
| pivot_table.loc['Total'] = pivot_table.sum() | |
| return "Success", pivot_table | |
| def process_file(self, excel_file, operations, target_date, use_date_filter): | |
| """Process file with selected operations""" | |
| logger.info(f"Processing file with operations: {operations}") | |
| # Initialize videos_sheet if not already done | |
| if self.videos_sheet is None: | |
| self._initialize_videos_sheet(excel_file) | |
| results = {} | |
| output_files = [] | |
| result_preview = None | |
| if not excel_file: | |
| logger.warning("No file uploaded") | |
| return "Please upload an Excel file", None, None | |
| try: | |
| # Process single-sheet operations | |
| single_sheet_ops = [ | |
| "count_daily_registers_by_source_name", | |
| "count_daily_registers_by_ref", | |
| "count_users_by_source_name", | |
| "count_users_by_ref" | |
| ] | |
| if any(op in operations for op in single_sheet_ops): | |
| df = pd.read_excel(excel_file, sheet_name="User Register") | |
| # Normalize 'Created At' for User Register sheet | |
| if 'Created At' in df.columns: | |
| def parse_date(date_str): | |
| if pd.isna(date_str): | |
| return pd.NaT | |
| try: | |
| # Try ISO 8601 format (2025-05-11T19:50:53Z) | |
| return pd.to_datetime(date_str, utc=True).date() | |
| except: | |
| try: | |
| # Try DD-MM-YYYY format (18-04-2025) | |
| return pd.to_datetime(date_str, format='%d-%m-%Y').date() | |
| except: | |
| logger.warning(f"Cannot parse date: {date_str}") | |
| return pd.NaT | |
| df['Created At'] = df['Created At'].apply(parse_date) | |
| if "count_daily_registers_by_source_name" in operations: | |
| pivot = self.count_daily_registers_by_source_name(df, target_date, use_date_filter) | |
| results["Daily Registers by Source Name"] = pivot | |
| buffer, filename = self._create_output_buffer(pivot, "daily_registers_source") | |
| output_files.append((buffer, filename)) | |
| if result_preview is None: | |
| result_preview = pivot | |
| if "count_daily_registers_by_ref" in operations: | |
| pivot = self.count_daily_registers_by_ref(df, target_date, use_date_filter) | |
| results["Daily Registers by Ref"] = pivot | |
| buffer, filename = self._create_output_buffer(pivot, "daily_registers_ref") | |
| output_files.append((buffer, filename)) | |
| if result_preview is None: | |
| result_preview = pivot | |
| if "count_users_by_source_name" in operations: | |
| pivot = self.count_users_by_source_name(df, target_date, use_date_filter) | |
| results["Users by Source Name"] = pivot | |
| buffer, filename = self._create_output_buffer(pivot, "users_source") | |
| output_files.append((buffer, filename)) | |
| if result_preview is None: | |
| result_preview = pivot | |
| if "count_users_by_ref" in operations: | |
| pivot = self.count_users_by_ref(df, target_date, use_date_filter) | |
| results["Users by Ref"] = pivot | |
| buffer, filename = self._create_output_buffer(pivot, "users_ref") | |
| output_files.append((buffer, filename)) | |
| if result_preview is None: | |
| result_preview = pivot | |
| # Process multi-sheet operations | |
| if "count_users_each_sheet_by_source_name" in operations: | |
| status, pivot = self.count_users_each_sheet_by_source_name(target_date, use_date_filter) | |
| if status != "Success": | |
| return status, None, None | |
| results["Users Each Sheet by Source Name"] = pivot | |
| buffer, filename = self._create_output_buffer(pivot, "users_sheet_source") | |
| output_files.append((buffer, filename)) | |
| if result_preview is None: | |
| result_preview = pivot | |
| if "count_users_each_sheet_by_ref" in operations: | |
| status, pivot = self.count_users_each_sheet_by_ref(target_date, use_date_filter) | |
| if status != "Success": | |
| return status, None, None | |
| results["Users Each Sheet by Ref"] = pivot | |
| buffer, filename = self._create_output_buffer(pivot, "users_sheet_ref") | |
| output_files.append((buffer, filename)) | |
| if result_preview is None: | |
| result_preview = pivot | |
| if "count_users_each_sheet_by_date" in operations: | |
| status, pivot = self.count_users_each_sheet_by_date(target_date, use_date_filter) | |
| if status != "Success": | |
| return status, None, None | |
| results["Users Each Sheet by Date"] = pivot | |
| buffer, filename = self._create_output_buffer(pivot, "users_sheet_date") | |
| output_files.append((buffer, filename)) | |
| if result_preview is None: | |
| result_preview = pivot | |
| # Create ZIP file | |
| if output_files: | |
| # Use temporary file for ZIP | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.zip') as tmp_file: | |
| with zipfile.ZipFile(tmp_file, 'w') as zip_file: | |
| for buffer, filename in output_files: | |
| zip_file.writestr(filename, buffer.getvalue()) | |
| tmp_file_path = tmp_file.name | |
| if result_preview is not None and result_preview.size > 10000: | |
| result_preview = result_preview.head(100) | |
| return "Processing completed successfully!", result_preview, tmp_file_path | |
| return "No operations performed", None, None | |
| except Exception as e: | |
| logger.error(f"Error during file processing: {str(e)}", exc_info=True) | |
| return f"Error: {str(e)}", None, None | |
| # ... (other methods remain unchanged) | |
| def create_gradio_interface(): | |
| """Create and configure the Gradio interface""" | |
| processor = ExcelDataProcessor() | |
| with gr.Blocks(title="Excel Data Processor") as app: | |
| gr.Markdown("# Excel Data Processing Tool") | |
| gr.Markdown("Upload an Excel file, select operations, and optionally filter by date.") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| file_input = gr.File(label="Upload Excel File") | |
| with gr.Group(): | |
| gr.Markdown("### Date Filter") | |
| use_date_filter = gr.Checkbox(label="Apply Date Filter", value=False) | |
| target_date = gr.Textbox( | |
| label="Target Date (YYYY-MM-DD)", | |
| value="2025-04-14", | |
| placeholder="YYYY-MM-DD" | |
| ) | |
| operations = gr.CheckboxGroup( | |
| choices=[ | |
| "count_daily_registers_by_source_name", | |
| "count_daily_registers_by_ref", | |
| "count_users_by_source_name", | |
| "count_users_by_ref", | |
| "count_users_each_sheet_by_source_name", | |
| "count_users_each_sheet_by_ref", | |
| "count_users_each_sheet_by_date" | |
| ], | |
| label="Select Operations", | |
| value=["count_daily_registers_by_source_name"] | |
| ) | |
| process_btn = gr.Button("Process Excel File", variant="primary") | |
| with gr.Column(scale=2): | |
| status_output = gr.Textbox(label="Status") | |
| result_output = gr.Dataframe(label="Preview Results (Limited to avoid UI freezing)") | |
| download_btn = gr.File(label="Download Results (ZIP)") | |
| def validate_and_process(file, ops, date_str, use_filter): | |
| """Validate inputs and process file""" | |
| logger.info(f"Processing started with operations: {ops}") | |
| try: | |
| target_date = datetime.datetime.strptime(date_str, '%Y-%m-%d').date() | |
| except ValueError: | |
| return "Invalid date format. Use YYYY-MM-DD", None, None | |
| # Re-initialize processor with new file | |
| processor.__init__(excel_file=file) | |
| return processor.process_file(file, ops, target_date, use_filter) | |
| process_btn.click( | |
| fn=lambda file, ops, date, filter: ("Processing... Please wait.", None, None), | |
| inputs=[file_input, operations, target_date, use_date_filter], | |
| outputs=[status_output, result_output, download_btn], | |
| queue=False | |
| ).then( | |
| fn=validate_and_process, | |
| inputs=[file_input, operations, target_date, use_date_filter], | |
| outputs=[status_output, result_output, download_btn] | |
| ) | |
| gr.Markdown(""" | |
| ## Instructions | |
| 1. Upload your Excel file | |
| 2. Optionally enable date filtering and specify a target date | |
| 3. Select desired operations | |
| 4. Click "Process Excel File" | |
| 5. View preview results and download the ZIP file containing all outputs | |
| ## Date Filter | |
| - When enabled, operations by reference use dates < target date | |
| - Operations by source name use dates >= target date | |
| - When disabled, all dates are included | |
| ## Operations | |
| - count_daily_registers_by_source_name: Daily registrations by source | |
| - count_daily_registers_by_ref: Daily registrations by referral | |
| - count_users_by_source_name: Unique users by source | |
| - count_users_by_ref: Unique users by referral | |
| - count_users_each_sheet_by_source_name: Users per sheet by source | |
| - count_users_each_sheet_by_ref: Users per sheet by referral | |
| - count_users_each_sheet_by_date: Users per sheet by date | |
| """) | |
| return app | |
| if __name__ == "__main__": | |
| app = create_gradio_interface() | |
| app.launch() |