|
import re
|
|
from bs4 import BeautifulSoup
|
|
import requests
|
|
import json
|
|
import io
|
|
import fitz
|
|
from pptx import Presentation
|
|
from io import BytesIO
|
|
import chardet
|
|
from docx import Document
|
|
import pandas as pd
|
|
from sumarize import summarize
|
|
from io import BytesIO
|
|
from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter
|
|
from pdfminer.converter import TextConverter
|
|
from io import StringIO
|
|
from pdfminer.layout import LAParams
|
|
from pdfminer.pdfpage import PDFPage
|
|
|
|
def trim_input_words(input_str, max_new_tokens = 512, max_total_tokens=32768):
|
|
words = input_str.split()
|
|
max_input_tokens = max_total_tokens - max_new_tokens
|
|
|
|
if len(words) > max_input_tokens - 100:
|
|
words = words[:max_input_tokens]
|
|
trimmed_input_str = ' '.join(words)
|
|
|
|
return trimmed_input_str
|
|
|
|
def select_words_until_char_limit(s, char_limit):
|
|
s_no_punct = re.sub(r'[^\w\s]', '', s)
|
|
words = s_no_punct.split()
|
|
selected_words = []
|
|
total_chars = 0
|
|
for word in words:
|
|
if total_chars + len(word) + 1 <= char_limit:
|
|
selected_words.append(word)
|
|
total_chars += len(word) + 1
|
|
else:
|
|
break
|
|
f = trim_input_words(' '.join(selected_words))
|
|
return f
|
|
|
|
|
|
|
|
def downl(url):
|
|
try:
|
|
rq = requests.get(url)
|
|
if rq.status_code != 200:
|
|
return ""
|
|
bs = BeautifulSoup(rq.text, features='lxml')
|
|
lis = bs.find_all('ul', class_='dropdown-menu')[-1].find_all('li')
|
|
link = lis[-1].find('a').get('href')
|
|
print(link)
|
|
return link
|
|
except Exception as e:
|
|
return ""
|
|
|
|
|
|
def pdf(url):
|
|
|
|
response = requests.get(url)
|
|
pdf_content = response.content
|
|
|
|
|
|
pdf_file = BytesIO(pdf_content)
|
|
|
|
|
|
resource_manager = PDFResourceManager()
|
|
fake_file_handle = StringIO()
|
|
converter = TextConverter(resource_manager, fake_file_handle, laparams=LAParams())
|
|
page_interpreter = PDFPageInterpreter(resource_manager, converter)
|
|
|
|
for page in PDFPage.get_pages(pdf_file):
|
|
page_interpreter.process_page(page)
|
|
|
|
text = fake_file_handle.getvalue()
|
|
f = select_words_until_char_limit(text, 30000)
|
|
converter.close()
|
|
fake_file_handle.close()
|
|
return f
|
|
|
|
|
|
def excel(link : str) -> str:
|
|
try:
|
|
response = requests.get(link)
|
|
if response.status_code == 200:
|
|
file_content = response.content
|
|
df = pd.read_excel(BytesIO(file_content))
|
|
if df.shape[0] > 50:
|
|
sample_size = 50
|
|
sample_df = df.sample(n=sample_size, random_state=42)
|
|
else:
|
|
sample_df = df
|
|
json_data = sample_df.to_json(orient='records')
|
|
js = json.loads(json_data)
|
|
rs = select_words_until_char_limit(f"{js}", 32000)
|
|
return rs
|
|
else:
|
|
print("Failed to download file")
|
|
return "No dat avaible error"
|
|
except Exception as e:
|
|
print(e)
|
|
return "No data avaible"
|
|
|
|
|
|
def csv(link : str) -> str:
|
|
try:
|
|
response = requests.get(link)
|
|
|
|
if response.status_code == 200:
|
|
file_content = response.content
|
|
detected_encoding = chardet.detect(file_content)['encoding']
|
|
df = pd.read_csv(io.BytesIO(file_content), encoding=detected_encoding, sep=';')
|
|
if df.empty:
|
|
print("The DataFrame is empty.")
|
|
return 'The data frame is empty'
|
|
|
|
if df.shape[0] > 50:
|
|
sample_size = 50
|
|
sample_df = df.sample(n=sample_size, random_state=42)
|
|
else:
|
|
sample_df = df
|
|
|
|
json_data = sample_df.to_json(orient='records')
|
|
js = json.loads(json_data)
|
|
rs = select_words_until_char_limit(f"{js}", 32000)
|
|
return rs
|
|
|
|
except Exception as e:
|
|
return 'No data avaible'
|
|
|
|
|
|
def docx(url : str) -> str:
|
|
try:
|
|
response = requests.get(url)
|
|
response.raise_for_status()
|
|
|
|
|
|
file_stream = io.BytesIO(response.content)
|
|
doc = Document(file_stream)
|
|
|
|
|
|
full_text = []
|
|
for para in doc.paragraphs:
|
|
full_text.append(para.text)
|
|
|
|
f = "\n".join(full_text)
|
|
n = select_words_until_char_limit(f, 32000)
|
|
return n
|
|
except Exception as e:
|
|
print(f"An error occurred: {e}")
|
|
return 'No data avaible'
|
|
|
|
|
|
|
|
|
|
def pptx(url : str) -> str:
|
|
try:
|
|
response = requests.get(url)
|
|
response.raise_for_status()
|
|
|
|
|
|
file_stream = io.BytesIO(response.content)
|
|
presentation = Presentation(file_stream)
|
|
|
|
|
|
full_text = []
|
|
for slide in presentation.slides:
|
|
for shape in slide.shapes:
|
|
if hasattr(shape, "text"):
|
|
full_text.append(shape.text)
|
|
|
|
g = "\n".join(full_text)
|
|
c = select_words_until_char_limit(g, 32000)
|
|
return c
|
|
except Exception as e:
|
|
print(f"An error occurred: {e}")
|
|
return 'No data avaible'
|
|
|
|
def get_data(url):
|
|
ki = url.replace('\nObservation', '').replace('"\nObservation', '')
|
|
jo = downl(ki)
|
|
ext = jo.split(".")[-1]
|
|
if ext == 'xlsx' or ext == 'xls' or ext == 'xlsm':
|
|
rs = excel(jo)
|
|
return summarize.invoke({"input":rs})
|
|
elif ext == 'pdf':
|
|
rs = pdf(jo)
|
|
return summarize.invoke({"input":rs})
|
|
elif ext == 'docx':
|
|
rs = docx(jo)
|
|
return summarize.invoke({"input":rs})
|
|
elif ext == 'csv':
|
|
rs = csv(jo)
|
|
return summarize.invoke({"input":rs})
|
|
elif ext == 'pptx' or ext == 'ppt':
|
|
rs = pptx(jo)
|
|
return summarize.invoke({"input":rs})
|
|
elif ext == 'doc':
|
|
return "L'extension .doc non supportée."
|
|
return "No data returned"
|
|
|