File size: 2,975 Bytes
d5d7329
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
from __future__ import annotations

from logging import getLogger
from pathlib import Path

import keyboard
import librosa
import sounddevice as sd
import soundfile as sf
from rich.console import Console
from tqdm.rich import tqdm

LOG = getLogger(__name__)


def preprocess_classify(
    input_dir: Path | str, output_dir: Path | str, create_new: bool = True
) -> None:
    # paths
    input_dir_ = Path(input_dir)
    output_dir_ = Path(output_dir)
    speed = 1
    if not input_dir_.is_dir():
        raise ValueError(f"{input_dir} is not a directory.")
    output_dir_.mkdir(exist_ok=True)

    console = Console()
    # get audio paths and folders
    audio_paths = list(input_dir_.glob("*.*"))
    last_folders = [x for x in output_dir_.glob("*") if x.is_dir()]
    console.print("Press ↑ or ↓ to change speed. Press any other key to classify.")
    console.print(f"Folders: {[x.name for x in last_folders]}")

    pbar_description = ""

    pbar = tqdm(audio_paths)
    for audio_path in pbar:
        # read file
        audio, sr = sf.read(audio_path)

        # update description
        duration = librosa.get_duration(y=audio, sr=sr)
        pbar_description = f"{duration:.1f} {pbar_description}"
        pbar.set_description(pbar_description)

        while True:
            # start playing
            sd.play(librosa.effects.time_stretch(audio, rate=speed), sr, loop=True)

            # wait for key press
            key = str(keyboard.read_key())
            if key == "down":
                speed /= 1.1
                console.print(f"Speed: {speed:.2f}")
            elif key == "up":
                speed *= 1.1
                console.print(f"Speed: {speed:.2f}")
            else:
                break

            # stop playing
            sd.stop()

        # print if folder changed
        folders = [x for x in output_dir_.glob("*") if x.is_dir()]
        if folders != last_folders:
            console.print(f"Folders updated: {[x.name for x in folders]}")
            last_folders = folders

        # get folder
        folder_candidates = [x for x in folders if x.name.startswith(key)]
        if len(folder_candidates) == 0:
            if create_new:
                folder = output_dir_ / key
            else:
                console.print(f"No folder starts with {key}.")
                continue
        else:
            if len(folder_candidates) > 1:
                LOG.warning(
                    f"Multiple folders ({[x.name for x in folder_candidates]}) start with {key}. "
                    f"Using first one ({folder_candidates[0].name})."
                )
            folder = folder_candidates[0]
        folder.mkdir(exist_ok=True)

        # move file
        new_path = folder / audio_path.name
        audio_path.rename(new_path)

        # update description
        pbar_description = f"Last: {audio_path.name} -> {folder.name}"

        # yield result
        # yield audio_path, key, folder, new_path