|
import os |
|
import requests |
|
from bs4 import BeautifulSoup |
|
from urllib.parse import urljoin |
|
import pandas as pd |
|
import numpy as np |
|
import zipfile |
|
import textract |
|
|
|
def scrape(url, excel_file, folder_name,progress=gr.Progress()): |
|
filenames = [] |
|
|
|
if excel_file and os.path.exists(excel_file): |
|
try: |
|
df = pd.read_excel(excel_file) |
|
|
|
|
|
if 'Actions' in df.columns: |
|
df = df[df['Actions'] == 'x'] |
|
|
|
elif 'File' in df.columns: |
|
filenames = [f"{url}/{row['File']}.zip" for index, row in df.iterrows()] |
|
elif 'URL' in df.columns: |
|
filenames = df['URL'].tolist() |
|
except Exception as e: |
|
print(f"Error reading Excel file: {e}") |
|
|
|
|
|
|
|
download_directory = folder_name |
|
if not os.path.exists(download_directory): |
|
os.makedirs(download_directory) |
|
|
|
|
|
if not filenames: |
|
print("No Excel file provided, or no valid URLs found in the file.") |
|
|
|
response = requests.get(url) |
|
|
|
|
|
soup = BeautifulSoup(response.content, "html.parser") |
|
|
|
|
|
links = soup.find_all("a", href=True) |
|
|
|
|
|
zip_links = [link['href'] for link in links if link['href'].endswith('.zip')] |
|
download_num = 0 |
|
pourcentss = 0.1 |
|
|
|
for zip_link in zip_links: |
|
if download_num%10 == 0: |
|
pourcentss = pourcentss + download_num/500 |
|
progress(pourcentss,desc='Telechargement') |
|
download_num = 0 |
|
download_num+=1 |
|
|
|
absolute_url = urljoin(url, zip_link) |
|
|
|
|
|
filename = os.path.basename(absolute_url) |
|
|
|
|
|
save_path = os.path.join(download_directory, filename) |
|
|
|
|
|
with requests.get(absolute_url, stream=True) as r: |
|
r.raise_for_status() |
|
with open(save_path, 'wb') as f: |
|
for chunk in r.iter_content(chunk_size=8192): |
|
f.write(chunk) |
|
|
|
|
|
|
|
else: |
|
|
|
for file_url in filenames: |
|
filename = os.path.basename(file_url) |
|
save_path = os.path.join(download_directory, filename) |
|
|
|
try: |
|
with requests.get(file_url, stream=True) as r: |
|
r.raise_for_status() |
|
with open(save_path, 'wb') as f: |
|
for chunk in r.iter_content(chunk_size=8192): |
|
f.write(chunk) |
|
except requests.exceptions.HTTPError as e: |
|
print(f"HTTP error occurred: {file_url}: {e}") |
|
return False, "Il n'y a pas de colonne action ou alors celle ci n'est pas bien écrite, format attendu: 'Actions'" |
|
|
|
return True, "Téléchargement terminé !" |
|
|
|
|
|
|
|
def extractZip(folder_name): |
|
|
|
download_directory = folder_name |
|
extract_directory = folder_name + " extraction" |
|
|
|
|
|
for zip_file in os.listdir(download_directory): |
|
zip_path = os.path.join(download_directory, zip_file) |
|
|
|
if zip_file.endswith(".zip"): |
|
extract_dir = os.path.join(extract_directory, os.path.splitext(zip_file)[0]) |
|
|
|
|
|
if os.path.exists(zip_path): |
|
|
|
if not os.path.exists(extract_dir): |
|
os.makedirs(extract_dir) |
|
|
|
|
|
print(f"Extraction en cours pour {zip_file}") |
|
with zipfile.ZipFile(zip_path, 'r') as zip_ref: |
|
zip_ref.extractall(extract_dir) |
|
|
|
print(f"Extraction terminée pour {zip_file}") |
|
else: |
|
print(f"Fichier zip {zip_file} introuvable") |
|
|
|
print("Toutes les extractions sont terminées !") |
|
|
|
def excel3gpp(url): |
|
response = requests.get(url) |
|
response.raise_for_status() |
|
|
|
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
|
|
|
excel_links = [a['href'] for a in soup.find_all('a', href=True) if a['href'].endswith(('.xlsx', '.xls'))] |
|
|
|
|
|
if excel_links: |
|
excel_url = excel_links[0] |
|
if not excel_url.startswith('http'): |
|
excel_url = os.path.join(url, excel_url) |
|
|
|
|
|
excel_response = requests.get(excel_url) |
|
excel_response.raise_for_status() |
|
|
|
|
|
filename = excel_url.split('/')[-1] |
|
filepath = os.path.join('path_to_save_directory', filename) |
|
|
|
|
|
|
|
filepath = 'guide.xlsx' |
|
|
|
with open(filepath, 'wb') as f: |
|
f.write(excel_response.content) |
|
print(f'Excel file downloaded and saved as: {filepath}') |
|
|
|
|
|
|
|
def replace_line_breaks(text): |
|
return text.replace("\n", "/n") |
|
|
|
def remod_text(text): |
|
return text.replace("/n", "\n") |
|
|
|
def extractionPrincipale(url, excel_file=None,progress=gr.Progress()): |
|
folder_name = url.split("/")[-2] |
|
progress(0.1,desc='Telechargement') |
|
result, message = scrape(url, excel_file, folder_name) |
|
if result: |
|
print("Success:", message) |
|
else: |
|
return(None, message) |
|
|
|
progress(0.4,desc='Extraction') |
|
extractZip(folder_name) |
|
progress(0.5,desc='Extraction 2') |
|
excel3gpp(url) |
|
progress(0.6,desc='Mise en forme Excel') |
|
|
|
extract_directory = folder_name +" extraction" |
|
categories = { |
|
"Other": ["URL", "File", "Type", "Title", "Source", "Content"], |
|
"CR": ["URL", "File", "Type", "Title", "Source", "Content"], |
|
"pCR":["URL", "File", "Type", "Title", "Source", "Content"], |
|
"LS": ["URL", "File", "Type", "Title", "Source", "Content"], |
|
"WID": ["URL", "File", "Type", "Title", "Source", "Content"], |
|
"SID": ["URL", "File", "Type", "Title", "Source", "Content"], |
|
"DISCUSSION": ["URL", "File", "Type", "Title", "Source", "Content"], |
|
"pdf": ["URL", "File", "Type", "Title", "Source", "Content"], |
|
"ppt": ["URL", "File", "Type", "Title", "Source", "Content"], |
|
"pptx": ["URL", "File", "Type", "Title", "Source", "Content"] |
|
} |
|
nouv=0 |
|
num=0.6 |
|
data = [] |
|
errors_count = 0 |
|
pre_title_section = None |
|
for folder in os.listdir(extract_directory): |
|
folder_path = os.path.join(extract_directory, folder) |
|
if os.path.isdir(folder_path): |
|
for file in os.listdir(folder_path): |
|
num=num + nouv/400 |
|
progress(num,desc='Mise en forme Excel') |
|
nouv+=1 |
|
if file == "__MACOSX": |
|
continue |
|
file_path = os.path.join(folder_path, file) |
|
if file.endswith((".pptx", ".ppt", ".pdf", ".docx", ".doc", ".DOCX")): |
|
try: |
|
text = textract.process(file_path).decode('utf-8') |
|
except Exception as e: |
|
print(f"Error processing {file_path}: {e}") |
|
errors_count += 1 |
|
continue |
|
|
|
cleaned_text_lines = text.split('\n') |
|
cleaned_text = '\n'.join([line.strip('|').strip() for line in cleaned_text_lines if line.strip()]) |
|
|
|
title = "" |
|
debut = "" |
|
sections = cleaned_text.split("Title:") |
|
if len(sections) > 1: |
|
pre_title_section = sections[0].strip().split() |
|
title = sections[1].strip().split("\n")[0].strip() |
|
debut = sections[0].strip() |
|
|
|
category = "Other" |
|
if file.endswith(".pdf"): |
|
category = "pdf" |
|
elif file.endswith((".ppt", ".pptx")): |
|
category = "ppt" |
|
elif "CHANGE REQUEST" in debut: |
|
category = "CR" |
|
elif "Discussion" in title: |
|
category = "DISCUSSION" |
|
elif "WID" in title: |
|
category = "WID" |
|
elif "SID" in title: |
|
category = "SID" |
|
elif "LS" in title: |
|
category = "LS" |
|
elif pre_title_section and pre_title_section[-1] == 'pCR': |
|
category = "pCR" |
|
elif "Pseudo-CR" in title: |
|
category = "pCR" |
|
|
|
|
|
contenu = "" |
|
if category in categories: |
|
columns = categories[category] |
|
extracted_content = [] |
|
if category == "CR": |
|
reason_for_change = "" |
|
summary_of_change = "" |
|
if len(sections) > 1: |
|
reason_for_change = sections[1].split("Reason for change", 1)[-1].split("Summary of change")[0].strip() |
|
summary_of_change = sections[1].split("Summary of change", 1)[-1].split("Consequences if not")[0].strip() |
|
extracted_content.append(f"Reason for change: {reason_for_change}") |
|
extracted_content.append(f"Summary of change: {summary_of_change}") |
|
elif category == "pCR": |
|
if len(sections) > 1: |
|
pcr_specific_content = sections[1].split("Introduction", 1)[-1].split("First Change")[0].strip() |
|
extracted_content.append(f"Introduction: {pcr_specific_content}") |
|
elif category == "LS": |
|
overall_review = "" |
|
if len(sections) > 1: |
|
overall_review = sections[1].split("Overall description", 1)[-1].strip() |
|
extracted_content.append(f"Overall review: {overall_review}") |
|
elif category in ["WID", "SID"]: |
|
objective = "" |
|
start_index = cleaned_text.find("Objective") |
|
end_index = cleaned_text.find("Expected Output and Time scale") |
|
if start_index != -1 and end_index != -1: |
|
objective = cleaned_text[start_index + len("Objective"):end_index].strip() |
|
extracted_content.append(f"Objective: {objective}") |
|
elif category == "DISCUSSION": |
|
Discussion = "" |
|
extracted_text = replace_line_breaks(cleaned_text) |
|
start_index_doc_for = extracted_text.find("Document for:") |
|
if start_index_doc_for != -1: |
|
start_index_word_after_doc_for = start_index_doc_for + len("Document for:") |
|
end_index_word_after_doc_for = start_index_word_after_doc_for + extracted_text[start_index_word_after_doc_for:].find("/n") |
|
word_after_doc_for = extracted_text[start_index_word_after_doc_for:end_index_word_after_doc_for].strip() |
|
result_intro = '' |
|
result_conclusion = '' |
|
result_info = '' |
|
if word_after_doc_for.lower() == "discussion": |
|
start_index_intro = extracted_text.find("Introduction") |
|
end_index_intro = extracted_text.find("Discussion", start_index_intro) |
|
|
|
intro_text = "" |
|
if start_index_intro != -1 and end_index_intro != -1: |
|
intro_text = extracted_text[start_index_intro + len("Introduction"):end_index_intro].strip() |
|
result_intro = remod_text(intro_text) |
|
else: |
|
result_intro = "Introduction section not found." |
|
|
|
|
|
start_index_conclusion = extracted_text.find("Conclusion", end_index_intro) |
|
end_index_conclusion = extracted_text.find("Proposal", start_index_conclusion if start_index_conclusion != -1 else end_index_intro) |
|
|
|
conclusion_text = "" |
|
if start_index_conclusion != -1 and end_index_conclusion != -1: |
|
conclusion_text = extracted_text[start_index_conclusion + len("Conclusion"):end_index_conclusion].strip() |
|
result_conclusion = remod_text(conclusion_text) |
|
elif start_index_conclusion == -1: |
|
start_index_proposal = extracted_text.find("Proposal", end_index_intro) |
|
if start_index_proposal != -1: |
|
end_index_proposal = len(extracted_text) |
|
proposal_text = extracted_text[start_index_proposal + len("Proposal"):end_index_proposal].strip() |
|
result_conclusion = remod_text(proposal_text) |
|
else: |
|
result_conclusion = "Conclusion/Proposal section not found." |
|
else: |
|
|
|
conclusion_text = extracted_text[start_index_conclusion + len("Conclusion"):].strip() |
|
result_conclusion = remod_text(conclusion_text) |
|
Discussion=f"Introduction: {result_intro}\nConclusion/Proposal: {result_conclusion}" |
|
elif word_after_doc_for.lower() == "information": |
|
start_index_info = extracted_text.find(word_after_doc_for) |
|
if start_index_info != -1: |
|
info_to_end = extracted_text[start_index_info + len("Information"):].strip() |
|
result_info = remod_text(info_to_end) |
|
Discussion = f"Discussion:{result_info}" |
|
else: |
|
Discussion = "The word after 'Document for:' is not 'Discussion', 'DISCUSSION', 'Information', or 'INFORMATION'." |
|
else: |
|
Discussion = "The phrase 'Document for:' was not found." |
|
|
|
|
|
discussion_details = Discussion |
|
extracted_content.append(discussion_details) |
|
|
|
contenu = "\n".join(extracted_content) |
|
|
|
|
|
|
|
source = "" |
|
status = "" |
|
data.append([url+ "/" + folder + '.zip', folder , category, title, source,status, contenu]) |
|
|
|
|
|
|
|
guide_df = pd.read_excel('guide.xlsx', usecols=['Source', 'TDoc','TDoc Status']) |
|
tdoc_source_map = {row['TDoc']: row['Source'] for index, row in guide_df.iterrows()} |
|
tdoc_status_map = {row['TDoc']: row['TDoc Status'] for index, row in guide_df.iterrows()} |
|
|
|
for item in data: |
|
nom_du_fichier = item[1] |
|
if nom_du_fichier in tdoc_source_map: |
|
item[4] = tdoc_source_map[nom_du_fichier] |
|
item[5] = tdoc_status_map[nom_du_fichier] |
|
|
|
|
|
|
|
|
|
new_df_columns = ["URL", "File", "Type", "Title", "Source", "Status", "Content"] |
|
new_df = pd.DataFrame(data, columns=new_df_columns) |
|
try: |
|
old_df = pd.read_excel(excel_file) |
|
|
|
|
|
if 'Actions' in old_df.columns: |
|
|
|
for index, new_row in new_df.iterrows(): |
|
|
|
match_indices = old_df[old_df['TDoc'] == new_row['File']].index |
|
|
|
for i in match_indices: |
|
old_df.at[i, 'Content'] = new_row['Content'] |
|
old_df.at[i, 'URL'] = new_row['URL'] |
|
|
|
df = old_df |
|
|
|
|
|
|
|
|
|
|
|
|
|
else: |
|
|
|
df = pd.concat([old_df, new_df], axis=0, ignore_index=True) |
|
except Exception as e: |
|
print("The provided excel file seems invalid:", e) |
|
df = new_df |
|
|
|
file_name = url.split("/")[-2] + ".xlsx" |
|
|
|
df.to_excel(file_name, index=False) |
|
return file_name, "Téléchargement réussi" |