|
import requests |
|
import json |
|
import os |
|
import uuid |
|
import zipfile |
|
import io |
|
import subprocess |
|
import os |
|
import re |
|
import warnings |
|
from fastapi import FastAPI, HTTPException |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from fastapi.responses import FileResponse |
|
from fastapi.staticfiles import StaticFiles |
|
from pydantic import BaseModel |
|
from typing import Any, Dict, List, Literal, Optional |
|
|
|
warnings.filterwarnings("ignore") |
|
|
|
app = FastAPI(title="3GPP Specification Splitter API", |
|
description="API to split and display specifications by their chapters & sub-chapters") |
|
|
|
app.mount("/static", StaticFiles(directory="static"), name="static") |
|
|
|
origins = [ |
|
"*", |
|
] |
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=origins, |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
def get_text(specification: str, version: str): |
|
"""Récupère les bytes du PDF à partir d'une spécification et d'une version.""" |
|
doc_id = specification |
|
series = doc_id.split(".")[0] |
|
|
|
response = requests.get( |
|
f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version}.zip", |
|
verify=False, |
|
headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'} |
|
) |
|
|
|
if response.status_code != 200: |
|
raise Exception(f"Téléchargement du ZIP échoué pour {specification}-{version}") |
|
|
|
zip_bytes = io.BytesIO(response.content) |
|
|
|
with zipfile.ZipFile(zip_bytes) as zf: |
|
for file_name in zf.namelist(): |
|
if file_name.endswith("zip"): |
|
print("Another ZIP !") |
|
zip_bytes = io.BytesIO(zf.read(file_name)) |
|
zf = zipfile.ZipFile(zip_bytes) |
|
for file_name2 in zf.namelist(): |
|
if file_name2.endswith("doc") or file_name2.endswith("docx"): |
|
if "cover" in file_name2.lower(): |
|
print("COVER !") |
|
continue |
|
ext = file_name2.split(".")[-1] |
|
doc_bytes = zf.read(file_name2) |
|
temp_id = str(uuid.uuid4()) |
|
input_path = f"/tmp/{temp_id}.{ext}" |
|
output_path = f"/tmp/{temp_id}.txt" |
|
|
|
with open(input_path, "wb") as f: |
|
f.write(doc_bytes) |
|
|
|
subprocess.run([ |
|
"libreoffice", |
|
"--headless", |
|
"--convert-to", "txt", |
|
"--outdir", "/tmp", |
|
input_path |
|
], check=True) |
|
|
|
with open(output_path, "r") as f: |
|
txt_data = [line.strip() for line in f if line.strip()] |
|
|
|
os.remove(input_path) |
|
os.remove(output_path) |
|
return txt_data |
|
elif file_name.endswith("doc") or file_name.endswith("docx"): |
|
if "cover" in file_name.lower(): |
|
print("COVER !") |
|
continue |
|
ext = file_name.split(".")[-1] |
|
doc_bytes = zf.read(file_name) |
|
temp_id = str(uuid.uuid4()) |
|
input_path = f"/tmp/{temp_id}.{ext}" |
|
output_path = f"/tmp/{temp_id}.txt" |
|
|
|
print("Ecriture") |
|
with open(input_path, "wb") as f: |
|
f.write(doc_bytes) |
|
|
|
print("Convertissement") |
|
subprocess.run([ |
|
"libreoffice", |
|
"--headless", |
|
"--convert-to", "txt", |
|
"--outdir", "/tmp", |
|
input_path |
|
], check=True) |
|
|
|
print("Ecriture TXT") |
|
with open(output_path, "r", encoding="utf-8") as f: |
|
txt_data = [line.strip() for line in f if line.strip()] |
|
|
|
os.remove(input_path) |
|
os.remove(output_path) |
|
return txt_data |
|
|
|
raise Exception(f"Aucun fichier .doc/.docx trouvé dans le ZIP pour {specification}-{version}") |
|
|
|
def get_latest_version(spec: str) -> str: |
|
try: |
|
req = requests.post("https://organizedprogrammers-3gppdocfinder.hf.space/find", headers={"Content-Type": "application/json"}, data=json.dumps({"doc_id": spec}), verify=False) |
|
except Exception as e: |
|
raise HTTPException(500, f"An error has occured while getting latest version: {e}") |
|
if req.status_code == 200: |
|
reqJS = req.json() |
|
return reqJS['version'] |
|
else: |
|
raise HTTPException(req.status_code, "An error has occured while getting latest version") |
|
|
|
class SpecRequest(BaseModel): |
|
specification: str |
|
version: Optional[str] = None |
|
|
|
@app.get("/") |
|
def main_page(): |
|
return FileResponse(os.path.join("templates", "index.html")) |
|
|
|
@app.post("/from-search") |
|
def get_file_from_spec_id_version(req: SpecRequest) -> Dict[str, str]: |
|
spec = req.specification |
|
version = req.version |
|
if not version: |
|
version = get_latest_version(spec) |
|
|
|
text = get_text(spec, version) |
|
forewords = [] |
|
for x in range(len(text)): |
|
line = text[x] |
|
if "Foreword" in line: |
|
forewords.append(x) |
|
if len(forewords) >= 2: |
|
break |
|
|
|
toc_brut = text[forewords[0]:forewords[1]] |
|
chapters = [] |
|
for line in toc_brut: |
|
x = line.split("\t") |
|
if re.fullmatch(r"^\d\t[A-Z][a-zA-Z0-9\s,;!?'.-]*$", line): |
|
chapters.append(x[0] if len(x) == 1 else "\t".join(x[:2])) |
|
if re.fullmatch(r"^\d\.\d\t[A-Z][a-zA-Z0-9\s,;!?'.-]*$", line): |
|
chapters.append(x[0] if len(x) == 1 else "\t".join(x[:2])) |
|
if re.fullmatch(r"^\d\.\d\.\d\t[A-Z][a-zA-Z0-9\s,;!?'.-]*$", line): |
|
chapters.append(x[0] if len(x) == 1 else "\t".join(x[:2])) |
|
if re.fullmatch(r"^\d\.\d\.\d.\d\t[A-Z][a-zA-Z0-9\s,;!?'.-]*$", line): |
|
chapters.append(x[0] if len(x) == 1 else "\t".join(x[:2])) |
|
if re.fullmatch(r"^\d\.\d\.\d.\d.\d\t[A-Z][a-zA-Z0-9\s,;!?'.-]*$", line): |
|
chapters.append(x[0] if len(x) == 1 else "\t".join(x[:2])) |
|
|
|
real_toc_indexes = {} |
|
|
|
for chapter in chapters: |
|
try: |
|
x = text.index(chapter) |
|
real_toc_indexes[chapter] = x |
|
except ValueError as e: |
|
real_toc_indexes[chapter] = -float("inf") |
|
|
|
document = {} |
|
toc = list(real_toc_indexes.keys()) |
|
index_toc = list(real_toc_indexes.values()) |
|
curr_index = 0 |
|
for x in range(1, len(toc)): |
|
document[toc[curr_index].replace("\t", " ")] = re.sub(r"[\ \t]+", " ", "\n".join(text[index_toc[curr_index]+1:index_toc[x]])) |
|
curr_index = x |
|
|
|
document[toc[curr_index].replace("\t"," ")] = re.sub(r"\s+", " ", " ".join(text[index_toc[curr_index]+1:])) |
|
return document |
|
|