auto_ner / app.py
bokey's picture
reload
7c8cfcc
raw
history blame contribute delete
No virus
14.2 kB
'''
Creator: Sudhir Arvind Deshmukh
Run command: streamlit run app.py
This is an end to end app for all you Entity ectraction needs
'''
import streamlit as st
import spacy
from spacy.tokens import Doc
from spacy.training.example import Example
import datetime
import os
import random
import pandas as pd
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import datetime
from transformers import AutoTokenizer, T5ForConditionalGeneration
from spacy import displacy
# fuction to load the csv file and extract sentences and tags
def load_data_from_csv(file):
df = pd.read_csv(file, encoding="latin-1")
df = df.dropna()
#df.loc[:, "Sentence #"] = df["Sentence #"].fillna(method="ffill")
df.loc[:, "Sentence #"] = df["Sentence #"].ffill()
sentences = df.groupby("Sentence #")["Word"].apply(list).values
tags = df.groupby("Sentence #")["Tag"].apply(list).values
return sentences, tags
# Streamlit UI for Online Inference
def online_inference(nlp_models):
st.title("Online Inference")
selected_model = st.selectbox("Select base Model for finetunning", nlp_models)
# Load the selected spaCy model
# model_path = os.path.join(saved_models_dir, f"{selected_model}")
nlp = spacy.load(selected_model)
text_input = st.text_input("Enter Text for Inference")
if text_input:
doc = nlp(text_input)
# Filter out 'O' entities and get unique entity types
filtered_entities = [ent for ent in doc.ents if ent.label_ != 'O']
unique_entity_types = list(set(ent.label_ for ent in filtered_entities))
if filtered_entities:
# Define Google-themed colors for each entity type
color_dict = {
'B-geo': '#4285F4', # Blue
'B-gpe': '#EA4335', # Red
'B-per': '#FBBC05', # Yellow
'I-geo': '#0F9D58', # Green
'B-org': '#34A853', # Green
'I-org': '#FF9800', # Orange
'B-tim': '#AA66CC', # Purple
'B-art': '#FFC107', # Amber
'I-art': '#9C27B0', # Purple
'I-per': '#03A9F4', # Blue
'I-gpe': '#009688', # Teal
'I-tim': '#FF5722', # Deep Orange
'B-nat': '#7B1FA2', # Deep Purple
'B-eve': '#8BC34A', # Light Green
'I-eve': '#FDD835', # Yellow
'I-nat': '#616161' # Gray
}
# Render the visualization with custom colors
options = {"ents": unique_entity_types, "colors": color_dict}
html = spacy.displacy.render(doc, style="ent", options=options)
st.components.v1.html(html, height=400)
else:
st.write("No named entities found in the text.")
# Streamlit UI for Model Training
def model_training(saved_models_dir):
st.title("Model Training")
base_model = ["en_core_web_sm", "en_core_web_md", "en_core_web_lg"]
selected_model = st.selectbox("Select base Model to Train", base_model)
# Define hyperparameters
learning_rate = st.slider("Learning Rate", min_value=0.001, max_value=0.1, step=0.001, value=0.01)
n_iter = st.slider("Number of Iterations", min_value=1, max_value=10, value=2)
dropout = st.slider("Dropout", min_value=0.1, max_value=0.9, step=0.1, value=0.5)
uploaded_file = st.file_uploader("Upload Training Data (CSV)", type="csv")
model_name_uniq = st.text_input("Enter Model Name")
if st.button("Train & Evaluate Model"):
if uploaded_file is not None:
# Load training data from the uploaded CSV file
sentences, tags = load_data_from_csv(uploaded_file)
# Split data into training, validation, and test sets
train_sentences, test_sentences, train_tags, test_tags = train_test_split(sentences, tags, test_size=0.2, random_state=42)
train_sentences, val_sentences, train_tags, val_tags = train_test_split(train_sentences, train_tags, test_size=0.2, random_state=42)
print(f"Experimenting with model: {selected_model}")
# Load the pre-trained model
nlp = spacy.load(selected_model)
# Add or modify the NER component in the pipeline
if "ner" not in nlp.pipe_names:
ner = nlp.add_pipe("ner")
else:
ner = nlp.get_pipe("ner")
# Function to convert input format to spaCy format
def convert_to_spacy_format(sentences, tags):
examples = []
for sent, tag_list in zip(sentences, tags):
words = sent
spaces = [True] * len(words)
doc = Doc(nlp.vocab, words=words, spaces=spaces)
gold_entities = []
for token, tag in zip(doc, tag_list):
start = token.idx
end = start + len(token.text)
gold_entities.append((start, end, tag))
example = Example.from_dict(doc, {"entities": gold_entities})
examples.append(example)
return examples
# Add entity labels to the ner component
for label in set(tag for tag_list in tags for tag in tag_list):
ner.add_label(label)
# Create spaCy examples for training
train_examples = convert_to_spacy_format(train_sentences, train_tags)
val_examples = convert_to_spacy_format(val_sentences, val_tags)
# Lists to store learning curve data
train_losses = []
train_api_metrics = []
val_precisions = []
val_recalls = []
total_batches = len(train_examples) / 8
ner_metrics = []
# Train the NER model
for epoch in range(n_iter):
random.shuffle(train_examples)
st.write("this is iteration number:", epoch)
losses = {}
progress_bar = st.progress(0)
for batch_index, batch in enumerate(spacy.util.minibatch(train_examples, size=8), start=1):
nlp.update(batch, drop=dropout, losses=losses)
# Calculate progress percentage
progress_percentage = batch_index / (total_batches + 1)
progress_bar.progress(progress_percentage) # Display progress in Streamlit
train_losses.append(losses["ner"])
train_api_metrics.append(losses)
# Evaluate the model on the validation set
metrics = nlp.evaluate(val_examples)
val_precisions.append(metrics["ents_p"])
val_recalls.append(metrics["ents_r"])
# Append metrics to the ner_metrics list
ner_metrics.append(metrics)
print(val_precisions)
print(val_recalls)
current_time = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
save_model_name = f"{model_name_uniq}_ner_model_{current_time}"
# Plot learning curve
plt.figure(figsize=(12, 4))
plt.plot(range(n_iter), train_losses, label="Training Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title(f"Learning Curve for Model: {save_model_name}")
plt.legend()
learning_curve_plot_path = f"images/learning_curve_{save_model_name}.png"
plt.savefig(learning_curve_plot_path)
st.image(learning_curve_plot_path)
# # Plot Precision-Recall curve (Not straight forward with spacy therefore lets do Brert implementation)
# plt.figure(figsize=(12, 4))
# plt.plot(val_recalls, val_precisions, label="Precision-Recall Curve")
# plt.xlabel("Recall")
# plt.ylabel("Precision")
# plt.title(f"Precision-Recall Curve for Model: {save_model_name}")
# plt.legend()
# pr_curve_plot_path = f"images/precision_recall_curve_{save_model_name}.png"
# plt.savefig(pr_curve_plot_path)
# st.image(pr_curve_plot_path)
# Save the trained model to disk with timestamp
nlp.to_disk(os.path.join(saved_models_dir, str(save_model_name)))
st.success(f"Trained model saved as: {save_model_name}")
# Print important NER performance metrics
ner_performance_metrics = ["ents_p", "ents_r", "ents_f",
#"ents_per_type"
]
# Print model performance metrics
st.write("---")
st.subheader("Evaluation Metrics on validation data (calculated during last epoch)")
for model_name, metrics in zip([selected_model], ner_metrics):
st.write(f"Model: {model_name}")
for metric_name in ner_performance_metrics:
metric_value = metrics.get(metric_name, 0.0)
st.write(f"{metric_name}: {metric_value}")
st.write("") # Add an empty line for spacing
st.write("---")
st.subheader("Performance Metrics on test data")
test_examples = convert_to_spacy_format(test_sentences, test_tags)
# Evaluate the model on the validation set
test_metrics = nlp.evaluate(test_examples)
# Print important NER performance metrics
# ner_performance_metrics = ["ents_p", "ents_r", "ents_f"]
# Print model performance metrics
for metric_name in ner_performance_metrics:
metric_value = test_metrics.get(metric_name, 0.0)
st.write(f"{metric_name}: {metric_value}")
st.write("---")
st.write(train_api_metrics)
st.write("training metric list of dicts")
st.write(ner_metrics)
st.write("training metric list of dicts")
st.write(test_metrics)
else:
st.warning("Please upload training data in CSV format.")
def gen_ai():
# Streamlit app layout
st.title("Few-Shot Named Entity Recognition with Flan")
# Load the Flan model
model_name = st.selectbox("Select Flan Model", ["google/flan-t5-base", "google/flan-t5-large", "google/flan-t5-xl", "google/flan-t5-xxl"])
model = T5ForConditionalGeneration.from_pretrained(model_name)
# Load a pre-trained tokenizer that's compatible with T5
tokenizer = AutoTokenizer.from_pretrained(model_name)
st.write("---")
# User input for few-shot examples
st.subheader("Few-Shot Examples")
examples = []
num_examples = st.number_input("Number of Examples", min_value=1, value=2)
for _ in range(num_examples):
col1, col2 = st.columns([3, 1])
with col1:
example_text = st.text_input(f"Example {_+1} (Text)")
with col2:
example_label = st.text_input(f"Example {_+1} (Label)")
if example_text and example_label:
examples.append((example_text, example_label))
st.write("---")
# User input for query text
st.subheader("Query Text")
query = st.text_input("Enter Query Text")
# Detect Entities button
detect_button = st.button("Detect Entities")
# Generate named entities
if detect_button:
if not examples or not query:
st.warning('Need both examples and query as user input', icon="⚠️")
prompt = "\n".join([f"NER: {example[0]} Labels: {example[1]}" for example in examples])
prompt += f"\n{query} Labels:"
input_ids = tokenizer.encode(prompt, return_tensors="pt")
outputs = model.generate(input_ids, max_length=100, num_return_sequences=1)
generated_text = tokenizer.decode(outputs[0], skip_special_tokens=True)
# Process the generated output for displacy
entities = generated_text.split("Labels:")
entities = [e.strip().split(":")[0].strip() for e in entities if e.strip()]
st.write("---")
# Display identified named entities
st.subheader("Identified Named Entities:")
doc = {"text": query, "ents": [{"start": query.find(entity), "end": query.find(entity) + len(entity), "label": "Custom Entity"} for entity in entities], "title": None}
html = displacy.render(doc, style="ent", manual=True, minify=True)
st.components.v1.html(html)
st.write("---")
st.write(doc)
def ensure_folders_exist(script_dir):
images_path = os.path.join(script_dir, "images")
saved_model_path = os.path.join(script_dir, "saved_models")
# Create the 'images' directory if it doesn't exist
if not os.path.exists(images_path):
os.makedirs(images_path)
# Create the 'saved_model' directory if it doesn't exist
if not os.path.exists(saved_model_path):
os.makedirs(saved_model_path)
def main():
## Load spaCy models from saved_models directory
# Get absolute path to the current script's directory
script_dir = os.path.dirname(os.path.abspath(__file__))
# Ensure that required folders exist
ensure_folders_exist(script_dir)
saved_models_dir = os.path.join(script_dir, "saved_models")
nlp_models = ["en_core_web_sm", "en_core_web_md", "en_core_web_lg"] + [os.path.join(saved_models_dir, str(model_name)) for model_name in os.listdir(saved_models_dir)]
# Streamlit App
st.set_page_config(page_title="NER Model Experimentation")
st.sidebar.title("Navigation")
page = st.sidebar.radio("Go to", ["Online Inference", "Model Training",
#"Evaluation Metrics",
"GEN AI"])
if page == "Online Inference":
online_inference(nlp_models)
elif page == "Model Training":
model_training(saved_models_dir)
elif page == "GEN AI":
gen_ai()
# call main fuction
if __name__=="__main__":
main()