import json import logging import os.path from collections import defaultdict from functools import partial from typing import Any, Dict, List, Optional, Tuple import gradio as gr import pandas as pd from pie_modules.document.processing import tokenize_document from pie_modules.documents import TokenDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions from pie_modules.models import * # noqa: F403 from pie_modules.taskmodules import * # noqa: F403 from pytorch_ie import Pipeline from pytorch_ie.annotations import LabeledSpan from pytorch_ie.auto import AutoPipeline from pytorch_ie.documents import TextDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions from pytorch_ie.models import * # noqa: F403 from pytorch_ie.taskmodules import * # noqa: F403 from rendering_utils import render_displacy, render_pretty_table from transformers import AutoModel, AutoTokenizer, PreTrainedModel, PreTrainedTokenizer from vector_store import SimpleVectorStore logger = logging.getLogger(__name__) RENDER_WITH_DISPLACY = "displaCy + highlighted arguments" RENDER_WITH_PRETTY_TABLE = "Pretty Table" DEFAULT_MODEL_NAME = "ArneBinder/sam-pointer-bart-base-v0.3" DEFAULT_MODEL_REVISION = "76300f8e534e2fcf695f00cb49bba166739b8d8a" # local path # DEFAULT_MODEL_NAME = "models/dataset-sciarg/task-ner_re/v0.3/2024-05-28_23-33-46" # DEFAULT_MODEL_REVISION = None DEFAULT_EMBEDDING_MODEL_NAME = "allenai/scibert_scivocab_uncased" def embed_text_annotations( document: TextDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions, model: PreTrainedModel, tokenizer: PreTrainedTokenizer, text_layer_name: str, ) -> dict: # to not modify the original document document = document.copy() # tokenize_document does not yet consider predictions, so we need to add them manually document[text_layer_name].extend(document[text_layer_name].predictions.clear()) added_annotations = [] tokenizer_kwargs = { "max_length": 512, "stride": 64, "truncation": True, "return_overflowing_tokens": True, } tokenized_documents = tokenize_document( document, tokenizer=tokenizer, result_document_type=TokenDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions, partition_layer="labeled_partitions", added_annotations=added_annotations, strict_span_conversion=False, **tokenizer_kwargs, ) # just tokenize again to get tensors in the correct format for the model # TODO: fix for A34.txt from sciarg corpus model_inputs = tokenizer(document.text, return_tensors="pt", **tokenizer_kwargs) # this is added when using return_overflowing_tokens=True, but the model does not accept it model_inputs.pop("overflow_to_sample_mapping", None) assert len(model_inputs.encodings) == len(tokenized_documents) model_output = model(**model_inputs) # get embeddings for all text annotations embeddings = {} for batch_idx in range(len(model_output.last_hidden_state)): text2tok_ann = added_annotations[batch_idx][text_layer_name] tok2text_ann = {v: k for k, v in text2tok_ann.items()} for tok_ann in tokenized_documents[batch_idx].labeled_spans: # skip "empty" annotations if tok_ann.start == tok_ann.end: continue # use the max pooling strategy to get a single embedding for the annotation text embedding = model_output.last_hidden_state[batch_idx, tok_ann.start : tok_ann.end].max( dim=0 )[0] text_ann = tok2text_ann[tok_ann] if text_ann in embeddings: logger.warning( f"Overwriting embedding for annotation '{text_ann}' (do you use striding?)" ) embeddings[text_ann] = embedding return embeddings def annotate( document: TextDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions, pipeline: Pipeline, embedding_model: Optional[PreTrainedModel] = None, embedding_tokenizer: Optional[PreTrainedTokenizer] = None, ) -> None: # execute prediction pipeline pipeline(document) if embedding_model is not None and embedding_tokenizer is not None: adu_embeddings = embed_text_annotations( document=document, model=embedding_model, tokenizer=embedding_tokenizer, text_layer_name="labeled_spans", ) # convert keys to str because JSON keys must be strings adu_embeddings_dict = {str(k._id): v.detach().tolist() for k, v in adu_embeddings.items()} document.metadata["embeddings"] = adu_embeddings_dict else: gr.Warning( "No embedding model provided. Skipping embedding extraction. You can load an embedding " "model in the 'Model Configuration' section." ) def render_annotated_document( document: TextDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions, render_with: str, render_kwargs_json: str, ) -> str: render_kwargs = json.loads(render_kwargs_json) if render_with == RENDER_WITH_PRETTY_TABLE: html = render_pretty_table(document, **render_kwargs) elif render_with == RENDER_WITH_DISPLACY: html = render_displacy(document, **render_kwargs) else: raise ValueError(f"Unknown render_with value: {render_with}") return html def add_to_index( document: TextDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions, processed_documents: dict, vector_store: SimpleVectorStore, ) -> None: try: if document.id in processed_documents: gr.Warning(f"Document '{document.id}' already in index. Overwriting.") # save the processed document to the index processed_documents[document.id] = document # save the embeddings to the vector store for adu_id, embedding in document.metadata["embeddings"].items(): vector_store.save((document.id, adu_id), embedding) gr.Info( f"Added document {document.id} to index (index contains {len(processed_documents)} " f"documents and {len(vector_store)} embeddings)." ) except Exception as e: raise gr.Error(f"Failed to add document {document.id} to index: {e}") def process_text( text: str, doc_id: str, models: Tuple[Pipeline, Optional[PreTrainedModel], Optional[PreTrainedTokenizer]], processed_documents: dict[ str, TextDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions ], vector_store: SimpleVectorStore, ) -> TextDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions: try: document = TextDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions( id=doc_id, text=text, metadata={} ) # add single partition from the whole text (the model only considers text in partitions) document.labeled_partitions.append(LabeledSpan(start=0, end=len(text), label="text")) # annotate the document annotate( document=document, pipeline=models[0], embedding_model=models[1], embedding_tokenizer=models[2], ) # add the document to the index add_to_index(document, processed_documents, vector_store) return document except Exception as e: raise gr.Error(f"Failed to process text: {e}") def wrapped_process_text( text: str, doc_id: str, models: Tuple[Pipeline, Optional[PreTrainedModel], Optional[PreTrainedTokenizer]], processed_documents: dict[ str, TextDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions ], vector_store: SimpleVectorStore, ) -> Tuple[dict, TextDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions]: document = process_text( text=text, doc_id=doc_id, models=models, processed_documents=processed_documents, vector_store=vector_store, ) # Return as dict and document to avoid serialization issues return document.asdict(), document def process_uploaded_file( file_names: List[str], models: Tuple[Pipeline, Optional[PreTrainedModel], Optional[PreTrainedTokenizer]], processed_documents: dict[ str, TextDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions ], vector_store: SimpleVectorStore, ) -> None: try: for file_name in file_names: if file_name.lower().endswith(".txt"): # read the file content with open(file_name, "r", encoding="utf-8") as f: text = f.read() base_file_name = os.path.basename(file_name) gr.Info(f"Processing file '{base_file_name}' ...") process_text(text, base_file_name, models, processed_documents, vector_store) else: raise gr.Error(f"Unsupported file format: {file_name}") except Exception as e: raise gr.Error(f"Failed to process uploaded files: {e}") def _get_annotation_from_document( document: TextDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions, annotation_id: str, annotation_layer: str, ) -> LabeledSpan: # use predictions annotations = document[annotation_layer].predictions id2annotation = {str(annotation._id): annotation for annotation in annotations} annotation = id2annotation.get(annotation_id) if annotation is None: raise gr.Error( f"annotation '{annotation_id}' not found in document '{document.id}'. Available " f"annotations: {id2annotation}" ) return annotation def _get_annotation( doc_id: str, annotation_id: str, annotation_layer: str, processed_documents: dict[ str, TextDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions ], ) -> LabeledSpan: document = processed_documents.get(doc_id) if document is None: raise gr.Error( f"Document '{doc_id}' not found in index. Available documents: {list(processed_documents)}" ) return _get_annotation_from_document(document, annotation_id, annotation_layer) def _get_similar_entries_from_vector_store( ref_annotation_id: str, ref_document: TextDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions, vector_store: SimpleVectorStore[Tuple[str, str]], **retrieval_kwargs, ) -> List[Tuple[Tuple[str, str], float]]: embeddings = ref_document.metadata["embeddings"] ref_embedding = embeddings.get(ref_annotation_id) if ref_embedding is None: raise gr.Error( f"Embedding for annotation '{ref_annotation_id}' not found in metadata of " f"document '{ref_document.id}'. Annotations with embeddings: {list(embeddings)}" ) try: similar_entries = vector_store.retrieve_similar( ref_id=(ref_document.id, ref_annotation_id), **retrieval_kwargs ) except Exception as e: raise gr.Error(f"Failed to retrieve similar ADUs: {e}") return similar_entries def get_similar_adus( ref_annotation_id: str, ref_document: TextDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions, vector_store: SimpleVectorStore, processed_documents: dict[ str, TextDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions ], min_similarity: float, ) -> pd.DataFrame: similar_entries = _get_similar_entries_from_vector_store( ref_annotation_id=ref_annotation_id, ref_document=ref_document, vector_store=vector_store, min_similarity=min_similarity, ) similar_annotations = [ _get_annotation( doc_id=doc_id, annotation_id=annotation_id, annotation_layer="labeled_spans", processed_documents=processed_documents, ) for (doc_id, annotation_id), _ in similar_entries ] df = pd.DataFrame( [ # unpack the tuple (doc_id, annotation_id) to separate columns # and add the similarity score and the text of the annotation (doc_id, annotation_id, score, str(annotation)) for ((doc_id, annotation_id), score), annotation in zip( similar_entries, similar_annotations ) ], columns=["doc_id", "adu_id", "sim_score", "text"], ) return df def get_relevant_adus( ref_annotation_id: str, ref_document: TextDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions, vector_store: SimpleVectorStore, processed_documents: dict[ str, TextDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions ], min_similarity: float, ) -> pd.DataFrame: similar_entries = _get_similar_entries_from_vector_store( ref_annotation_id=ref_annotation_id, ref_document=ref_document, vector_store=vector_store, min_similarity=min_similarity, ) ref_annotation = _get_annotation( doc_id=ref_document.id, annotation_id=ref_annotation_id, annotation_layer="labeled_spans", processed_documents=processed_documents, ) result = [] for (doc_id, annotation_id), score in similar_entries: # skip entries from the same document if doc_id == ref_document.id: continue document = processed_documents[doc_id] tail2rels = defaultdict(list) head2rels = defaultdict(list) for rel in document.binary_relations.predictions: # skip non-argumentative relations if rel.label in ["parts_of_same", "semantically_same"]: continue head2rels[rel.head].append(rel) tail2rels[rel.tail].append(rel) id2annotation = { str(annotation._id): annotation for annotation in document.labeled_spans.predictions } annotation = id2annotation.get(annotation_id) # note: we do not need to check if the annotation is different from the reference annotation, # because they com from different documents and we already skip entries from the same document for rel in head2rels.get(annotation, []): result.append( { "doc_id": doc_id, "reference_adu": str(annotation), "sim_score": score, "rel_score": rel.score, "relation": rel.label, "text": str(rel.tail), } ) # define column order df = pd.DataFrame( result, columns=["text", "relation", "doc_id", "reference_adu", "sim_score", "rel_score"] ) return df def open_accordion(): return gr.Accordion(open=True) def close_accordion(): return gr.Accordion(open=False) def load_argumentation_model(model_name: str, revision: Optional[str] = None) -> Pipeline: try: model = AutoPipeline.from_pretrained( model_name, device=-1, num_workers=0, taskmodule_kwargs=dict(revision=revision), model_kwargs=dict(revision=revision), ) except Exception as e: raise gr.Error(f"Failed to load argumentation model: {e}") gr.Info(f"Loaded argumentation model: model_name={model_name}, revision={revision})") return model def load_embedding_model(model_name: str) -> Tuple[PreTrainedModel, PreTrainedTokenizer]: try: embedding_model = AutoModel.from_pretrained(model_name) embedding_tokenizer = AutoTokenizer.from_pretrained(model_name) except Exception as e: raise gr.Error(f"Failed to load embedding model: {e}") gr.Info(f"Loaded embedding model: model_name={model_name})") return embedding_model, embedding_tokenizer def load_models( model_name: str, revision: Optional[str] = None, embedding_model_name: Optional[str] = None ) -> Tuple[Pipeline, Optional[PreTrainedModel], Optional[PreTrainedTokenizer]]: argumentation_model = load_argumentation_model(model_name, revision) embedding_model = None embedding_tokenizer = None if embedding_model_name is not None and embedding_model_name.strip(): embedding_model, embedding_tokenizer = load_embedding_model(embedding_model_name) return argumentation_model, embedding_model, embedding_tokenizer def update_processed_documents_df( processed_documents: dict[str, TextDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions] ) -> pd.DataFrame: df = pd.DataFrame( [ ( doc_id, len(document.labeled_spans.predictions), len(document.binary_relations.predictions), ) for doc_id, document in processed_documents.items() ], columns=["doc_id", "num_adus", "num_relations"], ) return df def select_processed_document( evt: gr.SelectData, processed_documents_df: pd.DataFrame, processed_documents: Dict[ str, TextDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions ], ) -> TextDocumentWithLabeledSpansBinaryRelationsAndLabeledPartitions: row_idx, col_idx = evt.index doc_id = processed_documents_df.iloc[row_idx]["doc_id"] gr.Info(f"Select document: {doc_id}") doc = processed_documents[doc_id] return doc def main(): example_text = "Scholarly Argumentation Mining (SAM) has recently gained attention due to its potential to help scholars with the rapid growth of published scientific literature. It comprises two subtasks: argumentative discourse unit recognition (ADUR) and argumentative relation extraction (ARE), both of which are challenging since they require e.g. the integration of domain knowledge, the detection of implicit statements, and the disambiguation of argument structure. While previous work focused on dataset construction and baseline methods for specific document sections, such as abstract or results, full-text scholarly argumentation mining has seen little progress. In this work, we introduce a sequential pipeline model combining ADUR and ARE for full-text SAM, and provide a first analysis of the performance of pretrained language models (PLMs) on both subtasks. We establish a new SotA for ADUR on the Sci-Arg corpus, outperforming the previous best reported result by a large margin (+7% F1). We also present the first results for ARE, and thus for the full AM pipeline, on this benchmark dataset. Our detailed error analysis reveals that non-contiguous ADUs as well as the interpretation of discourse connectors pose major challenges and that data annotation needs to be more consistent." print("Loading models ...") argumentation_model, embedding_model, embedding_tokenizer = load_models( model_name=DEFAULT_MODEL_NAME, revision=DEFAULT_MODEL_REVISION, embedding_model_name=DEFAULT_EMBEDDING_MODEL_NAME, ) default_render_kwargs = { "entity_options": { # we need to convert the keys to uppercase because the spacy rendering function expects them in uppercase "colors": { "own_claim".upper(): "#009933", "background_claim".upper(): "#99ccff", "data".upper(): "#993399", } }, "colors_hover": { "selected": "#ffa", # "tail": "#aff", "tail": { # green "supports": "#9f9", # red "contradicts": "#f99", # do not highlight "parts_of_same": None, }, "head": None, # "#faf", "other": None, }, } with gr.Blocks() as demo: processed_documents_state = gr.State(dict()) vector_store_state = gr.State(SimpleVectorStore()) # wrap the pipeline and the embedding model/tokenizer in a tuple to avoid that it gets called models_state = gr.State((argumentation_model, embedding_model, embedding_tokenizer)) with gr.Row(): with gr.Column(scale=1): doc_id = gr.Textbox( label="Document ID", value="user_input", ) doc_text = gr.Textbox( label="Text", lines=20, value=example_text, ) with gr.Accordion("Model Configuration", open=False): model_name = gr.Textbox( label="Model Name", value=DEFAULT_MODEL_NAME, ) model_revision = gr.Textbox( label="Model Revision", value=DEFAULT_MODEL_REVISION, ) embedding_model_name = gr.Textbox( label=f"Embedding Model Name (e.g. {DEFAULT_EMBEDDING_MODEL_NAME})", value="", ) load_models_btn = gr.Button("Load Models") load_models_btn.click( fn=load_models, inputs=[model_name, model_revision, embedding_model_name], outputs=models_state, ) predict_btn = gr.Button("Analyse") document_state = gr.State() with gr.Column(scale=1): with gr.Accordion("See plain result ...", open=False) as output_accordion: document_json = gr.JSON(label="Model Output") with gr.Accordion("Render Options", open=False): render_as = gr.Dropdown( label="Render with", choices=[RENDER_WITH_PRETTY_TABLE, RENDER_WITH_DISPLACY], value=RENDER_WITH_DISPLACY, ) render_kwargs = gr.Textbox( label="Render Arguments", lines=5, value=json.dumps(default_render_kwargs, indent=2), ) render_btn = gr.Button("Re-render") rendered_output = gr.HTML(label="Rendered Output") # add_to_index_btn = gr.Button("Add current result to Index") upload_btn = gr.UploadButton( "Upload & Analyse Documents", file_types=["text"], file_count="multiple" ) with gr.Column(scale=1): with gr.Accordion("Indexed Documents", open=False): processed_documents_df = gr.DataFrame( headers=["id", "num_adus", "num_relations"], interactive=False, ) with gr.Accordion("Reference ADU", open=False): reference_adu_id = gr.Textbox(label="ID", elem_id="reference_adu_id") reference_adu_text = gr.Textbox(label="Text") with gr.Accordion("Retrieval Configuration", open=False): min_similarity = gr.Slider( label="Minimum Similarity", minimum=0.0, maximum=1.0, step=0.01, value=0.8, ) retrieve_similar_adus_btn = gr.Button("Retrieve similar ADUs") similar_adus = gr.DataFrame(headers=["doc_id", "adu_id", "score", "text"]) # retrieve_relevant_adus_btn = gr.Button("Retrieve relevant ADUs") relevant_adus = gr.DataFrame( label="Relevant ADUs from other documents", headers=[ "text", "relation", "doc_id", "reference_adu", "sim_score", "rel_score", ], ) render_event_kwargs = dict( fn=render_annotated_document, inputs=[document_state, render_as, render_kwargs], outputs=rendered_output, ) predict_btn.click(fn=open_accordion, inputs=[], outputs=[output_accordion]).then( fn=wrapped_process_text, inputs=[doc_text, doc_id, models_state, processed_documents_state, vector_store_state], outputs=[document_json, document_state], api_name="predict", ).success( fn=update_processed_documents_df, inputs=[processed_documents_state], outputs=[processed_documents_df], ) render_btn.click(**render_event_kwargs, api_name="render") document_state.change( fn=lambda doc: doc.asdict(), inputs=[document_state], outputs=[document_json], ).success(close_accordion, inputs=[], outputs=[output_accordion]).then( **render_event_kwargs ) upload_btn.upload( fn=process_uploaded_file, inputs=[upload_btn, models_state, processed_documents_state, vector_store_state], outputs=[], ).success( fn=update_processed_documents_df, inputs=[processed_documents_state], outputs=[processed_documents_df], ) processed_documents_df.select( select_processed_document, inputs=[processed_documents_df, processed_documents_state], outputs=[document_state], ) retrieve_relevant_adus_event_kwargs = dict( fn=get_relevant_adus, inputs=[ reference_adu_id, document_state, vector_store_state, processed_documents_state, min_similarity, ], outputs=[relevant_adus], ) reference_adu_id.change( fn=partial(_get_annotation_from_document, annotation_layer="labeled_spans"), inputs=[document_state, reference_adu_id], outputs=[reference_adu_text], ).success(**retrieve_relevant_adus_event_kwargs) retrieve_similar_adus_btn.click( fn=get_similar_adus, inputs=[ reference_adu_id, document_state, vector_store_state, processed_documents_state, min_similarity, ], outputs=[similar_adus], ) # retrieve_relevant_adus_btn.click( # **retrieve_relevant_adus_event_kwargs # ) js = """ () => { function maybeSetColor(entity, colorAttributeKey, colorDictKey) { var color = entity.getAttribute('data-color-' + colorAttributeKey); // if color is a json string, parse it and use the value at colorDictKey try { const colors = JSON.parse(color); color = colors[colorDictKey]; } catch (e) {} if (color) { entity.style.backgroundColor = color; entity.style.color = '#000'; } } function highlightRelationArguments(entityId) { const entities = document.querySelectorAll('.entity'); // reset all entities entities.forEach(entity => { const color = entity.getAttribute('data-color-original'); entity.style.backgroundColor = color; entity.style.color = ''; }); if (entityId !== null) { var visitedEntities = new Set(); // highlight selected entity const selectedEntity = document.getElementById(entityId); if (selectedEntity) { const label = selectedEntity.getAttribute('data-label'); maybeSetColor(selectedEntity, 'selected', label); visitedEntities.add(selectedEntity); } // highlight tails const relationTailsAndLabels = JSON.parse(selectedEntity.getAttribute('data-relation-tails')); relationTailsAndLabels.forEach(relationTail => { const tailEntity = document.getElementById(relationTail['entity-id']); if (tailEntity) { const label = relationTail['label']; maybeSetColor(tailEntity, 'tail', label); visitedEntities.add(tailEntity); } }); // highlight heads const relationHeadsAndLabels = JSON.parse(selectedEntity.getAttribute('data-relation-heads')); relationHeadsAndLabels.forEach(relationHead => { const headEntity = document.getElementById(relationHead['entity-id']); if (headEntity) { const label = relationHead['label']; maybeSetColor(headEntity, 'head', label); visitedEntities.add(headEntity); } }); // highlight other entities entities.forEach(entity => { if (!visitedEntities.has(entity)) { const label = entity.getAttribute('data-label'); maybeSetColor(entity, 'other', label); } }); } } function setReferenceAduId(entityId) { // get the textarea element that holds the reference adu id let referenceAduIdDiv = document.querySelector('#reference_adu_id textarea'); // set the value of the input field referenceAduIdDiv.value = entityId; // trigger an input event to update the state var event = new Event('input'); referenceAduIdDiv.dispatchEvent(event); } const entities = document.querySelectorAll('.entity'); entities.forEach(entity => { const alreadyHasListener = entity.getAttribute('data-has-listener'); if (alreadyHasListener) { return; } entity.addEventListener('mouseover', () => { highlightRelationArguments(entity.id); setReferenceAduId(entity.id); }); entity.addEventListener('mouseout', () => { highlightRelationArguments(null); }); entity.setAttribute('data-has-listener', 'true'); }); } """ rendered_output.change(fn=None, js=js, inputs=[], outputs=[]) demo.launch() if __name__ == "__main__": # configure logging logging.basicConfig() main()