File size: 2,795 Bytes
bfb5aad
 
 
 
 
 
 
 
 
 
 
dc3a007
bfb5aad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dc3a007
bfb5aad
 
 
 
 
 
 
 
 
 
 
 
dc3a007
bfb5aad
 
 
dc3a007
bfb5aad
 
 
 
dc3a007
bfb5aad
 
 
 
 
 
 
 
dc3a007
bfb5aad
 
 
 
dc3a007
bfb5aad
dc3a007
bfb5aad
 
 
 
 
 
 
dc3a007
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import requests
import time
from scipy.io.wavfile import write
import io


upload_endpoint = "https://api.assemblyai.com/v2/upload"
transcript_endpoint = "https://api.assemblyai.com/v2/transcript"


def make_header(api_key):
    return {"authorization": api_key, "content-type": "application/json"}


def _read_file(filename, chunk_size=5242880):
    """Reads the file in chunks. Helper for `upload_file()`"""
    with open(filename, "rb") as f:
        while True:
            data = f.read(chunk_size)
            if not data:
                break
            yield data


def _read_array(audio, chunk_size=5242880):
    """Like _read_file but for array - creates temporary unsaved "file" from sample rate and audio np.array"""
    sr, aud = audio

    # Create temporary "file" and write data to it
    bytes_wav = bytes()
    temp_file = io.BytesIO(bytes_wav)
    write(temp_file, sr, aud)

    while True:
        data = temp_file.read(chunk_size)
        if not data:
            break
        yield data


def upload_file(audio_file, header, is_file=True):
    """Uploads a file to AssemblyAI"""
    upload_response = requests.post(
        upload_endpoint,
        headers=header,
        data=_read_file(audio_file) if is_file else _read_array(audio_file),
    )
    if upload_response.status_code != 200:
        upload_response.raise_for_status()
    # Returns {'upload_url': <URL>}
    return upload_response.json()


def request_transcript(upload_url, header):
    """Requests a transcript from AssemblyAI"""

    # If input is a dict returned from `upload_file` rather than a raw upload_url string
    if type(upload_url) is dict:
        upload_url = upload_url["upload_url"]

    # Create request
    transcript_request = {
        "audio_url": upload_url,
    }

    # POST request
    transcript_response = requests.post(
        transcript_endpoint, json=transcript_request, headers=header
    )

    return transcript_response.json()


def wait_for_completion(transcript_id, header):
    """Given a polling endpoint, waits for the transcription/audio analysis to complete"""
    polling_endpoint = "https://api.assemblyai.com/v2/transcript/" + transcript_id

    while True:
        polling_response = requests.get(polling_endpoint, headers=header)
        polling_response = polling_response.json()

        if polling_response["status"] == "completed":
            return polling_response, None
        elif polling_response["status"] == "error":
            return None, f"Error: {polling_response['error']}"

        time.sleep(5)


def make_paragraphs_string(transc_id, header):
    endpoint = transcript_endpoint + "/" + transc_id + "/paragraphs"
    paras = requests.get(endpoint, headers=header).json()["paragraphs"]
    return "\n\n".join(i["text"] for i in paras)