File size: 4,224 Bytes
ffd2caa
 
 
 
5a8c370
 
 
368eb36
 
 
5a8c370
14a17dc
 
 
 
 
 
 
 
 
 
 
5a8c370
 
4316137
5a8c370
 
 
 
 
368eb36
ffd2caa
5a8c370
 
 
 
 
 
 
 
14a17dc
 
40b32ea
14a17dc
 
ffd2caa
5a8c370
ffd2caa
66e2d43
 
 
 
 
 
 
 
 
 
 
 
ffd2caa
 
5a8c370
ffd2caa
 
 
 
66e2d43
ffd2caa
 
 
 
 
 
 
 
 
 
 
 
5a8c370
 
 
 
40b32ea
5a8c370
 
 
 
 
 
 
4316137
 
5a8c370
 
 
 
 
 
 
ffd2caa
5a8c370
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import os
import requests
import shutil
import subprocess
from pydub import AudioSegment
import whisper
from speechbrain.pretrained.interfaces import foreign_class
import torch

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def clear_tmp_dir(path):
    for filename in os.listdir(path):
        file_path = os.path.join(path, filename)
        try:
            if os.path.isfile(file_path) or os.path.islink(file_path):
                os.unlink(file_path)
            elif os.path.isdir(file_path):
                shutil.rmtree(file_path)
        except Exception as e:
            print(f'Failed to delete {file_path}. Reason: {e}')

class AccentAnalyzerTool:
    def __init__(self):
        #self.whisper_model = whisper.load_model("tiny", device = device)
        self.accent_model = foreign_class(
            source="Jzuluaga/accent-id-commonaccent_xlsr-en-english",
            pymodule_file="custom_interface.py",
            classname="CustomEncoderWav2vec2Classifier"
        )
        self.accent_model.device = torch.device(device)
        self.last_transcript = None

    def log(self, msg):
        print(f"[AccentAnalyzerTool] {msg}")

    def analyze(self, url: str) -> str:
        try:
            self.log("Downloading video...")
            tmp_dir = "tmp"
            if not os.path.exists(tmp_dir):
                os.makedirs(tmp_dir, exist_ok=True)
                os.chmod(tmp_dir, 0o777)
            else:
                clear_tmp_dir(tmp_dir)

            video_path = os.path.join(tmp_dir, "video.mp4")

            headers = {
                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                              "AppleWebKit/537.36 (KHTML, like Gecko) "
                              "Chrome/114.0.0.0 Safari/537.36",
                "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,"
                          "image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
                "Accept-Language": "en-US,en;q=0.9",
                "Referer": "https://www.youtube.com/",
                "Connection": "keep-alive",
                "DNT": "1",
            }
            
            r = requests.get(url, headers=headers, stream=True)
            r.raise_for_status()
            with open(video_path, "wb") as f:
                for chunk in r.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)


            file_size = os.path.getsize(video_path)
            self.log(f"Downloaded video size: {file_size} bytes")
            if file_size < 1000:
                raise ValueError("Downloaded video file is too small or invalid")

            # Debug with ffprobe to check video validity
            ffprobe_cmd = ["ffprobe", "-v", "error", "-show_format", "-show_streams", video_path]
            try:
                output = subprocess.check_output(ffprobe_cmd, stderr=subprocess.STDOUT).decode()
                self.log(f"ffprobe output:\n{output}")
            except subprocess.CalledProcessError as e:
                self.log(f"ffprobe error:\n{e.output.decode()}")

            self.log("Extracting audio...")
            audio_path = os.path.join(tmp_dir, "audio.wav")
            AudioSegment.from_file(video_path).export(audio_path, format="wav")
            os.chmod(audio_path, 0o666)  

            self.log("Classifying accent...")
            _, score, _, label = self.accent_model.classify_file(audio_path)
            accent = label[0].upper() if label[0] == 'us' else label[0].capitalize()
            confidence = round(float(score) * 100, 2)

            self.log("Transcribing...")
            whisper_model = whisper.load_model("tiny", device = device)
            transcript = whisper_model.transcribe(audio_path)["text"]
            self.last_transcript = transcript

            summary = (
                f"The speaker has a **{accent} English accent** "
                f"with **{confidence}% confidence**.\n\n"
                f"**Transcript of the audio:**\n\n *{transcript.strip(' ')}*"
            )

            return summary

        except Exception as e:
            return f"Error analyzing accent: {str(e)}"