from transformers import TrOCRProcessor, VisionEncoderDecoderModel from PIL import Image import easyocr import requests import warnings from skimage.io import imread from skimage.color import rgb2gray import matplotlib.pyplot as plt from skimage.filters import sobel import numpy as np from heapq import * import gradio as gr from skimage.filters import threshold_otsu from skimage.util import invert import imageio from matplotlib.dates import SU from regex import F from sklearn.feature_extraction.text import TfidfVectorizer from sentence_transformers import SentenceTransformer, util from sklearn.metrics.pairwise import cosine_similarity import spacy import pandas as pd from tqdm import tqdm import textdistance from spacy.lang.en.stop_words import STOP_WORDS #import psycopg2 import os from tensorflow.keras.applications.resnet50 import ResNet50,preprocess_input, decode_predictions from tensorflow.keras.preprocessing import image from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity def horizontal_projections(sobel_image): return np.sum(sobel_image, axis=1) def find_peak_regions(hpp, divider=4): threshold = (np.max(hpp)-np.min(hpp))/divider peaks = [] for i, hppv in enumerate(hpp): if hppv < threshold: peaks.append([i, hppv]) return peaks def heuristic(a, b): return (b[0] - a[0]) ** 2 + (b[1] - a[1]) ** 2 def get_hpp_walking_regions(peaks_index): hpp_clusters = [] cluster = [] for index, value in enumerate(peaks_index): cluster.append(value) if index < len(peaks_index)-1 and peaks_index[index+1] - value > 1: hpp_clusters.append(cluster) cluster = [] #get the last cluster if index == len(peaks_index)-1: hpp_clusters.append(cluster) cluster = [] return hpp_clusters def astar(array, start, goal): neighbors = [(0,1),(0,-1),(1,0),(-1,0),(1,1),(1,-1),(-1,1),(-1,-1)] close_set = set() came_from = {} gscore = {start:0} fscore = {start:heuristic(start, goal)} oheap = [] heappush(oheap, (fscore[start], start)) while oheap: current = heappop(oheap)[1] if current == goal: data = [] while current in came_from: data.append(current) current = came_from[current] return data close_set.add(current) for i, j in neighbors: neighbor = current[0] + i, current[1] + j tentative_g_score = gscore[current] + heuristic(current, neighbor) if 0 <= neighbor[0] < array.shape[0]: if 0 <= neighbor[1] < array.shape[1]: if array[neighbor[0]][neighbor[1]] == 1: continue else: # array bound y walls continue else: # array bound x walls continue if neighbor in close_set and tentative_g_score >= gscore.get(neighbor, 0): continue if tentative_g_score < gscore.get(neighbor, 0) or neighbor not in [i[1]for i in oheap]: came_from[neighbor] = current gscore[neighbor] = tentative_g_score fscore[neighbor] = tentative_g_score + heuristic(neighbor, goal) heappush(oheap, (fscore[neighbor], neighbor)) return [] def get_binary(img): mean = np.mean(img) if mean == 0.0 or mean == 1.0: return img thresh = threshold_otsu(img) binary = img <= thresh binary = binary*1 return binary def path_exists(window_image): #very basic check first then proceed to A* check if 0 in horizontal_projections(window_image): return True padded_window = np.zeros((window_image.shape[0],1)) world_map = np.hstack((padded_window, np.hstack((window_image,padded_window)) ) ) path = np.array(astar(world_map, (int(world_map.shape[0]/2), 0), (int(world_map.shape[0]/2), world_map.shape[1]))) if len(path) > 0: return True return False def get_road_block_regions(nmap): road_blocks = [] needtobreak = False for col in range(nmap.shape[1]): start = col end = col+20 if end > nmap.shape[1]-1: end = nmap.shape[1]-1 needtobreak = True if path_exists(nmap[:, start:end]) == False: road_blocks.append(col) if needtobreak == True: break return road_blocks def group_the_road_blocks(road_blocks): #group the road blocks road_blocks_cluster_groups = [] road_blocks_cluster = [] size = len(road_blocks) for index, value in enumerate(road_blocks): road_blocks_cluster.append(value) if index < size-1 and (road_blocks[index+1] - road_blocks[index]) > 1: road_blocks_cluster_groups.append([road_blocks_cluster[0], road_blocks_cluster[len(road_blocks_cluster)-1]]) road_blocks_cluster = [] if index == size-1 and len(road_blocks_cluster) > 0: road_blocks_cluster_groups.append([road_blocks_cluster[0], road_blocks_cluster[len(road_blocks_cluster)-1]]) road_blocks_cluster = [] return road_blocks_cluster_groups def extract_line_from_image(image, lower_line, upper_line): lower_boundary = np.min(lower_line[:, 0]) upper_boundary = np.min(upper_line[:, 0]) img_copy = np.copy(image) r, c = img_copy.shape for index in range(c-1): img_copy[0:lower_line[index, 0], index] = 0 img_copy[upper_line[index, 0]:r, index] = 0 return img_copy[lower_boundary:upper_boundary, :] def extract(image1): img = rgb2gray(image1) print(img) #img = rgb2gray(imread("Penwritten_2048x.jpeg")) #img = rgb2gray(imread("test.jpg")) #img = rgb2gray(imread("")) sobel_image = sobel(img) hpp = horizontal_projections(sobel_image) warnings.filterwarnings("ignore") #find the midway where we can make a threshold and extract the peaks regions #divider parameter value is used to threshold the peak values from non peak values. peaks = find_peak_regions(hpp) peaks_index = np.array(peaks)[:,0].astype(int) #print(peaks_index.shape) segmented_img = np.copy(img) r= segmented_img.shape for ri in range(r[0]): if ri in peaks_index: segmented_img[ri, :] = 0 #group the peaks into walking windows hpp_clusters = get_hpp_walking_regions(peaks_index) #a star path planning algorithm #Scan the paths to see if there are any blockers. binary_image = get_binary(img) for cluster_of_interest in hpp_clusters: nmap = binary_image[cluster_of_interest[0]:cluster_of_interest[len(cluster_of_interest)-1],:] road_blocks = get_road_block_regions(nmap) road_blocks_cluster_groups = group_the_road_blocks(road_blocks) #create the doorways for index, road_blocks in enumerate(road_blocks_cluster_groups): window_image = nmap[:, road_blocks[0]: road_blocks[1]+10] binary_image[cluster_of_interest[0]:cluster_of_interest[len(cluster_of_interest)-1],:][:, road_blocks[0]: road_blocks[1]+10][int(window_image.shape[0]/2),:] *= 0 #now that everything is cleaner, its time to segment all the lines using the A* algorithm line_segments = [] #print(len(hpp_clusters)) #print(hpp_clusters) for i, cluster_of_interest in enumerate(hpp_clusters): nmap = binary_image[cluster_of_interest[0]:cluster_of_interest[len(cluster_of_interest)-1],:] path = np.array(astar(nmap, (int(nmap.shape[0]/2), 0), (int(nmap.shape[0]/2),nmap.shape[1]-1))) #print(path.shape) if path.shape[0]!=0: #break offset_from_top = cluster_of_interest[0] #print(offset_from_top) path[:,0] += offset_from_top #print(path) line_segments.append(path) #print(i) cluster_of_interest = hpp_clusters[1] offset_from_top = cluster_of_interest[0] nmap = binary_image[cluster_of_interest[0]:cluster_of_interest[len(cluster_of_interest)-1],:] #plt.figure(figsize=(20,20)) #plt.imshow(invert(nmap), cmap="gray") path = np.array(astar(nmap, (int(nmap.shape[0]/2), 0), (int(nmap.shape[0]/2),nmap.shape[1]-1))) #plt.plot(path[:,1], path[:,0]) offset_from_top = cluster_of_interest[0] ## add an extra line to the line segments array which represents the last bottom row on the image last_bottom_row = np.flip(np.column_stack(((np.ones((img.shape[1],))*img.shape[0]), np.arange(img.shape[1]))).astype(int), axis=0) line_segments.append(last_bottom_row) line_images = [] line_count = len(line_segments) fig, ax = plt.subplots(figsize=(10,10), nrows=line_count-1) output = [] for line_index in range(line_count-1): line_image = extract_line_from_image(img, line_segments[line_index], line_segments[line_index+1]) line_images.append(line_image) #print(line_image) #cv2.imwrite('/Users/vatsalya/Desktop/demo.jpeg',line_image) line_image=line_image*255 im=Image.fromarray(line_image) im=im.convert("L") print(im) im.save("demo.jpeg") print("#### Image Saved #######") # new_p = Image.fromarray(line_image) # if new_p.mode != 'RGB': # new_p = new_p.convert('RGB') #print(line_image) # new_p = Image.fromarray(line_image) # new_p = new_p.convert("L") #imageio.imwrite('demo1.jpeg',line_image) image = Image.open("demo.jpeg").convert("RGB") #print("Started Processing") #image = line_image processor = TrOCRProcessor.from_pretrained('microsoft/trocr-base-handwritten') model = VisionEncoderDecoderModel.from_pretrained('microsoft/trocr-base-handwritten') pixel_values = processor(images=image, return_tensors="pt").pixel_values generated_ids = model.generate(pixel_values) generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0] print(generated_text) output.append(generated_text) #ax[line_index].imshow(line_image, cmap="gray") result="" for o in output: result=result+o result=result+" " return result nlp = spacy.load("en_core_web_md") def listToString(s): # initialize an empty string str1 = " " # return string return (str1.join(s)) def rm_stop(my_doc): # Create list of word tokens token_list = [] for token in my_doc: token_list.append(token.text) # Create list of word tokens after removing stopwords filtered_sentence =[] for word in token_list: lexeme = nlp.vocab[word] if lexeme.is_stop == False: filtered_sentence.append(word) return filtered_sentence def text_processing(sentence): sentence = [token.lemma_.lower() for token in nlp(sentence) if token.is_alpha and not token.is_stop] return sentence def jaccard_sim(sent1,sent2): # Text Processing sentence1 = text_processing(sent1) sentence2 = text_processing(sent2) # Jaccard similarity return textdistance.jaccard.normalized_similarity(sentence1, sentence2) def sim(Ideal_Answer,Submitted_Answer): # SBERT EMBEDDINGS text1=Ideal_Answer.replace("\"","").replace("\'","") text2=Submitted_Answer.replace("\"","").replace("\'","") output=[] model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') #Compute embedding for both lists embedding_1= model.encode(text1, convert_to_tensor=True) embedding_2 = model.encode(text2, convert_to_tensor=True) score=util.pytorch_cos_sim(embedding_1, embedding_2) output.append("SBERT:"+str(int(float(str(score).split("[")[2].split("]")[0])*10.0))+",") sbert=int(float(str(score).split("[")[2].split("]")[0])*10.0) #Jaccard output.append("Jaccard:"+str(int(jaccard_sim(text1,text2)*10.0))+",") #spacy average word2vec nlp = spacy.load("en_core_web_md") # make sure to use larger package! doc1 = listToString(rm_stop(nlp(text1))) doc2 = listToString(rm_stop(nlp(text2))) # Similarity of two documents w2v=int(nlp(doc1).similarity(nlp(doc2))*10.0) final_score=int(0.8*sbert+0.2*w2v) output.append("Word2Vec:"+str(int(nlp(doc1).similarity(nlp(doc2))*10.0))+",final_score:"+str(final_score)) out_string=listToString(output) #return out_string return str(out_string),final_score def return_image_embedding(model,img_path): #img = image.load_img(img_path, target_size=(224, 224)) x = image.img_to_array(img_path) x = np.expand_dims(x, axis=0) x = preprocess_input(x) preds = model.predict(x) curr_df = pd.DataFrame(preds[0]).T print(curr_df.to_numpy()) return curr_df.to_numpy() def draw_boxes(image, bounds, color='yellow', width=2): draw = ImageDraw.Draw(image) for bound in bounds: p0, p1, p2, p3 = bound[0] draw.line([*p0, *p1, *p2, *p3, *p0], fill=color, width=width) return image def inference(img, lang): reader = easyocr.Reader(lang) bounds = reader.readtext(img,detail=0) #im = PIL.Image.open(img.name) #draw_boxes(im, bounds) #im.save('result.jpg') return bounds def compute_tfidf_embeddings(words_list1, words_list2): # Combine the words from both lists combined_words = words_list1 + words_list2 # Initialize the TF-IDF vectorizer vectorizer = TfidfVectorizer() # Compute the TF-IDF matrix tfidf_matrix = vectorizer.fit_transform(combined_words) # Split the matrix into separate parts for the two lists tfidf_matrix_list1 = tfidf_matrix[:len(words_list1)] tfidf_matrix_list2 = tfidf_matrix[len(words_list1):] return tfidf_matrix_list1, tfidf_matrix_list2 def compute_cosine_similarity(tfidf_matrix_list1, tfidf_matrix_list2): # Compute the cosine similarity between the two TF-IDF matrices similarity_matrix = cosine_similarity(tfidf_matrix_list1, tfidf_matrix_list2) return similarity_matrix def extract_eval(Ideal_Answer_Text,Ideal_Answer_Diagram,Submitted_Answer_Text,Submitted_Answer_Diagram): # print(image1) ideal_text=extract(Ideal_Answer_Text) print("Extracting Ideal Text \n") print(ideal_text) submitted_text=extract(Submitted_Answer_Text) print("Extracting Submitted Text \n") print(submitted_text) a,b=sim(ideal_text,submitted_text) print(a) text_sim_score=b model = ResNet50(include_top=False, weights='imagenet', pooling='avg') diagram_1_embed=return_image_embedding(model,Ideal_Answer_Diagram) diagram_2_embed=return_image_embedding(model,Submitted_Answer_Diagram) diagram_embed_sim_score=util.pytorch_cos_sim(diagram_1_embed, diagram_2_embed) print("Diagram Embedding Similarity Score \n") print(str(int(float(str(diagram_embed_sim_score).split("[")[2].split("]")[0])*10.0))) diagram_1_text=inference(Ideal_Answer_Diagram,['en']) diagram_2_text=inference(Submitted_Answer_Diagram,['en']) tfidf_matrix_list1, tfidf_matrix_list2 = compute_tfidf_embeddings(diagram_1_text, diagram_2_text) similarity_matrix = compute_cosine_similarity(tfidf_matrix_list1, tfidf_matrix_list2) print("Diagram Text Embedding Similarity Score \n") print(similarity_matrix[0][0]) text_sim=similarity_matrix[0][0] img_sim=float(str(diagram_embed_sim_score).split("[")[2].split("]")[0]) diagram_similarity=0.7*text_sim+0.3*img_sim print("Diagram Overall Similarity\n") print(diagram_similarity*10) final_string=a+"\n"+"Diagram Embedding Similarity Score \n"+str(int(float(str(diagram_embed_sim_score).split("[")[2].split("]")[0])*10.0))+"Diagram Text Embedding Similarity Score \n"+str(similarity_matrix[0][0])+"Diagram Overall Similarity\n"+str(diagram_similarity*10) return final_string iface = gr.Interface(fn=extract_eval, inputs=["image","image","image","image"], outputs=gr.outputs.Textbox(),) iface.launch(enable_queue=True)