|
import pandas as pd |
|
import zipfile |
|
import base64 |
|
import os |
|
import requests |
|
import re |
|
from pydub import AudioSegment |
|
import json |
|
|
|
def remove_quotes(text): |
|
|
|
return text.replace('"', '') |
|
|
|
def text_to_speech(input_file, selected_option): |
|
|
|
api_key = 'AIzaSyBqvy0wRyZVapYyZ_GfTlRpQwrwj9YqYaw' |
|
data = pd.read_csv(input_file) |
|
zip_path = 'output_audio_files.zip' |
|
error_log = [] |
|
|
|
with zipfile.ZipFile(zip_path, 'w') as z: |
|
for idx, row in data.iterrows(): |
|
try: |
|
|
|
script = row.get('script', '') |
|
print("原稿", script) |
|
if pd.isna(script): |
|
error_log.append(f"行 {idx+1}: スクリプトが空です") |
|
continue |
|
if not isinstance(script, str): |
|
script = str(script) |
|
|
|
|
|
parts = re.split(r'(A:|B:)', script) |
|
ssml_parts = [] |
|
|
|
|
|
for i in range(1, len(parts), 2): |
|
if parts[i] == "A:": |
|
voice_name = row["voiceA"] |
|
elif parts[i] == "B:": |
|
voice_name = row["voiceB"] |
|
else: |
|
continue |
|
|
|
text = parts[i + 1].strip() |
|
text = remove_quotes(text) |
|
|
|
|
|
text = text.replace("a.m.", 'AM') |
|
text = text.replace("p.m.", 'PM') |
|
text = text.replace("U.S.", 'US') |
|
text = text.replace("U.K.", 'UK') |
|
text = text.replace("Mr.", 'Mister') |
|
text = text.replace("Ms.", 'MIZ') |
|
text = text.replace("Mrs.", 'Misiz') |
|
text = text.replace("Dr.", 'Doctor') |
|
text = text.replace("Mt.", 'Mount') |
|
text = text.replace("Prof.", 'Professor') |
|
|
|
|
|
text = text.replace("\n", '<break time="1s"/>') |
|
text = text.replace(".", '.<break time="500ms"/>') |
|
|
|
if selected_option == "ブレイクタイム有": |
|
|
|
if row["eikenn"] in ["5級", "4級", "3級", "準2級", "2級", "準1級"]: |
|
text = text.replace(",", '<break time="50ms"/>') |
|
|
|
ssml_parts.append(f'<voice name="{voice_name}"><prosody rate="{row["speed"]}"><p>{text}</p></prosody></voice>') |
|
|
|
ssml = '<speak>' + ''.join(ssml_parts) |
|
|
|
if pd.notna(row.get('question')) and row['question'] != '': |
|
ssml += f'<break time="1s"/><voice name="{row["voiceQuestion"]}"><prosody rate="{row["speed"]}"><p>Question</p><break time="1s"/><p>{row["question"]}</p></prosody></voice>' |
|
if pd.notna(row.get('choices')) and row['choices'] != '': |
|
choices_list = row['choices'].split('/') |
|
choices_ssml = '<break time="1s"/>'.join(choices_list) |
|
ssml += f'<break time="1s"/><voice name="{row["voiceB"]}"><prosody rate="{row["speed"]}"><p>{choices_ssml}</p></prosody></voice>' |
|
ssml += '</speak>' |
|
|
|
|
|
body = { |
|
"input": {"ssml": ssml}, |
|
"voice": {"languageCode": "en-US"}, |
|
"audioConfig": {"audioEncoding": "MP3"} |
|
} |
|
headers = { |
|
"X-Goog-Api-Key": api_key, |
|
"Content-Type": "application/json" |
|
} |
|
url = "https://texttospeech.googleapis.com/v1/text:synthesize" |
|
|
|
|
|
try: |
|
response = requests.post(url, headers=headers, json=body) |
|
|
|
|
|
if response.status_code != 200: |
|
error_message = f"行 {idx+1}: APIエラー - HTTP {response.status_code}" |
|
try: |
|
error_data = response.json() |
|
if 'error' in error_data and 'message' in error_data['error']: |
|
error_message += f" - {error_data['error']['message']}" |
|
except: |
|
error_message += f" - レスポンス: {response.text}" |
|
|
|
print(error_message) |
|
error_log.append(error_message) |
|
continue |
|
|
|
response_data = response.json() |
|
|
|
|
|
if 'audioContent' not in response_data: |
|
error_message = f"行 {idx+1}: 音声データがレスポンスに含まれていません" |
|
print(error_message) |
|
error_log.append(error_message) |
|
continue |
|
|
|
|
|
audio_content = base64.b64decode(response_data['audioContent']) |
|
temp_file_name = f"{row['id']}_temp.mp3" |
|
final_file_name = f"{row['id']}.mp3" |
|
|
|
|
|
with open(temp_file_name, "wb") as out: |
|
out.write(audio_content) |
|
|
|
|
|
audio = AudioSegment.from_file(temp_file_name, format="mp3") |
|
audio.export(final_file_name, format="mp3", bitrate="128k") |
|
|
|
|
|
z.write(final_file_name) |
|
|
|
|
|
os.remove(temp_file_name) |
|
os.remove(final_file_name) |
|
|
|
print(f"行 {idx+1}: 処理成功 - ファイルID: {row['id']}") |
|
|
|
except requests.exceptions.RequestException as e: |
|
error_message = f"行 {idx+1}: API接続エラー - {str(e)}" |
|
print(error_message) |
|
error_log.append(error_message) |
|
|
|
except Exception as e: |
|
error_message = f"行 {idx+1}: 予期せぬエラー - {str(e)}" |
|
print(error_message) |
|
error_log.append(error_message) |
|
|
|
|
|
if error_log: |
|
print("\n===== エラーログ =====") |
|
for error in error_log: |
|
print(error) |
|
print(f"合計 {len(error_log)} 件のエラーが発生しました。") |
|
|
|
|
|
with open('tts_error_log.txt', 'w', encoding='utf-8') as log_file: |
|
log_file.write("===== Google Text-to-Speech API エラーログ =====\n") |
|
log_file.write(f"処理日時: {pd.Timestamp.now()}\n\n") |
|
for error in error_log: |
|
log_file.write(f"{error}\n") |
|
print("エラーログは 'tts_error_log.txt' に保存されました。") |
|
else: |
|
print("すべての行が正常に処理されました。") |
|
|
|
return zip_path |