GitBot / app.py
acecalisto3's picture
Update app.py
83c0150 verified
raw
history blame contribute delete
No virus
12.2 kB
import argparse
import os
import re
import subprocess
import sys
import zipfile
import random
import string
import shutil
import io
import webbrowser
from typing import List, Any, Dict, Union
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
import aiohttp
import asyncio
import hashlib
import gradio as gr
from transformers import AutoTokenizer, pipeline
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Updated regex using lookahead assertion
OPENAI_REGEX = re.compile(r'(?=import\s+)(?:openai|openai_.*)')
GITIGNORE_CONTENT = '# Converted files will appear here\n'
WEBAPP_DIRNAME = 'webapp'
CACHE_DIR = './cache'
class OpenAIWrapper:
# Implement OpenAI interaction methods here
pass
async def download_file(url: str) -> bytes:
"""Downloads a file asynchronously with retries and returns its content."""
logging.info(f"Downloading content from {url}...")
async with aiohttp.ClientSession() as session:
for attempt in range(3):
try:
async with session.get(url) as response:
if response.status == 200:
logging.info("Download complete!")
return await response.read()
else:
logging.error(f"Failed to download content from {url} (status code: {response.status})")
except aiohttp.ClientError as e:
logging.error(f"Error downloading {url}: {e}")
await asyncio.sleep(2 ** attempt) # Exponential backoff
return None
def get_cache_path(url: str) -> str:
"""Returns a unique cache path based on the URL."""
hash_digest = hashlib.md5(url.encode()).hexdigest()
return os.path.join(CACHE_DIR, hash_digest)
class DownloadItemTask:
"""Class responsible for fetching remote content"""
def __init__(self, url: str):
self.url = url
async def download(self) -> bytes:
"""Attempts to download the file using the download_file function"""
cache_path = get_cache_path(self.url)
if os.path.exists(cache_path):
logging.info(f"Using cached file for {self.url}")
with open(cache_path, 'rb') as f:
return f.read()
data = await download_file(self.url)
if data:
os.makedirs(CACHE_DIR, exist_ok=True)
with open(cache_path, 'wb') as f:
f.write(data)
return data
class UnarchiveTask:
"""Utility class dealing with archives such as .zip or tarballs"""
def __init__(self, data: bytes):
self.data = data
def unarchive(self) -> str:
"""Unpacks and returns root directory holding contents"""
logging.info("Unarchiving downloaded file...")
extracted_dir = os.path.join(CACHE_DIR, ''.join(random.choices(string.ascii_uppercase + string.digits, k=10)))
os.makedirs(extracted_dir, exist_ok=True)
try:
if sys.platform == 'darwin' or sys.platform.startswith('linux'):
with open(os.path.join(extracted_dir, 'archive.tar'), 'wb') as f:
f.write(self.data)
subprocess.run(['tar', '-xf', 'archive.tar', '-C', extracted_dir], check=True)
else:
with zipfile.ZipFile(io.BytesIO(self.data), 'r') as zip_ref:
zip_ref.extractall(extracted_dir)
logging.info("Unarchiving complete!")
except Exception as e:
logging.error(f"Error unarchiving file: {e}")
return None
return extracted_dir
class DependencyFinderTask:
"""Scans project structure searching for specific dependencies to be swapped"""
DEPENDENCY_REGEXES = {
'openai': OPENAI_REGEX,
}
def __init__(self):
self.found_paths = {'openai': set()}
self.has_openai_dep = False
def find_dependencies(self, dir_path: str):
"""Recursively searches through directories looking for dependencies"""
logging.info("Searching for dependencies...")
for current_root, _, filenames in os.walk(dir_path):
for filename in filenames:
full_path = os.path.join(current_root, filename)
try:
with open(full_path, mode='rt', encoding='utf-8') as f:
content = f.read()
for dep, regex in self.DEPENDENCY_REGEXES.items():
if regex.search(content):
self.found_paths[dep].add(full_path)
self.has_openai_dep = True
except Exception as e:
logging.error(f"Error reading file {full_path}: {e}")
logging.info("Dependency search complete!")
return self
class DependencyReplacerTask:
"""
Replaces specified dependencies with their corresponding wrapper methods
Also, provides a method to save and load the dependency mapping to improve efficiency
"""
def __init__(self, finder: DependencyFinderTask, pipeline: Any, wrapper: Any):
self.finder = finder
self.pipeline = pipeline
self.wrapper = wrapper
self.num_changed_files = 0
def replace(self):
"""Replaces the dependencies in the specified files"""
logging.info("Replacing dependencies...")
for dep, paths in self.finder.found_paths.items():
if dep == 'openai':
for path in paths:
try:
with open(path, mode='rt', encoding='utf-8') as f:
content = f.read()
replaced_content = content.replace('openai.', 'self.pipeline.')
with open(path, mode='wt', encoding='utf-8') as f:
f.write(replaced_content)
self.num_changed_files += 1
except Exception as e:
logging.error(f"Error replacing dependency in file {path}: {e}")
logging.info("Dependency replacement complete!")
return self
def save_mapping(self, mapping_file: str):
"""Saves the dependency mapping to a file for future use"""
with open(mapping_file, 'w') as f:
for dep, paths in self.finder.found_paths.items():
f.write(f"{dep}: {','.join(paths)}\n")
def load_mapping(self, mapping_file: str):
"""Loads the dependency mapping from a file for future use"""
with open(mapping_file, 'r') as f:
lines = f.readlines()
for line in lines:
dep, paths = line.strip().split(': ')
self.finder.found_paths[dep] = {path.strip() for path in paths.split(',')}
class WebAppCreatorTask:
"""Creates a web app directory and copies converted files to it"""
def __init__(self, webapp_dirname: str, unarchived_dir: str):
self.webapp_dirname = webapp_dirname
self.unarchived_dir = unarchived_dir
def create(self) -> bool:
"""Creates a web app directory and copies converted files to it"""
logging.info("Creating web app directory...")
webapp_dir = os.path.join(self.unarchived_dir, self.webapp_dirname)
os.makedirs(webapp_dir, exist_ok=True)
try:
for root, _, files in os.walk(self.unarchived_dir):
for file in files:
if not file.endswith('.html'):
continue
src_path = os.path.join(root, file)
dest_path = os.path.join(webapp_dir, file)
shutil.copy2(src_path, dest_path)
logging.info("Web app directory creation complete!")
except Exception as e:
logging.error(f"Error creating web app directory: {e}")
return False
return os.path.exists(webapp_dir)
class DeploymentTask:
"""Class responsible for deploying the web application"""
def __init__(self, webapp_dir: str, api_key: str):
self.webapp_dir = webapp_dir
self.api_key = api_key
self.success = False
def deploy(self):
"""Deploys the web application using the specified API key"""
logging.info("Deploying web application...")
try:
# Deployment logic here
self.success = True
logging.info("Deployment complete!")
except Exception as e:
logging.error(f"Error during deployment: {e}")
self.success = False
return self
def process_file(file_path: str, api_key: str, action: str, dependency_mapping: Dict[str, List[str]]):
logging.info(f'\nProcessing local file: {file_path}')
with open(file_path, 'rb') as f:
downloaded_file = f.read()
unarchived_dir = UnarchiveTask(downloaded_file).unarchive()
if not unarchived_dir:
logging.error("Unarchiving failed! Proceeding to next URL...")
return
os.chdir(unarchived_dir)
tokenizer = AutoTokenizer.from_pretrained('ELECTRA-base-discriminator')
pipe = pipeline('text-generation', model='ELECTRA-base-discriminator', tokenizer=tokenizer)
finder = DependencyFinderTask().find_dependencies(os.curdir)
# Load dependency mapping if provided
if dependency_mapping:
finder.load_mapping(dependency_mapping)
if finder.has_openai_dep:
replacer = DependencyReplacerTask(finder, pipe, OpenAIWrapper()).replace()
replacer.save_mapping(dependency_mapping)
created_webapp = WebAppCreatorTask(WEBAPP_DIRNAME, unarchived_dir).create()
os.chdir('..')
if action == 'upload':
deploy_task = DeploymentTask(created_webapp, api_key).deploy()
if not deploy_task.success:
logging.error("Deployment failed! Continuing to next URL...")
logging.info(f"Successfully processed local file: {file_path}")
def process_urls(urls: List[str], api_key: str, action: str, dependency_mapping: Dict[str, List[str]]):
async def process_url_task(url: str):
file_data = await DownloadItemTask(url).download()
if file_data:
unarchived_dir = UnarchiveTask(file_data).unarchive()
if unarchived_dir:
process_file(unarchived_dir, api_key, action, dependency_mapping)
os.chdir('..')
with ThreadPoolExecutor() as executor:
futures = {executor.submit(process_url_task, url) for url in urls}
for future in as_completed(futures):
future.result()
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--api-key', '-a', type=str, help='Hugging Face API Key')
parser.add_argument('--action', '-t', type=str, choices=['convert', 'upload'], help='Action to perform')
parser.add_argument('--dependency-mapping', '-d', type=str, help='Dependency mapping file path')
args = parser.parse_args()
if not args.api_key:
print("Please provide an API key using --api-key flag.")
sys.exit(1)
if not args.action:
print("Please provide an action to perform using --action flag.")
sys.exit(1)
dependency_mapping = {}
if args.dependency_mapping:
if not os.path.exists(args.dependency_mapping):
print(f"Dependency mapping file '{args.dependency_mapping}' does not exist.")
sys.exit(1)
with open(args.dependency_mapping, 'r') as f:
for line in f:
dep, paths = line.strip().split(': ')
dependency_mapping[dep] = [path.strip() for path in paths.split(',')]
iface = gr.Interface(
fn=lambda x: None,
inputs=gr.inputs.Textbox(label="URLs (comma-separated)"),
outputs="text",
title="Project Converter and Uploader",
description="Convert and upload projects to Hugging Face Spaces."
)
def process_urls_and_open_browser(urls: str):
urls_list = [url.strip() for url in urls.split(',')]
process_urls(urls_list, args.api_key, args.action, dependency_mapping)
webbrowser.open("http://localhost:7860")
iface.launch(process_urls_and_open_browser)
if __name__ == "__main__":
main()