Spaces:
Build error
Build error
File size: 4,452 Bytes
21c582f d9ea2c1 85eb3dd d9ea2c1 85eb3dd 9ec7bfb d9ea2c1 85eb3dd 9ec7bfb d9ea2c1 85eb3dd d9ea2c1 85eb3dd d9ea2c1 85eb3dd d9ea2c1 85eb3dd d9ea2c1 85eb3dd d9ea2c1 85eb3dd d9ea2c1 85eb3dd d9ea2c1 85eb3dd d9ea2c1 85eb3dd 22b38b8 85eb3dd d9ea2c1 8b053e4 d9ea2c1 dc07482 d9ea2c1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
from transformers import AutoTokenizer, AutoModelForTableQuestionAnswering
import pandas as pd
import re
p = re.compile('\d+(\.\d+)?')
def load_model_and_tokenizer():
"""
Load
"""
# Load pretrained tokenizer: TAPAS finetuned on WikiTable Questions
# tokenizer = TapasTokenizer.from_pretrained("google/tapas-base-finetuned-wtq")
tokenizer = AutoTokenizer.from_pretrained("Meena/table-question-answering-tapas")
# Load pretrained model: TAPAS finetuned on WikiTable Questions
# model = TapasForQuestionAnswering.from_pretrained("google/tapas-base-finetuned-wtq")
model = AutoModelForTableQuestionAnswering.from_pretrained("Meena/table-question-answering-tapas")
# Return tokenizer and model
return tokenizer, model
def prepare_inputs(table, queries, tokenizer):
"""
Convert dictionary into data frame and tokenize inputs given queries.
"""
# Prepare inputs
# table = pd.DataFrame.from_dict(data)
# table = netflix_df[['title', 'release_year', 'rating']].astype('str').head(50)
table = table.astype('str').head(100)
inputs = tokenizer(table=table, queries=queries, padding='max_length', return_tensors="pt")
# Return things
return table, inputs
def generate_predictions(inputs, model, tokenizer):
"""
Generate predictions for some tokenized input.
"""
# Generate model results
outputs = model(**inputs)
# Convert logit outputs into predictions for table cells and aggregation operators
predicted_table_cell_coords, predicted_aggregation_operators = tokenizer.convert_logits_to_predictions(
inputs,
outputs.logits.detach(),
outputs.logits_aggregation.detach()
)
# Return values
return predicted_table_cell_coords, predicted_aggregation_operators
def postprocess_predictions(predicted_aggregation_operators, predicted_table_cell_coords, table):
"""
Compute the predicted operation and nicely structure the answers.
"""
# Process predicted aggregation operators
aggregation_operators = {0: "NONE", 1: "SUM", 2: "AVERAGE", 3:"COUNT"}
aggregation_predictions_string = [aggregation_operators[x] for x in predicted_aggregation_operators]
# Process predicted table cell coordinates
answers = []
for agg, coordinates in zip(predicted_aggregation_operators, predicted_table_cell_coords):
if len(coordinates) == 1:
# 1 cell
answers.append(table.iat[coordinates[0]])
else:
# > 1 cell
cell_values = []
for coordinate in coordinates:
cell_values.append(table.iat[coordinate])
answers.append(", ".join(cell_values))
# Return values
return aggregation_predictions_string, answers
def show_answers(queries, answers, aggregation_predictions_string):
"""
Visualize the postprocessed answers.
"""
agg = {"NONE": lambda x: x, "SUM" : lambda x: sum(x), "AVERAGE": lambda x: (sum(x) / len(x)), "COUNT": lambda x: len(x)}
result = ''
for query, answer, predicted_agg in zip(queries, answers, aggregation_predictions_string):
print(query)
if predicted_agg == "NONE":
print("Predicted answer: " + answer)
else:
if all([not p.match(val) == None for val in answer.split(', ')]):
# print("Predicted answer: " + predicted_agg + "(" + answer + ") = " + str(agg[predicted_agg](list(map(float, answer.split(','))))))
result = "Predicted answer: " + str(agg[predicted_agg](list(map(float, answer.split(',')))))
elif predicted_agg == "COUNT":
# print("Predicted answer: " + predicted_agg + "(" + answer + ") = " + str(agg[predicted_agg](answer.split(','))))
result = "Predicted answer: " + str(agg[predicted_agg](answer.split(',')))
else:
result = "Predicted answer: " + predicted_agg + " > " + answer
return result
def execute_query(query, table):
"""
Invoke the TAPAS model.
"""
queries = [query]
tokenizer, model = load_model_and_tokenizer()
table, inputs = prepare_inputs(table, queries, tokenizer)
predicted_table_cell_coords, predicted_aggregation_operators = generate_predictions(inputs, model, tokenizer)
aggregation_predictions_string, answers = postprocess_predictions(predicted_aggregation_operators, predicted_table_cell_coords, table)
return show_answers(queries, answers, aggregation_predictions_string)
|