TopicDig / streamlit_app.py
m. polinsky
Added refactored app code
d828c62 unverified
raw history blame
No virus
9.47 kB
import requests
import json
from typing import List, Set
from collections import namedtuple
from functools import lru_cache
from datetime import datetime as dt
import os, os.path
from codetiming import Timer
import streamlit as st
# local code
from digestor import Digestor
from source import Source
from scrape_sources import NPRLite, CNNText, stub
import random
# EDIT: before doing NER check time of last scrape and just read in from JSON store instead of rescraping
# can force rescrape
# This may take a config to get sources as input
def initialize(limit, rando, use_cache=True):
clusters: dict[str:List[namedtuple]] = dict()
# This is a container for the source classes.
# Make sure you handle this. Whats the deal.
sources:List[Source]= [] # Write them and import? Read a config?
# FOR NOW ONLY add this explicitly here.
# MUST read in final version though.
sources.append(NPRLite(
'npr',
'https://text.npr.org/1001',
'sshleifer/distilbart-cnn-12-6',
'dbmdz/bert-large-cased-finetuned-conll03-english'
))
sources.append(CNNText(
'cnn',
'https://lite.cnn.com',
'sshleifer/distilbart-cnn-12-6',
'dbmdz/bert-large-cased-finetuned-conll03-english'
))
# initialize list to hold cluster data namedtuples
cluster_data: List[namedtuple('article', ['link','hed','entities', 'source'])]
article_dict : dict[str:namedtuple]
# For all sources retrieve_cluster_data
# returns List[namedtuples] with empty entity lists
# TEST THIS ALL V V V
cluster_data = []
article_meta = namedtuple('article_meta',['source', 'count'])
cluster_meta : List[article_meta] = []
print("Calling data source retrieve cluster data....")
for data_source in sources:
if limit is not None:
c_data, c_meta = data_source.retrieve_cluster_data(limit//len(sources))
else:
c_data, c_meta = data_source.retrieve_cluster_data()
cluster_data.append(c_data)
cluster_meta.append(article_meta(data_source.source_name, c_meta))
print("Finished...moving on to clustering...")
cluster_data = cluster_data[0] + cluster_data[1]
# NER
# iterate the list of namedtuples,
for tup in cluster_data:
# pass each hed to the api query method, return the dict
# through the ner_results function to the 'entities' list.
# Populate stub entities list
perform_ner(tup, cache=use_cache)
generate_clusters(clusters, tup)
st.write(f"""Total number of clusters: {len(clusters)}""")
# Article stubs tracks all stubs
# If cluster is unsummarized, its hed's value is the namedtuple stub.
# Else reference digestor instance so summary can be found.
article_dict = {stub.hed: stub for stub in cluster_data}
return article_dict, clusters
# Am I going to use this for those two lines?
def perform_ner(tup:namedtuple('article',['link','hed','entities', 'source']), cache=True):
with Timer(name="ner_query_time", logger=None):
result = ner_results(ner_query(
{
"inputs":tup.hed,
"paramters":
{
"use_cache": cache,
},
}
))
for i in result:
tup.entities.append(i)
def ner_query(payload):
print("making a query....")
data = json.dumps(payload)
response = requests.request("POST", NER_API_URL, headers=headers, data=data)
return json.loads(response.content.decode("utf-8"))
def generate_clusters(
the_dict: dict,
tup : namedtuple('article_stub',[ 'link','hed','entities', 'source'])
) -> dict:
for entity in tup.entities:
# Add cluster if entity not already in dict
if entity not in the_dict:
the_dict[entity] = []
# Add this article's link to the cluster dict
the_dict[entity].append(tup)
def ner_results(ner_object, groups=True, NER_THRESHOLD=0.5) -> List[str]:
# empty lists to collect our entities
people, places, orgs, misc = [], [], [], []
# 'ent' and 'designation' handle the difference between dictionary keys
# for aggregation strategy grouped vs ungrouped
ent = 'entity' if not groups else 'entity_group'
designation = 'I-' if not groups else ''
# Define actions -- this is a switch-case dictionary.
# keys are the identifiers used inthe return dict from
# the ner_query.
# values are list.append() for each of the lists
# created at the top of the function. They hold sorted entities.
# actions is used to pass entities into the lists.
# Why I called it actions I have no idea rename it.
actions = {designation+'PER':people.append,
designation+'LOC':places.append,
designation+'ORG':orgs.append,
designation+'MISC':misc.append
} # Is this an antipattern?
# For each dictionary in the ner result list, if the entity str doesn't contain a '#'
# and the confidence is > 90%, add the entity to the list for its type.
# actions[d[ent]](d['word']) accesses the key of actions that is returned
# from d[ent] and then passes the entity name, returned by d['word'] to
# the 'list.append' waiting to be called in the dict actions.
# Note the (). We access actions to call its append...
readable = [ actions[d[ent]](d['word']) for d in ner_object if '#' not in d['word'] and d['score'] > NER_THRESHOLD ]
# create list of all entities to return
ner_list = [i for i in set(people) if len(i) > 2] + [i for i in set(places) if len(i) > 2] + [i for i in set(orgs) if len(i) > 2] + [i for i in set(misc) if len(i) > 2]
return ner_list
# These could be passed through the command line
# or read from a config file.
# One of these is needed here for NER and one in Digestor for summarization.
NER_API_URL = "https://api-inference.huggingface.co/models/dbmdz/bert-large-cased-finetuned-conll03-english"
headers = {"Authorization": f"""Bearer {st.secrets['ato']}"""}
LIMIT = None # Controls time and number of clusters.
USE_CACHE = True
if not USE_CACHE:
print("NOT USING CACHE--ARE YOU GATHERING DATA?")
if LIMIT is not None:
print(f"LIMIT: {LIMIT}")
# digest store
digests = dict() # key is cluster, value is digestor object
out_dicts = []
# list to accept user choices
# retrieve cluster data and create dict to track each article (articleStubs)
# and create topic clusters by performing ner.
print("Initializing....")
article_dict, clusters = initialize(LIMIT, USE_CACHE)
# We now have clusters and cluster data. Redundancy.
# We call a display function and get the user input.
# For this its still streamlit.
selections = []
choices = list(clusters.keys())
choices.insert(0,'None')
# Form used to take 3 menu inputs
with st.form(key='columns_in_form'):
cols = st.columns(3)
for i, col in enumerate(cols):
selections.append(col.selectbox(f'Make a Selection', choices, key=i))
submitted = st.form_submit_button('Submit')
if submitted:
selections = [i for i in selections if i is not None]
with st.spinner(text="Digesting...please wait, this will take a few moments...Maybe check some messages or start reading the latest papers on summarization with transformers...."):
found = False
# Check if we already have this digest.
for i in digests:
if set(list(answers.values())) == set(list(i)):
digestor = digests[i]
found = True
break
# If we need a new digest
if not found:
chosen = []
# Why not just use answers.values()?
for i in selections: # i is supposed to be a list of stubs, mostly one
if i != 'None':
for j in clusters[i]:
if j not in chosen:
chosen.append(j) # j is supposed to be a stub.
# Article dict contains stubs for unprocessed articles and lists of summarized chunks for processed ones.
# Here we put together a list of article stubs and/or summary chunks and let the digestor sort out what it does with them,
chosen = [i if isinstance(article_dict[i.hed], stub) else article_dict[i.hed] for i in chosen]
# Digestor uses 'chosen', passed through 'stubs' to create digest.
# 'user_choicese' is passed for reference.
# Passing list(answers.values()) includes 'None' choices.
digestor = Digestor(timer=Timer(), cache = USE_CACHE, stubs=chosen, user_choices=list(selections))
# happens internally but may be used differently so it isn't automatic upon digestor creation.
# Easily turn caching off for testing.
digestor.digest() # creates summaries and stores them associated with the digest
# Get displayable digest and digest data
digestor.build_digest()# only returns for data collection
digest = digestor.text
if len(digest) == 0:
st.write("You didn't select a topic!")
else:
st.write("Your digest is ready:\n")
st.write(digest)