om4r932's picture
Final fix
4991d5e
import requests
import json
import os
import uuid
import zipfile
import io
import subprocess
import os
import re
import warnings
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from typing import Any, Dict, List, Literal, Optional
warnings.filterwarnings("ignore")
app = FastAPI(title="3GPP Specification Splitter API",
description="API to split and display specifications by their chapters & sub-chapters")
app.mount("/static", StaticFiles(directory="static"), name="static")
origins = [
"*",
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def get_text(specification: str, version: str):
"""Récupère les bytes du PDF à partir d'une spécification et d'une version."""
doc_id = specification
series = doc_id.split(".")[0]
response = requests.get(
f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version}.zip",
verify=False,
headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
)
if response.status_code != 200:
raise Exception(f"Téléchargement du ZIP échoué pour {specification}-{version}")
zip_bytes = io.BytesIO(response.content)
with zipfile.ZipFile(zip_bytes) as zf:
for file_name in zf.namelist():
if file_name.endswith("zip"):
print("Another ZIP !")
zip_bytes = io.BytesIO(zf.read(file_name))
zf = zipfile.ZipFile(zip_bytes)
for file_name2 in zf.namelist():
if file_name2.endswith("doc") or file_name2.endswith("docx"):
if "cover" in file_name2.lower():
print("COVER !")
continue
ext = file_name2.split(".")[-1]
doc_bytes = zf.read(file_name2)
temp_id = str(uuid.uuid4())
input_path = f"/tmp/{temp_id}.{ext}"
output_path = f"/tmp/{temp_id}.txt"
with open(input_path, "wb") as f:
f.write(doc_bytes)
subprocess.run([
"libreoffice",
"--headless",
"--convert-to", "txt",
"--outdir", "/tmp",
input_path
], check=True)
with open(output_path, "r") as f:
txt_data = [line.strip() for line in f if line.strip()]
os.remove(input_path)
os.remove(output_path)
return txt_data
elif file_name.endswith("doc") or file_name.endswith("docx"):
if "cover" in file_name.lower():
print("COVER !")
continue
ext = file_name.split(".")[-1]
doc_bytes = zf.read(file_name)
temp_id = str(uuid.uuid4())
input_path = f"/tmp/{temp_id}.{ext}"
output_path = f"/tmp/{temp_id}.txt"
print("Ecriture")
with open(input_path, "wb") as f:
f.write(doc_bytes)
print("Convertissement")
subprocess.run([
"libreoffice",
"--headless",
"--convert-to", "txt",
"--outdir", "/tmp",
input_path
], check=True)
print("Ecriture TXT")
with open(output_path, "r", encoding="utf-8") as f:
txt_data = [line.strip() for line in f if line.strip()]
os.remove(input_path)
os.remove(output_path)
return txt_data
raise Exception(f"Aucun fichier .doc/.docx trouvé dans le ZIP pour {specification}-{version}")
def get_latest_version(spec: str) -> str:
try:
req = requests.post("https://organizedprogrammers-3gppdocfinder.hf.space/find", headers={"Content-Type": "application/json"}, data=json.dumps({"doc_id": spec}), verify=False)
except Exception as e:
raise HTTPException(500, f"An error has occured while getting latest version: {e}")
if req.status_code == 200:
reqJS = req.json()
return reqJS['version']
else:
raise HTTPException(req.status_code, "An error has occured while getting latest version")
class SpecRequest(BaseModel):
specification: str
version: Optional[str] = None
@app.get("/")
def main_page():
return FileResponse(os.path.join("templates", "index.html"))
@app.post("/from-search")
def get_file_from_spec_id_version(req: SpecRequest) -> Dict[str, str]:
spec = req.specification
version = req.version
if not version:
version = get_latest_version(spec)
text = get_text(spec, version)
forewords = []
for x in range(len(text)):
line = text[x]
if "Foreword" in line:
forewords.append(x)
if len(forewords) >= 2:
break
toc_brut = text[forewords[0]:forewords[1]]
chapters = []
for line in toc_brut:
x = line.split("\t")
if re.fullmatch(r"^\d\t[A-Z][a-zA-Z0-9\s,;!?'.-]*$", line):
chapters.append(x[0] if len(x) == 1 else "\t".join(x[:2]))
if re.fullmatch(r"^\d\.\d\t[A-Z][a-zA-Z0-9\s,;!?'.-]*$", line):
chapters.append(x[0] if len(x) == 1 else "\t".join(x[:2]))
if re.fullmatch(r"^\d\.\d\.\d\t[A-Z][a-zA-Z0-9\s,;!?'.-]*$", line):
chapters.append(x[0] if len(x) == 1 else "\t".join(x[:2]))
if re.fullmatch(r"^\d\.\d\.\d.\d\t[A-Z][a-zA-Z0-9\s,;!?'.-]*$", line):
chapters.append(x[0] if len(x) == 1 else "\t".join(x[:2]))
if re.fullmatch(r"^\d\.\d\.\d.\d.\d\t[A-Z][a-zA-Z0-9\s,;!?'.-]*$", line):
chapters.append(x[0] if len(x) == 1 else "\t".join(x[:2]))
real_toc_indexes = {}
for chapter in chapters:
try:
x = text.index(chapter)
real_toc_indexes[chapter] = x
except ValueError as e:
real_toc_indexes[chapter] = -float("inf")
document = {}
toc = list(real_toc_indexes.keys())
index_toc = list(real_toc_indexes.values())
curr_index = 0
for x in range(1, len(toc)):
document[toc[curr_index].replace("\t", " ")] = re.sub(r"[\ \t]+", " ", "\n".join(text[index_toc[curr_index]+1:index_toc[x]]))
curr_index = x
document[toc[curr_index].replace("\t"," ")] = re.sub(r"\s+", " ", " ".join(text[index_toc[curr_index]+1:]))
return document