import os from azure.ai.vision.imageanalysis import ImageAnalysisClient from azure.ai.vision.imageanalysis.models import VisualFeatures from azure.core.credentials import AzureKeyCredential import numpy as np import networkx as nx from modules.utils import class_dict, proportion_inside import json from modules.utils import rescale_boxes as rescale, is_vertical from transformers import AutoTokenizer, AutoModelForSequenceClassification import torch import logging VISION_KEY = os.getenv("VISION_KEY") VISION_ENDPOINT = os.getenv("VISION_ENDPOINT") #If local execution """with open("VISION_KEY.json", "r") as json_file: json_data = json.load(json_file) VISION_KEY = json_data["VISION_KEY"] VISION_ENDPOINT = json_data["VISION_ENDPOINT"]""" # Suppress specific warnings from transformers logging.getLogger("transformers.modeling_utils").setLevel(logging.ERROR) # Function to initialize the model and tokenizer def initialize_model(): """ Initialize the tokenizer and model for sentiment analysis. """ tokenizer = AutoTokenizer.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment-latest") model = AutoModelForSequenceClassification.from_pretrained("cardiffnlp/twitter-roberta-base-sentiment-latest") return tokenizer, model # Initialize model and tokenizer tokenizer, emotion_model = initialize_model() # Function to perform sentiment analysis and return the highest scoring emotion and its score between positive and negative def analyze_sentiment(sentence, tokenizer=tokenizer, model=emotion_model): """ Analyze the sentiment of a given sentence using the initialized tokenizer and model. Parameters: - sentence (str): The input sentence to analyze. - tokenizer (AutoTokenizer): The tokenizer for processing the sentence. - model (AutoModelForSequenceClassification): The model for sentiment analysis. Returns: - tuple: The highest scoring emotion ('positive' or 'negative') and its corresponding score. """ inputs = tokenizer(sentence, return_tensors="pt") outputs = model(**inputs) probs = torch.nn.functional.softmax(outputs.logits, dim=-1).squeeze().tolist() labels = ["negative", "neutral", "positive"] results = dict(zip(labels, probs)) # Filter out the neutral score and get the highest score between positive and negative relevant_results = {k: results[k] for k in ["positive", "negative"]} highest_emotion = max(relevant_results, key=relevant_results.get) highest_score = relevant_results[highest_emotion] return highest_emotion, highest_score def sample_ocr_image_file(image_data): """ Sample OCR function to analyze an image file and extract text using Azure's Computer Vision service. Parameters: - image_data (bytes): The image data in bytes. Returns: - result: The OCR result from the Computer Vision service. """ # Set the values of your computer vision endpoint and computer vision key as environment variables: try: endpoint = VISION_ENDPOINT key = VISION_KEY except KeyError: print("Missing environment variable 'VISION_ENDPOINT' or 'VISION_KEY'") print("Set them before running this sample.") exit() # Create an Image Analysis client client = ImageAnalysisClient( endpoint=endpoint, credential=AzureKeyCredential(key) ) # Extract text (OCR) from an image stream. This will be a synchronously (blocking) call. result = client.analyze( image_data=image_data, visual_features=[VisualFeatures.READ] ) return result def text_prediction(image): """ Perform OCR on an image to extract text. Parameters: - image: The image to process. Returns: - ocr_result: The OCR result. """ # Transform the image into a byte array image.save('temp.jpg') with open('temp.jpg', 'rb') as f: image_data = f.read() ocr_result = sample_ocr_image_file(image_data) # Delete the temporary image os.remove('temp.jpg') return ocr_result def filter_text(ocr_result, threshold=0.5): """ Filter and process the OCR results to remove unwanted characters and low-confidence words. Parameters: - ocr_result: The OCR result. - threshold (float): The confidence threshold for filtering words. Returns: - list_of_lines: Processed text lines and their bounding boxes. """ words_to_cancel = {"-","--","---","+",".",",","#","@","!","?","(",")","[","]","{","}","<",">","/","\\","|","-","_","=","&","^","%","$","£","€","¥","¢","¤","§","©","®","™","°","±","×","÷","¶","∆","∏","∑","∞","√","∫","≈","≠","≤","≥","≡","∼"} # Add every other one-letter word to the list of words to cancel, except 'I' and 'a' for letter in "bcdefghjklmnopqrstuvwxyz1234567890": # All lowercase letters except 'a' words_to_cancel.add(letter) words_to_cancel.add("i") words_to_cancel.add(letter.upper()) # Add the uppercase version as well characters_to_cancel = {"+", "<", ">"} # Characters to cancel list_of_lines = [] for block in ocr_result['readResult']['blocks']: for line in block['lines']: line_text = [] x_min, y_min = float('inf'), float('inf') x_max, y_max = float('-inf'), float('-inf') for word in line['words']: if word['text'] in words_to_cancel or any(disallowed_char in word['text'] for disallowed_char in characters_to_cancel): continue if word['confidence'] > threshold: if word['text']: line_text.append(word['text']) x = [point['x'] for point in word['boundingPolygon']] y = [point['y'] for point in word['boundingPolygon']] x_min = min(x_min, min(x)) y_min = min(y_min, min(y)) x_max = max(x_max, max(x)) y_max = max(y_max, max(y)) if line_text: # If there are valid words in the line list_of_lines.append({ 'text': ' '.join(line_text), 'boundingBox': [x_min,y_min,x_max,y_max] }) list_text = [] list_bbox = [] for i in range(len(list_of_lines)): list_text.append(list_of_lines[i]['text']) for i in range(len(list_of_lines)): list_bbox.append(list_of_lines[i]['boundingBox']) list_of_lines = [list_bbox, list_text] return list_of_lines def get_box_points(box): """ Returns all critical points of a box: corners and midpoints of edges. Parameters: - box (array): Bounding box coordinates [xmin, ymin, xmax, ymax]. Returns: - numpy.array: Array of critical points. """ xmin, ymin, xmax, ymax = box return np.array([ [xmin, ymin], # Bottom-left corner [xmax, ymin], # Bottom-right corner [xmin, ymax], # Top-left corner [xmax, ymax], # Top-right corner [(xmin + xmax) / 2, ymin], # Midpoint of bottom edge [(xmin + xmax) / 2, ymax], # Midpoint of top edge [xmin, (ymin + ymax) / 2], # Midpoint of left edge [xmax, (ymin + ymax) / 2] # Midpoint of right edge ]) def min_distance_between_boxes(box1, box2): """ Computes the minimum distance between two boxes considering all critical points. Parameters: - box1 (array): First bounding box coordinates. - box2 (array): Second bounding box coordinates. Returns: - float: The minimum distance between the two boxes. """ points1 = get_box_points(box1) points2 = get_box_points(box2) min_dist = float('inf') for point1 in points1: for point2 in points2: dist = np.linalg.norm(point1 - point2) if dist < min_dist: min_dist = dist return min_dist def are_close(box1, box2, threshold=50): """ Determines if boxes are close based on their corners and center points. Parameters: - box1 (array): First bounding box coordinates. - box2 (array): Second bounding box coordinates. - threshold (int): Distance threshold for determining closeness. Returns: - bool: True if boxes are close, otherwise False. """ corners1 = np.array([ [box1[0], box1[1]], [box1[0], box1[3]], [box1[2], box1[1]], [box1[2], box1[3]], [(box1[0]+box1[2])/2, box1[1]], [(box1[0]+box1[2])/2, box1[3]], [box1[0], (box1[1]+box1[3])/2], [box1[2], (box1[1]+box1[3])/2] ]) corners2 = np.array([ [box2[0], box2[1]], [box2[0], box2[3]], [box2[2], box2[1]], [box2[2], box2[3]], [(box2[0]+box2[2])/2, box2[1]], [(box2[0]+box2[2])/2, box2[3]], [box2[0], (box2[1]+box2[3])/2], [box2[2], (box2[1]+box2[3])/2] ]) for c1 in corners1: for c2 in corners2: if np.linalg.norm(c1 - c2) < threshold: return True return False def find_closest_box(text_box, all_boxes, labels, threshold, iou_threshold=0.5): """ Find the closest box to the given text box within a specified threshold. Parameters: - text_box (array): The text box coordinates. - all_boxes (list): List of all bounding boxes. - labels (list): List of labels corresponding to the boxes. - threshold (float): Distance threshold for determining closeness. - iou_threshold (float): IoU threshold for determining if a text is inside a sequenceFlow. Returns: - int or None: Index of the closest box or None if no box is close enough. """ min_distance = float('inf') closest_index = None # Check if the text is inside a sequenceFlow for j in range(len(all_boxes)): if proportion_inside(text_box, all_boxes[j]) > iou_threshold and labels[j] == list(class_dict.values()).index('sequenceFlow'): return j for i, box in enumerate(all_boxes): # Compute the center of both boxes center_text = np.array([(text_box[0] + text_box[2]) / 2, (text_box[1] + text_box[3]) / 2]) center_box = np.array([(box[0] + box[2]) / 2, (box[1] + box[3]) / 2]) # Calculate Euclidean distance between centers distance = np.linalg.norm(center_text - center_box) # Update closest box if this box is nearer if distance < min_distance: min_distance = distance closest_index = i # Check if the closest box found is within the acceptable threshold if min_distance < threshold: return closest_index return None def group_texts(task_boxes, text_boxes, texts, min_dist=50, iou_threshold=0.8, percentage_thresh=0.8): """ Maps text boxes to task boxes and groups texts within each task based on proximity. Parameters: - task_boxes (list): List of task bounding boxes. - text_boxes (list): List of text bounding boxes. - texts (list): List of texts corresponding to the text boxes. - min_dist (float): Minimum distance threshold for grouping. - iou_threshold (float): IoU threshold for determining if text is inside a task box. - percentage_thresh (float): Percentage threshold for determining if text boxes are close. Returns: - tuple: Grouped task-related texts, their bounding boxes, grouped information texts, and their bounding boxes. """ G = nx.Graph() # Map each text box to the nearest task box task_to_texts = {i: [] for i in range(len(task_boxes))} information_texts = [] # Texts not inside any task box text_to_task_mapped = [False] * len(text_boxes) for idx, text_box in enumerate(text_boxes): mapped = False for jdx, task_box in enumerate(task_boxes): if proportion_inside(text_box, task_box) > iou_threshold: task_to_texts[jdx].append(idx) text_to_task_mapped[idx] = True mapped = True break if not mapped: information_texts.append(idx) all_grouped_texts = [] sentence_boxes = [] # Store the bounding box for each sentence # Process texts for each task for task_texts in task_to_texts.values(): G.clear() for i in task_texts: G.add_node(i) for j in task_texts: if i != j and are_close(text_boxes[i], text_boxes[j]) and not is_vertical(text_boxes[i]) and not is_vertical(text_boxes[j]): G.add_edge(i, j) groups = list(nx.connected_components(G)) for group in groups: group = list(group) lines = {} for idx in group: y_center = (text_boxes[idx][1] + text_boxes[idx][3]) / 2 found_line = False for line in lines: if abs(y_center - line) < (text_boxes[idx][3] - text_boxes[idx][1]) / 2: lines[line].append(idx) found_line = True break if not found_line: lines[y_center] = [idx] sorted_lines = sorted(lines.keys()) grouped_texts = [] min_x = min_y = float('inf') max_x = max_y = -float('inf') for line in sorted_lines: sorted_indices = sorted(lines[line], key=lambda idx: text_boxes[idx][0]) line_text = ' '.join(texts[idx] for idx in sorted_indices) grouped_texts.append(line_text) for idx in sorted_indices: box = text_boxes[idx] min_x = min(min_x-5, box[0]-5) min_y = min(min_y-5, box[1]-5) max_x = max(max_x+5, box[2]+5) max_y = max(max_y+5, box[3]+5) all_grouped_texts.append(' '.join(grouped_texts)) sentence_boxes.append([min_x, min_y, max_x, max_y]) # Group information texts G.clear() info_sentence_boxes = [] for i in information_texts: G.add_node(i) for j in information_texts: if i != j and are_close(text_boxes[i], text_boxes[j], percentage_thresh * min_dist) and not is_vertical(text_boxes[i]) and not is_vertical(text_boxes[j]): G.add_edge(i, j) info_groups = list(nx.connected_components(G)) information_grouped_texts = [] for group in info_groups: group = list(group) lines = {} for idx in group: y_center = (text_boxes[idx][1] + text_boxes[idx][3]) / 2 found_line = False for line in lines: if abs(y_center - line) < (text_boxes[idx][3] - text_boxes[idx][1]) / 2: lines[line].append(idx) found_line = True break if not found_line: lines[y_center] = [idx] sorted_lines = sorted(lines.keys()) grouped_texts = [] min_x = min_y = float('inf') max_x = max_y = -float('inf') for line in sorted_lines: sorted_indices = sorted(lines[line], key=lambda idx: text_boxes[idx][0]) line_text = ' '.join(texts[idx] for idx in sorted_indices) grouped_texts.append(line_text) for idx in sorted_indices: box = text_boxes[idx] min_x = min(min_x, box[0]) min_y = min(min_y, box[1]) max_x = max(max_x, box[2]) max_y = max(max_y, box[3]) information_grouped_texts.append(' '.join(grouped_texts)) info_sentence_boxes.append([min_x, min_y, max_x, max_y]) return all_grouped_texts, sentence_boxes, information_grouped_texts, info_sentence_boxes def mapping_text(full_pred, text_pred, print_sentences=False, percentage_thresh=0.6, scale=1.0, iou_threshold=0.5): """ Map the extracted texts to the predicted bounding boxes. Parameters: - full_pred (dict): Full prediction dictionary containing boxes, labels, BPMN IDs, and pool dictionary. - text_pred (list): List containing text predictions and their bounding boxes. - print_sentences (bool): Whether to print the sentences and their bounding boxes. - percentage_thresh (float): Percentage threshold for determining closeness. - scale (float): Scale factor for rescaling bounding boxes. - iou_threshold (float): IoU threshold for determining if text is inside a bounding box. Returns: - dict: Text mapping for BPMN elements. """ boxes = rescale(scale, full_pred['boxes']) min_dist = 200 labels = full_pred['labels'] avoid = [list(class_dict.values()).index('pool'), list(class_dict.values()).index('lane'), list(class_dict.values()).index('sequenceFlow'), list(class_dict.values()).index('messageFlow'), list(class_dict.values()).index('dataAssociation')] for i in range(len(boxes)): box1 = boxes[i] if labels[i] in avoid: continue for j in range(i + 1, len(boxes)): box2 = boxes[j] if labels[j] in avoid: continue dist = min_distance_between_boxes(box1, box2) min_dist = min(min_dist, dist) # Print the minimum distance between boxes # print("Minimum distance between boxes:", min_dist) text_pred[0] = rescale(scale, text_pred[0]) task_boxes = [box for i, box in enumerate(boxes) if full_pred['labels'][i] == list(class_dict.values()).index('task')] grouped_sentences, sentence_bounding_boxes, info_texts, info_boxes = group_texts(task_boxes, text_pred[0], text_pred[1], min_dist=min_dist) BPMN_id = set(full_pred['BPMN_id']) # This ensures uniqueness of task names text_mapping = {id: '' for id in BPMN_id} if print_sentences: for sentence, box in zip(grouped_sentences, sentence_bounding_boxes): print("Task-related Text:", sentence) print("Bounding Box:", box) print("Information Texts:", info_texts) print("Information Bounding Boxes:", info_boxes) # Map the grouped sentences to the corresponding task for i in range(len(sentence_bounding_boxes)): for j in range(len(boxes)): if proportion_inside(sentence_bounding_boxes[i], boxes[j]) > iou_threshold and full_pred['labels'][j] == list(class_dict.values()).index('task'): text_mapping[full_pred['BPMN_id'][j]] = grouped_sentences[i] # Map the grouped sentences to the corresponding pool for key, elements in full_pred['pool_dict'].items(): if len(elements) > 0: continue else: for i in range(len(info_boxes)): # Find the position of the key in BPMN_id position = list(full_pred['BPMN_id']).index(key) if proportion_inside(info_boxes[i], boxes[position]) > iou_threshold: text_mapping[key] = info_texts[i] info_texts[i] = '' # Clear the text to avoid re-use for i in range(len(info_boxes)): if is_vertical(info_boxes[i]): for j in range(len(boxes)): if proportion_inside(info_boxes[i], boxes[j]) > 0 and full_pred['labels'][j] == list(class_dict.values()).index('pool'): print("Text:", info_texts[i], "associate with ", full_pred['BPMN_id'][j]) bpmn_id = full_pred['BPMN_id'][j] # Append new text or create new entry if not existing if bpmn_id in text_mapping: text_mapping[bpmn_id] += " " + info_texts[i] # Append text with a space in between else: text_mapping[bpmn_id] = info_texts[i] info_texts[i] = '' # Clear the text to avoid re-use # Map the grouped sentences to the corresponding object for i in range(len(info_boxes)): if is_vertical(info_boxes[i]): continue # Skip if the text is vertical for j in range(len(boxes)): if info_texts[i] == '': continue # Skip if there's no text if (proportion_inside(info_boxes[i], boxes[j]) > 0 or are_close(info_boxes[i], boxes[j], threshold=percentage_thresh * min_dist)) and (full_pred['labels'][j] == list(class_dict.values()).index('event') or full_pred['labels'][j] == list(class_dict.values()).index('messageEvent') or full_pred['labels'][j] == list(class_dict.values()).index('timerEvent') or full_pred['labels'][j] == list(class_dict.values()).index('dataObject')): bpmn_id = full_pred['BPMN_id'][j] # Append new text or create new entry if not existing if bpmn_id in text_mapping: text_mapping[bpmn_id] += " " + info_texts[i] # Append text with a space in between else: text_mapping[bpmn_id] = info_texts[i] info_texts[i] = '' # Clear the text to avoid re-use # Map the grouped sentences to the corresponding flow for i in range(len(info_boxes)): if info_texts[i] == '' or is_vertical(info_boxes[i]): continue # Skip if there's no text # Find the closest box within the defined threshold closest_index = find_closest_box(info_boxes[i], boxes, full_pred['labels'], threshold=4 * min_dist) if closest_index is not None and (full_pred['labels'][closest_index] == list(class_dict.values()).index('sequenceFlow') or full_pred['labels'][closest_index] == list(class_dict.values()).index('messageFlow')): bpmn_id = full_pred['BPMN_id'][closest_index] # Append new text or create new entry if not existing if bpmn_id in text_mapping: text_mapping[bpmn_id] += " " + info_texts[i] # Append text with a space in between else: text_mapping[bpmn_id] = info_texts[i] info_texts[i] = '' # Clear the text to avoid re-use if print_sentences: print("Text Mapping:", text_mapping) print("Information Texts left:", info_texts) return text_mapping