extractor / extractor.py
mrfirdauss's picture
Update extractor.py
cd14bf0 verified
from transformers import RobertaTokenizerFast, AutoModelForTokenClassification
import re
import torch
from itertools import cycle
tokenizer = RobertaTokenizerFast.from_pretrained("mrfirdauss/robert-base-finetuned-cv")
model = AutoModelForTokenClassification.from_pretrained("mrfirdauss/robert-base-finetuned-cv")
id2label = {0: 'O',
1: 'B-NAME',
3: 'B-NATION',
5: 'B-EMAIL',
7: 'B-URL',
9: 'B-CAMPUS',
11: 'B-MAJOR',
13: 'B-COMPANY',
15: 'B-DESIGNATION',
17: 'B-GPA',
19: 'B-PHONE NUMBER',
21: 'B-ACHIEVEMENT',
23: 'B-EXPERIENCES DESC',
25: 'B-SKILLS',
27: 'B-PROJECTS',
2: 'I-NAME',
4: 'I-NATION',
6: 'I-EMAIL',
8: 'I-URL',
10: 'I-CAMPUS',
12: 'I-MAJOR',
14: 'I-COMPANY',
16: 'I-DESIGNATION',
18: 'I-GPA',
20: 'I-PHONE NUMBER',
22: 'I-ACHIEVEMENT',
24: 'I-EXPERIENCES DESC',
26: 'I-SKILLS',
28: 'I-PROJECTS'}
def merge_subwords(tokens, labels):
merged_tokens = []
merged_labels = []
current_token = ""
current_label = ""
for token, label in zip(tokens, labels):
if token.startswith("Ġ"):
if current_token:
# Append the accumulated subwords as a new token and label
merged_tokens.append(current_token)
merged_labels.append(current_label)
# Start a new token and label
current_token = token[1:] # Remove the 'Ġ'
current_label = label
else:
# Continue accumulating subwords into the current token
current_token += token
# Append the last token and label
if current_token:
merged_tokens.append(current_token)
merged_labels.append(current_label)
return merged_tokens, merged_labels
def chunked_inference(text, tokenizer, model, max_length=512):
# Tokenize the text with truncation=False to get the full list of tokens
tok = re.findall(r'\w+|[^\w\s]', text, re.UNICODE)
tokens = tokenizer.tokenize(tok, is_split_into_words=True)
# Initialize containers for tokenized inputs
input_ids_chunks = []
# Create chunks of tokens that fit within the model's maximum input size
for i in range(0, len(tokens), max_length - 2): # -2 accounts for special tokens [CLS] and [SEP]
chunk = tokens[i:i + max_length - 2]
# Encode the chunks. Add special tokens via the tokenizer
chunk_ids = tokenizer.convert_tokens_to_ids(chunk)
chunk_ids = tokenizer.build_inputs_with_special_tokens(chunk_ids)
input_ids_chunks.append(chunk_ids)
# Convert list of token ids into a tensor
input_ids_chunks = [torch.tensor(chunk_ids).unsqueeze(0) for chunk_ids in input_ids_chunks]
# Predictions container
predictions = []
# Process each chunk
for input_ids in input_ids_chunks:
attention_mask = torch.ones_like(input_ids) # Create an attention mask for the inputs
output = model(input_ids, attention_mask=attention_mask)
logits = output[0] if isinstance(output, tuple) else output.logits
predictions_chunk = torch.argmax(logits, dim=-1).squeeze(0)
predictions.append(predictions_chunk[1:-1])
# Optionally, you can convert predictions to labels here
# Flatten the list of tensors into one long tensor for label mapping
predictions = torch.cat(predictions, dim=0)
predicted_labels = [id2label[pred.item()] for pred in predictions]
return merge_subwords(tokens,predicted_labels)
def process_tokens(tokens, tag_prefix):
# Process tokens to extract entities based on the tag prefix
entities = []
current_entity = {}
for token, tag in tokens:
if tag.startswith('B-') and tag.endswith(tag_prefix):
# Start a new entity
if current_entity:
# Append the current entity before starting a new one
entities.append(current_entity)
current_entity = {}
current_entity['text'] = token
current_entity['type'] = tag
elif tag.startswith('I-') and (('GPA') == tag_prefix or tag_prefix == ('URL')) and tag.endswith(tag_prefix) and current_entity:
current_entity['text'] += '' + token
elif tag.startswith('I-') and tag.endswith(tag_prefix) and current_entity:
# Continue the current entity
current_entity['text'] += ' ' + token
# Append the last entity if there is one
if current_entity:
entities.append(current_entity)
return entities
def predict(text):
tokens, predictions = chunked_inference(text, tokenizer, model)
data = list(zip(tokens, predictions))
profile = {
"name": "",
"links": [],
"skills": [],
"experiences": [],
"educations": []
}
profile['name'] = ' '.join([t for t, p in data if p.endswith('NAME')])
for skills in process_tokens(data, 'SKILLS'):
profile['skills'].append(skills['text'])
#Links
for links in process_tokens(data, 'URL'):
profile['links'].append(links['text'])
# Process experiences and education
workzip = []
exp = process_tokens(data, 'EXPERIENCES DESC')
designation = process_tokens(data, 'DESIGNATION')
comp = process_tokens(data, 'COMPANY')
if len(exp) >= len (designation) and len(exp) >= len(comp):
workzip = zip(cycle(designation),cycle(comp),exp)
elif len(designation)>=len(comp):
workzip = zip((designation),cycle(comp),cycle(exp))
else:
workzip = zip(cycle(designation),(comp),cycle(exp))
for designation, company, experience_desc in workzip:
profile['experiences'].append({
"start": None,
"end": None,
"designation": designation['text'],
"company": company['text'], # To be filled in similarly
"experience_description": experience_desc['text'] # To be filled in similarly
})
for major, gpa, campus in zip(process_tokens(data, 'MAJOR'), process_tokens(data, 'GPA'), process_tokens(data, 'CAMPUS')):
profile['educations'].append({
"start": None,
"end": None,
"major": major['text'],
"campus": campus['text'], # To be filled in similarly
"GPA": gpa['text'] # To be filled in similarly
})
return profile