Spaces:
Sleeping
Sleeping
| from collections import defaultdict | |
| import pandas as pd | |
| import random | |
| import re | |
| import io | |
| import pypdfium2 as pdfium | |
| import fitz | |
| from PIL import Image, ImageDraw | |
| from PyPDF2 import PdfReader, PdfWriter | |
| from PyPDF2.generic import TextStringObject, NameObject, ArrayObject, FloatObject | |
| from PyPDF2.generic import NameObject, TextStringObject, DictionaryObject, FloatObject, ArrayObject | |
| from PyPDF2 import PdfReader | |
| from PyPDF2.generic import TextStringObject | |
| import numpy as np | |
| import cv2 | |
| def convert2img(path): | |
| pdf = pdfium.PdfDocument(path) | |
| page = pdf.get_page(0) | |
| pil_image = page.render().to_pil() | |
| pl1=np.array(pil_image) | |
| img = cv2.cvtColor(pl1, cv2.COLOR_RGB2BGR) | |
| return img | |
| def convert2pillow(path): | |
| pdf = pdfium.PdfDocument(path) | |
| page = pdf.get_page(0) | |
| pil_image = page.render().to_pil() | |
| return pil_image | |
| def calculate_midpoint(x1,y1,x2,y2): | |
| xm = int((x1 + x2) / 2) | |
| ym = int((y1 + y2) / 2) | |
| return (xm, ym) | |
| def read_text(input_pdf_path): | |
| pdf_document = fitz.open('pdf',input_pdf_path) | |
| for page_num in range(pdf_document.page_count): | |
| page = pdf_document[page_num] | |
| text_instances = page.get_text("words") | |
| page.apply_redactions() | |
| return text_instances | |
| def search_columns(df): | |
| import pandas as pd | |
| import re | |
| # Define patterns | |
| door_id_pattern = r'\b(?:door\s*)?(?:id|no|number)(?!-)\b' | |
| door_type_pattern = r'^\s*(?:\S*\s+)?door\s*[\n\s]*type\s*$|^type\s*$' | |
| width_pattern = r'^\s*(?:WIDTH|Width|width)\s*$' | |
| height_pattern = r'^\s*(?:HEIGHT|Height|height)\s*$' | |
| structural_opening_pattern = r'\b(?:Structural\s+opening|structural\s+opening)\b' | |
| # Function to search in column names and return column indices | |
| def find_column_indices(df, patterns): | |
| matches = {} | |
| for key, pattern in patterns.items(): | |
| indices = [i for i, col in enumerate(df.columns) if re.search(pattern, col, re.IGNORECASE)] | |
| if indices: | |
| matches[key] = indices # Store column index if found | |
| return matches | |
| # Function to search in cells and return (row index, column index) pairs | |
| def find_matches_in_cells(df, patterns): | |
| matches = {} | |
| for key, pattern in patterns.items(): | |
| found = [] | |
| for row_idx in range(min(2, len(df))): # Limit to the first two rows | |
| for col_idx in range(len(df.columns)): | |
| cell = df.iat[row_idx, col_idx] | |
| if isinstance(cell, str) and re.search(pattern, cell, re.IGNORECASE): | |
| found.append((row_idx, col_idx)) # Store (row index, column index) | |
| if found: | |
| matches[key] = found # Store if any matches are found | |
| return matches | |
| # Search in column names first | |
| patterns = { | |
| "door_id": door_id_pattern, | |
| "door_type": door_type_pattern, | |
| "width": width_pattern, | |
| "height": height_pattern | |
| } | |
| column_matches = find_column_indices(df, patterns) | |
| # If door_id and door_type are NOT found in column names, search in cells | |
| if "door_id" not in column_matches and "door_type" not in column_matches: | |
| cell_matches = find_matches_in_cells(df, {"door_id": door_id_pattern, "door_type": door_type_pattern}) | |
| column_matches.update(cell_matches) # Merge results | |
| # If width and height are NOT found in column names, search for them in cells | |
| if "width" not in column_matches and "height" not in column_matches: | |
| cell_matches = find_matches_in_cells(df, {"width": width_pattern, "height": height_pattern}) | |
| column_matches.update(cell_matches) # Merge results | |
| # If width and height are still NOT found, search for structural opening in column names | |
| if "width" not in column_matches or "height" not in column_matches: | |
| structural_opening_match = find_column_indices(df, {"structural opening": structural_opening_pattern}) | |
| column_matches.update(structural_opening_match) | |
| # If structural opening is also NOT found in column names, search in cells | |
| if "structural opening" not in column_matches: | |
| structural_opening_match = find_matches_in_cells(df, {"structural opening": structural_opening_pattern}) | |
| column_matches.update(structural_opening_match) | |
| # Print results | |
| #print(column_matches) | |
| return column_matches | |
| def row_clmn_indices(column_matches): | |
| clm_idx = [] | |
| starting_row_index = [] | |
| for key in column_matches.keys(): | |
| if type(column_matches[key][0]) == tuple: | |
| clm_idx.append((key,column_matches[key][0][1])) | |
| starting_row_index.append(column_matches[key][0][0]) | |
| else: | |
| clm_idx.append((key,column_matches[key][0])) | |
| return clm_idx, starting_row_index | |
| def generate_current_table_without_cropping(clm_idx,df): | |
| selected_df = df.iloc[:, clm_idx] | |
| print("hello I generated the selected columns table without cropping") | |
| return selected_df | |
| def column_name_index(clm_idx): | |
| clmn_name = [] | |
| clmn_idx = [] | |
| for indd in clm_idx: | |
| cl_nm, cl_idx = indd | |
| clmn_name.append(cl_nm) | |
| clmn_idx.append(cl_idx) | |
| return clmn_name, clmn_idx | |
| def crop_rename_table(indices, clmn_name, clmn_idx,df): | |
| #crop_at = (max(set(indices), key=indices.count)) + 1 | |
| crop_at = max(indices) + 1 | |
| df = df.iloc[crop_at:] # Starts from row index 5 (zero-based index) | |
| df.reset_index(drop=True, inplace=True) # Reset index after cropping | |
| slctd_clms = df.iloc[:, clmn_idx] # Select columns by index | |
| slctd_clms.columns = clmn_name # Rename selected columns | |
| return slctd_clms | |
| def details_in_another_table(clmn_name, clmn_idx, current_dfs, dfs): | |
| for dff in dfs: | |
| if dff.shape[1] == current_dfs.shape[1]: | |
| df = dff | |
| # Create a new DataFrame with selected columns | |
| new_df = df.iloc[:, clmn_idx].copy() # Use .copy() to avoid modifying original df | |
| column_names_row = pd.DataFrame([new_df.columns], columns=new_df.columns) | |
| # Append the original data below the column names row | |
| new_df = pd.concat([column_names_row, new_df], ignore_index=True) | |
| # Rename columns | |
| new_df.columns = clmn_name | |
| return new_df | |
| def extract_tables(schedule): | |
| doc = fitz.open("pdf",schedule) | |
| for page in doc: | |
| tabs = page.find_tables() | |
| dfs = [] | |
| for tab in tabs: | |
| df = tab.to_pandas() | |
| dfs.append(df) | |
| return dfs | |
| def get_selected_columns(dfs): | |
| selected_columns = [] | |
| for i in range(len(dfs)): | |
| column_matches = search_columns(dfs[i]) | |
| clm_idx, starting_row_index = row_clmn_indices(column_matches) | |
| clmn_name, clmn_idx = column_name_index(clm_idx) | |
| if len(clm_idx) == 0 and len(starting_row_index) == 0: | |
| print(f"this is df {i}, SEARCH IN ANOTHER DF") | |
| else: | |
| #MIX | |
| if (len(clm_idx) != len(starting_row_index)) and len(starting_row_index) > 0: | |
| print(f"this is df {i} MIX, search in another df but make sure of the length") | |
| #IN COLUMNS | |
| if len(starting_row_index) == 0: | |
| print(f"this is df {i} mawgooda fel columns, check el df length 3ashan law el details fe table tany") | |
| #details in another table | |
| if len(dfs[i]) <10: | |
| selected_columns_new = details_in_another_table(clmn_name, clmn_idx, dfs[i], dfs) | |
| selected_columns.append((selected_columns_new, dfs[i],clm_idx, clmn_name, starting_row_index)) | |
| #details in the same table | |
| if len(dfs[i]) >10: | |
| selected_columns_new = generate_current_table_without_cropping(clmn_idx,dfs[i]) | |
| selected_columns.append((selected_columns_new, dfs[i],clm_idx, clmn_name, starting_row_index)) | |
| #IN CELLS | |
| if len(starting_row_index) == len(clm_idx): | |
| print(f"this is df {i} mawgooda fel cells, check el df length 3ashan law el details fe table tany") | |
| #details in another table | |
| if len(dfs[i]) <10: | |
| selected_columns_new = details_in_another_table(clmn_name, clmn_idx, dfs[i], dfs) | |
| selected_columns.append((selected_columns_new, dfs[i],clm_idx, clmn_name, starting_row_index)) | |
| #details in the same table | |
| if len(dfs[i]) >10: | |
| print(f"this is df {i} call crop_rename_table(indices, clmn_name, clmn_idx,df)") | |
| selected_columns_new = crop_rename_table(starting_row_index, clmn_name, clmn_idx,dfs[i]) | |
| selected_columns.append((selected_columns_new, dfs[i],clm_idx, clmn_name, starting_row_index)) | |
| return selected_columns | |
| def get_st_op_pattern(clm_idx, clmn_name, starting_row_index, df): | |
| target = 'structural opening' | |
| clm_dict = dict(clm_idx) # Convert list of tuples to dictionary | |
| structural_opening_value = clm_dict.get(target) # Returns None if not found | |
| if target in clmn_name: | |
| position = clmn_name.index(target) | |
| kelma = df.iloc[starting_row_index[position], structural_opening_value] | |
| return kelma | |
| def get_similar_colors(selected_columns_new): | |
| def generate_rgb(): | |
| return (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)) # RGB tuple | |
| unique_keys = selected_columns_new['door_type'].unique() | |
| key_colors = {key: generate_rgb() for key in unique_keys} # Assign a unique RGB color to each key | |
| # Create dictionary storing values, colors, and widths | |
| col_dict = defaultdict(lambda: {'values': [], 'color': None, 'widths': []}) | |
| for _, row in selected_columns_new.iterrows(): | |
| key = row['door_type'] | |
| col_dict[key]['values'].append(row['door_id']) | |
| col_dict[key]['widths'].append(row['structural opening']) # Add structural opening | |
| col_dict[key]['color'] = key_colors[key] # Assign the unique RGB color | |
| # Convert defaultdict to a normal dictionary | |
| col_dict = dict(col_dict) | |
| return col_dict | |
| def get_flattened_tuples_list(col_dict): | |
| tuples_list = [] | |
| for key in col_dict.keys(): | |
| tuples_list.append([(value, width, col_dict[key]["color"]) for value, width in zip(col_dict[key]['values'], col_dict[key]['widths'])]) | |
| flattened_list = [item for sublist in tuples_list for item in sublist] | |
| return flattened_list | |
| def find_text_in_plan(label, x): | |
| substring_coordinates = [] | |
| words = [] | |
| point_list = [] | |
| #None, None, None | |
| for tpl in x: | |
| if tpl[4] == label: | |
| substring_coordinates.append(calculate_midpoint(tpl[0],tpl[1],tpl[2],tpl[3]))# for pdf | |
| point_list.append(calculate_midpoint(tpl[1],tpl[0],tpl[3],tpl[2]))# for rotated | |
| words.append(tpl[4]) | |
| return substring_coordinates, words, point_list | |
| def get_word_locations_plan(flattened_list, plan_texts): | |
| locations = [] | |
| not_found = [] | |
| for lbl, w, clr in flattened_list: | |
| location,worz, txt_pt = find_text_in_plan(lbl, plan_texts) | |
| if len(location) ==0: | |
| not_found.append(lbl) | |
| locations.append((location, lbl, clr, w)) | |
| return locations, not_found | |
| def get_repeated_labels(locations): | |
| seen_labels = set() | |
| repeated_labels = set() | |
| for item in locations: | |
| label = item[1] | |
| if label in seen_labels: | |
| repeated_labels.add(label) | |
| else: | |
| seen_labels.add(label) | |
| return repeated_labels | |
| def get_cleaned_data(locations): | |
| processed = defaultdict(int) | |
| new_data = [] | |
| for coords, label, color, w in locations: | |
| if len(coords)>1: | |
| index = processed[label] % len(coords) # Round-robin indexing | |
| new_coord = [coords[index]] # Pick the correct coordinate | |
| new_data.append((new_coord, label, color, w)) | |
| processed[label] += 1 # Move to the next coordinate for this label | |
| if len(coords)==1: | |
| new_data.append((coords, label, color, w)) | |
| return new_data | |
| def get_width_info_tobeprinted(new_data): | |
| width_info_tobeprinted = [] | |
| for _,_,_, w in new_data: | |
| width_info_tobeprinted.append(w) | |
| return width_info_tobeprinted | |
| def clean_dimensions(text): | |
| # Remove commas and "mm" | |
| text = re.sub(r'[,\s]*mm', '', text) # Remove "mm" with optional spaces or commas before it | |
| text = text.replace(",", "") # Remove remaining commas if any | |
| return text | |
| def get_cleaned_width(width_info_tobeprinted): | |
| cleaned_width = [] | |
| for w in width_info_tobeprinted: | |
| cleaned_width.append(clean_dimensions(w)) | |
| return cleaned_width | |
| def get_widths_bb_format(cleaned_width, kelma): | |
| pattern = r"\bW(?:idth)?\s*[×x]\s*H(?:eight)?\b" | |
| match = re.search(pattern, kelma) | |
| widths = [] | |
| for widthaa in cleaned_width: | |
| index = max(widthaa.find("x"), widthaa.find("×"), widthaa.find("x"), widthaa.find("X"), widthaa.find("x")) | |
| width_name = widthaa[:index] | |
| height_name = widthaa[index+1:] | |
| if match: | |
| full_text = f"{width_name}mm wide x {height_name}mm high" | |
| else: | |
| full_text = f"{height_name}mm wide x {width_name}mm high" | |
| widths.append(full_text) | |
| return widths | |
| import fitz # PyMuPDF | |
| import PyPDF2 | |
| import io | |
| from PyPDF2.generic import TextStringObject # ✅ Required for setting string values | |
| def add_bluebeam_count_annotations(pdf_bytes, locations): | |
| pdf_stream = io.BytesIO(pdf_bytes) # Load PDF from bytes | |
| pdf_document = fitz.open("pdf", pdf_stream.read()) # Open PDF in memory | |
| page = pdf_document[0] # First page | |
| for loc in locations: | |
| coor, lbl, clr,w = loc | |
| clr = (clr[0] / 255, clr[1] / 255, clr[2] / 255) | |
| for cor in coor: | |
| #Create a Circle annotation (Count Markup) | |
| annot = page.add_circle_annot( | |
| fitz.Rect(cor[0] - 10, cor[1] - 10, cor[0] + 10, cor[1] + 10) # Small circle | |
| ) | |
| #Assign required Bluebeam metadata | |
| annot.set_colors(stroke=clr, fill=(1, 1, 1)) # Set stroke color and fill white | |
| annot.set_border(width=2) # Border thickness | |
| annot.set_opacity(1) # Fully visible | |
| #Set annotation properties for Bluebeam Count detection | |
| annot.set_info("name", lbl) # Unique name for each count | |
| annot.set_info("subject", "Count") #Bluebeam uses "Count" for Count markups | |
| annot.set_info("title", lbl) # Optional | |
| annot.update() # Apply changes | |
| #Save modified PDF to a variable instead of a file | |
| output_stream = io.BytesIO() | |
| pdf_document.save(output_stream) | |
| pdf_document.close() | |
| return output_stream.getvalue() # Return the modified PDF as bytes | |
| def modify_author_in_pypdf2(pdf_bytes, new_authors): | |
| pdf_stream = io.BytesIO(pdf_bytes) # Load PDF from bytes | |
| reader = PyPDF2.PdfReader(pdf_stream) | |
| writer = PyPDF2.PdfWriter() | |
| author_index = 0 # Track author assignment | |
| for page in reader.pages: | |
| if "/Annots" in page: #Check if annotations exist | |
| for annot in page["/Annots"]: | |
| annot_obj = annot.get_object() | |
| # Assign each annotation a unique author | |
| if author_index < len(new_authors): | |
| annot_obj.update({"/T": TextStringObject(new_authors[author_index])})#Convert to PdfString | |
| author_index += 1 # Move to next author | |
| # If authors list is exhausted, keep the last one | |
| else: | |
| annot_obj.update({"/T": TextStringObject(new_authors[-1])}) | |
| writer.add_page(page) | |
| #Save the modified PDF to a variable | |
| output_stream = io.BytesIO() | |
| writer.write(output_stream) | |
| output_stream.seek(0) | |
| return output_stream.read() | |
| # return output_stream.getvalue() # Return modified PDF as bytes | |
| def process_pdf(input_pdf_path, output_pdf_path, locations, new_authors): | |
| #Load original PDF | |
| # with open(input_pdf_path, "rb") as file: | |
| # original_pdf_bytes = file.read() | |
| #Add Bluebeam-compatible count annotations | |
| annotated_pdf_bytes = add_bluebeam_count_annotations(input_pdf_path, locations) | |
| #Modify author field using PyPDF2 | |
| final_pdf_bytes = modify_author_in_pypdf2(annotated_pdf_bytes, new_authors) | |
| return final_pdf_bytes | |
| # #Save the final modified PDF to disk | |
| # with open(output_pdf_path, "wb") as file: | |
| # file.write(final_pdf_bytes) | |
| def mainRun(schedule, plan): | |
| dfs = extract_tables(schedule) | |
| selected_columns = get_selected_columns(dfs) | |
| selected_columns_new = selected_columns[0][0] | |
| df = selected_columns[0][1] | |
| clm_idx = selected_columns[0][2] | |
| clmn_name = selected_columns[0][3] | |
| starting_row_index = selected_columns[0][4] | |
| kelma = get_st_op_pattern(clm_idx, clmn_name, starting_row_index,df) | |
| col_dict = get_similar_colors(selected_columns_new) | |
| flattened_list = get_flattened_tuples_list(col_dict) | |
| plan_texts = read_text(plan) | |
| locations, not_found = get_word_locations_plan(flattened_list,plan_texts) | |
| new_data = get_cleaned_data(locations) | |
| repeated_labels = get_repeated_labels(locations) | |
| width_info_tobeprinted = get_width_info_tobeprinted(new_data) | |
| cleaned_width = get_cleaned_width(width_info_tobeprinted) | |
| widths = get_widths_bb_format(cleaned_width, kelma) | |
| final_pdf_bytes= process_pdf(plan, "final_output_width.pdf", new_data, widths) | |
| doc2 =fitz.open('pdf',final_pdf_bytes) | |
| page=doc2[0] | |
| pix = page.get_pixmap() # render page to an image | |
| pl=Image.frombytes('RGB', [pix.width,pix.height],pix.samples) | |
| img=np.array(pl) | |
| annotatedimg = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) | |
| list1=pd.DataFrame(columns=['content', 'id', 'subject','color']) | |
| # for page in doc: | |
| for page in doc2: | |
| # Iterate through annotations on the page | |
| for annot in page.annots(): | |
| # Get the color of the annotation | |
| annot_color = annot.colors | |
| if annot_color is not None: | |
| # annot_color is a dictionary with 'stroke' and 'fill' keys | |
| stroke_color = annot_color.get('stroke') # Border color | |
| fill_color = annot_color.get('fill') # Fill color | |
| if fill_color: | |
| v='fill' | |
| # print('fill') | |
| if stroke_color: | |
| v='stroke' | |
| x,y,z=int(annot_color.get(v)[0]*255),int(annot_color.get(v)[1]*255),int(annot_color.get(v)[2]*255) | |
| list1.loc[len(list1)] =[annot.info['content'],annot.info['id'],annot.info['subject'],[x,y,z]] | |
| return annotatedimg, doc2 , list1, repeated_labels , not_found | |