Spaces:
Runtime error
Runtime error
import requests | |
import time | |
from scipy.io.wavfile import write | |
import io | |
upload_endpoint = "https://api.assemblyai.com/v2/upload" | |
transcript_endpoint = "https://api.assemblyai.com/v2/transcript" | |
def make_header(api_key): | |
return { | |
'authorization': api_key, | |
'content-type': 'application/json' | |
} | |
def _read_file(filename, chunk_size=5242880): | |
"""Reads the file in chunks. Helper for `upload_file()`""" | |
with open(filename, "rb") as f: | |
while True: | |
data = f.read(chunk_size) | |
if not data: | |
break | |
yield data | |
def _read_array(audio, chunk_size=5242880): | |
"""Like _read_file but for array - creates temporary unsaved "file" from sample rate and audio np.array""" | |
sr, aud = audio | |
# Create temporary "file" and write data to it | |
bytes_wav = bytes() | |
temp_file = io.BytesIO(bytes_wav) | |
write(temp_file, sr, aud) | |
while True: | |
data = temp_file.read(chunk_size) | |
if not data: | |
break | |
yield data | |
def upload_file(audio_file, header, is_file=True): | |
"""Uploads a file to AssemblyAI""" | |
upload_response = requests.post( | |
upload_endpoint, | |
headers=header, | |
data=_read_file(audio_file) if is_file else _read_array(audio_file) | |
) | |
if upload_response.status_code != 200: | |
upload_response.raise_for_status() | |
# Returns {'upload_url': <URL>} | |
return upload_response.json() | |
def request_transcript(upload_url, header): | |
"""Requests a transcript from AssemblyAI""" | |
# If input is a dict returned from `upload_file` rather than a raw upload_url string | |
if type(upload_url) is dict: | |
upload_url = upload_url['upload_url'] | |
# Create request | |
transcript_request = { | |
'audio_url': upload_url, | |
} | |
# POST request | |
transcript_response = requests.post( | |
transcript_endpoint, | |
json=transcript_request, | |
headers=header | |
) | |
return transcript_response.json() | |
def wait_for_completion(transcript_id, header): | |
"""Given a polling endpoint, waits for the transcription/audio analysis to complete""" | |
polling_endpoint = "https://api.assemblyai.com/v2/transcript/" + transcript_id | |
while True: | |
polling_response = requests.get(polling_endpoint, headers=header) | |
polling_response = polling_response.json() | |
if polling_response['status'] == 'completed': | |
return polling_response, None | |
elif polling_response['status'] == 'error': | |
return None, f"Error: {polling_response['error']}" | |
time.sleep(5) | |
def make_paragraphs_string(transc_id, header): | |
endpoint = transcript_endpoint + "/" + transc_id + "/paragraphs" | |
paras = requests.get(endpoint, headers=header).json()['paragraphs'] | |
return '\n\n'.join(i['text'] for i in paras) | |