Spaces:
Sleeping
Sleeping
import os | |
from typing import Dict, Optional | |
from whisper import load_model # Import directly from whisper package | |
import librosa | |
import soundfile as sf | |
from datetime import datetime | |
from rich.console import Console | |
from rich.progress import Progress | |
from content_generator import ContentGenerator | |
import config | |
class AudioProcessor: | |
def __init__(self): | |
self.console = Console() | |
try: | |
# Use tiny model instead of base for faster processing | |
self.model = load_model("tiny") | |
self.console.print("[green]Successfully loaded Whisper model (tiny)[/green]") | |
except Exception as e: | |
self.console.print(f"[red]Error loading Whisper model:[/red] {str(e)}") | |
raise | |
self.content_generator = ContentGenerator() | |
def process_audio( | |
self, | |
audio_path: str, | |
language: str = config.DEFAULT_LANGUAGE, | |
content_type: str = "news", # "news" or "blog" | |
generate_content: bool = True | |
) -> Dict: | |
""" | |
Process audio file: transcribe and optionally generate content. | |
Args: | |
audio_path (str): Path to the audio file | |
language (str): Language code for transcription and content generation | |
content_type (str): Type of content to generate ("news" or "blog") | |
generate_content (bool): Whether to generate content from transcript | |
Returns: | |
Dict: Contains transcript and optionally generated content | |
""" | |
self.console.print(f"[yellow]Processing audio file:[/yellow] {audio_path}") | |
try: | |
# Transcribe audio with highly optimized settings | |
self.console.print("[yellow]Transcribing audio...[/yellow]") | |
result = self.model.transcribe( | |
audio_path, | |
language=language if language != "tr" else "turkish", | |
fp16=False, | |
beam_size=1, # Minimum beam size for fastest processing | |
best_of=1, # Single candidate for fastest processing | |
condition_on_previous_text=False, | |
compression_ratio_threshold=2.4, | |
logprob_threshold=-1.0, | |
no_speech_threshold=0.6, | |
initial_prompt="Bu bir haber metnidir." # Add context for better transcription | |
) | |
transcript = result["text"] | |
# Generate content if requested | |
generated_content = None | |
if generate_content: | |
self.console.print(f"[yellow]Generating {content_type} content from transcript...[/yellow]") | |
if content_type == "news": | |
generated_content = self._generate_news_from_transcript(transcript, language) | |
else: | |
generated_content = self._generate_blog_from_transcript(transcript, language) | |
output = { | |
"transcript": transcript, | |
"language": language, | |
"date": datetime.now().strftime("%Y-%m-%d"), | |
"audio_file": os.path.basename(audio_path), | |
"content_type": content_type | |
} | |
if generated_content: | |
output["generated_content"] = generated_content | |
return output | |
except Exception as e: | |
self.console.print(f"[red]Error processing audio:[/red] {str(e)}") | |
raise | |
def _generate_news_from_transcript( | |
self, | |
transcript: str, | |
language: str | |
) -> Optional[Dict]: | |
"""Generate a news article from the transcript.""" | |
try: | |
news_content = self.content_generator.generate_content( | |
topic=transcript, | |
keywords=["news", "professional", "factual"], | |
language=language | |
) | |
# Validate the generated content | |
if news_content and "title" in news_content: | |
if len(news_content["content"].split('\n')) < 3: # Minimum 3 paragraphs | |
return None | |
return news_content | |
except Exception as e: | |
self.console.print(f"[red]Error generating news article:[/red] {str(e)}") | |
return None | |
def _generate_blog_from_transcript( | |
self, | |
transcript: str, | |
language: str | |
) -> Optional[Dict]: | |
"""Generate a blog post from the transcript.""" | |
try: | |
blog_content = self.content_generator.generate_content( | |
topic=transcript, | |
keywords=["blog", "engaging", "informative"], | |
language=language | |
) | |
return blog_content | |
except Exception as e: | |
self.console.print(f"[red]Error generating blog post:[/red] {str(e)}") | |
return None | |
def save_results( | |
self, | |
results: Dict, | |
output_dir: str = "data/transcripts" | |
) -> None: | |
""" | |
Save transcription and generated content results. | |
Args: | |
results (Dict): Processing results including transcript and content | |
output_dir (str): Directory to save the output files | |
""" | |
os.makedirs(output_dir, exist_ok=True) | |
# Create base filename from audio file | |
base_name = os.path.splitext(results["audio_file"])[0] | |
date_prefix = results["date"] | |
# Save transcript | |
transcript_file = os.path.join( | |
output_dir, | |
f"{date_prefix}-{base_name}-transcript.txt" | |
) | |
with open(transcript_file, "w", encoding="utf-8") as f: | |
f.write(results["transcript"]) | |
# Save generated content if available and valid | |
if "generated_content" in results and results["generated_content"]: | |
content_type = results["content_type"] | |
content_file = os.path.join( | |
output_dir, | |
f"{date_prefix}-{base_name}-{content_type}.md" | |
) | |
try: | |
with open(content_file, "w", encoding="utf-8") as f: | |
if content_type == "news": | |
# Add metadata and format for news articles | |
f.write(f"# {results['generated_content']['title']}\n\n") | |
# Extract subtitle if it exists (first non-empty line after title) | |
content_lines = results['generated_content']['content'].split('\n') | |
first_line = next((line for line in content_lines if line.strip()), '') | |
if first_line and not first_line.startswith('*') and not first_line.startswith('#'): | |
f.write(f"*{first_line}*\n\n") | |
content = '\n'.join(content_lines[content_lines.index(first_line) + 1:]) | |
else: | |
content = results['generated_content']['content'] | |
# Add metadata | |
f.write(f"**Tarih:** {date_prefix}\n\n") | |
f.write("---\n\n") # Separator line | |
# Write main content with proper formatting | |
f.write(content) | |
else: | |
# Blog format | |
f.write(f"# {results['generated_content']['title']}\n\n") | |
f.write(f"*Yazar: Mete*\n") | |
f.write(f"*Tarih: {date_prefix}*\n\n") | |
f.write(results['generated_content']['content']) | |
self.console.print(f"[green]{results['content_type'].title()} content saved to:[/green] {content_file}") | |
except Exception as e: | |
self.console.print(f"[red]Error saving content:[/red] {str(e)}") | |
else: | |
if results.get("content_type") == "news": | |
self.console.print("[yellow]Warning:[/yellow] Could not generate news article from this audio content.") | |
else: | |
self.console.print("[yellow]Warning:[/yellow] Could not generate blog post from this audio content.") | |
self.console.print(f"[green]Transcript saved to:[/green] {transcript_file}") |