| import io |
| import json |
| import platform |
| import re |
| import sys |
| import tokenize |
| import traceback |
| from contextlib import contextmanager |
| from dataclasses import replace |
| from datetime import datetime |
| from enum import Enum |
| from json.decoder import JSONDecodeError |
| from pathlib import Path |
| from typing import ( |
| Any, |
| Dict, |
| Generator, |
| Iterator, |
| List, |
| MutableMapping, |
| Optional, |
| Pattern, |
| Sequence, |
| Set, |
| Sized, |
| Tuple, |
| Union, |
| ) |
|
|
| import click |
| from click.core import ParameterSource |
| from mypy_extensions import mypyc_attr |
| from pathspec import PathSpec |
| from pathspec.patterns.gitwildmatch import GitWildMatchPatternError |
|
|
| from _black_version import version as __version__ |
| from black.cache import Cache, get_cache_info, read_cache, write_cache |
| from black.comments import normalize_fmt_off |
| from black.const import ( |
| DEFAULT_EXCLUDES, |
| DEFAULT_INCLUDES, |
| DEFAULT_LINE_LENGTH, |
| STDIN_PLACEHOLDER, |
| ) |
| from black.files import ( |
| find_project_root, |
| find_pyproject_toml, |
| find_user_pyproject_toml, |
| gen_python_files, |
| get_gitignore, |
| normalize_path_maybe_ignore, |
| parse_pyproject_toml, |
| wrap_stream_for_windows, |
| ) |
| from black.handle_ipynb_magics import ( |
| PYTHON_CELL_MAGICS, |
| TRANSFORMED_MAGICS, |
| jupyter_dependencies_are_installed, |
| mask_cell, |
| put_trailing_semicolon_back, |
| remove_trailing_semicolon, |
| unmask_cell, |
| ) |
| from black.linegen import LN, LineGenerator, transform_line |
| from black.lines import EmptyLineTracker, LinesBlock |
| from black.mode import ( |
| FUTURE_FLAG_TO_FEATURE, |
| VERSION_TO_FEATURES, |
| Feature, |
| Mode, |
| TargetVersion, |
| supports_feature, |
| ) |
| from black.nodes import ( |
| STARS, |
| is_number_token, |
| is_simple_decorator_expression, |
| is_string_token, |
| syms, |
| ) |
| from black.output import color_diff, diff, dump_to_file, err, ipynb_diff, out |
| from black.parsing import InvalidInput |
| from black.parsing import lib2to3_parse, parse_ast, stringify_ast |
| from black.report import Changed, NothingChanged, Report |
| from black.trans import iter_fexpr_spans |
| from blib2to3.pgen2 import token |
| from blib2to3.pytree import Leaf, Node |
|
|
| COMPILED = Path(__file__).suffix in (".pyd", ".so") |
|
|
| |
| FileContent = str |
| Encoding = str |
| NewLine = str |
|
|
|
|
| class WriteBack(Enum): |
| NO = 0 |
| YES = 1 |
| DIFF = 2 |
| CHECK = 3 |
| COLOR_DIFF = 4 |
|
|
| @classmethod |
| def from_configuration( |
| cls, *, check: bool, diff: bool, color: bool = False |
| ) -> "WriteBack": |
| if check and not diff: |
| return cls.CHECK |
|
|
| if diff and color: |
| return cls.COLOR_DIFF |
|
|
| return cls.DIFF if diff else cls.YES |
|
|
|
|
| |
| FileMode = Mode |
|
|
|
|
| def read_pyproject_toml( |
| ctx: click.Context, param: click.Parameter, value: Optional[str] |
| ) -> Optional[str]: |
| """Inject Black configuration from "pyproject.toml" into defaults in `ctx`. |
| |
| Returns the path to a successfully found and read configuration file, None |
| otherwise. |
| """ |
| if not value: |
| value = find_pyproject_toml(ctx.params.get("src", ())) |
| if value is None: |
| return None |
|
|
| try: |
| config = parse_pyproject_toml(value) |
| except (OSError, ValueError) as e: |
| raise click.FileError( |
| filename=value, hint=f"Error reading configuration file: {e}" |
| ) from None |
|
|
| if not config: |
| return None |
| else: |
| |
| |
| |
| config = { |
| k: str(v) if not isinstance(v, (list, dict)) else v |
| for k, v in config.items() |
| } |
|
|
| target_version = config.get("target_version") |
| if target_version is not None and not isinstance(target_version, list): |
| raise click.BadOptionUsage( |
| "target-version", "Config key target-version must be a list" |
| ) |
|
|
| default_map: Dict[str, Any] = {} |
| if ctx.default_map: |
| default_map.update(ctx.default_map) |
| default_map.update(config) |
|
|
| ctx.default_map = default_map |
| return value |
|
|
|
|
| def target_version_option_callback( |
| c: click.Context, p: Union[click.Option, click.Parameter], v: Tuple[str, ...] |
| ) -> List[TargetVersion]: |
| """Compute the target versions from a --target-version flag. |
| |
| This is its own function because mypy couldn't infer the type correctly |
| when it was a lambda, causing mypyc trouble. |
| """ |
| return [TargetVersion[val.upper()] for val in v] |
|
|
|
|
| def re_compile_maybe_verbose(regex: str) -> Pattern[str]: |
| """Compile a regular expression string in `regex`. |
| |
| If it contains newlines, use verbose mode. |
| """ |
| if "\n" in regex: |
| regex = "(?x)" + regex |
| compiled: Pattern[str] = re.compile(regex) |
| return compiled |
|
|
|
|
| def validate_regex( |
| ctx: click.Context, |
| param: click.Parameter, |
| value: Optional[str], |
| ) -> Optional[Pattern[str]]: |
| try: |
| return re_compile_maybe_verbose(value) if value is not None else None |
| except re.error as e: |
| raise click.BadParameter(f"Not a valid regular expression: {e}") from None |
|
|
|
|
| @click.command( |
| context_settings={"help_option_names": ["-h", "--help"]}, |
| |
| |
| help="The uncompromising code formatter.", |
| ) |
| @click.option("-c", "--code", type=str, help="Format the code passed in as a string.") |
| @click.option( |
| "-l", |
| "--line-length", |
| type=int, |
| default=DEFAULT_LINE_LENGTH, |
| help="How many characters per line to allow.", |
| show_default=True, |
| ) |
| @click.option( |
| "-t", |
| "--target-version", |
| type=click.Choice([v.name.lower() for v in TargetVersion]), |
| callback=target_version_option_callback, |
| multiple=True, |
| help=( |
| "Python versions that should be supported by Black's output. By default, Black" |
| " will try to infer this from the project metadata in pyproject.toml. If this" |
| " does not yield conclusive results, Black will use per-file auto-detection." |
| ), |
| ) |
| @click.option( |
| "--pyi", |
| is_flag=True, |
| help=( |
| "Format all input files like typing stubs regardless of file extension (useful" |
| " when piping source on standard input)." |
| ), |
| ) |
| @click.option( |
| "--ipynb", |
| is_flag=True, |
| help=( |
| "Format all input files like Jupyter Notebooks regardless of file extension " |
| "(useful when piping source on standard input)." |
| ), |
| ) |
| @click.option( |
| "--python-cell-magics", |
| multiple=True, |
| help=( |
| "When processing Jupyter Notebooks, add the given magic to the list" |
| f" of known python-magics ({', '.join(sorted(PYTHON_CELL_MAGICS))})." |
| " Useful for formatting cells with custom python magics." |
| ), |
| default=[], |
| ) |
| @click.option( |
| "-x", |
| "--skip-source-first-line", |
| is_flag=True, |
| help="Skip the first line of the source code.", |
| ) |
| @click.option( |
| "-S", |
| "--skip-string-normalization", |
| is_flag=True, |
| help="Don't normalize string quotes or prefixes.", |
| ) |
| @click.option( |
| "-C", |
| "--skip-magic-trailing-comma", |
| is_flag=True, |
| help="Don't use trailing commas as a reason to split lines.", |
| ) |
| @click.option( |
| "--experimental-string-processing", |
| is_flag=True, |
| hidden=True, |
| help="(DEPRECATED and now included in --preview) Normalize string literals.", |
| ) |
| @click.option( |
| "--preview", |
| is_flag=True, |
| help=( |
| "Enable potentially disruptive style changes that may be added to Black's main" |
| " functionality in the next major release." |
| ), |
| ) |
| @click.option( |
| "--check", |
| is_flag=True, |
| help=( |
| "Don't write the files back, just return the status. Return code 0 means" |
| " nothing would change. Return code 1 means some files would be reformatted." |
| " Return code 123 means there was an internal error." |
| ), |
| ) |
| @click.option( |
| "--diff", |
| is_flag=True, |
| help="Don't write the files back, just output a diff for each file on stdout.", |
| ) |
| @click.option( |
| "--color/--no-color", |
| is_flag=True, |
| help="Show colored diff. Only applies when `--diff` is given.", |
| ) |
| @click.option( |
| "--fast/--safe", |
| is_flag=True, |
| help="If --fast given, skip temporary sanity checks. [default: --safe]", |
| ) |
| @click.option( |
| "--required-version", |
| type=str, |
| help=( |
| "Require a specific version of Black to be running (useful for unifying results" |
| " across many environments e.g. with a pyproject.toml file). It can be" |
| " either a major version number or an exact version." |
| ), |
| ) |
| @click.option( |
| "--include", |
| type=str, |
| default=DEFAULT_INCLUDES, |
| callback=validate_regex, |
| help=( |
| "A regular expression that matches files and directories that should be" |
| " included on recursive searches. An empty value means all files are included" |
| " regardless of the name. Use forward slashes for directories on all platforms" |
| " (Windows, too). Exclusions are calculated first, inclusions later." |
| ), |
| show_default=True, |
| ) |
| @click.option( |
| "--exclude", |
| type=str, |
| callback=validate_regex, |
| help=( |
| "A regular expression that matches files and directories that should be" |
| " excluded on recursive searches. An empty value means no paths are excluded." |
| " Use forward slashes for directories on all platforms (Windows, too)." |
| " Exclusions are calculated first, inclusions later. [default:" |
| f" {DEFAULT_EXCLUDES}]" |
| ), |
| show_default=False, |
| ) |
| @click.option( |
| "--extend-exclude", |
| type=str, |
| callback=validate_regex, |
| help=( |
| "Like --exclude, but adds additional files and directories on top of the" |
| " excluded ones. (Useful if you simply want to add to the default)" |
| ), |
| ) |
| @click.option( |
| "--force-exclude", |
| type=str, |
| callback=validate_regex, |
| help=( |
| "Like --exclude, but files and directories matching this regex will be " |
| "excluded even when they are passed explicitly as arguments." |
| ), |
| ) |
| @click.option( |
| "--stdin-filename", |
| type=str, |
| help=( |
| "The name of the file when passing it through stdin. Useful to make " |
| "sure Black will respect --force-exclude option on some " |
| "editors that rely on using stdin." |
| ), |
| ) |
| @click.option( |
| "-W", |
| "--workers", |
| type=click.IntRange(min=1), |
| default=None, |
| help="Number of parallel workers [default: number of CPUs in the system]", |
| ) |
| @click.option( |
| "-q", |
| "--quiet", |
| is_flag=True, |
| help=( |
| "Don't emit non-error messages to stderr. Errors are still emitted; silence" |
| " those with 2>/dev/null." |
| ), |
| ) |
| @click.option( |
| "-v", |
| "--verbose", |
| is_flag=True, |
| help=( |
| "Also emit messages to stderr about files that were not changed or were ignored" |
| " due to exclusion patterns." |
| ), |
| ) |
| @click.version_option( |
| version=__version__, |
| message=( |
| f"%(prog)s, %(version)s (compiled: {'yes' if COMPILED else 'no'})\n" |
| f"Python ({platform.python_implementation()}) {platform.python_version()}" |
| ), |
| ) |
| @click.argument( |
| "src", |
| nargs=-1, |
| type=click.Path( |
| exists=True, file_okay=True, dir_okay=True, readable=True, allow_dash=True |
| ), |
| is_eager=True, |
| metavar="SRC ...", |
| ) |
| @click.option( |
| "--config", |
| type=click.Path( |
| exists=True, |
| file_okay=True, |
| dir_okay=False, |
| readable=True, |
| allow_dash=False, |
| path_type=str, |
| ), |
| is_eager=True, |
| callback=read_pyproject_toml, |
| help="Read configuration from FILE path.", |
| ) |
| @click.pass_context |
| def main( |
| ctx: click.Context, |
| code: Optional[str], |
| line_length: int, |
| target_version: List[TargetVersion], |
| check: bool, |
| diff: bool, |
| color: bool, |
| fast: bool, |
| pyi: bool, |
| ipynb: bool, |
| python_cell_magics: Sequence[str], |
| skip_source_first_line: bool, |
| skip_string_normalization: bool, |
| skip_magic_trailing_comma: bool, |
| experimental_string_processing: bool, |
| preview: bool, |
| quiet: bool, |
| verbose: bool, |
| required_version: Optional[str], |
| include: Pattern[str], |
| exclude: Optional[Pattern[str]], |
| extend_exclude: Optional[Pattern[str]], |
| force_exclude: Optional[Pattern[str]], |
| stdin_filename: Optional[str], |
| workers: Optional[int], |
| src: Tuple[str, ...], |
| config: Optional[str], |
| ) -> None: |
| """The uncompromising code formatter.""" |
| ctx.ensure_object(dict) |
|
|
| if src and code is not None: |
| out( |
| main.get_usage(ctx) |
| + "\n\n'SRC' and 'code' cannot be passed simultaneously." |
| ) |
| ctx.exit(1) |
| if not src and code is None: |
| out(main.get_usage(ctx) + "\n\nOne of 'SRC' or 'code' is required.") |
| ctx.exit(1) |
|
|
| root, method = ( |
| find_project_root(src, stdin_filename) if code is None else (None, None) |
| ) |
| ctx.obj["root"] = root |
|
|
| if verbose: |
| if root: |
| out( |
| f"Identified `{root}` as project root containing a {method}.", |
| fg="blue", |
| ) |
|
|
| normalized = [ |
| ( |
| (source, source) |
| if source == "-" |
| else (normalize_path_maybe_ignore(Path(source), root), source) |
| ) |
| for source in src |
| ] |
| srcs_string = ", ".join( |
| [ |
| ( |
| f'"{_norm}"' |
| if _norm |
| else f'\033[31m"{source} (skipping - invalid)"\033[34m' |
| ) |
| for _norm, source in normalized |
| ] |
| ) |
| out(f"Sources to be formatted: {srcs_string}", fg="blue") |
|
|
| if config: |
| config_source = ctx.get_parameter_source("config") |
| user_level_config = str(find_user_pyproject_toml()) |
| if config == user_level_config: |
| out( |
| ( |
| "Using configuration from user-level config at " |
| f"'{user_level_config}'." |
| ), |
| fg="blue", |
| ) |
| elif config_source in ( |
| ParameterSource.DEFAULT, |
| ParameterSource.DEFAULT_MAP, |
| ): |
| out("Using configuration from project root.", fg="blue") |
| else: |
| out(f"Using configuration in '{config}'.", fg="blue") |
| if ctx.default_map: |
| for param, value in ctx.default_map.items(): |
| out(f"{param}: {value}") |
|
|
| error_msg = "Oh no! 💥 💔 💥" |
| if ( |
| required_version |
| and required_version != __version__ |
| and required_version != __version__.split(".")[0] |
| ): |
| err( |
| f"{error_msg} The required version `{required_version}` does not match" |
| f" the running version `{__version__}`!" |
| ) |
| ctx.exit(1) |
| if ipynb and pyi: |
| err("Cannot pass both `pyi` and `ipynb` flags!") |
| ctx.exit(1) |
|
|
| write_back = WriteBack.from_configuration(check=check, diff=diff, color=color) |
| if target_version: |
| versions = set(target_version) |
| else: |
| |
| versions = set() |
| mode = Mode( |
| target_versions=versions, |
| line_length=line_length, |
| is_pyi=pyi, |
| is_ipynb=ipynb, |
| skip_source_first_line=skip_source_first_line, |
| string_normalization=not skip_string_normalization, |
| magic_trailing_comma=not skip_magic_trailing_comma, |
| experimental_string_processing=experimental_string_processing, |
| preview=preview, |
| python_cell_magics=set(python_cell_magics), |
| ) |
|
|
| if code is not None: |
| |
| |
| quiet = True |
|
|
| report = Report(check=check, diff=diff, quiet=quiet, verbose=verbose) |
|
|
| if code is not None: |
| reformat_code( |
| content=code, fast=fast, write_back=write_back, mode=mode, report=report |
| ) |
| else: |
| try: |
| sources = get_sources( |
| ctx=ctx, |
| src=src, |
| quiet=quiet, |
| verbose=verbose, |
| include=include, |
| exclude=exclude, |
| extend_exclude=extend_exclude, |
| force_exclude=force_exclude, |
| report=report, |
| stdin_filename=stdin_filename, |
| ) |
| except GitWildMatchPatternError: |
| ctx.exit(1) |
|
|
| path_empty( |
| sources, |
| "No Python files are present to be formatted. Nothing to do 😴", |
| quiet, |
| verbose, |
| ctx, |
| ) |
|
|
| if len(sources) == 1: |
| reformat_one( |
| src=sources.pop(), |
| fast=fast, |
| write_back=write_back, |
| mode=mode, |
| report=report, |
| ) |
| else: |
| from black.concurrency import reformat_many |
|
|
| reformat_many( |
| sources=sources, |
| fast=fast, |
| write_back=write_back, |
| mode=mode, |
| report=report, |
| workers=workers, |
| ) |
|
|
| if verbose or not quiet: |
| if code is None and (verbose or report.change_count or report.failure_count): |
| out() |
| out(error_msg if report.return_code else "All done! ✨ 🍰 ✨") |
| if code is None: |
| click.echo(str(report), err=True) |
| ctx.exit(report.return_code) |
|
|
|
|
| def get_sources( |
| *, |
| ctx: click.Context, |
| src: Tuple[str, ...], |
| quiet: bool, |
| verbose: bool, |
| include: Pattern[str], |
| exclude: Optional[Pattern[str]], |
| extend_exclude: Optional[Pattern[str]], |
| force_exclude: Optional[Pattern[str]], |
| report: "Report", |
| stdin_filename: Optional[str], |
| ) -> Set[Path]: |
| """Compute the set of files to be formatted.""" |
| sources: Set[Path] = set() |
| root = ctx.obj["root"] |
|
|
| using_default_exclude = exclude is None |
| exclude = re_compile_maybe_verbose(DEFAULT_EXCLUDES) if exclude is None else exclude |
| gitignore: Optional[Dict[Path, PathSpec]] = None |
| root_gitignore = get_gitignore(root) |
|
|
| for s in src: |
| if s == "-" and stdin_filename: |
| p = Path(stdin_filename) |
| is_stdin = True |
| else: |
| p = Path(s) |
| is_stdin = False |
|
|
| if is_stdin or p.is_file(): |
| normalized_path = normalize_path_maybe_ignore(p, ctx.obj["root"], report) |
| if normalized_path is None: |
| continue |
|
|
| normalized_path = "/" + normalized_path |
| |
| if force_exclude: |
| force_exclude_match = force_exclude.search(normalized_path) |
| else: |
| force_exclude_match = None |
| if force_exclude_match and force_exclude_match.group(0): |
| report.path_ignored(p, "matches the --force-exclude regular expression") |
| continue |
|
|
| if is_stdin: |
| p = Path(f"{STDIN_PLACEHOLDER}{str(p)}") |
|
|
| if p.suffix == ".ipynb" and not jupyter_dependencies_are_installed( |
| verbose=verbose, quiet=quiet |
| ): |
| continue |
|
|
| sources.add(p) |
| elif p.is_dir(): |
| p = root / normalize_path_maybe_ignore(p, ctx.obj["root"], report) |
| if using_default_exclude: |
| gitignore = { |
| root: root_gitignore, |
| p: get_gitignore(p), |
| } |
| sources.update( |
| gen_python_files( |
| p.iterdir(), |
| ctx.obj["root"], |
| include, |
| exclude, |
| extend_exclude, |
| force_exclude, |
| report, |
| gitignore, |
| verbose=verbose, |
| quiet=quiet, |
| ) |
| ) |
| elif s == "-": |
| sources.add(p) |
| else: |
| err(f"invalid path: {s}") |
| return sources |
|
|
|
|
| def path_empty( |
| src: Sized, msg: str, quiet: bool, verbose: bool, ctx: click.Context |
| ) -> None: |
| """ |
| Exit if there is no `src` provided for formatting |
| """ |
| if not src: |
| if verbose or not quiet: |
| out(msg) |
| ctx.exit(0) |
|
|
|
|
| def reformat_code( |
| content: str, fast: bool, write_back: WriteBack, mode: Mode, report: Report |
| ) -> None: |
| """ |
| Reformat and print out `content` without spawning child processes. |
| Similar to `reformat_one`, but for string content. |
| |
| `fast`, `write_back`, and `mode` options are passed to |
| :func:`format_file_in_place` or :func:`format_stdin_to_stdout`. |
| """ |
| path = Path("<string>") |
| try: |
| changed = Changed.NO |
| if format_stdin_to_stdout( |
| content=content, fast=fast, write_back=write_back, mode=mode |
| ): |
| changed = Changed.YES |
| report.done(path, changed) |
| except Exception as exc: |
| if report.verbose: |
| traceback.print_exc() |
| report.failed(path, str(exc)) |
|
|
|
|
| |
| |
| @mypyc_attr(patchable=True) |
| def reformat_one( |
| src: Path, fast: bool, write_back: WriteBack, mode: Mode, report: "Report" |
| ) -> None: |
| """Reformat a single file under `src` without spawning child processes. |
| |
| `fast`, `write_back`, and `mode` options are passed to |
| :func:`format_file_in_place` or :func:`format_stdin_to_stdout`. |
| """ |
| try: |
| changed = Changed.NO |
|
|
| if str(src) == "-": |
| is_stdin = True |
| elif str(src).startswith(STDIN_PLACEHOLDER): |
| is_stdin = True |
| |
| |
| src = Path(str(src)[len(STDIN_PLACEHOLDER) :]) |
| else: |
| is_stdin = False |
|
|
| if is_stdin: |
| if src.suffix == ".pyi": |
| mode = replace(mode, is_pyi=True) |
| elif src.suffix == ".ipynb": |
| mode = replace(mode, is_ipynb=True) |
| if format_stdin_to_stdout(fast=fast, write_back=write_back, mode=mode): |
| changed = Changed.YES |
| else: |
| cache: Cache = {} |
| if write_back not in (WriteBack.DIFF, WriteBack.COLOR_DIFF): |
| cache = read_cache(mode) |
| res_src = src.resolve() |
| res_src_s = str(res_src) |
| if res_src_s in cache and cache[res_src_s] == get_cache_info(res_src): |
| changed = Changed.CACHED |
| if changed is not Changed.CACHED and format_file_in_place( |
| src, fast=fast, write_back=write_back, mode=mode |
| ): |
| changed = Changed.YES |
| if (write_back is WriteBack.YES and changed is not Changed.CACHED) or ( |
| write_back is WriteBack.CHECK and changed is Changed.NO |
| ): |
| write_cache(cache, [src], mode) |
| report.done(src, changed) |
| except Exception as exc: |
| if report.verbose: |
| traceback.print_exc() |
| report.failed(src, str(exc)) |
|
|
|
|
| def format_file_in_place( |
| src: Path, |
| fast: bool, |
| mode: Mode, |
| write_back: WriteBack = WriteBack.NO, |
| lock: Any = None, |
| ) -> bool: |
| """Format file under `src` path. Return True if changed. |
| |
| If `write_back` is DIFF, write a diff to stdout. If it is YES, write reformatted |
| code to the file. |
| `mode` and `fast` options are passed to :func:`format_file_contents`. |
| """ |
| if src.suffix == ".pyi": |
| mode = replace(mode, is_pyi=True) |
| elif src.suffix == ".ipynb": |
| mode = replace(mode, is_ipynb=True) |
|
|
| then = datetime.utcfromtimestamp(src.stat().st_mtime) |
| header = b"" |
| with open(src, "rb") as buf: |
| if mode.skip_source_first_line: |
| header = buf.readline() |
| src_contents, encoding, newline = decode_bytes(buf.read()) |
| try: |
| dst_contents = format_file_contents(src_contents, fast=fast, mode=mode) |
| except NothingChanged: |
| return False |
| except JSONDecodeError: |
| raise ValueError( |
| f"File '{src}' cannot be parsed as valid Jupyter notebook." |
| ) from None |
| src_contents = header.decode(encoding) + src_contents |
| dst_contents = header.decode(encoding) + dst_contents |
|
|
| if write_back == WriteBack.YES: |
| with open(src, "w", encoding=encoding, newline=newline) as f: |
| f.write(dst_contents) |
| elif write_back in (WriteBack.DIFF, WriteBack.COLOR_DIFF): |
| now = datetime.utcnow() |
| src_name = f"{src}\t{then} +0000" |
| dst_name = f"{src}\t{now} +0000" |
| if mode.is_ipynb: |
| diff_contents = ipynb_diff(src_contents, dst_contents, src_name, dst_name) |
| else: |
| diff_contents = diff(src_contents, dst_contents, src_name, dst_name) |
|
|
| if write_back == WriteBack.COLOR_DIFF: |
| diff_contents = color_diff(diff_contents) |
|
|
| with lock or nullcontext(): |
| f = io.TextIOWrapper( |
| sys.stdout.buffer, |
| encoding=encoding, |
| newline=newline, |
| write_through=True, |
| ) |
| f = wrap_stream_for_windows(f) |
| f.write(diff_contents) |
| f.detach() |
|
|
| return True |
|
|
|
|
| def format_stdin_to_stdout( |
| fast: bool, |
| *, |
| content: Optional[str] = None, |
| write_back: WriteBack = WriteBack.NO, |
| mode: Mode, |
| ) -> bool: |
| """Format file on stdin. Return True if changed. |
| |
| If content is None, it's read from sys.stdin. |
| |
| If `write_back` is YES, write reformatted code back to stdout. If it is DIFF, |
| write a diff to stdout. The `mode` argument is passed to |
| :func:`format_file_contents`. |
| """ |
| then = datetime.utcnow() |
|
|
| if content is None: |
| src, encoding, newline = decode_bytes(sys.stdin.buffer.read()) |
| else: |
| src, encoding, newline = content, "utf-8", "" |
|
|
| dst = src |
| try: |
| dst = format_file_contents(src, fast=fast, mode=mode) |
| return True |
|
|
| except NothingChanged: |
| return False |
|
|
| finally: |
| f = io.TextIOWrapper( |
| sys.stdout.buffer, encoding=encoding, newline=newline, write_through=True |
| ) |
| if write_back == WriteBack.YES: |
| |
| if dst and dst[-1] != "\n": |
| dst += "\n" |
| f.write(dst) |
| elif write_back in (WriteBack.DIFF, WriteBack.COLOR_DIFF): |
| now = datetime.utcnow() |
| src_name = f"STDIN\t{then} +0000" |
| dst_name = f"STDOUT\t{now} +0000" |
| d = diff(src, dst, src_name, dst_name) |
| if write_back == WriteBack.COLOR_DIFF: |
| d = color_diff(d) |
| f = wrap_stream_for_windows(f) |
| f.write(d) |
| f.detach() |
|
|
|
|
| def check_stability_and_equivalence( |
| src_contents: str, dst_contents: str, *, mode: Mode |
| ) -> None: |
| """Perform stability and equivalence checks. |
| |
| Raise AssertionError if source and destination contents are not |
| equivalent, or if a second pass of the formatter would format the |
| content differently. |
| """ |
| assert_equivalent(src_contents, dst_contents) |
| assert_stable(src_contents, dst_contents, mode=mode) |
|
|
|
|
| def format_file_contents(src_contents: str, *, fast: bool, mode: Mode) -> FileContent: |
| """Reformat contents of a file and return new contents. |
| |
| If `fast` is False, additionally confirm that the reformatted code is |
| valid by calling :func:`assert_equivalent` and :func:`assert_stable` on it. |
| `mode` is passed to :func:`format_str`. |
| """ |
| if mode.is_ipynb: |
| dst_contents = format_ipynb_string(src_contents, fast=fast, mode=mode) |
| else: |
| dst_contents = format_str(src_contents, mode=mode) |
| if src_contents == dst_contents: |
| raise NothingChanged |
|
|
| if not fast and not mode.is_ipynb: |
| |
| check_stability_and_equivalence(src_contents, dst_contents, mode=mode) |
| return dst_contents |
|
|
|
|
| def validate_cell(src: str, mode: Mode) -> None: |
| """Check that cell does not already contain TransformerManager transformations, |
| or non-Python cell magics, which might cause tokenizer_rt to break because of |
| indentations. |
| |
| If a cell contains ``!ls``, then it'll be transformed to |
| ``get_ipython().system('ls')``. However, if the cell originally contained |
| ``get_ipython().system('ls')``, then it would get transformed in the same way: |
| |
| >>> TransformerManager().transform_cell("get_ipython().system('ls')") |
| "get_ipython().system('ls')\n" |
| >>> TransformerManager().transform_cell("!ls") |
| "get_ipython().system('ls')\n" |
| |
| Due to the impossibility of safely roundtripping in such situations, cells |
| containing transformed magics will be ignored. |
| """ |
| if any(transformed_magic in src for transformed_magic in TRANSFORMED_MAGICS): |
| raise NothingChanged |
| if ( |
| src[:2] == "%%" |
| and src.split()[0][2:] not in PYTHON_CELL_MAGICS | mode.python_cell_magics |
| ): |
| raise NothingChanged |
|
|
|
|
| def format_cell(src: str, *, fast: bool, mode: Mode) -> str: |
| """Format code in given cell of Jupyter notebook. |
| |
| General idea is: |
| |
| - if cell has trailing semicolon, remove it; |
| - if cell has IPython magics, mask them; |
| - format cell; |
| - reinstate IPython magics; |
| - reinstate trailing semicolon (if originally present); |
| - strip trailing newlines. |
| |
| Cells with syntax errors will not be processed, as they |
| could potentially be automagics or multi-line magics, which |
| are currently not supported. |
| """ |
| validate_cell(src, mode) |
| src_without_trailing_semicolon, has_trailing_semicolon = remove_trailing_semicolon( |
| src |
| ) |
| try: |
| masked_src, replacements = mask_cell(src_without_trailing_semicolon) |
| except SyntaxError: |
| raise NothingChanged from None |
| masked_dst = format_str(masked_src, mode=mode) |
| if not fast: |
| check_stability_and_equivalence(masked_src, masked_dst, mode=mode) |
| dst_without_trailing_semicolon = unmask_cell(masked_dst, replacements) |
| dst = put_trailing_semicolon_back( |
| dst_without_trailing_semicolon, has_trailing_semicolon |
| ) |
| dst = dst.rstrip("\n") |
| if dst == src: |
| raise NothingChanged from None |
| return dst |
|
|
|
|
| def validate_metadata(nb: MutableMapping[str, Any]) -> None: |
| """If notebook is marked as non-Python, don't format it. |
| |
| All notebook metadata fields are optional, see |
| https://nbformat.readthedocs.io/en/latest/format_description.html. So |
| if a notebook has empty metadata, we will try to parse it anyway. |
| """ |
| language = nb.get("metadata", {}).get("language_info", {}).get("name", None) |
| if language is not None and language != "python": |
| raise NothingChanged from None |
|
|
|
|
| def format_ipynb_string(src_contents: str, *, fast: bool, mode: Mode) -> FileContent: |
| """Format Jupyter notebook. |
| |
| Operate cell-by-cell, only on code cells, only for Python notebooks. |
| If the ``.ipynb`` originally had a trailing newline, it'll be preserved. |
| """ |
| if not src_contents: |
| raise NothingChanged |
|
|
| trailing_newline = src_contents[-1] == "\n" |
| modified = False |
| nb = json.loads(src_contents) |
| validate_metadata(nb) |
| for cell in nb["cells"]: |
| if cell.get("cell_type", None) == "code": |
| try: |
| src = "".join(cell["source"]) |
| dst = format_cell(src, fast=fast, mode=mode) |
| except NothingChanged: |
| pass |
| else: |
| cell["source"] = dst.splitlines(keepends=True) |
| modified = True |
| if modified: |
| dst_contents = json.dumps(nb, indent=1, ensure_ascii=False) |
| if trailing_newline: |
| dst_contents = dst_contents + "\n" |
| return dst_contents |
| else: |
| raise NothingChanged |
|
|
|
|
| def format_str(src_contents: str, *, mode: Mode) -> str: |
| """Reformat a string and return new contents. |
| |
| `mode` determines formatting options, such as how many characters per line are |
| allowed. Example: |
| |
| >>> import black |
| >>> print(black.format_str("def f(arg:str='')->None:...", mode=black.Mode())) |
| def f(arg: str = "") -> None: |
| ... |
| |
| A more complex example: |
| |
| >>> print( |
| ... black.format_str( |
| ... "def f(arg:str='')->None: hey", |
| ... mode=black.Mode( |
| ... target_versions={black.TargetVersion.PY36}, |
| ... line_length=10, |
| ... string_normalization=False, |
| ... is_pyi=False, |
| ... ), |
| ... ), |
| ... ) |
| def f( |
| arg: str = '', |
| ) -> None: |
| hey |
| |
| """ |
| dst_contents = _format_str_once(src_contents, mode=mode) |
| |
| |
| |
| if src_contents != dst_contents: |
| return _format_str_once(dst_contents, mode=mode) |
| return dst_contents |
|
|
|
|
| def _format_str_once(src_contents: str, *, mode: Mode) -> str: |
| src_node = lib2to3_parse(src_contents.lstrip(), mode.target_versions) |
| dst_blocks: List[LinesBlock] = [] |
| if mode.target_versions: |
| versions = mode.target_versions |
| else: |
| future_imports = get_future_imports(src_node) |
| versions = detect_target_versions(src_node, future_imports=future_imports) |
|
|
| context_manager_features = { |
| feature |
| for feature in {Feature.PARENTHESIZED_CONTEXT_MANAGERS} |
| if supports_feature(versions, feature) |
| } |
| normalize_fmt_off(src_node) |
| lines = LineGenerator(mode=mode, features=context_manager_features) |
| elt = EmptyLineTracker(mode=mode) |
| split_line_features = { |
| feature |
| for feature in {Feature.TRAILING_COMMA_IN_CALL, Feature.TRAILING_COMMA_IN_DEF} |
| if supports_feature(versions, feature) |
| } |
| block: Optional[LinesBlock] = None |
| for current_line in lines.visit(src_node): |
| block = elt.maybe_empty_lines(current_line) |
| dst_blocks.append(block) |
| for line in transform_line( |
| current_line, mode=mode, features=split_line_features |
| ): |
| block.content_lines.append(str(line)) |
| if dst_blocks: |
| dst_blocks[-1].after = 0 |
| dst_contents = [] |
| for block in dst_blocks: |
| dst_contents.extend(block.all_lines()) |
| if not dst_contents: |
| |
| |
| normalized_content, _, newline = decode_bytes(src_contents.encode("utf-8")) |
| if "\n" in normalized_content: |
| return newline |
| return "" |
| return "".join(dst_contents) |
|
|
|
|
| def decode_bytes(src: bytes) -> Tuple[FileContent, Encoding, NewLine]: |
| """Return a tuple of (decoded_contents, encoding, newline). |
| |
| `newline` is either CRLF or LF but `decoded_contents` is decoded with |
| universal newlines (i.e. only contains LF). |
| """ |
| srcbuf = io.BytesIO(src) |
| encoding, lines = tokenize.detect_encoding(srcbuf.readline) |
| if not lines: |
| return "", encoding, "\n" |
|
|
| newline = "\r\n" if b"\r\n" == lines[0][-2:] else "\n" |
| srcbuf.seek(0) |
| with io.TextIOWrapper(srcbuf, encoding) as tiow: |
| return tiow.read(), encoding, newline |
|
|
|
|
| def get_features_used( |
| node: Node, *, future_imports: Optional[Set[str]] = None |
| ) -> Set[Feature]: |
| """Return a set of (relatively) new Python features used in this file. |
| |
| Currently looking for: |
| - f-strings; |
| - self-documenting expressions in f-strings (f"{x=}"); |
| - underscores in numeric literals; |
| - trailing commas after * or ** in function signatures and calls; |
| - positional only arguments in function signatures and lambdas; |
| - assignment expression; |
| - relaxed decorator syntax; |
| - usage of __future__ flags (annotations); |
| - print / exec statements; |
| - parenthesized context managers; |
| - match statements; |
| - except* clause; |
| - variadic generics; |
| """ |
| features: Set[Feature] = set() |
| if future_imports: |
| features |= { |
| FUTURE_FLAG_TO_FEATURE[future_import] |
| for future_import in future_imports |
| if future_import in FUTURE_FLAG_TO_FEATURE |
| } |
|
|
| for n in node.pre_order(): |
| if is_string_token(n): |
| value_head = n.value[:2] |
| if value_head in {'f"', 'F"', "f'", "F'", "rf", "fr", "RF", "FR"}: |
| features.add(Feature.F_STRINGS) |
| if Feature.DEBUG_F_STRINGS not in features: |
| for span_beg, span_end in iter_fexpr_spans(n.value): |
| if n.value[span_beg : span_end - 1].rstrip().endswith("="): |
| features.add(Feature.DEBUG_F_STRINGS) |
| break |
|
|
| elif is_number_token(n): |
| if "_" in n.value: |
| features.add(Feature.NUMERIC_UNDERSCORES) |
|
|
| elif n.type == token.SLASH: |
| if n.parent and n.parent.type in { |
| syms.typedargslist, |
| syms.arglist, |
| syms.varargslist, |
| }: |
| features.add(Feature.POS_ONLY_ARGUMENTS) |
|
|
| elif n.type == token.COLONEQUAL: |
| features.add(Feature.ASSIGNMENT_EXPRESSIONS) |
|
|
| elif n.type == syms.decorator: |
| if len(n.children) > 1 and not is_simple_decorator_expression( |
| n.children[1] |
| ): |
| features.add(Feature.RELAXED_DECORATORS) |
|
|
| elif ( |
| n.type in {syms.typedargslist, syms.arglist} |
| and n.children |
| and n.children[-1].type == token.COMMA |
| ): |
| if n.type == syms.typedargslist: |
| feature = Feature.TRAILING_COMMA_IN_DEF |
| else: |
| feature = Feature.TRAILING_COMMA_IN_CALL |
|
|
| for ch in n.children: |
| if ch.type in STARS: |
| features.add(feature) |
|
|
| if ch.type == syms.argument: |
| for argch in ch.children: |
| if argch.type in STARS: |
| features.add(feature) |
|
|
| elif ( |
| n.type in {syms.return_stmt, syms.yield_expr} |
| and len(n.children) >= 2 |
| and n.children[1].type == syms.testlist_star_expr |
| and any(child.type == syms.star_expr for child in n.children[1].children) |
| ): |
| features.add(Feature.UNPACKING_ON_FLOW) |
|
|
| elif ( |
| n.type == syms.annassign |
| and len(n.children) >= 4 |
| and n.children[3].type == syms.testlist_star_expr |
| ): |
| features.add(Feature.ANN_ASSIGN_EXTENDED_RHS) |
|
|
| elif ( |
| n.type == syms.with_stmt |
| and len(n.children) > 2 |
| and n.children[1].type == syms.atom |
| ): |
| atom_children = n.children[1].children |
| if ( |
| len(atom_children) == 3 |
| and atom_children[0].type == token.LPAR |
| and atom_children[1].type == syms.testlist_gexp |
| and atom_children[2].type == token.RPAR |
| ): |
| features.add(Feature.PARENTHESIZED_CONTEXT_MANAGERS) |
|
|
| elif n.type == syms.match_stmt: |
| features.add(Feature.PATTERN_MATCHING) |
|
|
| elif ( |
| n.type == syms.except_clause |
| and len(n.children) >= 2 |
| and n.children[1].type == token.STAR |
| ): |
| features.add(Feature.EXCEPT_STAR) |
|
|
| elif n.type in {syms.subscriptlist, syms.trailer} and any( |
| child.type == syms.star_expr for child in n.children |
| ): |
| features.add(Feature.VARIADIC_GENERICS) |
|
|
| elif ( |
| n.type == syms.tname_star |
| and len(n.children) == 3 |
| and n.children[2].type == syms.star_expr |
| ): |
| features.add(Feature.VARIADIC_GENERICS) |
|
|
| return features |
|
|
|
|
| def detect_target_versions( |
| node: Node, *, future_imports: Optional[Set[str]] = None |
| ) -> Set[TargetVersion]: |
| """Detect the version to target based on the nodes used.""" |
| features = get_features_used(node, future_imports=future_imports) |
| return { |
| version for version in TargetVersion if features <= VERSION_TO_FEATURES[version] |
| } |
|
|
|
|
| def get_future_imports(node: Node) -> Set[str]: |
| """Return a set of __future__ imports in the file.""" |
| imports: Set[str] = set() |
|
|
| def get_imports_from_children(children: List[LN]) -> Generator[str, None, None]: |
| for child in children: |
| if isinstance(child, Leaf): |
| if child.type == token.NAME: |
| yield child.value |
|
|
| elif child.type == syms.import_as_name: |
| orig_name = child.children[0] |
| assert isinstance(orig_name, Leaf), "Invalid syntax parsing imports" |
| assert orig_name.type == token.NAME, "Invalid syntax parsing imports" |
| yield orig_name.value |
|
|
| elif child.type == syms.import_as_names: |
| yield from get_imports_from_children(child.children) |
|
|
| else: |
| raise AssertionError("Invalid syntax parsing imports") |
|
|
| for child in node.children: |
| if child.type != syms.simple_stmt: |
| break |
|
|
| first_child = child.children[0] |
| if isinstance(first_child, Leaf): |
| |
| if ( |
| len(child.children) == 2 |
| and first_child.type == token.STRING |
| and child.children[1].type == token.NEWLINE |
| ): |
| continue |
|
|
| break |
|
|
| elif first_child.type == syms.import_from: |
| module_name = first_child.children[1] |
| if not isinstance(module_name, Leaf) or module_name.value != "__future__": |
| break |
|
|
| imports |= set(get_imports_from_children(first_child.children[3:])) |
| else: |
| break |
|
|
| return imports |
|
|
|
|
| def assert_equivalent(src: str, dst: str) -> None: |
| """Raise AssertionError if `src` and `dst` aren't equivalent.""" |
| try: |
| src_ast = parse_ast(src) |
| except Exception as exc: |
| raise AssertionError( |
| "cannot use --safe with this file; failed to parse source file AST: " |
| f"{exc}\n" |
| "This could be caused by running Black with an older Python version " |
| "that does not support new syntax used in your source file." |
| ) from exc |
|
|
| try: |
| dst_ast = parse_ast(dst) |
| except Exception as exc: |
| log = dump_to_file("".join(traceback.format_tb(exc.__traceback__)), dst) |
| raise AssertionError( |
| f"INTERNAL ERROR: Black produced invalid code: {exc}. " |
| "Please report a bug on https://github.com/psf/black/issues. " |
| f"This invalid output might be helpful: {log}" |
| ) from None |
|
|
| src_ast_str = "\n".join(stringify_ast(src_ast)) |
| dst_ast_str = "\n".join(stringify_ast(dst_ast)) |
| if src_ast_str != dst_ast_str: |
| log = dump_to_file(diff(src_ast_str, dst_ast_str, "src", "dst")) |
| raise AssertionError( |
| "INTERNAL ERROR: Black produced code that is not equivalent to the" |
| " source. Please report a bug on " |
| f"https://github.com/psf/black/issues. This diff might be helpful: {log}" |
| ) from None |
|
|
|
|
| def assert_stable(src: str, dst: str, mode: Mode) -> None: |
| """Raise AssertionError if `dst` reformats differently the second time.""" |
| |
| |
| |
| newdst = _format_str_once(dst, mode=mode) |
| if dst != newdst: |
| log = dump_to_file( |
| str(mode), |
| diff(src, dst, "source", "first pass"), |
| diff(dst, newdst, "first pass", "second pass"), |
| ) |
| raise AssertionError( |
| "INTERNAL ERROR: Black produced different code on the second pass of the" |
| " formatter. Please report a bug on https://github.com/psf/black/issues." |
| f" This diff might be helpful: {log}" |
| ) from None |
|
|
|
|
| @contextmanager |
| def nullcontext() -> Iterator[None]: |
| """Return an empty context manager. |
| |
| To be used like `nullcontext` in Python 3.7. |
| """ |
| yield |
|
|
|
|
| def patch_click() -> None: |
| """Make Click not crash on Python 3.6 with LANG=C. |
| |
| On certain misconfigured environments, Python 3 selects the ASCII encoding as the |
| default which restricts paths that it can access during the lifetime of the |
| application. Click refuses to work in this scenario by raising a RuntimeError. |
| |
| In case of Black the likelihood that non-ASCII characters are going to be used in |
| file paths is minimal since it's Python source code. Moreover, this crash was |
| spurious on Python 3.7 thanks to PEP 538 and PEP 540. |
| """ |
| modules: List[Any] = [] |
| try: |
| from click import core |
| except ImportError: |
| pass |
| else: |
| modules.append(core) |
| try: |
| |
| |
| from click import _unicodefun |
| except ImportError: |
| pass |
| else: |
| modules.append(_unicodefun) |
|
|
| for module in modules: |
| if hasattr(module, "_verify_python3_env"): |
| module._verify_python3_env = lambda: None |
| if hasattr(module, "_verify_python_env"): |
| module._verify_python_env = lambda: None |
|
|
|
|
| def patched_main() -> None: |
| |
| |
| if getattr(sys, "frozen", False): |
| from multiprocessing import freeze_support |
|
|
| freeze_support() |
|
|
| patch_click() |
| main() |
|
|
|
|
| if __name__ == "__main__": |
| patched_main() |
|
|