raccoon / main.py
grapplerulrich's picture
Add new cache delete buttons
2164d57
from os import getenv, remove
from os.path import exists
from functools import cache
import json
import streamlit as st
from dotenv import load_dotenv
from googleapiclient.discovery import build
from slugify import slugify
from transformers import pipeline
import uuid
from beautiful_soup.app import get_url_content
@cache
def google_search_api_request( query ):
load_dotenv()
api_key = getenv('GOOGLE_SEARCH_API_KEY')
# cx = os.getenv('GOOGLE_SEARCH_ENGINE_ID')
service = build(
"customsearch",
"v1",
developerKey=api_key,
cache_discovery=False
)
# Exclude PDFs from search results.
query = query + ' -filetype:pdf'
return service.cse().list(
q=query,
cx='05048cc2df6134a06',
num=5,
).execute()
def search_results( query ):
file_path = 'search-results/' + slugify( query ) + '.json'
results = []
if exists( file_path ):
with open( file_path, 'r' ) as results_file:
results = json.load( results_file )
else:
search_result = google_search_api_request( query )
if ( int( search_result['searchInformation']['totalResults'] ) > 0 ):
results = search_result['items']
with open( file_path, 'w' ) as results_file:
json.dump( results, results_file )
if ( len( results ) == 0 ) :
raise Exception('No results found.')
return results
def content_summary( url_id, content ):
file_path = 'summaries/' + url_id + '.json'
if exists( file_path ):
with open( file_path, 'r' ) as file:
summary = json.load( file )
else:
try:
summarizer = pipeline("summarization", model="sshleifer/distilbart-cnn-12-6")
summary = summarizer(content, max_length=130, min_length=30, do_sample=False, truncation=True)
except Exception as exception:
raise exception
with open( file_path, 'w' ) as file:
json.dump( summary, file )
return summary
def main():
st.title('Google Search')
query = st.text_input('Search query')
if query :
with st.spinner('Loading search results...'):
try:
results = search_results( query )
except Exception as exception:
st.exception(exception)
return
number_of_results = len( results )
st.success( 'Found {} results.'.format( number_of_results ) )
# if st.button('Search results JSON'):
with st.expander("Search results JSON"):
st.json( results )
progress_bar = st.progress(0)
# for result in results:
for index, result in enumerate(results):
with st.container():
url_id = uuid.uuid5( uuid.NAMESPACE_URL, result['link'] ).hex
st.write(result['link'])
st.write(url_id)
# if st.button('URL HTML'):
# st.json( results )
# if st.button('Page content'):
# st.json( results )
try:
content = get_url_content( result['link'] )
except Exception as exception:
st.exception(exception)
progress_bar.progress( ( index + 1 ) / number_of_results )
continue
summary = content_summary( url_id, content )
for sentence in summary:
st.write(sentence['summary_text'])
progress_bar.progress( ( index + 1 ) / number_of_results )
col1, col2 = st.columns([.5,1])
with col1:
if st.button('Delete summary cache', key=url_id + 'summary'):
remove( 'summaries/' + url_id + '.json' )
with col2:
if st.button('Delete content cache', key=url_id + 'content'):
remove( 'page-content/' + url_id + '.txt' )
if __name__ == '__main__':
main()