|
import requests, os, zipfile, subprocess, re, warnings |
|
warnings.filterwarnings("ignore") |
|
from io import BytesIO |
|
from dotenv import load_dotenv |
|
from datasets import load_dataset |
|
from huggingface_hub import login |
|
from fastapi import FastAPI, HTTPException |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from pydantic import BaseModel |
|
|
|
load_dotenv() |
|
|
|
app = FastAPI(title="3GPP Specification Splitter API", |
|
description="API to split and display specifications by their chapters & sub-chapters", |
|
docs_url="/") |
|
|
|
origins = [ |
|
"*", |
|
] |
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=origins, |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
spec_contents = load_dataset("OrganizedProgrammers/3GPPSpecContent", token=os.environ["HF_TOKEN"]) |
|
spec_contents = spec_contents["train"].to_list() |
|
|
|
def is_doc_indexed(spec_id: str): |
|
return any([True if spec_id == s["doc_id"] else False for s in spec_contents]) |
|
|
|
def get_full_doc(spec_id: str): |
|
doc = [] |
|
for spec in spec_contents: |
|
if spec["doc_id"] == spec_id: |
|
doc.append(f"{spec['section']}\n{spec['content']}") |
|
return "\n\n".join(doc) |
|
|
|
def get_structured_doc(spec_id: str): |
|
doc = {} |
|
for spec in spec_contents: |
|
if spec["doc_id"] == spec_id: |
|
doc[spec["section"]] = spec["content"] |
|
return doc |
|
|
|
|
|
class SpecRequest(BaseModel): |
|
spec_id: str |
|
|
|
@app.post("/get_full_text") |
|
def get_text(request: SpecRequest): |
|
specification = request.spec_id |
|
if is_doc_indexed(specification): |
|
return get_full_doc(specification) |
|
print(f"[WARNING] Document no. {specification} not indexed or is a TDoc, if it's a specification, try to reindex") |
|
total_file = [] |
|
url = requests.post( |
|
"https://organizedprogrammers-3gppdocfinder.hf.space/find", |
|
verify=False, |
|
headers={"Content-Type": "application/json"}, |
|
json={"doc_id": specification} |
|
) |
|
|
|
if url.status_code != 200: |
|
raise HTTPException(404, detail="Not found") |
|
|
|
url = url.json()['url'] |
|
response = requests.get( |
|
url, |
|
verify=False, |
|
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"} |
|
|
|
) |
|
|
|
zip_bytes = BytesIO(response.content) |
|
current_zip_file = zipfile.ZipFile(zip_bytes) |
|
for file_info in current_zip_file.infolist(): |
|
if file_info.filename.endswith(".zip") and len(current_zip_file.namelist()) == 1: |
|
nested_zip_bytes = BytesIO(current_zip_file.read(file_info.filename)) |
|
current_zip_file = zipfile.ZipFile(nested_zip_bytes) |
|
break |
|
|
|
for file_info in current_zip_file.infolist(): |
|
filename = file_info.filename |
|
if (filename.endswith('.doc') or filename.endswith('.docx')) and ("cover" not in filename.lower() and "annex" not in filename.lower()): |
|
doc_bytes = current_zip_file.read(filename) |
|
ext = filename.split(".")[-1] |
|
input_path = f"/tmp/{specification}.{ext}" |
|
output_path = f"/tmp/{specification}.txt" |
|
with open(input_path, "wb") as f: |
|
f.write(doc_bytes) |
|
|
|
subprocess.run([ |
|
"libreoffice", |
|
"--headless", |
|
"--convert-to", "txt", |
|
"--outdir", "/tmp", |
|
input_path |
|
], check=True) |
|
|
|
with open(output_path, "r") as f: |
|
txt_data = [line.strip() for line in f if line.strip()] |
|
|
|
os.remove(input_path) |
|
os.remove(output_path) |
|
total_file.extend(txt_data) |
|
if total_file == []: |
|
raise HTTPException(status_code=404, detail="Not found !") |
|
else: |
|
return total_file |
|
|
|
@app.post("/get_spec_content") |
|
def get_spec_content(request: SpecRequest): |
|
if is_doc_indexed(request.spec_id): |
|
return get_structured_doc(request.spec_id) |
|
text = get_text(request) |
|
chapters = [] |
|
chapter_regex = re.compile(r"^(\d+[a-z]?(?:\.\d+)*)\t[A-Z0-9][\ \S]+$") |
|
|
|
for i, line in enumerate(text): |
|
if chapter_regex.fullmatch(line): |
|
chapters.append((i, line)) |
|
|
|
document = {} |
|
for i in range(len(chapters)): |
|
start_index, chapter_title = chapters[i] |
|
end_index = chapters[i+1][0] if i+1 < len(chapters) else len(text) |
|
content_lines = text[start_index + 1 : end_index] |
|
document[chapter_title.replace('\t', " ")] = "\n".join(content_lines) |
|
|
|
return document |