Spaces:
Running
Running
import importlib | |
import sys | |
from dataclasses import dataclass | |
from logging import getLogger | |
from pathlib import Path | |
from typing import Union | |
from rich import print | |
from rich.padding import Padding | |
from rich.panel import Panel | |
from rich.syntax import Syntax | |
from rich.tree import Tree | |
from fastapi_cli.exceptions import FastAPICLIException | |
logger = getLogger(__name__) | |
try: | |
from fastapi import FastAPI | |
except ImportError: # pragma: no cover | |
FastAPI = None # type: ignore[misc, assignment] | |
def get_default_path() -> Path: | |
path = Path("main.py") | |
if path.is_file(): | |
return path | |
path = Path("app.py") | |
if path.is_file(): | |
return path | |
path = Path("api.py") | |
if path.is_file(): | |
return path | |
path = Path("app/main.py") | |
if path.is_file(): | |
return path | |
path = Path("app/app.py") | |
if path.is_file(): | |
return path | |
path = Path("app/api.py") | |
if path.is_file(): | |
return path | |
raise FastAPICLIException( | |
"Could not find a default file to run, please provide an explicit path" | |
) | |
class ModuleData: | |
module_import_str: str | |
extra_sys_path: Path | |
def get_module_data_from_path(path: Path) -> ModuleData: | |
logger.info( | |
"Searching for package file structure from directories with [blue]__init__.py[/blue] files" | |
) | |
use_path = path.resolve() | |
module_path = use_path | |
if use_path.is_file() and use_path.stem == "__init__": | |
module_path = use_path.parent | |
module_paths = [module_path] | |
extra_sys_path = module_path.parent | |
for parent in module_path.parents: | |
init_path = parent / "__init__.py" | |
if init_path.is_file(): | |
module_paths.insert(0, parent) | |
extra_sys_path = parent.parent | |
else: | |
break | |
logger.info(f"Importing from {extra_sys_path.resolve()}") | |
root = module_paths[0] | |
name = f"π {root.name}" if root.is_file() else f"π {root.name}" | |
root_tree = Tree(name) | |
if root.is_dir(): | |
root_tree.add("[dim]π __init__.py[/dim]") | |
tree = root_tree | |
for sub_path in module_paths[1:]: | |
sub_name = ( | |
f"π {sub_path.name}" if sub_path.is_file() else f"π {sub_path.name}" | |
) | |
tree = tree.add(sub_name) | |
if sub_path.is_dir(): | |
tree.add("[dim]π __init__.py[/dim]") | |
title = "[b green]Python module file[/b green]" | |
if len(module_paths) > 1 or module_path.is_dir(): | |
title = "[b green]Python package file structure[/b green]" | |
panel = Padding( | |
Panel( | |
root_tree, | |
title=title, | |
expand=False, | |
padding=(1, 2), | |
), | |
1, | |
) | |
print(panel) | |
module_str = ".".join(p.stem for p in module_paths) | |
logger.info(f"Importing module [green]{module_str}[/green]") | |
return ModuleData( | |
module_import_str=module_str, extra_sys_path=extra_sys_path.resolve() | |
) | |
def get_app_name(*, mod_data: ModuleData, app_name: Union[str, None] = None) -> str: | |
try: | |
mod = importlib.import_module(mod_data.module_import_str) | |
except (ImportError, ValueError) as e: | |
logger.error(f"Import error: {e}") | |
logger.warning( | |
"Ensure all the package directories have an [blue]__init__.py[/blue] file" | |
) | |
raise | |
if not FastAPI: # type: ignore[truthy-function] | |
raise FastAPICLIException( | |
"Could not import FastAPI, try running 'pip install fastapi'" | |
) from None | |
object_names = dir(mod) | |
object_names_set = set(object_names) | |
if app_name: | |
if app_name not in object_names_set: | |
raise FastAPICLIException( | |
f"Could not find app name {app_name} in {mod_data.module_import_str}" | |
) | |
app = getattr(mod, app_name) | |
if not isinstance(app, FastAPI): | |
raise FastAPICLIException( | |
f"The app name {app_name} in {mod_data.module_import_str} doesn't seem to be a FastAPI app" | |
) | |
return app_name | |
for preferred_name in ["app", "api"]: | |
if preferred_name in object_names_set: | |
obj = getattr(mod, preferred_name) | |
if isinstance(obj, FastAPI): | |
return preferred_name | |
for name in object_names: | |
obj = getattr(mod, name) | |
if isinstance(obj, FastAPI): | |
return name | |
raise FastAPICLIException("Could not find FastAPI app in module, try using --app") | |
def get_import_string( | |
*, path: Union[Path, None] = None, app_name: Union[str, None] = None | |
) -> str: | |
if not path: | |
path = get_default_path() | |
logger.info(f"Using path [blue]{path}[/blue]") | |
logger.info(f"Resolved absolute path {path.resolve()}") | |
if not path.exists(): | |
raise FastAPICLIException(f"Path does not exist {path}") | |
mod_data = get_module_data_from_path(path) | |
sys.path.insert(0, str(mod_data.extra_sys_path)) | |
use_app_name = get_app_name(mod_data=mod_data, app_name=app_name) | |
import_example = Syntax( | |
f"from {mod_data.module_import_str} import {use_app_name}", "python" | |
) | |
import_panel = Padding( | |
Panel( | |
import_example, | |
title="[b green]Importable FastAPI app[/b green]", | |
expand=False, | |
padding=(1, 2), | |
), | |
1, | |
) | |
logger.info("Found importable FastAPI app") | |
print(import_panel) | |
import_string = f"{mod_data.module_import_str}:{use_app_name}" | |
logger.info(f"Using import string [b green]{import_string}[/b green]") | |
return import_string | |