Spaces:
Build error
Build error
import argparse | |
import os | |
import re | |
import subprocess | |
import sys | |
import zipfile | |
import random | |
import string | |
import shutil | |
import io | |
import webbrowser | |
from typing import List, Any, Dict, Union | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import logging | |
import aiohttp | |
import asyncio | |
import hashlib | |
import gradio as gr | |
from transformers import AutoTokenizer, pipeline | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
# Updated regex using lookahead assertion | |
OPENAI_REGEX = re.compile(r'(?=import\s+)(?:openai|openai_.*)') | |
GITIGNORE_CONTENT = '# Converted files will appear here\n' | |
WEBAPP_DIRNAME = 'webapp' | |
CACHE_DIR = './cache' | |
class OpenAIWrapper: | |
# Implement OpenAI interaction methods here | |
pass | |
async def download_file(url: str) -> bytes: | |
"""Downloads a file asynchronously with retries and returns its content.""" | |
logging.info(f"Downloading content from {url}...") | |
async with aiohttp.ClientSession() as session: | |
for attempt in range(3): | |
try: | |
async with session.get(url) as response: | |
if response.status == 200: | |
logging.info("Download complete!") | |
return await response.read() | |
else: | |
logging.error(f"Failed to download content from {url} (status code: {response.status})") | |
except aiohttp.ClientError as e: | |
logging.error(f"Error downloading {url}: {e}") | |
await asyncio.sleep(2 ** attempt) # Exponential backoff | |
return None | |
def get_cache_path(url: str) -> str: | |
"""Returns a unique cache path based on the URL.""" | |
hash_digest = hashlib.md5(url.encode()).hexdigest() | |
return os.path.join(CACHE_DIR, hash_digest) | |
class DownloadItemTask: | |
"""Class responsible for fetching remote content""" | |
def __init__(self, url: str): | |
self.url = url | |
async def download(self) -> bytes: | |
"""Attempts to download the file using the download_file function""" | |
cache_path = get_cache_path(self.url) | |
if os.path.exists(cache_path): | |
logging.info(f"Using cached file for {self.url}") | |
with open(cache_path, 'rb') as f: | |
return f.read() | |
data = await download_file(self.url) | |
if data: | |
os.makedirs(CACHE_DIR, exist_ok=True) | |
with open(cache_path, 'wb') as f: | |
f.write(data) | |
return data | |
class UnarchiveTask: | |
"""Utility class dealing with archives such as .zip or tarballs""" | |
def __init__(self, data: bytes): | |
self.data = data | |
def unarchive(self) -> str: | |
"""Unpacks and returns root directory holding contents""" | |
logging.info("Unarchiving downloaded file...") | |
extracted_dir = os.path.join(CACHE_DIR, ''.join(random.choices(string.ascii_uppercase + string.digits, k=10))) | |
os.makedirs(extracted_dir, exist_ok=True) | |
try: | |
if sys.platform == 'darwin' or sys.platform.startswith('linux'): | |
with open(os.path.join(extracted_dir, 'archive.tar'), 'wb') as f: | |
f.write(self.data) | |
subprocess.run(['tar', '-xf', 'archive.tar', '-C', extracted_dir], check=True) | |
else: | |
with zipfile.ZipFile(io.BytesIO(self.data), 'r') as zip_ref: | |
zip_ref.extractall(extracted_dir) | |
logging.info("Unarchiving complete!") | |
except Exception as e: | |
logging.error(f"Error unarchiving file: {e}") | |
return None | |
return extracted_dir | |
class DependencyFinderTask: | |
"""Scans project structure searching for specific dependencies to be swapped""" | |
DEPENDENCY_REGEXES = { | |
'openai': OPENAI_REGEX, | |
} | |
def __init__(self): | |
self.found_paths = {'openai': set()} | |
self.has_openai_dep = False | |
def find_dependencies(self, dir_path: str): | |
"""Recursively searches through directories looking for dependencies""" | |
logging.info("Searching for dependencies...") | |
for current_root, _, filenames in os.walk(dir_path): | |
for filename in filenames: | |
full_path = os.path.join(current_root, filename) | |
try: | |
with open(full_path, mode='rt', encoding='utf-8') as f: | |
content = f.read() | |
for dep, regex in self.DEPENDENCY_REGEXES.items(): | |
if regex.search(content): | |
self.found_paths[dep].add(full_path) | |
self.has_openai_dep = True | |
except Exception as e: | |
logging.error(f"Error reading file {full_path}: {e}") | |
logging.info("Dependency search complete!") | |
return self | |
class DependencyReplacerTask: | |
""" | |
Replaces specified dependencies with their corresponding wrapper methods | |
Also, provides a method to save and load the dependency mapping to improve efficiency | |
""" | |
def __init__(self, finder: DependencyFinderTask, pipeline: Any, wrapper: Any): | |
self.finder = finder | |
self.pipeline = pipeline | |
self.wrapper = wrapper | |
self.num_changed_files = 0 | |
def replace(self): | |
"""Replaces the dependencies in the specified files""" | |
logging.info("Replacing dependencies...") | |
for dep, paths in self.finder.found_paths.items(): | |
if dep == 'openai': | |
for path in paths: | |
try: | |
with open(path, mode='rt', encoding='utf-8') as f: | |
content = f.read() | |
replaced_content = content.replace('openai.', 'self.pipeline.') | |
with open(path, mode='wt', encoding='utf-8') as f: | |
f.write(replaced_content) | |
self.num_changed_files += 1 | |
except Exception as e: | |
logging.error(f"Error replacing dependency in file {path}: {e}") | |
logging.info("Dependency replacement complete!") | |
return self | |
def save_mapping(self, mapping_file: str): | |
"""Saves the dependency mapping to a file for future use""" | |
with open(mapping_file, 'w') as f: | |
for dep, paths in self.finder.found_paths.items(): | |
f.write(f"{dep}: {','.join(paths)}\n") | |
def load_mapping(self, mapping_file: str): | |
"""Loads the dependency mapping from a file for future use""" | |
with open(mapping_file, 'r') as f: | |
lines = f.readlines() | |
for line in lines: | |
dep, paths = line.strip().split(': ') | |
self.finder.found_paths[dep] = {path.strip() for path in paths.split(',')} | |
class WebAppCreatorTask: | |
"""Creates a web app directory and copies converted files to it""" | |
def __init__(self, webapp_dirname: str, unarchived_dir: str): | |
self.webapp_dirname = webapp_dirname | |
self.unarchived_dir = unarchived_dir | |
def create(self) -> bool: | |
"""Creates a web app directory and copies converted files to it""" | |
logging.info("Creating web app directory...") | |
webapp_dir = os.path.join(self.unarchived_dir, self.webapp_dirname) | |
os.makedirs(webapp_dir, exist_ok=True) | |
try: | |
for root, _, files in os.walk(self.unarchived_dir): | |
for file in files: | |
if not file.endswith('.html'): | |
continue | |
src_path = os.path.join(root, file) | |
dest_path = os.path.join(webapp_dir, file) | |
shutil.copy2(src_path, dest_path) | |
logging.info("Web app directory creation complete!") | |
except Exception as e: | |
logging.error(f"Error creating web app directory: {e}") | |
return False | |
return os.path.exists(webapp_dir) | |
class DeploymentTask: | |
"""Class responsible for deploying the web application""" | |
def __init__(self, webapp_dir: str, api_key: str): | |
self.webapp_dir = webapp_dir | |
self.api_key = api_key | |
self.success = False | |
def deploy(self): | |
"""Deploys the web application using the specified API key""" | |
logging.info("Deploying web application...") | |
try: | |
# Deployment logic here | |
self.success = True | |
logging.info("Deployment complete!") | |
except Exception as e: | |
logging.error(f"Error during deployment: {e}") | |
self.success = False | |
return self | |
def process_file(file_path: str, api_key: str, action: str, dependency_mapping: Dict[str, List[str]]): | |
logging.info(f'\nProcessing local file: {file_path}') | |
with open(file_path, 'rb') as f: | |
downloaded_file = f.read() | |
unarchived_dir = UnarchiveTask(downloaded_file).unarchive() | |
if not unarchived_dir: | |
logging.error("Unarchiving failed! Proceeding to next URL...") | |
return | |
os.chdir(unarchived_dir) | |
tokenizer = AutoTokenizer.from_pretrained('ELECTRA-base-discriminator') | |
pipe = pipeline('text-generation', model='ELECTRA-base-discriminator', tokenizer=tokenizer) | |
finder = DependencyFinderTask().find_dependencies(os.curdir) | |
# Load dependency mapping if provided | |
if dependency_mapping: | |
finder.load_mapping(dependency_mapping) | |
if finder.has_openai_dep: | |
replacer = DependencyReplacerTask(finder, pipe, OpenAIWrapper()).replace() | |
replacer.save_mapping(dependency_mapping) | |
created_webapp = WebAppCreatorTask(WEBAPP_DIRNAME, unarchived_dir).create() | |
os.chdir('..') | |
if action == 'upload': | |
deploy_task = DeploymentTask(created_webapp, api_key).deploy() | |
if not deploy_task.success: | |
logging.error("Deployment failed! Continuing to next URL...") | |
logging.info(f"Successfully processed local file: {file_path}") | |
def process_urls(urls: List[str], api_key: str, action: str, dependency_mapping: Dict[str, List[str]]): | |
async def process_url_task(url: str): | |
file_data = await DownloadItemTask(url).download() | |
if file_data: | |
unarchived_dir = UnarchiveTask(file_data).unarchive() | |
if unarchived_dir: | |
process_file(unarchived_dir, api_key, action, dependency_mapping) | |
os.chdir('..') | |
with ThreadPoolExecutor() as executor: | |
futures = {executor.submit(process_url_task, url) for url in urls} | |
for future in as_completed(futures): | |
future.result() | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--api-key', '-a', type=str, help='Hugging Face API Key') | |
parser.add_argument('--action', '-t', type=str, choices=['convert', 'upload'], help='Action to perform') | |
parser.add_argument('--dependency-mapping', '-d', type=str, help='Dependency mapping file path') | |
args = parser.parse_args() | |
if not args.api_key: | |
print("Please provide an API key using --api-key flag.") | |
sys.exit(1) | |
if not args.action: | |
print("Please provide an action to perform using --action flag.") | |
sys.exit(1) | |
dependency_mapping = {} | |
if args.dependency_mapping: | |
if not os.path.exists(args.dependency_mapping): | |
print(f"Dependency mapping file '{args.dependency_mapping}' does not exist.") | |
sys.exit(1) | |
with open(args.dependency_mapping, 'r') as f: | |
for line in f: | |
dep, paths = line.strip().split(': ') | |
dependency_mapping[dep] = [path.strip() for path in paths.split(',')] | |
iface = gr.Interface( | |
fn=lambda x: None, | |
inputs=gr.inputs.Textbox(label="URLs (comma-separated)"), | |
outputs="text", | |
title="Project Converter and Uploader", | |
description="Convert and upload projects to Hugging Face Spaces." | |
) | |
def process_urls_and_open_browser(urls: str): | |
urls_list = [url.strip() for url in urls.split(',')] | |
process_urls(urls_list, args.api_key, args.action, dependency_mapping) | |
webbrowser.open("http://localhost:7860") | |
iface.launch(process_urls_and_open_browser) | |
if __name__ == "__main__": | |
main() |