Build error
Build error
from Models import Models | |
from ResumeSegmenter import ResumeSegmenter | |
from datetime import datetime | |
from dateutil import parser | |
import re | |
from string import punctuation | |
class ResumeParser: | |
def __init__(self, ner, ner_dates, zero_shot_classifier, tagger): | |
self.models = Models() | |
self.segmenter = ResumeSegmenter(zero_shot_classifier) | |
self.ner, self.ner_dates, self.zero_shot_classifier, self.tagger = ner, ner_dates, zero_shot_classifier, tagger | |
self.parsed_cv = {} | |
def parse(self, resume_lines): | |
resume_segments = self.segmenter.segment(resume_lines) | |
print("***************************** Parsing the Resume...***************************** ") | |
for segment_name in resume_segments: | |
if segment_name == "work_and_employment": | |
resume_segment = resume_segments[segment_name] | |
self.parse_job_history(resume_segment) | |
elif segment_name == "contact_info": | |
contact_info = resume_segments[segment_name] | |
self.parse_contact_info(contact_info) | |
elif segment_name == "education_and_training": | |
education_and_training = resume_segments[segment_name] | |
self.parse_education(education_and_training) | |
elif segment_name == "skills_header": | |
skills_header = resume_segments[segment_name] | |
self.parse_skills(skills_header) | |
print("************************************** SKILLS HEADER ***************************** <br>",skills_header) | |
return self.parsed_cv | |
def parse_education(self, education_and_training): | |
print(education_and_training) | |
self.parsed_cv['Education'] = education_and_training | |
def parse_skills(self, skills_header): | |
self.parsed_cv['Skills'] = skills_header | |
def parse_contact_info(self, contact_info): | |
contact_info_dict = {} | |
name = self.find_person_name(contact_info) | |
email = self.find_contact_email(contact_info) | |
self.parsed_cv['Name'] = name | |
contact_info_dict["Email"] = email | |
self.parsed_cv['Contact Info'] = contact_info_dict | |
def find_person_name(self, items): | |
class_score = [] | |
splitter = re.compile(r'[{}]+'.format(re.escape(punctuation.replace("&", "") ))) | |
classes = ["person name", "address", "email", "title"] | |
for item in items: | |
elements = splitter.split(item) | |
for element in elements: | |
element = ''.join(i for i in element.strip() if not i.isdigit()) | |
if not len(element.strip().split()) > 1: continue | |
out = self.zero_shot_classifier(element, classes) | |
highest = sorted(zip(out["labels"], out["scores"]), key=lambda x: x[1])[-1] | |
if highest[0] == "person name": | |
class_score.append((element, highest[1])) | |
if len(class_score): | |
return sorted(class_score, key=lambda x: x[1], reverse=True)[0][0] | |
return "" | |
def find_contact_email(self, items): | |
for item in items: | |
match ='[\w.+-]+@[\w-]+\.[\w.-]+', item) | |
if match: | |
return | |
return "" | |
def parse_job_history(self, resume_segment): | |
idx_job_title = self.get_job_titles(resume_segment) | |
current_and_below = False | |
if not len(idx_job_title): | |
self.parsed_cv["Job History"] = [] | |
return | |
if idx_job_title[0][0] == 0: current_and_below = True | |
job_history = [] | |
for ls_idx, (idx, job_title) in enumerate(idx_job_title): | |
job_info = {} | |
# print("<br> Job Title: ",job_title) | |
job_info["Job Title"] = self.filter_job_title(job_title) | |
# company | |
if current_and_below: line1, line2 = idx, idx+1 | |
else: line1, line2 = idx, idx-1 | |
job_info["Company"] = self.get_job_company(line1, line2, resume_segment) | |
if current_and_below: st_span = idx | |
else: st_span = idx-1 | |
# Dates | |
if ls_idx == len(idx_job_title) - 1: end_span = len(resume_segment) | |
else: end_span = idx_job_title[ls_idx+1][0] | |
start, end = self.get_job_dates(st_span, end_span, resume_segment) | |
job_info["Start Date"] = start | |
job_info["End Date"] = end | |
# if(start != "" and end != ""): | |
job_history.append(job_info) | |
self.parsed_cv["Job History"] = job_history | |
def get_job_titles(self, resume_segment): | |
classes = ["organization", "institution", "company", "job title", "work details"] | |
idx_line = [] | |
for idx, line in enumerate(resume_segment): | |
has_verb = False | |
line_modifed = ''.join(i for i in line if not i.isdigit()) | |
sentence = self.models.get_flair_sentence(line_modifed) | |
self.tagger.predict(sentence) | |
tags = [] | |
for entity in sentence.get_spans('pos'): | |
tags.append(entity.tag) | |
if entity.tag.startswith("V"): | |
has_verb = True | |
most_common_tag = max(set(tags), key=tags.count) | |
if (most_common_tag == "NNP") or (most_common_tag == "NN"): | |
# if most_common_tag == "NNP": | |
if not has_verb: | |
out = self.zero_shot_classifier(line, classes) | |
class_score = zip(out["labels"], out["scores"]) | |
highest = sorted(class_score, key=lambda x: x[1])[-1] | |
if (highest[0] == "job title") or (highest[0] == "organization"): | |
# if highest[0] == "job title": | |
idx_line.append((idx, line)) | |
return idx_line | |
def get_job_dates(self, st, end, resume_segment): | |
search_span = resume_segment[st:end] | |
dates = [] | |
for line in search_span: | |
for dt in self.get_ner_in_line(line, "DATE"): | |
if self.isvalidyear(dt.strip()): | |
dates.append(dt) | |
if len(dates): first = dates[0] | |
exists_second = False | |
if len(dates) > 1: | |
exists_second = True | |
second = dates[1] | |
if len(dates) > 0: | |
if self.has_two_dates(first): | |
d1, d2 = self.get_two_dates(first) | |
return self.format_date(d1), self.format_date(d2) | |
elif exists_second and self.has_two_dates(second): | |
d1, d2 = self.get_two_dates(second) | |
return self.format_date(d1), self.format_date(d2) | |
else: | |
if exists_second: | |
st = self.format_date(first) | |
end = self.format_date(second) | |
return st, end | |
else: | |
return (self.format_date(first), "") | |
else: return ("", "") | |
def filter_job_title(self, job_title): | |
job_title_splitter = re.compile(r'[{}]+'.format(re.escape(punctuation.replace("&", "") ))) | |
job_title = ''.join(i for i in job_title if not i.isdigit()) | |
tokens = job_title_splitter.split(job_title) | |
tokens = [''.join([i for i in tok.strip() if (i.isalpha() or i.strip()=="")]) for tok in tokens if tok.strip()] | |
classes = ["company", "organization", "institution", "job title", "responsibility", "details"] | |
new_title = [] | |
for token in tokens: | |
if not token: continue | |
res = self.zero_shot_classifier(token, classes) | |
class_score = zip(res["labels"], res["scores"]) | |
highest = sorted(class_score, key=lambda x: x[1])[-1] | |
if (highest[0] == "job title") or (highest[0] == "organization"): | |
# if highest[0] == "job title": | |
new_title.append(token.strip()) | |
if len(new_title): | |
return ', '.join(new_title) | |
else: return ', '.join(tokens) | |
def has_two_dates(self, date): | |
years = self.get_valid_years() | |
count = 0 | |
for year in years: | |
if year in str(date): | |
count+=1 | |
return count == 2 | |
def get_two_dates(self, date): | |
years = self.get_valid_years() | |
idxs = [] | |
for year in years: | |
if year in date: | |
idxs.append(date.index(year)) | |
min_idx = min(idxs) | |
first = date[:min_idx+4] | |
second = date[min_idx+4:] | |
return first, second | |
def get_valid_years(self): | |
current_year = | |
years = [str(i) for i in range(current_year-100, current_year)] | |
return years | |
def format_date(self, date): | |
out = self.parse_date(date) | |
if out: | |
return out | |
else: | |
date = self.clean_date(date) | |
out = self.parse_date(date) | |
if out: | |
return out | |
else: | |
return date | |
def clean_date(self, date): | |
try: | |
date = ''.join(i for i in date if i.isalnum() or i =='-' or i == '/') | |
return date | |
except: | |
return date | |
def parse_date(self, date): | |
try: | |
date = parser.parse(date) | |
return date.strftime("%m-%Y") | |
except: | |
try: | |
date = datetime(date) | |
return date.strftime("%m-%Y") | |
except: | |
return 0 | |
def isvalidyear(self, date): | |
current_year = | |
years = [str(i) for i in range(current_year-100, current_year)] | |
for year in years: | |
if year in str(date): | |
return True | |
return False | |
def get_ner_in_line(self, line, entity_type): | |
if entity_type == "DATE": ner = self.ner_dates | |
else: ner = self.ner | |
return [i['word'] for i in ner(line) if i['entity_group'] == entity_type] | |
def get_job_company(self, idx, idx1, resume_segment): | |
job_title = resume_segment[idx] | |
if not idx1 <= len(resume_segment)-1: context = "" | |
else:context = resume_segment[idx1] | |
candidate_companies = self.get_ner_in_line(job_title, "ORG") + self.get_ner_in_line(context, "ORG") | |
classes = ["organization", "company", "institution", "not organization", "not company", "not institution"] | |
scores = [] | |
for comp in candidate_companies: | |
res = self.zero_shot_classifier(comp, classes)['scores'] | |
scores.append(max(res[:3])) | |
sorted_cmps = sorted(zip(candidate_companies, scores), key=lambda x: x[1], reverse=True) | |
if len(sorted_cmps): return sorted_cmps[0][0] | |
return context |