|
from transformers import TrOCRProcessor, VisionEncoderDecoderModel |
|
from PIL import Image |
|
import requests |
|
import warnings |
|
from skimage.io import imread |
|
from skimage.color import rgb2gray |
|
import matplotlib.pyplot as plt |
|
from skimage.filters import sobel |
|
import numpy as np |
|
from heapq import * |
|
import gradio as gr |
|
from skimage.filters import threshold_otsu |
|
from skimage.util import invert |
|
import imageio |
|
from matplotlib.dates import SU |
|
from regex import F |
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
from sentence_transformers import SentenceTransformer, util |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
import spacy |
|
import pandas as pd |
|
from tqdm import tqdm |
|
import textdistance |
|
from spacy.lang.en.stop_words import STOP_WORDS |
|
|
|
import os |
|
from tensorflow.keras.applications.resnet50 import ResNet50,preprocess_input, decode_predictions |
|
from tensorflow.keras.preprocessing import image |
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
|
|
|
|
|
|
|
|
|
|
def horizontal_projections(sobel_image): |
|
return np.sum(sobel_image, axis=1) |
|
|
|
|
|
def find_peak_regions(hpp, divider=4): |
|
threshold = (np.max(hpp)-np.min(hpp))/divider |
|
peaks = [] |
|
|
|
for i, hppv in enumerate(hpp): |
|
if hppv < threshold: |
|
peaks.append([i, hppv]) |
|
return peaks |
|
|
|
def heuristic(a, b): |
|
return (b[0] - a[0]) ** 2 + (b[1] - a[1]) ** 2 |
|
|
|
def get_hpp_walking_regions(peaks_index): |
|
hpp_clusters = [] |
|
cluster = [] |
|
for index, value in enumerate(peaks_index): |
|
cluster.append(value) |
|
|
|
if index < len(peaks_index)-1 and peaks_index[index+1] - value > 1: |
|
hpp_clusters.append(cluster) |
|
cluster = [] |
|
|
|
|
|
if index == len(peaks_index)-1: |
|
hpp_clusters.append(cluster) |
|
cluster = [] |
|
|
|
return hpp_clusters |
|
|
|
def astar(array, start, goal): |
|
|
|
neighbors = [(0,1),(0,-1),(1,0),(-1,0),(1,1),(1,-1),(-1,1),(-1,-1)] |
|
close_set = set() |
|
came_from = {} |
|
gscore = {start:0} |
|
fscore = {start:heuristic(start, goal)} |
|
oheap = [] |
|
|
|
heappush(oheap, (fscore[start], start)) |
|
|
|
while oheap: |
|
|
|
current = heappop(oheap)[1] |
|
|
|
if current == goal: |
|
data = [] |
|
while current in came_from: |
|
data.append(current) |
|
current = came_from[current] |
|
return data |
|
|
|
close_set.add(current) |
|
for i, j in neighbors: |
|
neighbor = current[0] + i, current[1] + j |
|
tentative_g_score = gscore[current] + heuristic(current, neighbor) |
|
if 0 <= neighbor[0] < array.shape[0]: |
|
if 0 <= neighbor[1] < array.shape[1]: |
|
if array[neighbor[0]][neighbor[1]] == 1: |
|
continue |
|
else: |
|
|
|
continue |
|
else: |
|
|
|
continue |
|
|
|
if neighbor in close_set and tentative_g_score >= gscore.get(neighbor, 0): |
|
continue |
|
|
|
if tentative_g_score < gscore.get(neighbor, 0) or neighbor not in [i[1]for i in oheap]: |
|
came_from[neighbor] = current |
|
gscore[neighbor] = tentative_g_score |
|
fscore[neighbor] = tentative_g_score + heuristic(neighbor, goal) |
|
heappush(oheap, (fscore[neighbor], neighbor)) |
|
|
|
return [] |
|
|
|
def get_binary(img): |
|
mean = np.mean(img) |
|
if mean == 0.0 or mean == 1.0: |
|
return img |
|
|
|
thresh = threshold_otsu(img) |
|
binary = img <= thresh |
|
binary = binary*1 |
|
return binary |
|
|
|
def path_exists(window_image): |
|
|
|
if 0 in horizontal_projections(window_image): |
|
return True |
|
|
|
padded_window = np.zeros((window_image.shape[0],1)) |
|
world_map = np.hstack((padded_window, np.hstack((window_image,padded_window)) ) ) |
|
path = np.array(astar(world_map, (int(world_map.shape[0]/2), 0), (int(world_map.shape[0]/2), world_map.shape[1]))) |
|
if len(path) > 0: |
|
return True |
|
|
|
return False |
|
|
|
def get_road_block_regions(nmap): |
|
road_blocks = [] |
|
needtobreak = False |
|
|
|
for col in range(nmap.shape[1]): |
|
start = col |
|
end = col+20 |
|
if end > nmap.shape[1]-1: |
|
end = nmap.shape[1]-1 |
|
needtobreak = True |
|
|
|
if path_exists(nmap[:, start:end]) == False: |
|
road_blocks.append(col) |
|
|
|
if needtobreak == True: |
|
break |
|
|
|
return road_blocks |
|
|
|
def group_the_road_blocks(road_blocks): |
|
|
|
road_blocks_cluster_groups = [] |
|
road_blocks_cluster = [] |
|
size = len(road_blocks) |
|
for index, value in enumerate(road_blocks): |
|
road_blocks_cluster.append(value) |
|
if index < size-1 and (road_blocks[index+1] - road_blocks[index]) > 1: |
|
road_blocks_cluster_groups.append([road_blocks_cluster[0], road_blocks_cluster[len(road_blocks_cluster)-1]]) |
|
road_blocks_cluster = [] |
|
|
|
if index == size-1 and len(road_blocks_cluster) > 0: |
|
road_blocks_cluster_groups.append([road_blocks_cluster[0], road_blocks_cluster[len(road_blocks_cluster)-1]]) |
|
road_blocks_cluster = [] |
|
|
|
return road_blocks_cluster_groups |
|
|
|
def extract_line_from_image(image, lower_line, upper_line): |
|
lower_boundary = np.min(lower_line[:, 0]) |
|
upper_boundary = np.min(upper_line[:, 0]) |
|
img_copy = np.copy(image) |
|
r, c = img_copy.shape |
|
for index in range(c-1): |
|
img_copy[0:lower_line[index, 0], index] = 0 |
|
img_copy[upper_line[index, 0]:r, index] = 0 |
|
|
|
return img_copy[lower_boundary:upper_boundary, :] |
|
|
|
def extract(image1): |
|
img = rgb2gray(image1) |
|
print(img) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sobel_image = sobel(img) |
|
hpp = horizontal_projections(sobel_image) |
|
|
|
|
|
warnings.filterwarnings("ignore") |
|
|
|
|
|
|
|
|
|
peaks = find_peak_regions(hpp) |
|
|
|
peaks_index = np.array(peaks)[:,0].astype(int) |
|
|
|
segmented_img = np.copy(img) |
|
r= segmented_img.shape |
|
for ri in range(r[0]): |
|
if ri in peaks_index: |
|
segmented_img[ri, :] = 0 |
|
|
|
|
|
|
|
|
|
hpp_clusters = get_hpp_walking_regions(peaks_index) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
binary_image = get_binary(img) |
|
|
|
for cluster_of_interest in hpp_clusters: |
|
nmap = binary_image[cluster_of_interest[0]:cluster_of_interest[len(cluster_of_interest)-1],:] |
|
road_blocks = get_road_block_regions(nmap) |
|
road_blocks_cluster_groups = group_the_road_blocks(road_blocks) |
|
|
|
for index, road_blocks in enumerate(road_blocks_cluster_groups): |
|
window_image = nmap[:, road_blocks[0]: road_blocks[1]+10] |
|
binary_image[cluster_of_interest[0]:cluster_of_interest[len(cluster_of_interest)-1],:][:, road_blocks[0]: road_blocks[1]+10][int(window_image.shape[0]/2),:] *= 0 |
|
|
|
|
|
line_segments = [] |
|
|
|
|
|
for i, cluster_of_interest in enumerate(hpp_clusters): |
|
nmap = binary_image[cluster_of_interest[0]:cluster_of_interest[len(cluster_of_interest)-1],:] |
|
path = np.array(astar(nmap, (int(nmap.shape[0]/2), 0), (int(nmap.shape[0]/2),nmap.shape[1]-1))) |
|
|
|
if path.shape[0]!=0: |
|
|
|
offset_from_top = cluster_of_interest[0] |
|
|
|
path[:,0] += offset_from_top |
|
|
|
line_segments.append(path) |
|
|
|
|
|
cluster_of_interest = hpp_clusters[1] |
|
offset_from_top = cluster_of_interest[0] |
|
nmap = binary_image[cluster_of_interest[0]:cluster_of_interest[len(cluster_of_interest)-1],:] |
|
|
|
|
|
|
|
path = np.array(astar(nmap, (int(nmap.shape[0]/2), 0), (int(nmap.shape[0]/2),nmap.shape[1]-1))) |
|
|
|
|
|
offset_from_top = cluster_of_interest[0] |
|
|
|
|
|
|
|
|
|
last_bottom_row = np.flip(np.column_stack(((np.ones((img.shape[1],))*img.shape[0]), np.arange(img.shape[1]))).astype(int), axis=0) |
|
line_segments.append(last_bottom_row) |
|
|
|
line_images = [] |
|
|
|
|
|
|
|
|
|
line_count = len(line_segments) |
|
fig, ax = plt.subplots(figsize=(10,10), nrows=line_count-1) |
|
output = [] |
|
|
|
|
|
for line_index in range(line_count-1): |
|
line_image = extract_line_from_image(img, line_segments[line_index], line_segments[line_index+1]) |
|
line_images.append(line_image) |
|
|
|
|
|
|
|
|
|
im=Image.fromarray(line_image) |
|
im=im.convert("L") |
|
print(im) |
|
im.save("demo.jpeg") |
|
print("#### Image Saved #######") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
image = Image.open("demo.jpeg").convert("RGB") |
|
|
|
|
|
|
|
processor = TrOCRProcessor.from_pretrained('microsoft/trocr-base-handwritten') |
|
model = VisionEncoderDecoderModel.from_pretrained('microsoft/trocr-base-handwritten') |
|
pixel_values = processor(images=image, return_tensors="pt").pixel_values |
|
generated_ids = model.generate(pixel_values) |
|
generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0] |
|
print(generated_text) |
|
output.append(generated_text) |
|
|
|
result="" |
|
for o in output: |
|
result=result+o |
|
result=result+" " |
|
return result |
|
|
|
|
|
|
|
nlp = spacy.load("en_core_web_md") |
|
|
|
|
|
def listToString(s): |
|
|
|
|
|
str1 = " " |
|
|
|
|
|
return (str1.join(s)) |
|
|
|
def rm_stop(my_doc): |
|
|
|
token_list = [] |
|
for token in my_doc: |
|
token_list.append(token.text) |
|
|
|
|
|
|
|
|
|
filtered_sentence =[] |
|
|
|
for word in token_list: |
|
lexeme = nlp.vocab[word] |
|
if lexeme.is_stop == False: |
|
filtered_sentence.append(word) |
|
|
|
return filtered_sentence |
|
|
|
def text_processing(sentence): |
|
|
|
sentence = [token.lemma_.lower() |
|
for token in nlp(sentence) |
|
if token.is_alpha and not token.is_stop] |
|
|
|
return sentence |
|
|
|
def jaccard_sim(sent1,sent2): |
|
|
|
sentence1 = text_processing(sent1) |
|
sentence2 = text_processing(sent2) |
|
|
|
|
|
return textdistance.jaccard.normalized_similarity(sentence1, sentence2) |
|
|
|
def sim(Ideal_Answer,Submitted_Answer): |
|
|
|
text1=Ideal_Answer.replace("\"","").replace("\'","") |
|
text2=Submitted_Answer.replace("\"","").replace("\'","") |
|
output=[] |
|
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') |
|
|
|
|
|
embedding_1= model.encode(text1, convert_to_tensor=True) |
|
embedding_2 = model.encode(text2, convert_to_tensor=True) |
|
|
|
score=util.pytorch_cos_sim(embedding_1, embedding_2) |
|
output.append("SBERT:"+str(int(float(str(score).split("[")[2].split("]")[0])*10.0))+",") |
|
sbert=int(float(str(score).split("[")[2].split("]")[0])*10.0) |
|
|
|
output.append("Jaccard:"+str(int(jaccard_sim(text1,text2)*10.0))+",") |
|
|
|
|
|
nlp = spacy.load("en_core_web_md") |
|
doc1 = listToString(rm_stop(nlp(text1))) |
|
doc2 = listToString(rm_stop(nlp(text2))) |
|
|
|
|
|
w2v=int(nlp(doc1).similarity(nlp(doc2))*10.0) |
|
final_score=int(0.8*sbert+0.2*w2v) |
|
output.append("Word2Vec:"+str(int(nlp(doc1).similarity(nlp(doc2))*10.0))+",final_score:"+str(final_score)) |
|
out_string=listToString(output) |
|
|
|
return str(out_string),final_score |
|
|
|
|
|
|
|
def return_image_embedding(model,img_path): |
|
img = image.load_img(img_path, target_size=(224, 224)) |
|
x = image.img_to_array(img) |
|
x = np.expand_dims(x, axis=0) |
|
x = preprocess_input(x) |
|
preds = model.predict(x) |
|
curr_df = pd.DataFrame(preds[0]).T |
|
return curr_df |
|
|
|
|
|
|
|
def draw_boxes(image, bounds, color='yellow', width=2): |
|
draw = ImageDraw.Draw(image) |
|
for bound in bounds: |
|
p0, p1, p2, p3 = bound[0] |
|
draw.line([*p0, *p1, *p2, *p3, *p0], fill=color, width=width) |
|
return image |
|
|
|
def inference(img, lang): |
|
reader = easyocr.Reader(lang) |
|
bounds = reader.readtext(img.name) |
|
im = PIL.Image.open(img.name) |
|
draw_boxes(im, bounds) |
|
im.save('result.jpg') |
|
return ['result.jpg', pd.DataFrame(bounds).iloc[: , 1:]] |
|
|
|
def compute_tfidf_embeddings(documents1, documents2): |
|
|
|
combined_documents = documents1 + documents2 |
|
|
|
|
|
vectorizer = TfidfVectorizer() |
|
|
|
|
|
vectorizer.fit(combined_documents) |
|
|
|
|
|
embeddings1 = vectorizer.transform(documents1) |
|
embeddings2 = vectorizer.transform(documents2) |
|
|
|
return embeddings1, embeddings2 |
|
|
|
import requests |
|
import base64 |
|
def extract_eval(image1,image2,image3,image4): |
|
print(image1) |
|
ideal_text=extract(image1) |
|
print(data) |
|
print("Extracting Ideal Text \n") |
|
print(ideal_text) |
|
submitted_text=extract(image3) |
|
print("Extracting Submitted Text \n") |
|
print(submitted_text) |
|
a,b=sim(ideal_text,submitted_text) |
|
print(a) |
|
text_sim_score=b |
|
model = ResNet50(include_top=False, weights='imagenet', pooling='avg') |
|
diagram_1_embed=return_image_embedding(model,image2) |
|
diagram_2_embed=return_image_embedding(model,image4) |
|
diagram_embed_sim_score=util.pytorch_cos_sim(embedding_1, embedding_2) |
|
print("Diagram Embedding Similarity Score \n") |
|
print(diagram_embed_sim_score) |
|
|
|
|
|
|
|
iface = gr.Interface(fn=extract_eval, |
|
inputs=["image","image","image","image"], |
|
outputs=gr.outputs.Textbox(),) |
|
|
|
iface.launch(enable_queue=True) |
|
|
|
|