import argparse import os import re import subprocess import sys import zipfile import random import string import shutil import io import webbrowser from typing import List, Any, Dict, Union from concurrent.futures import ThreadPoolExecutor, as_completed import logging import aiohttp import asyncio import hashlib import gradio as gr from transformers import AutoTokenizer, pipeline # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Updated regex using lookahead assertion OPENAI_REGEX = re.compile(r'(?=import\s+)(?:openai|openai_.*)') GITIGNORE_CONTENT = '# Converted files will appear here\n' WEBAPP_DIRNAME = 'webapp' CACHE_DIR = './cache' class OpenAIWrapper: # Implement OpenAI interaction methods here pass async def download_file(url: str) -> bytes: """Downloads a file asynchronously with retries and returns its content.""" logging.info(f"Downloading content from {url}...") async with aiohttp.ClientSession() as session: for attempt in range(3): try: async with session.get(url) as response: if response.status == 200: logging.info("Download complete!") return await response.read() else: logging.error(f"Failed to download content from {url} (status code: {response.status})") except aiohttp.ClientError as e: logging.error(f"Error downloading {url}: {e}") await asyncio.sleep(2 ** attempt) # Exponential backoff return None def get_cache_path(url: str) -> str: """Returns a unique cache path based on the URL.""" hash_digest = hashlib.md5(url.encode()).hexdigest() return os.path.join(CACHE_DIR, hash_digest) class DownloadItemTask: """Class responsible for fetching remote content""" def __init__(self, url: str): self.url = url async def download(self) -> bytes: """Attempts to download the file using the download_file function""" cache_path = get_cache_path(self.url) if os.path.exists(cache_path): logging.info(f"Using cached file for {self.url}") with open(cache_path, 'rb') as f: return f.read() data = await download_file(self.url) if data: os.makedirs(CACHE_DIR, exist_ok=True) with open(cache_path, 'wb') as f: f.write(data) return data class UnarchiveTask: """Utility class dealing with archives such as .zip or tarballs""" def __init__(self, data: bytes): self.data = data def unarchive(self) -> str: """Unpacks and returns root directory holding contents""" logging.info("Unarchiving downloaded file...") extracted_dir = os.path.join(CACHE_DIR, ''.join(random.choices(string.ascii_uppercase + string.digits, k=10))) os.makedirs(extracted_dir, exist_ok=True) try: if sys.platform == 'darwin' or sys.platform.startswith('linux'): with open(os.path.join(extracted_dir, 'archive.tar'), 'wb') as f: f.write(self.data) subprocess.run(['tar', '-xf', 'archive.tar', '-C', extracted_dir], check=True) else: with zipfile.ZipFile(io.BytesIO(self.data), 'r') as zip_ref: zip_ref.extractall(extracted_dir) logging.info("Unarchiving complete!") except Exception as e: logging.error(f"Error unarchiving file: {e}") return None return extracted_dir class DependencyFinderTask: """Scans project structure searching for specific dependencies to be swapped""" DEPENDENCY_REGEXES = { 'openai': OPENAI_REGEX, } def __init__(self): self.found_paths = {'openai': set()} self.has_openai_dep = False def find_dependencies(self, dir_path: str): """Recursively searches through directories looking for dependencies""" logging.info("Searching for dependencies...") for current_root, _, filenames in os.walk(dir_path): for filename in filenames: full_path = os.path.join(current_root, filename) try: with open(full_path, mode='rt', encoding='utf-8') as f: content = f.read() for dep, regex in self.DEPENDENCY_REGEXES.items(): if regex.search(content): self.found_paths[dep].add(full_path) self.has_openai_dep = True except Exception as e: logging.error(f"Error reading file {full_path}: {e}") logging.info("Dependency search complete!") return self class DependencyReplacerTask: """ Replaces specified dependencies with their corresponding wrapper methods Also, provides a method to save and load the dependency mapping to improve efficiency """ def __init__(self, finder: DependencyFinderTask, pipeline: Any, wrapper: Any): self.finder = finder self.pipeline = pipeline self.wrapper = wrapper self.num_changed_files = 0 def replace(self): """Replaces the dependencies in the specified files""" logging.info("Replacing dependencies...") for dep, paths in self.finder.found_paths.items(): if dep == 'openai': for path in paths: try: with open(path, mode='rt', encoding='utf-8') as f: content = f.read() replaced_content = content.replace('openai.', 'self.pipeline.') with open(path, mode='wt', encoding='utf-8') as f: f.write(replaced_content) self.num_changed_files += 1 except Exception as e: logging.error(f"Error replacing dependency in file {path}: {e}") logging.info("Dependency replacement complete!") return self def save_mapping(self, mapping_file: str): """Saves the dependency mapping to a file for future use""" with open(mapping_file, 'w') as f: for dep, paths in self.finder.found_paths.items(): f.write(f"{dep}: {','.join(paths)}\n") def load_mapping(self, mapping_file: str): """Loads the dependency mapping from a file for future use""" with open(mapping_file, 'r') as f: lines = f.readlines() for line in lines: dep, paths = line.strip().split(': ') self.finder.found_paths[dep] = {path.strip() for path in paths.split(',')} class WebAppCreatorTask: """Creates a web app directory and copies converted files to it""" def __init__(self, webapp_dirname: str, unarchived_dir: str): self.webapp_dirname = webapp_dirname self.unarchived_dir = unarchived_dir def create(self) -> bool: """Creates a web app directory and copies converted files to it""" logging.info("Creating web app directory...") webapp_dir = os.path.join(self.unarchived_dir, self.webapp_dirname) os.makedirs(webapp_dir, exist_ok=True) try: for root, _, files in os.walk(self.unarchived_dir): for file in files: if not file.endswith('.html'): continue src_path = os.path.join(root, file) dest_path = os.path.join(webapp_dir, file) shutil.copy2(src_path, dest_path) logging.info("Web app directory creation complete!") except Exception as e: logging.error(f"Error creating web app directory: {e}") return False return os.path.exists(webapp_dir) class DeploymentTask: """Class responsible for deploying the web application""" def __init__(self, webapp_dir: str, api_key: str): self.webapp_dir = webapp_dir self.api_key = api_key self.success = False def deploy(self): """Deploys the web application using the specified API key""" logging.info("Deploying web application...") try: # Deployment logic here self.success = True logging.info("Deployment complete!") except Exception as e: logging.error(f"Error during deployment: {e}") self.success = False return self def process_file(file_path: str, api_key: str, action: str, dependency_mapping: Dict[str, List[str]]): logging.info(f'\nProcessing local file: {file_path}') with open(file_path, 'rb') as f: downloaded_file = f.read() unarchived_dir = UnarchiveTask(downloaded_file).unarchive() if not unarchived_dir: logging.error("Unarchiving failed! Proceeding to next URL...") return os.chdir(unarchived_dir) tokenizer = AutoTokenizer.from_pretrained('ELECTRA-base-discriminator') pipe = pipeline('text-generation', model='ELECTRA-base-discriminator', tokenizer=tokenizer) finder = DependencyFinderTask().find_dependencies(os.curdir) # Load dependency mapping if provided if dependency_mapping: finder.load_mapping(dependency_mapping) if finder.has_openai_dep: replacer = DependencyReplacerTask(finder, pipe, OpenAIWrapper()).replace() replacer.save_mapping(dependency_mapping) created_webapp = WebAppCreatorTask(WEBAPP_DIRNAME, unarchived_dir).create() os.chdir('..') if action == 'upload': deploy_task = DeploymentTask(created_webapp, api_key).deploy() if not deploy_task.success: logging.error("Deployment failed! Continuing to next URL...") logging.info(f"Successfully processed local file: {file_path}") def process_urls(urls: List[str], api_key: str, action: str, dependency_mapping: Dict[str, List[str]]): async def process_url_task(url: str): file_data = await DownloadItemTask(url).download() if file_data: unarchived_dir = UnarchiveTask(file_data).unarchive() if unarchived_dir: process_file(unarchived_dir, api_key, action, dependency_mapping) os.chdir('..') with ThreadPoolExecutor() as executor: futures = {executor.submit(process_url_task, url) for url in urls} for future in as_completed(futures): future.result() def main(): parser = argparse.ArgumentParser() parser.add_argument('--api-key', '-a', type=str, help='Hugging Face API Key') parser.add_argument('--action', '-t', type=str, choices=['convert', 'upload'], help='Action to perform') parser.add_argument('--dependency-mapping', '-d', type=str, help='Dependency mapping file path') args = parser.parse_args() if not args.api_key: print("Please provide an API key using --api-key flag.") sys.exit(1) if not args.action: print("Please provide an action to perform using --action flag.") sys.exit(1) dependency_mapping = {} if args.dependency_mapping: if not os.path.exists(args.dependency_mapping): print(f"Dependency mapping file '{args.dependency_mapping}' does not exist.") sys.exit(1) with open(args.dependency_mapping, 'r') as f: for line in f: dep, paths = line.strip().split(': ') dependency_mapping[dep] = [path.strip() for path in paths.split(',')] iface = gr.Interface( fn=lambda x: None, inputs=gr.inputs.Textbox(label="URLs (comma-separated)"), outputs="text", title="Project Converter and Uploader", description="Convert and upload projects to Hugging Face Spaces." ) def process_urls_and_open_browser(urls: str): urls_list = [url.strip() for url in urls.split(',')] process_urls(urls_list, args.api_key, args.action, dependency_mapping) webbrowser.open("http://localhost:7860") iface.launch(process_urls_and_open_browser) if __name__ == "__main__": main()