Spaces:
Paused
Paused
| import io | |
| import os | |
| from azure.core.credentials import AzureKeyCredential | |
| from azure.ai.formrecognizer import DocumentAnalysisClient | |
| import pandas as pd | |
| from io import BytesIO | |
| def detect_tables(pdflist, pdfnames): | |
| """ | |
| - pdflist: a list of PDF bytes (each element is a bytes or bytearray object). | |
| - pdfnames: a list of strings, where pdfnames[i] is the path or name for pdflist[i]. | |
| Both lists must have the same length. | |
| The function: | |
| 1. Calls Azure Form Recognizer (prebuilt-layout) on each PDF bytes. | |
| 2. Extracts all tables, adding columns: 'pdf_name', 'table_id', 'page_number'. | |
| 3. Concatenates everything into one Excel worksheet named "Tables", leaving two blank rows between each PDF’s block. | |
| 4. Returns a BytesIO buffer containing the .xlsx. If no tables are found, returns None. | |
| """ | |
| # 1. Validate inputs | |
| if not isinstance(pdflist, (list, tuple)) or not isinstance(pdfnames, (list, tuple)): | |
| raise ValueError("Both pdflist and pdfnames must be lists (or tuples).") | |
| if len(pdflist) != len(pdfnames): | |
| raise ValueError("pdflist and pdfnames must have the same length.") | |
| # 2. Set up Azure Form Recognizer client | |
| endpoint = "https://tabledetection2.cognitiveservices.azure.com/" | |
| key = "5lr94dODMJihbGOMw2Vdz29zXRBiqt528fSGoGmzSJHTrWtHSnRdJQQJ99BEACYeBjFXJ3w3AAALACOGBANH" | |
| credential = AzureKeyCredential(key) | |
| client = DocumentAnalysisClient(endpoint=endpoint, credential=credential) | |
| tables_by_pdf = [] | |
| # 3. Loop over each PDF-bytes / name pair | |
| for pdf_bytes, pdf_path in zip(pdflist, pdfnames): | |
| # Skip anything that isn’t raw bytes or whose name isn’t a string | |
| if not isinstance(pdf_bytes, (bytes, bytearray)) or not isinstance(pdf_path, str): | |
| continue | |
| # Extract the filename from the path | |
| pdf_name = os.path.basename(pdf_path) | |
| stream = io.BytesIO(pdf_bytes) | |
| per_pdf_tables = [] | |
| # Call Form Recognizer on this PDF bytes | |
| poller = client.begin_analyze_document("prebuilt-layout", document=stream) | |
| result = poller.result() | |
| # Extract every table as a DataFrame | |
| for table_idx, table in enumerate(result.tables, start=1): | |
| # Determine the grid size | |
| cols = max(cell.column_index for cell in table.cells) + 1 | |
| rows = max(cell.row_index for cell in table.cells) + 1 | |
| grid = [["" for _ in range(cols)] for _ in range(rows)] | |
| for cell in table.cells: | |
| grid[cell.row_index][cell.column_index] = cell.content | |
| df = pd.DataFrame(grid) | |
| df["page_number"] = table.bounding_regions[0].page_number | |
| df["table_id"] = table_idx | |
| df["pdf_name"] = pdf_name | |
| df = df.replace(r':+(?:selected|unselected):*', '', regex=True) | |
| per_pdf_tables.append(df) | |
| if per_pdf_tables: | |
| tables_by_pdf.append((pdf_name, per_pdf_tables)) | |
| # If no tables at all, return None | |
| if not tables_by_pdf: | |
| return None | |
| # 4. Write all tables into one sheet, with 2 blank rows between PDFs | |
| excel_buffer = BytesIO() | |
| with pd.ExcelWriter(excel_buffer, engine="openpyxl") as writer: | |
| sheet_name = "Tables" | |
| current_row = 0 | |
| first_block = True | |
| for pdf_name, dfs in tables_by_pdf: | |
| for df in dfs: | |
| # Only write headers on the very first table in the sheet | |
| write_header = first_block and (current_row == 0) | |
| df.to_excel( | |
| writer, | |
| sheet_name=sheet_name, | |
| index=False, | |
| header=write_header, | |
| startrow=current_row | |
| ) | |
| # Advance current_row by the number of rows written: | |
| # • df.shape[0] data rows | |
| # • +1 if header was written | |
| rows_written = df.shape[0] + (1 if write_header else 0) | |
| current_row += rows_written | |
| first_block = False | |
| # After finishing this PDF’s tables, insert two blank rows | |
| current_row += 2 | |
| excel_buffer.seek(0) | |
| return excel_buffer | |