Spaces:
Running
Running
import requests | |
from bs4 import BeautifulSoup | |
from faker import Faker | |
from urllib.request import urlretrieve | |
import urllib.request | |
from urllib3.util.retry import Retry | |
import time | |
import os | |
import wget | |
import json | |
import unicodedata | |
import nltk | |
from sklearn.datasets import fetch_20newsgroups | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.linear_model import LogisticRegression | |
from sklearn.multiclass import OneVsRestClassifier | |
import warnings | |
from requests.adapters import HTTPAdapter | |
from constants import * | |
MAX_XDD = 5 | |
use_google_search = True | |
use_20newsgroup = True | |
fake = Faker() | |
def create_retry_session(): | |
retry_strategy = Retry( | |
total=5, | |
status_forcelist=[429, 500, 502, 503, 504], | |
method_whitelist=["GET"], | |
backoff_factor=1, | |
) | |
adapter = HTTPAdapter(max_retries=retry_strategy) | |
http = requests.Session() | |
http.mount("https://", adapter) | |
http.mount("http://", adapter) | |
return http | |
def get_google_search_results(query, retry_session): | |
if not use_google_search: | |
return [] | |
headers = {"User-Agent": fake.user_agent()} | |
search_url = f"https://www.google.com/search?q={query}" | |
try: | |
response = retry_session.get(search_url, headers=headers, timeout=10) | |
response.raise_for_status() | |
except requests.exceptions.RequestException as e: | |
return [] | |
soup = BeautifulSoup(response.text, "html.parser") | |
search_results = [] | |
for a_tag in soup.find_all('a', href=True): | |
if 'url?q=' in a_tag['href'] and not a_tag['href'].startswith("https://accounts.google.com"): | |
search_results.append(a_tag['href'].split('url?q=')[1].split('&')[0]) | |
return search_results | |
def fetch_20newsgroup_data(): | |
if not use_20newsgroup: | |
return [] | |
try: | |
newsgroups_train = fetch_20newsgroups(subset='train', categories=['talk.trivia', 'rec.sport.baseball', 'sci.med', 'comp.sys.ibm.pc.hardware', 'soc.religion.christian']) | |
data = newsgroups_train.data | |
return data | |
except Exception as e: | |
return [] | |
def download_file(url, filename, folder, retries=3): | |
filepath = os.path.join(folder, filename) | |
if os.path.exists(filepath): | |
return True | |
os.makedirs(folder, exist_ok=True) | |
for attempt in range(retries): | |
try: | |
wget.download(url, out=filepath) | |
return True | |
except Exception as e: | |
if attempt < retries - 1: | |
time.sleep(2) | |
else: | |
return False | |
return False | |
def download_gpt2_files(folder, model_url, model_file, encoder_url, encoder_file, vocab_url, vocab_file): | |
if not os.path.exists(folder): | |
os.makedirs(folder) | |
if not os.path.exists(os.path.join(folder, model_file)): | |
download_file(model_url, model_file, folder) | |
if not os.path.exists(os.path.join(folder, encoder_file)): | |
download_file(encoder_url, encoder_file, folder) | |
if not os.path.exists(os.path.join(folder, vocab_file)): | |
download_file(vocab_url, vocab_file, folder) | |
def download_translation_files(folder, model_files_urls): | |
if not os.path.exists(folder): | |
os.makedirs(folder) | |
for url, filename in model_files_urls: | |
if not os.path.exists(os.path.join(folder, filename)): | |
download_file(url, filename, folder) | |
def download_codegen_files(folder, model_files_urls): | |
if not os.path.exists(folder): | |
os.makedirs(folder) | |
for url, filename in model_files_urls: | |
if not os.path.exists(os.path.join(folder, filename)): | |
download_file(url, filename, folder) | |
def download_summarization_files(folder, model_files_urls): | |
if not os.path.exists(folder): | |
os.makedirs(folder) | |
for url, filename in model_files_urls: | |
if not os.path.exists(os.path.join(folder, filename)): | |
download_file(url, filename, folder) | |
def download_imagegen_files(folder, model_files_urls): | |
if not os.path.exists(folder): | |
os.makedirs(folder) | |
for url, filename in model_files_urls: | |
if not os.path.exists(os.path.join(folder, filename)): | |
download_file(url, filename, folder) | |
def download_image_to_3d_files(folder, model_files_urls): | |
if not os.path.exists(folder): | |
os.makedirs(folder) | |
for url, filename in model_files_urls: | |
if not os.path.exists(os.path.join(folder, filename)): | |
download_file(url, filename, folder) | |
def download_text_to_video_files(folder, model_files_urls): | |
if not os.path.exists(folder): | |
os.makedirs(folder) | |
for url, filename in model_files_urls: | |
if not os.path.exists(os.path.join(folder, filename)): | |
download_file(url, filename, folder) | |
def download_sentiment_files(folder, model_files_urls): | |
if not os.path.exists(folder): | |
os.makedirs(folder) | |
for url, filename in model_files_urls: | |
if not os.path.exists(os.path.join(folder, filename)): | |
download_file(url, filename, folder) | |
def download_stt_files(folder, model_files_urls): | |
if not os.path.exists(folder): | |
os.makedirs(folder) | |
for url, filename in model_files_urls: | |
if not os.path.exists(os.path.join(folder, filename)): | |
download_file(url, filename, folder) | |
def download_tts_files(folder, model_files_urls): | |
if not os.path.exists(folder): | |
os.makedirs(folder) | |
for url, filename in model_files_urls: | |
if not os.path.exists(os.path.join(folder, filename)): | |
download_file(url, filename, folder) | |
def download_musicgen_files(folder, model_files_urls): | |
if not os.path.exists(folder): | |
os.makedirs(folder) | |
for url, filename in model_files_urls: | |
if not os.path.exists(os.path.join(folder, filename)): | |
download_file(url, filename, folder) | |
def bytes_to_unicode_gpt2(): | |
bs = list(range(ord("!"), ord("~")+1))+list(range(ord("¡"), ord("¬")+1))+list(range(ord("®"), ord("ÿ")+1)) | |
cs = bs[:] | |
n = 0 | |
for b in range(2**8): | |
if b not in bs: | |
bs.append(b) | |
cs.append(2**8+n) | |
n = n+1 | |
cs = [chr(n) for n in cs] | |
return dict(zip(bs, cs)) | |
def get_codegen_tokenizer_pure(vocab_file, merges_file): | |
vocab = json.load(open(vocab_file)) | |
merges = open(merges_file, 'r', encoding="utf-8").read().split('\n')[1:-1] | |
bpe_ranks = dict(zip(merges, range(len(merges)))) | |
byte_encoder = bytes_to_unicode() | |
byte_decoder = {v: k for k, v in byte_encoder.items()} | |
tokenizer_regex = re.compile(r'''<\|endoftext\|>|'s|'t|'re|'ve|'m|'ll|'d|[\p{L}]+|[\p{N}]| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+''') | |
tokenize = lambda text: re.findall(tokenizer_regex, text) | |
encoder_obj = Encoder( | |
encoder=vocab, | |
decoder={v: u for u, v in vocab.items()}, | |
bpe_ranks=bpe_ranks, | |
byte_encoder=byte_encoder, | |
byte_decoder=byte_decoder, | |
tokenize=tokenize | |
) | |
return encoder_obj |