Spaces:
Sleeping
Sleeping
from datetime import datetime | |
import os | |
import warnings | |
import traceback | |
import gradio as gr | |
import subprocess | |
from huggingface_hub import Repository | |
from git import Repo | |
import requests | |
warnings.filterwarnings('ignore') | |
DOC_INDEXER = "indexer_multi.py" | |
SPEC_INDEXER = "spec_indexer_multi.py" | |
SPEC_DOC_INDEXER = "spec_doc_indexer_multi.py" | |
BM25_INDEXER = "bm25_maker.py" | |
DOC_INDEX_FILE = "indexed_docs.json" | |
SPEC_INDEX_FILE = "indexed_specifications.json" | |
SPEC_DOC_INDEX_FILE = "indexed_docs_content.zip" | |
BM25_INDEX_FILE = "bm25s.zip" | |
HF_SEARCH_REPO = "OrganizedProgrammers/3GPPDocFinder" | |
REPO_DIR = os.path.dirname(os.path.abspath(__file__)) | |
def get_docs_stats(): | |
if os.path.exists(DOC_INDEX_FILE): | |
import json | |
with open(DOC_INDEX_FILE, 'r', encoding='utf-8') as f: | |
data = json.load(f) | |
return len(data["docs"]) | |
return 0 | |
def get_specs_stats(): | |
if os.path.exists(SPEC_INDEX_FILE): | |
import json | |
with open(SPEC_INDEX_FILE, 'r', encoding='utf-8') as f: | |
data = json.load(f) | |
return len(data["specs"]) | |
return 0 | |
def get_scopes_stats(): | |
if os.path.exists(SPEC_INDEX_FILE): | |
import json | |
with open(SPEC_INDEX_FILE, 'r', encoding="utf-8") as f: | |
data = json.load(f) | |
return len(data['scopes']) | |
return 0 | |
def check_permissions(user: str, token: str): | |
try: | |
req = requests.get("https://huggingface.co/api/whoami-v2", verify=False, headers={"Accept": "application/json", "Authorization": f"Bearer {token}"}) | |
if req.status_code != 200: | |
return False | |
reqJson: dict = req.json() | |
if not reqJson.get("name") or reqJson['name'] != user: | |
return False | |
if not reqJson.get("orgs") or len(reqJson['orgs']) == 0: | |
return False | |
for org in reqJson['orgs']: | |
if "645cfa1b5ebf379fd6d8a339" == org['id']: | |
return True | |
if not reqJson.get('auth') or reqJson['auth'] == {}: | |
return False | |
if reqJson['auth']['accessToken']['role'] != "fineGrained": | |
return False | |
for scope in reqJson['auth']['accessToken']['fineGrained']['scoped']: | |
if scope['entity']['type'] == "org" and scope['entity']['_id'] == "645cfa1b5ebf379fd6d8a339" and all(perm in scope['permissions'] for perm in ['repo.write', 'repo.content.read']): | |
return True | |
return False | |
except Exception as e: | |
traceback.print_exception(e) | |
return False | |
def update_logged(user: str, token: str): | |
if check_permissions(user, token): | |
return gr.update(visible=False), gr.update(visible=True), gr.update(visible=True), gr.update(visible=True) | |
else: | |
return gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) | |
def commit_and_push_3gppindexers(user, token, files, message, current_log=""): | |
log = current_log + "\n" | |
repo = Repo(REPO_DIR) | |
origin = repo.remotes.origin | |
repo.config_writer().set_value("user", "name", "3GPP Indexer Automatic Git Tool").release() | |
repo.config_writer().set_value("user", "email", "example@mail.org").release() | |
origin.pull() | |
log += "Git pull succeed !\n" | |
yield log | |
repo.git.add(files) | |
repo.index.commit(message) | |
try: | |
repo.git.push(f"https://{user}:{token}@huggingface.co/spaces/OrganizedProgrammers/3GPPIndexers") | |
log += "Git push succeed !\n" | |
yield log | |
log += "Wait for Huggingface to restart the Space\n" | |
yield log | |
except Exception as e: | |
log += f"Git push failed: {e}\n" | |
yield log | |
def commit_and_push_3gppdocfinder(token, files, message, current_log=""): | |
log = current_log + "\n" | |
if not token: | |
log += "No token provided. Skipping HuggingFace push.\n" | |
yield log | |
return | |
hf_repo_dir = os.path.join(REPO_DIR, "hf_spaces") | |
repo = None | |
if not os.path.exists(hf_repo_dir): | |
repo = Repository( | |
local_dir=hf_repo_dir, | |
repo_type="space", | |
clone_from=HF_SEARCH_REPO, | |
git_user="3GPP Indexer Automatic Git Tool", | |
git_email="example@mail.org", | |
token=token, | |
skip_lfs_files=True | |
) | |
else: | |
repo = Repository( | |
local_dir=hf_repo_dir, | |
repo_type="space", | |
git_user="3GPP Indexer Automatic Git Tool", | |
git_email="example@mail.org", | |
token=token, | |
skip_lfs_files=True | |
) | |
repo.git_pull() | |
# Copy artifact files to huggingface space | |
for f in files: | |
import shutil | |
shutil.copy2(f, os.path.join(hf_repo_dir, f)) | |
repo.git_add(auto_lfs_track=True) | |
repo.git_commit(message) | |
repo.git_push() | |
log += "Pushed to HuggingFace.\n" | |
yield log | |
def refresh_stats(): | |
return str(get_docs_stats()), str(get_specs_stats()), str(get_scopes_stats()) | |
def stream_script_output(script_path, current_log=""): | |
accumulated_output = current_log | |
process = subprocess.Popen( | |
["python", script_path], | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, | |
bufsize=1, | |
universal_newlines=True, | |
) | |
for line in process.stdout: | |
accumulated_output += line | |
yield accumulated_output | |
process.stdout.close() | |
process.wait() | |
yield accumulated_output | |
def index_documents(user, token): | |
log_output = "⏳ Indexation en cours...\n" | |
# Désactiver tous les boutons | |
yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log_output | |
# Lancer l'indexation | |
if not check_permissions(user, token): | |
log_output += "❌ Identifiants invalides\n" | |
yield gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True), log_output | |
return | |
for log in stream_script_output(DOC_INDEXER, log_output): | |
yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log | |
log_output = log | |
d = datetime.today().strftime("%d/%m/%Y-%H:%M:%S") | |
for log in commit_and_push_3gppdocfinder(token, [DOC_INDEX_FILE], f"Update documents indexer via Indexer: {d}", log_output): | |
yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log | |
log_output = log | |
for log in commit_and_push_3gppindexers(user, token, [DOC_INDEX_FILE], f"Update documents indexer via Indexer: {d}", log_output): | |
yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log | |
log_output = log | |
# Réactiver les boutons à la fin | |
log_output += "✅ Terminé.\n" | |
yield gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True), log_output | |
def index_specifications(user, token): | |
log_output = "⏳ Indexation en cours...\n" | |
# Désactiver tous les boutons | |
yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log_output | |
# Lancer l'indexation | |
if not check_permissions(user, token): | |
log_output += "❌ Identifiants invalides\n" | |
yield gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True), log_output | |
return | |
for log in stream_script_output(SPEC_INDEXER, log_output): | |
yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log | |
log_output = log | |
for log in stream_script_output(SPEC_DOC_INDEXER, log_output): | |
yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log | |
log_output = log | |
for log in stream_script_output(BM25_INDEXER, log_output): | |
yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log | |
log_output = log | |
d = datetime.today().strftime("%d/%m/%Y-%H:%M:%S") | |
for log in commit_and_push_3gppdocfinder(token, [SPEC_DOC_INDEX_FILE, BM25_INDEX_FILE, SPEC_INDEX_FILE], f"Update specifications indexer via Indexer: {d}", log_output): | |
yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log | |
log_output = log | |
for log in commit_and_push_3gppindexers(user, token, [SPEC_DOC_INDEX_FILE, BM25_INDEX_FILE, SPEC_INDEX_FILE], f"Update specifications indexer via Indexer: {d}", log_output): | |
yield gr.update(interactive=False), gr.update(interactive=False), gr.update(interactive=False), log | |
log_output = log | |
# Réactiver les boutons à la fin | |
log_output += "✅ Terminé.\n" | |
yield gr.update(interactive=True), gr.update(interactive=True), gr.update(interactive=True), log_output | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown("## 📄 3GPP Indexers") | |
with gr.Row() as r1: | |
with gr.Column(): | |
git_user = gr.Textbox(label="Git user (for push/pull indexes)") | |
git_pass = gr.Textbox(label="Git Token", type="password") | |
btn_login = gr.Button("Login", variant="primary") | |
with gr.Row(visible=False) as r2: | |
with gr.Column(): | |
doc_count = gr.Textbox(label="Docs Indexed", value=str(get_docs_stats()), interactive=False) | |
btn_docs = gr.Button("Re-index Documents", variant="primary") | |
with gr.Column(): | |
spec_count = gr.Textbox(label="Specs Indexed", value=str(get_specs_stats()), interactive=False) | |
btn_specs = gr.Button("Re-index Specifications", variant="primary") | |
with gr.Column(): | |
scope_count = gr.Textbox(label="Scopes Indexed", value=str(get_scopes_stats()), interactive=False) | |
out = gr.Textbox(label="Output/Log", lines=13, autoscroll=True, visible=False) | |
refresh = gr.Button(value="🔄 Refresh Stats", visible=False) | |
btn_login.click(update_logged, inputs=[git_user, git_pass], outputs=[r1, r2, out, refresh]) | |
btn_docs.click(index_documents, inputs=[git_user, git_pass], outputs=[btn_docs, btn_specs, refresh, out]) | |
btn_specs.click(index_specifications, inputs=[git_user, git_pass], outputs=[btn_docs, btn_specs, refresh, out]) | |
refresh.click(refresh_stats, outputs=[doc_count, spec_count, scope_count]) | |
demo.launch() | |