import os import re import time import dotenv import pandas as pd import requests import schedule import srsly from bs4 import BeautifulSoup from datasets import Dataset, load_dataset from huggingface_hub import create_repo, login, whoami from retry import retry from tqdm.auto import tqdm dotenv.load_dotenv() login(token=os.environ.get("HF_TOKEN")) hf_user = whoami(os.environ.get("HF_TOKEN"))["name"] HF_REPO_ID = f"{hf_user}/zotero-answer-ai-articles" ######################################################## ### GET ZOTERO ITEMS ######################################################## @retry(tries=3, delay=8) def _fetch_one_zotero_batch(url, headers, params): """ Fetch articles from Zotero API """ response = requests.get(url, headers=headers, params=params) response.raise_for_status() return response.json() def get_zotero_items(debug=False): """ fetch items from zotero library """ GROUP_ID = os.getenv("GROUP_ID") API_KEY = os.getenv("API_KEY") BASE_URL = f"https://api.zotero.org/groups/{GROUP_ID}/items" LIMIT = 100 headers = {"Zotero-API-Key": API_KEY, "Content-Type": "application/json"} items = [] start = 0 i = 1 while True: i += 1 params = {"limit": LIMIT, "start": start} page_items = _fetch_one_zotero_batch(BASE_URL, headers, params) if not page_items: break items.extend(page_items) start += LIMIT print(f"# items fetched {len(items)}") if debug: if len(items) > 300: break return items ######################################################## ### EXTRACT ARXIV LINKS AND PDFs ######################################################## def get_arxiv_items(items): visited = set() arxiv_items = [] arxiv_pattern = re.compile(r"arxiv.org/abs/(\d+\.\d+)") for item in items: data = item.get("data", {}) attachments = item.get("links", {}).get("attachment", {}) arxiv_url = None pdf_url = None if "url" in data and "arxiv.org" in data["url"]: arxiv_match = arxiv_pattern.search(data["url"]) if arxiv_match: arxiv_url = data["url"] if attachments: pdf_url = attachments["href"] if arxiv_url: arxiv_id = arxiv_url.split("/")[-1] if arxiv_id in visited: continue arxiv_items.append( { "arxiv_id": arxiv_id, "arxiv_url": arxiv_url, "pdf_url": pdf_url, "added_by": item["meta"]["createdByUser"]["username"], "date_added": data.get("dateAdded", ""), } ) visited.add(arxiv_id) return arxiv_items @retry(tries=3, delay=15, backoff=2) def fetch_arxiv_html(arxiv_id): url = f"https://ar5iv.labs.arxiv.org/html/{arxiv_id.split('v')[0]}" response = requests.get(url) return response.text if response.status_code == 200 else None def fetch_arxiv_htmls(arxiv_items): for item in tqdm(arxiv_items): html = fetch_arxiv_html(item["arxiv_id"]) if html: item["raw_html"] = html else: print(f"failed to fetch html for {item['arxiv_id']}") item["raw_html"] = "Error" return arxiv_items ######################################################## ### PARSE CONTENT FROM ARXIV HTML # ######################################################## def parse_html_content(html): """ Parse content from arxiv html """ arxiv_id_match = re.search(r"\[(\d+\.\d+(v\d+)?)\]", html) arxiv_id = arxiv_id_match.group(1) if arxiv_id_match else None soup = BeautifulSoup(html, "html.parser") result = [] # Extract paper title try: paper_title = soup.find("h1", class_="ltx_title ltx_title_document").get_text(strip=True) except Exception: paper_title = soup.find("title").get_text(strip=True) paper_title = re.sub(r"^\[\d+\.\d+(v\d+)?\]\s*", "", paper_title) for math in soup.find_all("math"): math.decompose() for cite in soup.find_all("cite"): cite.decompose() # Extract abstract abstract = soup.find("div", class_="ltx_abstract") if abstract: result.append( { "content": " ".join(p.get_text(strip=True) for p in abstract.find_all("p")).replace(")", ") "), "title": "Abstract", "paper_title": paper_title, "content_type": "abstract", } ) # Extract sections sections = soup.find_all("section", class_="ltx_section") for index, section in enumerate(sections): section_title = section.find("h2", class_="ltx_title ltx_title_section") section_title = section_title.get_text(strip=True) if section_title else f"Section {index + 1}" section_content = section.get_text(strip=True).replace(")", ") ") content_type = "body" if index == 0: content_type = "introduction" elif index == len(sections) - 1: content_type = "conclusion" result.append( { "content": section_content, "title": section_title, "paper_title": paper_title, "content_type": content_type, } ) for c in result: c["arxiv_id"] = arxiv_id return result ######################################################## ### GET TEXTS FROM PDF & PARSE ######################################################## def get_pdf_text(arxiv_id): url = "http://147.189.194.113:80/extract" # fix: currently down try: response = requests.get(url, params={"arxiv_id": arxiv_id}) response = response.json() if "text" in response: return response["text"] return None except Exception as e: print(e) return None def get_content_type(section_type, section_count): """Determine the content type based on the section type and count""" if section_type == "abstract": return "abstract" elif section_type == "introduction" or section_count == 1: return "introduction" elif section_type == "conclusion" or section_type == "references": return section_type else: return "body" def get_section_type(title): """Determine the section type based on the title""" title_lower = title.lower() if "abstract" in title_lower: return "abstract" elif "introduction" in title_lower: return "introduction" elif "conclusion" in title_lower: return "conclusion" elif "reference" in title_lower: return "references" else: return "body" def parse_markdown_content(md_content, arxiv_id): """ Parses markdown content to identify and extract sections based on headers. """ lines = md_content.split("\n") parsed = [] current_section = None content = [] paper_title = None current_title = None # identify sections based on headers for line in lines: if line.startswith("#"): if paper_title is None: paper_title = line.lstrip("#").strip() continue if content: if current_title: parsed.append( { "content": " ".join(content), "title": current_title, "paper_title": paper_title, "content_type": get_content_type(current_section, len(parsed)), "arxiv_id": arxiv_id, } ) content = [] current_title = line.lstrip("#").lstrip("#").lstrip() if "bit" not in current_title: current_title = ( current_title.lstrip("123456789") .lstrip() .lstrip(".") .lstrip() .lstrip("123456789") .lstrip() .lstrip(".") .lstrip() ) current_section = get_section_type(current_title) else: content.append(line) # Add the last section if content and current_title: parsed.append( { "content": " ".join(content).replace(")", ") "), "title": current_title, "paper_title": paper_title, "content_type": get_content_type(current_section, len(parsed)), "arxiv_id": arxiv_id, } ) return parsed ######################################################## ### HF UPLOAD ######################################################## def upload_to_hf(abstract_df, contents_df, processed_arxiv_ids): repo_id = HF_REPO_ID create_repo( repo_id=repo_id, token=os.environ.get("HF_TOKEN"), private=True, repo_type="dataset", exist_ok=True, ) # push id_to_abstract abstract_ds = Dataset.from_pandas(abstract_df) abstract_ds.push_to_hub(repo_id, "abstracts", token=os.environ.get("HF_TOKEN")) # push arxiv_items arxiv_ds = Dataset.from_pandas(contents_df) arxiv_ds.push_to_hub(repo_id, "articles", token=os.environ.get("HF_TOKEN")) # push processed_arxiv_ids processed_arxiv_ids = [{"arxiv_id": arxiv_id} for arxiv_id in processed_arxiv_ids] processed_arxiv_ids_ds = Dataset.from_list(processed_arxiv_ids) processed_arxiv_ids_ds.push_to_hub(repo_id, "processed_arxiv_ids", token=os.environ.get("HF_TOKEN")) ######################################################## ### MAIN ######################################################## def main(): items = get_zotero_items(debug=True) print(f"# of items fetched from zotero: {len(items)}") arxiv_items = get_arxiv_items(items) print(f"# of arxiv papers: {len(arxiv_items)}") # get already processed arxiv ids from HF try: existing_arxiv_ids = load_dataset(HF_REPO_ID, "processed_arxiv_ids")["train"]["arxiv_id"] except Exception as e: print(e) try: existing_arxiv_ids = srsly.read_json("data/processed_arxiv_ids.json") except Exception as e: print(e) existing_arxiv_ids = [] existing_arxiv_ids = set(existing_arxiv_ids) print(f"# of existing arxiv ids: {len(existing_arxiv_ids)}") # new arxiv items arxiv_items = [item for item in arxiv_items if item["arxiv_id"] not in existing_arxiv_ids] arxiv_items = fetch_arxiv_htmls(arxiv_items) print(f"# of new arxiv items: {len(arxiv_items)}") processed_arxiv_ids = set() for item in arxiv_items: try: item["contents"] = parse_html_content(item["raw_html"]) processed_arxiv_ids.add(item["arxiv_id"]) except Exception as e: print(f"Failed to parse html for {item['arxiv_id']}: {e}") item["contents"] = [] if len(item["contents"]) == 0: print("Extracting from pdf...") md_content = get_pdf_text(item["arxiv_id"]) # fix this if md_content: item["contents"] = parse_markdown_content(md_content, item["arxiv_id"]) processed_arxiv_ids.add(item["arxiv_id"]) else: item["contents"] = [] # save contents --- processed_arxiv_ids = list(processed_arxiv_ids) print(f"# of processed arxiv ids: {len(processed_arxiv_ids)}") # save abstracts --- id_to_abstract = {} for item in arxiv_items: for entry in item["contents"]: if entry["content_type"] == "abstract": id_to_abstract[item["arxiv_id"]] = entry["content"] break print(f"# of abstracts: {len(id_to_abstract)}") abstract_df = pd.Series(id_to_abstract).reset_index().rename(columns={"index": "arxiv_id", 0: "abstract"}) print(abstract_df.head()) # add to existing dataset try: old_abstract_df = load_dataset(HF_REPO_ID, "abstracts")["train"].to_pandas() except Exception as e: print(e) old_abstract_df = pd.DataFrame(columns=abstract_df.columns) print(old_abstract_df.head()) abstract_df = pd.concat([old_abstract_df, abstract_df]).reset_index(drop=True) abstract_df = abstract_df.drop_duplicates(subset=["arxiv_id"], keep="last").reset_index(drop=True) # contents contents_df = pd.DataFrame(arxiv_items) print(contents_df.head()) try: old_contents_df = load_dataset(HF_REPO_ID, "articles")["train"].to_pandas() except Exception as e: print(e) old_contents_df = pd.DataFrame(columns=contents_df.columns) if len(old_contents_df) > 0: print(old_contents_df.sample().T) contents_df = pd.concat([old_contents_df, contents_df]).reset_index(drop=True) contents_df = contents_df.drop_duplicates(subset=["arxiv_id"], keep="last").reset_index(drop=True) # upload to hf processed_arxiv_ids = list(set(processed_arxiv_ids + list(processed_arxiv_ids))) upload_to_hf(abstract_df, contents_df, processed_arxiv_ids) # save as local copy os.makedirs("data", exist_ok=True) abstract_df.to_parquet("data/abstracts.parquet") contents_df.to_parquet("data/contents.parquet") srsly.write_json("data/processed_arxiv_ids.json", processed_arxiv_ids) def schedule_periodic_task(): """ Schedule the main task to run at the user-defined frequency """ main() # run once initially frequency = "daily" # TODO: env if frequency == "hourly": print("Scheduling tasks to run every hour at the top of the hour") schedule.every().hour.at(":00").do(main) elif frequency == "daily": start_time = "10:00" print("Scheduling tasks to run every day at: {start_time} UTC+00") schedule.every().day.at(start_time).do(main) while True: schedule.run_pending() time.sleep(1) if __name__ == "__main__": schedule_periodic_task()