Spaces:
Sleeping
Sleeping
# | |
# Copyright (c) Microsoft. All rights reserved. | |
# Licensed under the MIT license. See LICENSE.md file in the project root for full license information. | |
# | |
# Notes: | |
# - Install the Speech SDK. Run: | |
# pip install azure-cognitiveservices-speech | |
# - The Python Speech SDK on Windows requires the Microsoft Visual C++ Redistributable for Visual Studio 2015, 2017, 2019, or 2022 on the system. See: | |
# https://docs.microsoft.com/azure/cognitive-services/speech-service/quickstarts/setup-platform | |
# - Install gstreamer: | |
# https://docs.microsoft.com/azure/cognitive-services/speech-service/how-to-use-codec-compressed-audio-input-streams | |
from datetime import datetime, time, timezone, timedelta | |
from itertools import groupby, pairwise | |
from os import linesep, remove, environ | |
from os.path import exists | |
from pathlib import Path | |
from sys import argv | |
from time import sleep | |
from typing import Any, List, Optional | |
import wave | |
import azure.cognitiveservices.speech as speechsdk # type: ignore | |
from . import caption_helper | |
from . import helper | |
from . import user_config_helper | |
USAGE = """Usage: python captioning.py [...] | |
HELP | |
--help Show this help and stop. | |
CONNECTION | |
--key KEY Your Azure Speech service resource key. | |
Overrides the SPEECH_KEY environment variable. You must set the environment variable (recommended) or use the `--key` option. | |
--region REGION Your Azure Speech service region. | |
Overrides the SPEECH_REGION environment variable. You must set the environment variable (recommended) or use the `--region` option. | |
Examples: westus, eastus | |
LANGUAGE | |
--language LANG1 Specify language. This is used when breaking captions into lines. | |
Default value is en-US. | |
Examples: en-US, ja-JP | |
INPUT | |
--input FILE Input audio from file (default input is the microphone.) | |
--format FORMAT Use compressed audio format. | |
If this is not present, uncompressed format (wav) is assumed. | |
Valid only with --file. | |
Valid values: alaw, any, flac, mp3, mulaw, ogg_opus | |
MODE | |
--offline Output offline results. | |
Overrides --realTime. | |
--realTime Output real-time results. | |
Default output mode is offline. | |
ACCURACY | |
--phrases ""PHRASE1;PHRASE2"" Example: ""Constoso;Jessie;Rehaan"" | |
OUTPUT | |
--output FILE Output captions to FILE. | |
--srt Output captions in SubRip Text format (default format is WebVTT.) | |
--maxLineLength LENGTH Set the maximum number of characters per line for a caption to LENGTH. | |
Minimum is 20. Default is 37 (30 for Chinese). | |
--lines LINES Set the number of lines for a caption to LINES. | |
Minimum is 1. Default is 2. | |
--delay MILLISECONDS How many MILLISECONDS to delay the appearance of each caption. | |
Minimum is 0. Default is 1000. | |
--remainTime MILLISECONDS How many MILLISECONDS a caption should remain on screen if it is not replaced by another. | |
Minimum is 0. Default is 1000. | |
--quiet Suppress console output, except errors. | |
--profanity OPTION Valid values: raw, remove, mask | |
Default is mask. | |
--threshold NUMBER Set stable partial result threshold. | |
Default is 3. | |
""" | |
class Captioning(object) : | |
def __init__(self, language, input_audio, output) : | |
# self._user_config = user_config_helper.user_config_from_args(USAGE) | |
self._user_config = { | |
"language": language, | |
"captioning_mode": user_config_helper.CaptioningMode.OFFLINE, # or OFFLINE if you prefer offline mode | |
"input_file": input_audio, | |
"output_file": output, | |
"use_sub_rip_text_caption_format": True, | |
"use_compressed_audio": False, | |
"compressed_audio_format": speechsdk.AudioStreamContainerFormat.ANY, | |
"subscription_key" : environ.get("SPEECH_KEY"), | |
"region" : environ.get("SPEECH_REGION"), | |
"profanity_option" : speechsdk.ProfanityOption.Masked, | |
"phrases" : "Constoso;Jessie;Rehaan", | |
"suppress_console_output" : True, | |
"remain_time" : timedelta(milliseconds=1000), | |
"delay" : timedelta(milliseconds=1000), | |
"max_line_length" : helper.DEFAULT_MAX_LINE_LENGTH_SBCS, | |
"lines" : 2, | |
"stable_partial_result_threshold" : "3", | |
} | |
self._srt_sequence_number = 1 | |
self._previous_caption : Optional[caption_helper.Caption] = None | |
self._previous_end_time : Optional[time] = None | |
self._previous_result_is_recognized = False | |
self._recognized_lines : List[str] = [] | |
self._offline_results : List[speechsdk.SpeechRecognitionResult] = [] | |
def get_timestamp(self, start : time, end : time) -> str : | |
time_format = "" | |
if self._user_config["use_sub_rip_text_caption_format"] : | |
# SRT format requires ',' as decimal separator rather than '.'. | |
time_format = "%H:%M:%S,%f" | |
else : | |
time_format = "%H:%M:%S.%f" | |
# Truncate microseconds to milliseconds. | |
return "{} --> {}".format(start.strftime(time_format)[:-3], end.strftime(time_format)[:-3]) | |
def string_from_caption(self, caption : caption_helper.Caption) -> str : | |
retval = "" | |
if self._user_config["use_sub_rip_text_caption_format"] : | |
retval += str(caption.sequence) + linesep | |
retval += self.get_timestamp(caption.begin, caption.end) + linesep | |
retval += caption.text + linesep + linesep | |
return retval | |
def adjust_real_time_caption_text(self, text : str, is_recognized_result : bool) -> str : | |
# Split the caption text into multiple lines based on max_line_length and lines. | |
temp_caption_helper = caption_helper.CaptionHelper(self._user_config["language"], self._user_config["max_line_length"], self._user_config["lines"], []) | |
lines = temp_caption_helper.lines_from_text(text) | |
# Recognizing results can change with each new result, so we do not save previous Recognizing results. | |
# Recognized results are final, so we save them in a member value. | |
recognizing_lines : List[str] = [] | |
if is_recognized_result : | |
self._recognized_lines = self._recognized_lines + lines | |
else : | |
recognizing_lines = lines | |
caption_lines = self._recognized_lines + recognizing_lines | |
return '\n'.join(caption_lines[-self._user_config["lines"]:]) | |
def caption_from_real_time_result(self, result : speechsdk.SpeechRecognitionResult, is_recognized_result : bool) -> Optional[str] : | |
retval : Optional[str] = None | |
start_time = helper.time_from_ticks(result.offset) | |
end_time = helper.time_from_ticks(result.offset + result.duration) | |
# If the end timestamp for the previous result is later | |
# than the end timestamp for this result, drop the result. | |
# This sometimes happens when we receive a lot of Recognizing results close together. | |
if self._previous_end_time is not None and self._previous_end_time > end_time : | |
pass | |
else : | |
# Record the end timestamp for this result. | |
self._previous_end_time = end_time | |
# Convert the SpeechRecognitionResult to a caption. | |
# We are not ready to set the text for this caption. | |
# First we need to determine whether to clear _recognizedLines. | |
caption = caption_helper.Caption(self._user_config["language"], self._srt_sequence_number, helper.add_time_and_timedelta(start_time, self._user_config["delay"]), helper.add_time_and_timedelta(end_time, self._user_config["delay"]), "") | |
# Increment the sequence number. | |
self._srt_sequence_number += 1 | |
# If we have a previous caption... | |
if self._previous_caption is not None : | |
# If the previous result was type Recognized... | |
if self._previous_result_is_recognized : | |
# Set the end timestamp for the previous caption to the earliest of: | |
# - The end timestamp for the previous caption plus the remain time. | |
# - The start timestamp for the current caption. | |
previous_end = helper.add_time_and_timedelta(self._previous_caption.end, self._user_config["remain_time"]) | |
self._previous_caption.end = previous_end if previous_end < caption.begin else caption.begin | |
# If the gap between the original end timestamp for the previous caption | |
# and the start timestamp for the current caption is larger than remainTime, | |
# clear the cached recognized lines. | |
# Note this needs to be done before we call AdjustRealTimeCaptionText | |
# for the current caption, because it uses _recognizedLines. | |
if previous_end < caption.begin : | |
self._recognized_lines.clear() | |
# If the previous result was type Recognizing, simply set the start timestamp | |
# for the current caption to the end timestamp for the previous caption. | |
# Note this presumes there will not be a large gap between Recognizing results, | |
# because such a gap would cause the previous Recognizing result to be succeeded | |
# by a Recognized result. | |
else : | |
caption.begin = self._previous_caption.end | |
retval = self.string_from_caption(self._previous_caption) | |
# Break the caption text into lines if needed. | |
caption.text = self.adjust_real_time_caption_text(result.text, is_recognized_result) | |
# Save the current caption as the previous caption. | |
self._previous_caption = caption | |
# Save the result type as the previous result type. | |
self._previous_result_is_recognized = is_recognized_result | |
return retval | |
def captions_from_offline_results(self) -> List[caption_helper.Caption] : | |
captions = caption_helper.get_captions(self._user_config["language"], self._user_config["max_line_length"], self._user_config["lines"], list(self._offline_results)) | |
# Save the last caption. | |
last_caption = captions[-1] | |
last_caption.end = helper.add_time_and_timedelta(last_caption.end, self._user_config["remain_time"]) | |
# In offline mode, all captions come from RecognitionResults of type Recognized. | |
# Set the end timestamp for each caption to the earliest of: | |
# - The end timestamp for this caption plus the remain time. | |
# - The start timestamp for the next caption. | |
captions_2 : List[caption_helper.Caption] = [] | |
for (caption_1, caption_2) in pairwise(captions) : | |
end = helper.add_time_and_timedelta(caption_1.end, self._user_config["remain_time"]) | |
caption_1.end = end if end < caption_2.begin else caption_2.begin | |
captions_2.append(caption_1) | |
# Re-add the last caption. | |
captions_2.append(last_caption) | |
return captions_2 | |
def finish(self) -> None : | |
if user_config_helper.CaptioningMode.OFFLINE == self._user_config["captioning_mode"] : | |
for caption in self.captions_from_offline_results() : | |
helper.write_to_console_or_file(text=self.string_from_caption(caption), user_config=self._user_config) | |
elif user_config_helper.CaptioningMode.REALTIME == self._user_config["captioning_mode"] : | |
# Show the last "previous" caption, which is actually the last caption. | |
if self._previous_caption is not None : | |
self._previous_caption.end = helper.add_time_and_timedelta(self._previous_caption.end, self._user_config["remain_time"]) | |
helper.write_to_console_or_file(text=self.string_from_caption(self._previous_caption), user_config=self._user_config) | |
def initialize(self) : | |
if self._user_config["output_file"] is not None and exists(self._user_config["output_file"]) : | |
remove(self._user_config["output_file"]) | |
if not self._user_config["use_sub_rip_text_caption_format"] : | |
helper.write_to_console_or_file(text="WEBVTT{}{}".format(linesep, linesep), user_config=self._user_config) | |
return | |
def audio_config_from_user_config(self) -> helper.Read_Only_Dict : | |
if self._user_config["input_file"] is None : | |
return helper.Read_Only_Dict({ | |
"audio_config" : speechsdk.AudioConfig(use_default_microphone=True), | |
"audio_stream_format" : None, | |
"pull_input_audio_stream_callback" : None, | |
"pull_input_audio_stream" : None | |
}); | |
else : | |
audio_stream_format = None | |
if not self._user_config["use_compressed_audio"] : | |
reader = wave.open(self._user_config["input_file"], mode=None) | |
audio_stream_format = speechsdk.audio.AudioStreamFormat(samples_per_second=reader.getframerate(), bits_per_sample=reader.getsampwidth() * 8, channels=reader.getnchannels()) | |
reader.close() | |
else : | |
audio_stream_format = speechsdk.audio.AudioStreamFormat(compressed_stream_format=self._user_config["compressed_audio_format"]) | |
callback = helper.BinaryFileReaderCallback(filename=self._user_config["input_file"]) | |
stream = speechsdk.audio.PullAudioInputStream(pull_stream_callback=callback, stream_format=audio_stream_format) | |
# We return the BinaryFileReaderCallback, AudioStreamFormat, and PullAudioInputStream | |
# because we need to keep them in scope until they are actually used. | |
return helper.Read_Only_Dict({ | |
"audio_config" : speechsdk.audio.AudioConfig(stream=stream), | |
"audio_stream_format" : audio_stream_format, | |
"pull_input_audio_stream_callback" : callback, | |
"pull_input_audio_stream" : stream, | |
}) | |
def speech_config_from_user_config(self) -> speechsdk.SpeechConfig : | |
speech_config = None | |
speech_config = speechsdk.SpeechConfig(subscription=self._user_config["subscription_key"], region=self._user_config["region"]) | |
speech_config.set_profanity(self._user_config["profanity_option"]) | |
if self._user_config["stable_partial_result_threshold"] is not None : | |
speech_config.set_property(property_id=speechsdk.PropertyId.SpeechServiceResponse_StablePartialResultThreshold, value=self._user_config["stable_partial_result_threshold"]) | |
speech_config.set_property(property_id=speechsdk.PropertyId.SpeechServiceResponse_PostProcessingOption, value="TrueText") | |
speech_config.speech_recognition_language=self._user_config["language"] | |
return speech_config | |
def speech_recognizer_from_user_config(self) -> helper.Read_Only_Dict : | |
audio_config_data = self.audio_config_from_user_config() | |
speech_config = self.speech_config_from_user_config() | |
speech_recognizer = speechsdk.SpeechRecognizer(speech_config=speech_config, audio_config=audio_config_data["audio_config"]) | |
if len(self._user_config["phrases"]) > 0 : | |
grammar = speechsdk.PhraseListGrammar.from_recognizer(recognizer=speech_recognizer) | |
for phrase in self._user_config["phrases"] : | |
grammar.addPhrase(phrase) | |
return helper.Read_Only_Dict({ | |
"speech_recognizer" : speech_recognizer, | |
"audio_stream_format" : audio_config_data["audio_stream_format"], | |
"pull_input_audio_stream_callback" : audio_config_data["pull_input_audio_stream_callback"], | |
"pull_input_audio_stream" : audio_config_data["pull_input_audio_stream"], | |
}) | |
def recognize_continuous(self, speech_recognizer : speechsdk.SpeechRecognizer, format : speechsdk.audio.AudioStreamFormat, callback : helper.BinaryFileReaderCallback, stream : speechsdk.audio.PullAudioInputStream) : | |
done = False | |
def recognizing_handler(e : speechsdk.SpeechRecognitionEventArgs) : | |
if speechsdk.ResultReason.RecognizingSpeech == e.result.reason and len(e.result.text) > 0 : | |
# This seems to be the only way we can get information about | |
# exceptions raised inside an event handler. | |
try : | |
caption = self.caption_from_real_time_result(e.result, False) | |
if caption is not None : | |
helper.write_to_console_or_file(text=caption, user_config=self._user_config) | |
except Exception as ex : | |
print('Exception in recognizing_handler: {}'.format(ex)) | |
elif speechsdk.ResultReason.NoMatch == e.result.reason : | |
helper.write_to_console(text="NOMATCH: Speech could not be recognized.{}".format(linesep), user_config=self._user_config) | |
def recognized_handler(e : speechsdk.SpeechRecognitionEventArgs) : | |
if speechsdk.ResultReason.RecognizedSpeech == e.result.reason and len(e.result.text) > 0 : | |
try : | |
if user_config_helper.CaptioningMode.OFFLINE == self._user_config["captioning_mode"] : | |
self._offline_results.append(e.result) | |
else : | |
caption = self.caption_from_real_time_result(e.result, True) | |
if caption is not None : | |
helper.write_to_console_or_file(text=caption, user_config=self._user_config) | |
except Exception as ex : | |
print('Exception in recognized_handler: {}'.format(ex)) | |
elif speechsdk.ResultReason.NoMatch == e.result.reason : | |
helper.write_to_console(text="NOMATCH: Speech could not be recognized.{}".format(linesep), user_config=self._user_config) | |
def canceled_handler(e : speechsdk.SpeechRecognitionCanceledEventArgs) : | |
nonlocal done | |
# Notes: | |
# SpeechRecognitionCanceledEventArgs inherits the result property from SpeechRecognitionEventArgs. See: | |
# https://docs.microsoft.com/python/api/azure-cognitiveservices-speech/azure.cognitiveservices.speech.speechrecognitioncanceledeventargs | |
# https://docs.microsoft.com/python/api/azure-cognitiveservices-speech/azure.cognitiveservices.speech.speechrecognitioneventargs | |
# result is type SpeechRecognitionResult, which inherits the reason property from RecognitionResult. See: | |
# https://docs.microsoft.com/python/api/azure-cognitiveservices-speech/azure.cognitiveservices.speech.speechrecognitionresult | |
# https://docs.microsoft.com/python/api/azure-cognitiveservices-speech/azure.cognitiveservices.speech.recognitionresult | |
# e.result.reason is ResultReason.Canceled. To get the cancellation reason, see e.cancellation_details.reason. | |
if speechsdk.CancellationReason.EndOfStream == e.cancellation_details.reason : | |
helper.write_to_console(text="End of stream reached.{}".format(linesep), user_config=self._user_config) | |
done = True | |
elif speechsdk.CancellationReason.CancelledByUser == e.cancellation_details.reason : | |
helper.write_to_console(text="User canceled request.{}".format(linesep), user_config=self._user_config) | |
done = True | |
elif speechsdk.CancellationReason.Error == e.cancellation_details.reason : | |
# Error output should not be suppressed, even if suppress output flag is set. | |
print("Encountered error. Cancellation details: {}{}".format(e.cancellation_details, linesep)) | |
done = True | |
else : | |
print("Request was cancelled for an unrecognized reason. Cancellation details: {}{}".format(e.cancellation_details, linesep)) | |
done = True | |
def stopped_handler(e : speechsdk.SessionEventArgs) : | |
nonlocal done | |
helper.write_to_console(text="Session stopped.{}".format(linesep), user_config=self._user_config) | |
done = True | |
# We only use Recognizing results in real-time mode. | |
if user_config_helper.CaptioningMode.REALTIME == self._user_config["captioning_mode"] : | |
speech_recognizer.recognizing.connect(recognizing_handler) | |
speech_recognizer.recognized.connect(recognized_handler) | |
speech_recognizer.session_stopped.connect(stopped_handler) | |
speech_recognizer.canceled.connect(canceled_handler) | |
speech_recognizer.start_continuous_recognition() | |
while not done : | |
sleep(5) | |
speech_recognizer.stop_continuous_recognition() | |
return | |
def generate_sub(language, input_file, output_file) : | |
captioning = Captioning(language=language, input_audio=input_file, output=output_file) | |
captioning.initialize() | |
speech_recognizer_data = captioning.speech_recognizer_from_user_config() | |
captioning.recognize_continuous(speech_recognizer=speech_recognizer_data["speech_recognizer"], format=speech_recognizer_data["audio_stream_format"], callback=speech_recognizer_data["pull_input_audio_stream_callback"], stream=speech_recognizer_data["pull_input_audio_stream"]) | |
captioning.finish() |