Spaces:
Running
Running
import json | |
import logging | |
import datasets | |
import huggingface_hub | |
import pandas as pd | |
from transformers import pipeline | |
def get_labels_and_features_from_dataset(dataset_id, dataset_config, split): | |
if not dataset_config: | |
dataset_config = "default" | |
if not split: | |
split = "train" | |
try: | |
ds = datasets.load_dataset(dataset_id, dataset_config)[split] | |
dataset_features = ds.features | |
label_keys = [i for i in dataset_features.keys() if i.startswith('label')] | |
if len(label_keys) == 0: # no labels found | |
# return everything for post processing | |
return list(dataset_features.keys()), list(dataset_features.keys()) | |
if not isinstance(dataset_features[label_keys[0]], datasets.ClassLabel): | |
if hasattr(dataset_features[label_keys[0]], 'feature'): | |
label_feat = dataset_features[label_keys[0]].feature | |
labels = label_feat.names | |
else: | |
labels = dataset_features[label_keys[0]].names | |
features = [f for f in dataset_features.keys() if not f.startswith("label")] | |
return labels, features | |
except Exception as e: | |
logging.warning( | |
f"Failed to load dataset {dataset_id} with config {dataset_config}: {e}" | |
) | |
return None, None | |
def check_model(model_id): | |
try: | |
task = huggingface_hub.model_info(model_id).pipeline_tag | |
except Exception: | |
return None | |
try: | |
ppl = pipeline(task=task, model=model_id) | |
return ppl | |
except Exception: | |
return None | |
def text_classificaiton_match_label_case_unsensative(id2label_mapping, label): | |
for model_label in id2label_mapping.keys(): | |
if model_label.upper() == label.upper(): | |
return model_label, label | |
return None, label | |
def text_classification_map_model_and_dataset_labels(id2label, dataset_features): | |
id2label_mapping = {id2label[k]: None for k in id2label.keys()} | |
dataset_labels = None | |
for feature in dataset_features.values(): | |
if not isinstance(feature, datasets.ClassLabel): | |
continue | |
if len(feature.names) != len(id2label_mapping.keys()): | |
continue | |
dataset_labels = feature.names | |
# Try to match labels | |
for label in feature.names: | |
if label in id2label_mapping.keys(): | |
model_label = label | |
else: | |
# Try to find case unsensative | |
model_label, label = text_classificaiton_match_label_case_unsensative( | |
id2label_mapping, label | |
) | |
if model_label is not None: | |
id2label_mapping[model_label] = label | |
else: | |
print(f"Label {label} is not found in model labels") | |
return id2label_mapping, dataset_labels | |
""" | |
params: | |
column_mapping: dict | |
example: { | |
"text": "sentences", | |
"label": { | |
"label0": "LABEL_0", | |
"label1": "LABEL_1" | |
} | |
} | |
ppl: pipeline | |
""" | |
def check_column_mapping_keys_validity(column_mapping, ppl): | |
# get the element in all the list elements | |
column_mapping = json.loads(column_mapping) | |
if "data" not in column_mapping.keys(): | |
return True | |
user_labels = set([pair[0] for pair in column_mapping["data"]]) | |
model_labels = set([pair[1] for pair in column_mapping["data"]]) | |
id2label = ppl.model.config.id2label | |
original_labels = set(id2label.values()) | |
return user_labels == model_labels == original_labels | |
""" | |
params: | |
column_mapping: dict | |
dataset_features: dict | |
example: { | |
'text': Value(dtype='string', id=None), | |
'label': ClassLabel(names=['negative', 'neutral', 'positive'], id=None) | |
} | |
""" | |
def infer_text_input_column(column_mapping, dataset_features): | |
# Check whether we need to infer the text input column | |
infer_text_input_column = True | |
feature_map_df = None | |
if "text" in column_mapping.keys(): | |
dataset_text_column = column_mapping["text"] | |
if dataset_text_column in dataset_features.keys(): | |
infer_text_input_column = False | |
else: | |
logging.warning(f"Provided {dataset_text_column} is not in Dataset columns") | |
if infer_text_input_column: | |
# Try to retrieve one | |
candidates = [ | |
f for f in dataset_features if dataset_features[f].dtype == "string" | |
] | |
feature_map_df = pd.DataFrame( | |
{"Dataset Features": [candidates[0]], "Model Input Features": ["text"]} | |
) | |
if len(candidates) > 0: | |
logging.debug(f"Candidates are {candidates}") | |
column_mapping["text"] = candidates[0] | |
return column_mapping, feature_map_df | |
""" | |
params: | |
column_mapping: dict | |
id2label_mapping: dict | |
example: | |
id2label_mapping: { | |
'negative': 'negative', | |
'neutral': 'neutral', | |
'positive': 'positive' | |
} | |
""" | |
def infer_output_label_column( | |
column_mapping, id2label_mapping, id2label, dataset_labels | |
): | |
# Check whether we need to infer the output label column | |
if "data" in column_mapping.keys(): | |
if isinstance(column_mapping["data"], list): | |
# Use the column mapping passed by user | |
for user_label, model_label in column_mapping["data"]: | |
id2label_mapping[model_label] = user_label | |
elif None in id2label_mapping.values(): | |
column_mapping["label"] = {i: None for i in id2label.keys()} | |
return column_mapping, None | |
if "data" not in column_mapping.keys(): | |
# Column mapping should contain original model labels | |
column_mapping["label"] = { | |
str(i): id2label_mapping[label] | |
for i, label in zip(id2label.keys(), dataset_labels) | |
} | |
id2label_df = pd.DataFrame( | |
{ | |
"Dataset Labels": dataset_labels, | |
"Model Prediction Labels": [ | |
id2label_mapping[label] for label in dataset_labels | |
], | |
} | |
) | |
return column_mapping, id2label_df | |
def check_dataset_features_validity(d_id, config, split): | |
# We assume dataset is ok here | |
ds = datasets.load_dataset(d_id, config)[split] | |
try: | |
dataset_features = ds.features | |
except AttributeError: | |
# Dataset does not have features, need to provide everything | |
return None, None | |
# Load dataset as DataFrame | |
df = ds.to_pandas() | |
return df, dataset_features | |
def get_example_prediction(ppl, dataset_id, dataset_config, dataset_split): | |
# get a sample prediction from the model on the dataset | |
prediction_input = None | |
prediction_result = None | |
try: | |
# Use the first item to test prediction | |
ds = datasets.load_dataset(dataset_id, dataset_config)[dataset_split] | |
if "text" not in ds.features.keys(): | |
# Dataset does not have text column | |
prediction_input = ds[0][ds.features.keys()[0]] | |
else: | |
prediction_input = ds[0]["text"] | |
print("prediction_input", prediction_input) | |
results = ppl(prediction_input, top_k=None) | |
# Display results in original label and mapped label | |
prediction_result = { | |
f'{result["label"]}': result["score"] for result in results | |
} | |
except Exception: | |
# Pipeline prediction failed, need to provide labels | |
return prediction_input, None | |
return prediction_input, prediction_result | |
def get_sample_prediction(ppl, df, column_mapping, id2label_mapping): | |
# get a sample prediction from the model on the dataset | |
prediction_input = None | |
prediction_result = None | |
try: | |
# Use the first item to test prediction | |
prediction_input = df.head(1).at[0, column_mapping["text"]] | |
results = ppl({"text": prediction_input}, top_k=None) | |
prediction_result = { | |
f'{result["label"]}': result["score"] for result in results | |
} | |
except Exception: | |
# Pipeline prediction failed, need to provide labels | |
return prediction_input, None | |
# Display results in original label and mapped label | |
prediction_result = { | |
f'{result["label"]}(original) - {id2label_mapping[result["label"]]}(mapped)': result[ | |
"score" | |
] | |
for result in results | |
} | |
return prediction_input, prediction_result | |
def text_classification_fix_column_mapping(column_mapping, ppl, d_id, config, split): | |
# load dataset as pd DataFrame | |
# get features column from dataset | |
df, dataset_features = check_dataset_features_validity(d_id, config, split) | |
column_mapping, feature_map_df = infer_text_input_column( | |
column_mapping, dataset_features | |
) | |
if feature_map_df is None: | |
# dataset does not have any features | |
return None, None, None, None, None | |
# Retrieve all labels | |
id2label = ppl.model.config.id2label | |
# Infer labels | |
id2label_mapping, dataset_labels = text_classification_map_model_and_dataset_labels( | |
id2label, dataset_features | |
) | |
column_mapping, id2label_df = infer_output_label_column( | |
column_mapping, id2label_mapping, id2label, dataset_labels | |
) | |
if id2label_df is None: | |
# does not able to infer output label column | |
return column_mapping, None, None, None, feature_map_df | |
# Get a sample prediction | |
prediction_input, prediction_result = get_sample_prediction( | |
ppl, df, column_mapping, id2label_mapping | |
) | |
if prediction_result is None: | |
# does not able to get a sample prediction | |
return column_mapping, prediction_input, None, id2label_df, feature_map_df | |
return ( | |
column_mapping, | |
prediction_input, | |
prediction_result, | |
id2label_df, | |
feature_map_df, | |
) | |