ESG_API_BATCH / app.py
rdose's picture
Update app.py
3994894
raw
history blame
8.42 kB
import numpy as np
import onnxruntime
import onnx
import gradio as gr
import requests
import json
from extractnet import Extractor
import math
from transformers import AutoTokenizer
import spacy
import os
from transformers import pipeline
import itertools
import pandas as pd
OUT_HEADERS = ['E','S','G']
MODEL_TRANSFORMER_BASED = "distilbert-base-uncased"
MODEL_ONNX_FNAME = "ESG_classifier_batch.onnx"
MODEL_SENTIMENT_ANALYSIS = "ProsusAI/finbert"
#MODEL_SUMMARY_PEGASUS = "oMateos2020/pegasus-newsroom-cnn_full-adafactor-bs6"
#API_HF_SENTIMENT_URL = "https://api-inference.huggingface.co/models/cardiffnlp/twitter-roberta-base-sentiment"
def _inference_ner_spancat(text, summary, penalty=0.5, normalise=True, limit_outputs=10):
nlp = spacy.load("en_pipeline")
doc = nlp(text)
spans = doc.spans["sc"]
comp_raw_text = dict( sorted( dict(zip([str(x) for x in spans],[float(x)*penalty for x in spans.attrs['scores']])).items(), key=lambda x: x[1], reverse=True) )
doc = nlp(summary)
spans = doc.spans["sc"]
exceeds_one = 0.0
for comp_s in spans:
if str(comp_s) in comp_raw_text.keys():
comp_raw_text[str(comp_s)] = comp_raw_text[str(comp_s)] / penalty
temp_max = comp_raw_text[str(comp_s)]if comp_raw_text[str(comp_s)] > 1.0 else 0.0
exceeds_one = comp_raw_text[str(comp_s)] if temp_max > exceeds_one else exceeds_one
#This "exceeds_one" is a bit confusing. So the thing is that the penalty is reverted for each time the company appears in the summary and hence the value can exceed one when the company appears more than once. The normalisation means that all the other scores are divided by the maximum when any value exceeds one
if normalise and (exceeds_one > 1):
comp_raw_text = {k: v/exceeds_one for k, v in comp_raw_text.items()}
return dict(itertools.islice(sorted(comp_raw_text.items(), key=lambda x: x[1], reverse=True), limit_outputs))
#def _inference_summary_model_pipeline(text):
# pipe = pipeline("text2text-generation", model=MODEL_SUMMARY_PEGASUS)
# return pipe(text,truncation='longest_first')
def _inference_sentiment_model_pipeline(text):
tokenizer_kwargs = {'padding':True,'truncation':True,'max_length':512}#,'return_tensors':'pt'}
pipe = pipeline("sentiment-analysis", model=MODEL_SENTIMENT_ANALYSIS )
return pipe(text,**tokenizer_kwargs)
#def _inference_sentiment_model_via_api_query(payload):
# response = requests.post(API_HF_SENTIMENT_URL , headers={"Authorization": os.environ['hf_api_token']}, json=payload)
# return response.json()
def _lematise_text(text):
nlp = spacy.load("en_core_web_sm", disable=['ner'])
text_out = []
for doc in nlp.pipe(text): #see https://spacy.io/models#design
new_text = ""
for token in doc:
if (not token.is_punct
and not token.is_stop
and not token.like_url
and not token.is_space
and not token.like_email
#and not token.like_num
and not token.pos_ == "CONJ"):
new_text = new_text + " " + token.lemma_
text_out.append( new_text )
return text_out
def sigmoid(x):
return 1 / (1 + np.exp(-x))
def to_numpy(tensor):
return tensor.detach().cpu().numpy() if tensor.requires_grad else tensor.cpu().numpy()
def is_in_archive(url):
try:
r = requests.get('http://archive.org/wayback/available?url='+url)
archive = json.loads(r.text)
if archive['archived_snapshots'] :
archive['archived_snapshots']['closest']
return {'archived':archive['archived_snapshots']['closest']['available'], 'url':archive['archived_snapshots']['closest']['url'],'error':0}
else:
return {'archived':False, 'url':"", 'error':0}
except:
print(f"[E] Quering URL ({url}) from archive.org")
return {'archived':False, 'url':"", 'error':-1}
#def _inference_ner(text):
# return labels
def _inference_classifier(text):
tokenizer = AutoTokenizer.from_pretrained(MODEL_TRANSFORMER_BASED)
inputs = tokenizer(_lematise_text(text), return_tensors="np", padding="max_length", truncation=True) #this assumes head-only!
ort_session = onnxruntime.InferenceSession(MODEL_ONNX_FNAME)
onnx_model = onnx.load(MODEL_ONNX_FNAME)
onnx.checker.check_model(onnx_model)
# compute ONNX Runtime output prediction
ort_outs = ort_session.run(None, input_feed=dict(inputs))
return sigmoid(ort_outs[0])
def inference(input_batch,isurl,use_archive,limit_companies=10):
input_batch_content = []
print("->Input size:",len(input_batch))
print("+",input_batch)
if isurl:
for row_in in input_batch:
url = row_in[0]
if use_archive:
archive = is_in_archive(url)
if archive['archived']:
url = archive['url']
#Extract the data from url
extracted = Extractor().extract(requests.get(url).text)
input_batch_content.append(extracted['content'])
else:
for row_in in input_batch:
input_batch_content.append(row_in[0])
print("->Batch size:",len(input_batch_content))
print("+",input_batch_content)
prob_outs = _inference_classifier(input_batch_content)
#sentiment = _inference_sentiment_model_via_api_query({"inputs": extracted['content']})
#sentiment = _inference_sentiment_model_pipeline(input_batch_content )[0]
#summary = _inference_summary_model_pipeline(input_batch_content )[0]['generated_text']
#ner_labels = _inference_ner_spancat(input_batch_content ,summary, penalty = 0.8, limit_outputs=limit_companies)
return prob_outs #ner_labels, {'E':float(prob_outs[0]),"S":float(prob_outs[1]),"G":float(prob_outs[2])},{sentiment['label']:float(sentiment['score'])},"**Summary:**\n\n" + summary
title = "ESG API Demo"
description = """This is a demonstration of the full ESG pipeline backend where given a list of URL (english, news) the news contents are extracted, using extractnet, and fed to three models:
- An off-the-shelf sentiment classification model (ProsusAI/finbert)
- A custom NER for the company extraction
- A custom ESG classifier for the ESG labeling of the news (the extracted text is also lemmatised prior to be fed to this classifier)
API input parameters:
- List: list of text. Either list of Url of the news (english) or list of extracted news contents
- 'Data type': int. 0=list is of extracted news contents, 1=list is of urls.
- `use_archive`: boolean. The model will extract the archived version in archive.org of the url indicated. This is useful with old news and to bypass news behind paywall
- `limit_companies`: integer. Number of found relevant companies to report.
"""
examples = [[[['https://www.bbc.com/news/uk-62732447'],
['https://www.bbc.com/news/business-62747401'],
['https://www.bbc.com/news/technology-62744858'],
['https://www.bbc.com/news/science-environment-62758811'],
['https://www.theguardian.com/business/2022/sep/02/nord-stream-1-gazprom-announces-indefinite-shutdown-of-pipeline'],
['https://www.bbc.com/news/world-europe-62766867'],
['https://www.bbc.com/news/business-62524031'],
['https://www.bbc.com/news/business-62728621'],
['https://www.bbc.com/news/science-environment-62680423']],'url',False,5]]
demo = gr.Interface(fn=inference,
inputs=[gr.Dataframe(label='input batch', col_count=1, datatype='str', type='array', wrap=True),
gr.Dropdown(label='data type', choices=['text','url'], type='index', value='url'),
gr.Checkbox(label='if url parse cached in archive.org'),
gr.Slider(minimum=1, maximum=10, step=1, label='Limit NER output', value=5)],
outputs=[gr.Dataframe(label='output raw', col_count=1, datatype='number', type='array', wrap=True, header=OUT_HEADERS)],
#gr.Label(label='Company'),
#gr.Label(label='ESG'),
#gr.Label(label='Sentiment'),
#gr.Markdown()],
title=title,
description=description,
examples=examples)
demo.launch()