CHAAT / app.py
mscsasem3's picture
Update app.py
c098515
from transformers import TrOCRProcessor, VisionEncoderDecoderModel
from PIL import Image
import easyocr
import requests
import warnings
from skimage.io import imread
from skimage.color import rgb2gray
import matplotlib.pyplot as plt
from skimage.filters import sobel
import numpy as np
from heapq import *
import gradio as gr
from skimage.filters import threshold_otsu
from skimage.util import invert
import imageio
from matplotlib.dates import SU
from regex import F
from sklearn.feature_extraction.text import TfidfVectorizer
from sentence_transformers import SentenceTransformer, util
from sklearn.metrics.pairwise import cosine_similarity
import spacy
import pandas as pd
from tqdm import tqdm
import textdistance
from spacy.lang.en.stop_words import STOP_WORDS
#import psycopg2
import os
from tensorflow.keras.applications.resnet50 import ResNet50,preprocess_input, decode_predictions
from tensorflow.keras.preprocessing import image
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
def horizontal_projections(sobel_image):
return np.sum(sobel_image, axis=1)
def find_peak_regions(hpp, divider=4):
threshold = (np.max(hpp)-np.min(hpp))/divider
peaks = []
for i, hppv in enumerate(hpp):
if hppv < threshold:
peaks.append([i, hppv])
return peaks
def heuristic(a, b):
return (b[0] - a[0]) ** 2 + (b[1] - a[1]) ** 2
def get_hpp_walking_regions(peaks_index):
hpp_clusters = []
cluster = []
for index, value in enumerate(peaks_index):
cluster.append(value)
if index < len(peaks_index)-1 and peaks_index[index+1] - value > 1:
hpp_clusters.append(cluster)
cluster = []
#get the last cluster
if index == len(peaks_index)-1:
hpp_clusters.append(cluster)
cluster = []
return hpp_clusters
def astar(array, start, goal):
neighbors = [(0,1),(0,-1),(1,0),(-1,0),(1,1),(1,-1),(-1,1),(-1,-1)]
close_set = set()
came_from = {}
gscore = {start:0}
fscore = {start:heuristic(start, goal)}
oheap = []
heappush(oheap, (fscore[start], start))
while oheap:
current = heappop(oheap)[1]
if current == goal:
data = []
while current in came_from:
data.append(current)
current = came_from[current]
return data
close_set.add(current)
for i, j in neighbors:
neighbor = current[0] + i, current[1] + j
tentative_g_score = gscore[current] + heuristic(current, neighbor)
if 0 <= neighbor[0] < array.shape[0]:
if 0 <= neighbor[1] < array.shape[1]:
if array[neighbor[0]][neighbor[1]] == 1:
continue
else:
# array bound y walls
continue
else:
# array bound x walls
continue
if neighbor in close_set and tentative_g_score >= gscore.get(neighbor, 0):
continue
if tentative_g_score < gscore.get(neighbor, 0) or neighbor not in [i[1]for i in oheap]:
came_from[neighbor] = current
gscore[neighbor] = tentative_g_score
fscore[neighbor] = tentative_g_score + heuristic(neighbor, goal)
heappush(oheap, (fscore[neighbor], neighbor))
return []
def get_binary(img):
mean = np.mean(img)
if mean == 0.0 or mean == 1.0:
return img
thresh = threshold_otsu(img)
binary = img <= thresh
binary = binary*1
return binary
def path_exists(window_image):
#very basic check first then proceed to A* check
if 0 in horizontal_projections(window_image):
return True
padded_window = np.zeros((window_image.shape[0],1))
world_map = np.hstack((padded_window, np.hstack((window_image,padded_window)) ) )
path = np.array(astar(world_map, (int(world_map.shape[0]/2), 0), (int(world_map.shape[0]/2), world_map.shape[1])))
if len(path) > 0:
return True
return False
def get_road_block_regions(nmap):
road_blocks = []
needtobreak = False
for col in range(nmap.shape[1]):
start = col
end = col+20
if end > nmap.shape[1]-1:
end = nmap.shape[1]-1
needtobreak = True
if path_exists(nmap[:, start:end]) == False:
road_blocks.append(col)
if needtobreak == True:
break
return road_blocks
def group_the_road_blocks(road_blocks):
#group the road blocks
road_blocks_cluster_groups = []
road_blocks_cluster = []
size = len(road_blocks)
for index, value in enumerate(road_blocks):
road_blocks_cluster.append(value)
if index < size-1 and (road_blocks[index+1] - road_blocks[index]) > 1:
road_blocks_cluster_groups.append([road_blocks_cluster[0], road_blocks_cluster[len(road_blocks_cluster)-1]])
road_blocks_cluster = []
if index == size-1 and len(road_blocks_cluster) > 0:
road_blocks_cluster_groups.append([road_blocks_cluster[0], road_blocks_cluster[len(road_blocks_cluster)-1]])
road_blocks_cluster = []
return road_blocks_cluster_groups
def extract_line_from_image(image, lower_line, upper_line):
lower_boundary = np.min(lower_line[:, 0])
upper_boundary = np.min(upper_line[:, 0])
img_copy = np.copy(image)
r, c = img_copy.shape
for index in range(c-1):
img_copy[0:lower_line[index, 0], index] = 0
img_copy[upper_line[index, 0]:r, index] = 0
return img_copy[lower_boundary:upper_boundary, :]
def extract(image1):
img = rgb2gray(image1)
print(img)
#img = rgb2gray(imread("Penwritten_2048x.jpeg"))
#img = rgb2gray(imread("test.jpg"))
#img = rgb2gray(imread(""))
sobel_image = sobel(img)
hpp = horizontal_projections(sobel_image)
warnings.filterwarnings("ignore")
#find the midway where we can make a threshold and extract the peaks regions
#divider parameter value is used to threshold the peak values from non peak values.
peaks = find_peak_regions(hpp)
peaks_index = np.array(peaks)[:,0].astype(int)
#print(peaks_index.shape)
segmented_img = np.copy(img)
r= segmented_img.shape
for ri in range(r[0]):
if ri in peaks_index:
segmented_img[ri, :] = 0
#group the peaks into walking windows
hpp_clusters = get_hpp_walking_regions(peaks_index)
#a star path planning algorithm
#Scan the paths to see if there are any blockers.
binary_image = get_binary(img)
for cluster_of_interest in hpp_clusters:
nmap = binary_image[cluster_of_interest[0]:cluster_of_interest[len(cluster_of_interest)-1],:]
road_blocks = get_road_block_regions(nmap)
road_blocks_cluster_groups = group_the_road_blocks(road_blocks)
#create the doorways
for index, road_blocks in enumerate(road_blocks_cluster_groups):
window_image = nmap[:, road_blocks[0]: road_blocks[1]+10]
binary_image[cluster_of_interest[0]:cluster_of_interest[len(cluster_of_interest)-1],:][:, road_blocks[0]: road_blocks[1]+10][int(window_image.shape[0]/2),:] *= 0
#now that everything is cleaner, its time to segment all the lines using the A* algorithm
line_segments = []
#print(len(hpp_clusters))
#print(hpp_clusters)
for i, cluster_of_interest in enumerate(hpp_clusters):
nmap = binary_image[cluster_of_interest[0]:cluster_of_interest[len(cluster_of_interest)-1],:]
path = np.array(astar(nmap, (int(nmap.shape[0]/2), 0), (int(nmap.shape[0]/2),nmap.shape[1]-1)))
#print(path.shape)
if path.shape[0]!=0:
#break
offset_from_top = cluster_of_interest[0]
#print(offset_from_top)
path[:,0] += offset_from_top
#print(path)
line_segments.append(path)
#print(i)
cluster_of_interest = hpp_clusters[1]
offset_from_top = cluster_of_interest[0]
nmap = binary_image[cluster_of_interest[0]:cluster_of_interest[len(cluster_of_interest)-1],:]
#plt.figure(figsize=(20,20))
#plt.imshow(invert(nmap), cmap="gray")
path = np.array(astar(nmap, (int(nmap.shape[0]/2), 0), (int(nmap.shape[0]/2),nmap.shape[1]-1)))
#plt.plot(path[:,1], path[:,0])
offset_from_top = cluster_of_interest[0]
## add an extra line to the line segments array which represents the last bottom row on the image
last_bottom_row = np.flip(np.column_stack(((np.ones((img.shape[1],))*img.shape[0]), np.arange(img.shape[1]))).astype(int), axis=0)
line_segments.append(last_bottom_row)
line_images = []
line_count = len(line_segments)
fig, ax = plt.subplots(figsize=(10,10), nrows=line_count-1)
output = []
for line_index in range(line_count-1):
line_image = extract_line_from_image(img, line_segments[line_index], line_segments[line_index+1])
line_images.append(line_image)
#print(line_image)
#cv2.imwrite('/Users/vatsalya/Desktop/demo.jpeg',line_image)
line_image=line_image*255
im=Image.fromarray(line_image)
im=im.convert("L")
print(im)
im.save("demo.jpeg")
print("#### Image Saved #######")
# new_p = Image.fromarray(line_image)
# if new_p.mode != 'RGB':
# new_p = new_p.convert('RGB')
#print(line_image)
# new_p = Image.fromarray(line_image)
# new_p = new_p.convert("L")
#imageio.imwrite('demo1.jpeg',line_image)
image = Image.open("demo.jpeg").convert("RGB")
#print("Started Processing")
#image = line_image
processor = TrOCRProcessor.from_pretrained('microsoft/trocr-base-handwritten')
model = VisionEncoderDecoderModel.from_pretrained('microsoft/trocr-base-handwritten')
pixel_values = processor(images=image, return_tensors="pt").pixel_values
generated_ids = model.generate(pixel_values)
generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
print(generated_text)
output.append(generated_text)
#ax[line_index].imshow(line_image, cmap="gray")
result=""
for o in output:
result=result+o
result=result+" "
return result
nlp = spacy.load("en_core_web_md")
def listToString(s):
# initialize an empty string
str1 = " "
# return string
return (str1.join(s))
def rm_stop(my_doc):
# Create list of word tokens
token_list = []
for token in my_doc:
token_list.append(token.text)
# Create list of word tokens after removing stopwords
filtered_sentence =[]
for word in token_list:
lexeme = nlp.vocab[word]
if lexeme.is_stop == False:
filtered_sentence.append(word)
return filtered_sentence
def text_processing(sentence):
sentence = [token.lemma_.lower()
for token in nlp(sentence)
if token.is_alpha and not token.is_stop]
return sentence
def jaccard_sim(sent1,sent2):
# Text Processing
sentence1 = text_processing(sent1)
sentence2 = text_processing(sent2)
# Jaccard similarity
return textdistance.jaccard.normalized_similarity(sentence1, sentence2)
def sim(Ideal_Answer,Submitted_Answer):
# SBERT EMBEDDINGS
text1=Ideal_Answer.replace("\"","").replace("\'","")
text2=Submitted_Answer.replace("\"","").replace("\'","")
output=[]
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
#Compute embedding for both lists
embedding_1= model.encode(text1, convert_to_tensor=True)
embedding_2 = model.encode(text2, convert_to_tensor=True)
score=util.pytorch_cos_sim(embedding_1, embedding_2)
output.append("SBERT:"+str(int(float(str(score).split("[")[2].split("]")[0])*10.0))+",")
sbert=int(float(str(score).split("[")[2].split("]")[0])*10.0)
#Jaccard
output.append("Jaccard:"+str(int(jaccard_sim(text1,text2)*10.0))+",")
#spacy average word2vec
nlp = spacy.load("en_core_web_md") # make sure to use larger package!
doc1 = listToString(rm_stop(nlp(text1)))
doc2 = listToString(rm_stop(nlp(text2)))
# Similarity of two documents
w2v=int(nlp(doc1).similarity(nlp(doc2))*10.0)
final_score=int(0.8*sbert+0.2*w2v)
output.append("Word2Vec:"+str(int(nlp(doc1).similarity(nlp(doc2))*10.0))+",final_score:"+str(final_score))
out_string=listToString(output)
#return out_string
return str(out_string),final_score
def return_image_embedding(model,img_path):
#img = image.load_img(img_path, target_size=(224, 224))
x = image.img_to_array(img_path)
x = np.expand_dims(x, axis=0)
x = preprocess_input(x)
preds = model.predict(x)
curr_df = pd.DataFrame(preds[0]).T
print(curr_df.to_numpy())
return curr_df.to_numpy()
def draw_boxes(image, bounds, color='yellow', width=2):
draw = ImageDraw.Draw(image)
for bound in bounds:
p0, p1, p2, p3 = bound[0]
draw.line([*p0, *p1, *p2, *p3, *p0], fill=color, width=width)
return image
def inference(img, lang):
reader = easyocr.Reader(lang)
bounds = reader.readtext(img,detail=0)
#im = PIL.Image.open(img.name)
#draw_boxes(im, bounds)
#im.save('result.jpg')
return bounds
def compute_tfidf_embeddings(words_list1, words_list2):
# Combine the words from both lists
combined_words = words_list1 + words_list2
# Initialize the TF-IDF vectorizer
vectorizer = TfidfVectorizer()
# Compute the TF-IDF matrix
tfidf_matrix = vectorizer.fit_transform(combined_words)
# Split the matrix into separate parts for the two lists
tfidf_matrix_list1 = tfidf_matrix[:len(words_list1)]
tfidf_matrix_list2 = tfidf_matrix[len(words_list1):]
return tfidf_matrix_list1, tfidf_matrix_list2
def compute_cosine_similarity(tfidf_matrix_list1, tfidf_matrix_list2):
# Compute the cosine similarity between the two TF-IDF matrices
similarity_matrix = cosine_similarity(tfidf_matrix_list1, tfidf_matrix_list2)
return similarity_matrix
def extract_eval(Ideal_Answer_Text,Ideal_Answer_Diagram,Submitted_Answer_Text,Submitted_Answer_Diagram):
# print(image1)
ideal_text=extract(Ideal_Answer_Text)
print("Extracting Ideal Text \n")
print(ideal_text)
submitted_text=extract(Submitted_Answer_Text)
print("Extracting Submitted Text \n")
print(submitted_text)
a,b=sim(ideal_text,submitted_text)
print(a)
text_sim_score=b
model = ResNet50(include_top=False, weights='imagenet', pooling='avg')
diagram_1_embed=return_image_embedding(model,Ideal_Answer_Diagram)
diagram_2_embed=return_image_embedding(model,Submitted_Answer_Diagram)
diagram_embed_sim_score=util.pytorch_cos_sim(diagram_1_embed, diagram_2_embed)
print("Diagram Embedding Similarity Score \n")
print(str(int(float(str(diagram_embed_sim_score).split("[")[2].split("]")[0])*10.0)))
diagram_1_text=inference(Ideal_Answer_Diagram,['en'])
diagram_2_text=inference(Submitted_Answer_Diagram,['en'])
tfidf_matrix_list1, tfidf_matrix_list2 = compute_tfidf_embeddings(diagram_1_text, diagram_2_text)
similarity_matrix = compute_cosine_similarity(tfidf_matrix_list1, tfidf_matrix_list2)
print("Diagram Text Embedding Similarity Score \n")
print(similarity_matrix[0][0])
text_sim=similarity_matrix[0][0]
img_sim=float(str(diagram_embed_sim_score).split("[")[2].split("]")[0])
diagram_similarity=0.7*text_sim+0.3*img_sim
print("Diagram Overall Similarity\n")
print(diagram_similarity*10)
final_string=a+"\n"+"Diagram Embedding Similarity Score \n"+str(int(float(str(diagram_embed_sim_score).split("[")[2].split("]")[0])*10.0))+"Diagram Text Embedding Similarity Score \n"+str(similarity_matrix[0][0])+"Diagram Overall Similarity\n"+str(diagram_similarity*10)
return final_string
iface = gr.Interface(fn=extract_eval,
inputs=["image","image","image","image"],
outputs=gr.outputs.Textbox(),)
iface.launch(enable_queue=True)