|
import subprocess |
|
from typing import Dict, List, Any |
|
import os |
|
import json |
|
import logging |
|
import sys |
|
import tempfile |
|
import time |
|
from pathlib import Path |
|
import re |
|
import shutil |
|
import stat |
|
import subprocess |
|
|
|
import uuid |
|
from contextlib import contextmanager |
|
|
|
import requests |
|
logging.basicConfig(format="%(asctime)s - %(message)s", level=logging.INFO) |
|
|
|
git_clone_command = "git clone https://github.com/OpenPecha/tibetan-aligner" |
|
|
|
|
|
try: |
|
subprocess.run(git_clone_command, shell=True, check=True) |
|
print("Git clone successful!") |
|
except subprocess.CalledProcessError as e: |
|
print(f"Error while running Git clone command: {e}") |
|
|
|
|
|
ALIGNER_SCRIPT_DIR = Path("./tibetan-aligner").resolve() |
|
ALIGNER_SCRIPT_NAME = "align_tib_en.sh" |
|
ALIGNER_SCRIPT_PATH = ALIGNER_SCRIPT_DIR / ALIGNER_SCRIPT_NAME |
|
assert ALIGNER_SCRIPT_PATH.is_file() |
|
|
|
import requests |
|
|
|
GITHUB_USERNAME = "pechawa" |
|
GITHUB_ACCESS_TOKEN = "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" |
|
GITHUB_TOKEN = "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" |
|
GITHUB_EMAIL = "openpecha-bot@openpecha.org" |
|
GITHUB_ORG = "MonlamAI" |
|
MAI_TM_PUBLISH_TODO_REPO = "MonlamAI_TMs_Publish_TODO" |
|
GITHUB_API_ENDPOINT = f"https://api.github.com/orgs/{GITHUB_ORG}/repos" |
|
|
|
DEBUG = False |
|
|
|
quiet = "-q" if DEBUG else "" |
|
def make_dir_executable(dir_path: Path): |
|
for fn in dir_path.iterdir(): |
|
st = os.stat(fn) |
|
os.chmod(fn, st.st_mode | stat.S_IEXEC) |
|
st = os.stat(fn) |
|
os.chmod(fn, st.st_mode | stat.S_IXGRP) |
|
st = os.stat(fn) |
|
os.chmod(fn, st.st_mode | stat.S_IXOTH) |
|
|
|
|
|
make_dir_executable(ALIGNER_SCRIPT_DIR) |
|
|
|
|
|
def create_github_repo(repo_path: Path, repo_name: str): |
|
logging.info("[INFO] Creating GitHub repo...") |
|
|
|
|
|
subprocess.run(f"git config --global user.name {GITHUB_USERNAME}".split()) |
|
subprocess.run(f"git config --global user.email {GITHUB_EMAIL}".split()) |
|
|
|
|
|
subprocess.run(f"git init {quiet}".split(), cwd=str(repo_path)) |
|
|
|
|
|
subprocess.run("git add . ".split(), cwd=str(repo_path)) |
|
subprocess.run( |
|
f"git commit {quiet} -m".split() + ["Initial commit"], cwd=str(repo_path) |
|
) |
|
|
|
|
|
response = requests.post( |
|
GITHUB_API_ENDPOINT, |
|
json={ |
|
"name": repo_name, |
|
"private": True, |
|
}, |
|
auth=(GITHUB_USERNAME, GITHUB_ACCESS_TOKEN), |
|
) |
|
response.raise_for_status() |
|
|
|
time.sleep(3) |
|
|
|
|
|
remote_url = f"https://{GITHUB_ORG}:{GITHUB_ACCESS_TOKEN}@github.com/{GITHUB_ORG}/{repo_name}.git" |
|
subprocess.run( |
|
f"git remote add origin {remote_url}", cwd=str(repo_path), shell=True |
|
) |
|
|
|
subprocess.run("git branch -M main".split(), cwd=str(repo_path)) |
|
subprocess.run(f"git push {quiet} -u origin main".split(), cwd=str(repo_path)) |
|
|
|
return response.json()["html_url"] |
|
|
|
|
|
def convert_raw_align_to_tm(align_fn: Path, tm_path: Path): |
|
if DEBUG: |
|
logging.debug("[INFO] Conerting raw alignment to TM repo...") |
|
|
|
def load_alignment(fn: Path): |
|
content = fn.read_text() |
|
print("Content !!! \n\n"+content) |
|
if not content: |
|
return [] |
|
|
|
for seg_pair in content.splitlines(): |
|
if not seg_pair: |
|
continue |
|
|
|
if "\t" in seg_pair: |
|
try: |
|
bo_seg, en_seg = seg_pair.split("\t", 1) |
|
except Exception as e: |
|
logging.error(f"{e} in {fn}") |
|
raise |
|
|
|
else: |
|
bo_seg = seg_pair |
|
en_seg = "\n" |
|
yield bo_seg, en_seg |
|
|
|
text_bo_fn = tm_path / f"{tm_path.name}-bo.txt" |
|
text_en_fn = tm_path / f"{tm_path.name}-en.txt" |
|
|
|
with open(text_bo_fn, "w", encoding="utf-8") as bo_file, open( |
|
text_en_fn, "w", encoding="utf-8" |
|
) as en_file: |
|
for bo_seg, en_seg in load_alignment(align_fn): |
|
bo_file.write(bo_seg + "\n") |
|
en_file.write(en_seg + "\n") |
|
|
|
return tm_path |
|
|
|
|
|
def get_github_dev_url(raw_github_url: str) -> str: |
|
base_url = "https://github.dev" |
|
_, file_path = raw_github_url.split(".com") |
|
blob_file_path = file_path.replace("main", "blob/main") |
|
return base_url + blob_file_path |
|
|
|
|
|
def add_input_in_readme(input_dict: Dict[str, str], path: Path) -> Path: |
|
input_readme_fn = path / "README.md" |
|
text_id = input_dict["text_id"] |
|
bo_file_url = get_github_dev_url(input_dict["bo_file_url"]) |
|
en_file_url = get_github_dev_url(input_dict["en_file_url"]) |
|
input_string = "## Input\n- [BO{}]({})\n- [EN{}]({})".format( |
|
text_id, bo_file_url, text_id, en_file_url |
|
) |
|
|
|
input_readme_fn.write_text(input_string) |
|
|
|
return path |
|
|
|
def add_to_publish_todo_repo(org, repo_name, file_path, access_token): |
|
base_url = f"https://api.github.com/repos/{org}/{repo_name}/contents/" |
|
|
|
headers = { |
|
"Authorization": f"Bearer {access_token}", |
|
"Accept": "application/vnd.github.v3+json", |
|
} |
|
|
|
url = base_url + file_path |
|
|
|
response = requests.get(url, headers=headers) |
|
|
|
if response.status_code == 200: |
|
print(f"[INFO] '{file_path}' already added.") |
|
return |
|
|
|
payload = {"message": f"Add {file_path}", "content": ""} |
|
|
|
response = requests.put(url, headers=headers, json=payload) |
|
|
|
if response.status_code == 201: |
|
print(f"[INFO] '{file_path}' added to publish todo") |
|
else: |
|
print(f"[ERROR] Failed to add '{file_path}'.") |
|
print(f"[ERROR] Response: {response.text}") |
|
|
|
|
|
def create_tm(align_fn: Path, text_pair: Dict[str, str]): |
|
align_fn = Path(align_fn) |
|
text_id = text_pair["text_id"] |
|
with tempfile.TemporaryDirectory() as tmp_dir: |
|
output_dir = Path(tmp_dir) |
|
repo_name = f"TM{text_id}" |
|
tm_path = output_dir / repo_name |
|
tm_path.mkdir(exist_ok=True, parents=True) |
|
repo_path = convert_raw_align_to_tm(align_fn, tm_path) |
|
repo_path = add_input_in_readme(text_pair, tm_path) |
|
repo_url = create_github_repo(repo_path, repo_name) |
|
logging.info(f"TM repo created: {repo_url}") |
|
add_to_publish_todo_repo(GITHUB_ORG, MAI_TM_PUBLISH_TODO_REPO, repo_name, GITHUB_ACCESS_TOKEN) |
|
return repo_url |
|
|
|
|
|
|
|
|
|
@contextmanager |
|
def TemporaryDirectory(): |
|
tmpdir = Path("./output").resolve() / uuid.uuid4().hex[:8] |
|
tmpdir.mkdir(exist_ok=True, parents=True) |
|
try: |
|
yield tmpdir |
|
finally: |
|
shutil.rmtree(str(tmpdir)) |
|
|
|
|
|
def download_file(s3_public_url: str, output_fn) -> Path: |
|
"""Download file from a public S3 bucket URL.""" |
|
with requests.get(s3_public_url, stream=True) as r: |
|
r.raise_for_status() |
|
with open(output_fn, "wb") as f: |
|
for chunk in r.iter_content(chunk_size=8192): |
|
f.write(chunk) |
|
return output_fn |
|
|
|
|
|
def _run_align_script(bo_fn, en_fn, output_dir): |
|
start = time.time() |
|
cmd = [str(ALIGNER_SCRIPT_PATH), str(bo_fn), str(en_fn), str(output_dir)] |
|
output = subprocess.run( |
|
cmd, |
|
check=True, |
|
capture_output=True, |
|
text=True, |
|
cwd=str(ALIGNER_SCRIPT_DIR), |
|
) |
|
output_fn = re.search(r"\[OUTPUT\] (.*)", output.stdout).group(1) |
|
output_fn = "/" + output_fn.split("//")[-1] |
|
end = time.time() |
|
total_time = round((end - start) / 60, 2) |
|
logging.info(f"Total time taken for Aligning: {total_time} mins") |
|
return output_fn |
|
def align(text_pair): |
|
logging.info(f"Running aligner for TM{text_pair['text_id']}...") |
|
with TemporaryDirectory() as tmpdir: |
|
output_dir = Path(tmpdir) |
|
bo_fn = download_file(text_pair["bo_file_url"], output_fn=output_dir / "bo.tx") |
|
en_fn = download_file(text_pair["en_file_url"], output_fn=output_dir / "en.tx") |
|
print("bo_fn: ", bo_fn) |
|
print("en_fn: ", en_fn) |
|
aligned_fn = _run_align_script(bo_fn, en_fn, output_dir) |
|
print("aligned_fn: ", aligned_fn) |
|
repo_url = create_tm(aligned_fn, text_pair=text_pair) |
|
return {"tm_repo_url": repo_url} |
|
|
|
class EndpointHandler(): |
|
def __init__(self, path=""): |
|
self.path = path |
|
|
|
def __call__(self, data: Any) -> List[List[Dict[str, float]]]: |
|
""" |
|
Args: |
|
data (:obj:): |
|
includes the input data and the parameters for the inference. |
|
Return: |
|
A :obj:`list`:. The list contains the embeddings of the inference inputs |
|
""" |
|
return align(data) |
|
|
|
|