|
|
|
import os |
|
import json |
|
import time |
|
import gradio as gr |
|
import speech_recognition as sr |
|
import pyttsx3 |
|
import threading |
|
|
|
from typing import Tuple |
|
|
|
|
|
USE_OPENAI = bool(os.getenv("OPENAI_API_KEY", "").strip()) |
|
if USE_OPENAI: |
|
import openai |
|
|
|
|
|
try: |
|
import torch |
|
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM |
|
from happytransformer import HappyTextToText, TTSettings |
|
LOCAL_MODEL_AVAILABLE = True |
|
except Exception: |
|
LOCAL_MODEL_AVAILABLE = False |
|
|
|
|
|
try: |
|
import Levenshtein |
|
_have_lev = True |
|
except Exception: |
|
_have_lev = False |
|
|
|
APP_TITLE = "ESPeak β AI Grammar & Speech Assistant" |
|
|
|
|
|
|
|
|
|
def levenshtein_distance(a: str, b: str) -> int: |
|
if _have_lev: |
|
return Levenshtein.distance(a, b) |
|
|
|
la, lb = len(a), len(b) |
|
if la == 0: return lb |
|
if lb == 0: return la |
|
dp = [[0]*(lb+1) for _ in range(la+1)] |
|
for i in range(la+1): |
|
dp[i][0] = i |
|
for j in range(lb+1): |
|
dp[0][j] = j |
|
for i in range(1, la+1): |
|
for j in range(1, lb+1): |
|
cost = 0 if a[i-1]==b[j-1] else 1 |
|
dp[i][j] = min(dp[i-1][j]+1, dp[i][j-1]+1, dp[i-1][j-1]+cost) |
|
return dp[la][lb] |
|
|
|
def score_from_edit(orig: str, corrected: str) -> int: |
|
|
|
if not orig.strip(): |
|
return 0 |
|
dist = levenshtein_distance(orig, corrected) |
|
|
|
norm = max(len(orig), 1) |
|
ratio = max(0.0, 1.0 - dist / norm) |
|
score = int(round(ratio * 100)) |
|
return score |
|
|
|
|
|
|
|
|
|
tokenizer = model = happy_tt = None |
|
if not USE_OPENAI and LOCAL_MODEL_AVAILABLE: |
|
def load_local_models(): |
|
global tokenizer, model, happy_tt |
|
model_name = "prithivida/grammar_error_correcter_v1" |
|
try: |
|
tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
model = AutoModelForSeq2SeqLM.from_pretrained(model_name) |
|
happy_tt = HappyTextToText("T5", model_name) |
|
except Exception as e: |
|
print("Local model load failed:", e) |
|
raise |
|
load_local_models() |
|
|
|
|
|
|
|
|
|
def transcribe_audio_file(audio_filepath: str) -> str: |
|
r = sr.Recognizer() |
|
try: |
|
with sr.AudioFile(audio_filepath) as source: |
|
audio_data = r.record(source) |
|
text = r.recognize_google(audio_data) |
|
return text |
|
except sr.UnknownValueError: |
|
return "" |
|
except Exception as e: |
|
return f"[transcription_error]: {str(e)}" |
|
|
|
|
|
|
|
|
|
OPENAI_PROMPT_SYSTEM = ( |
|
"You are ESPeak Assistant β expert grammar corrector. " |
|
"Return JSON only with keys: corrected_text (string), score (0-100 integer), explanation (short string)." |
|
) |
|
|
|
OPENAI_USER_TEMPLATE = ( |
|
"Correct this sentence for grammar, punctuation, and clarity while preserving tone:\n\n" |
|
"### INPUT\n{input_text}\n\n" |
|
"Return only JSON with corrected_text, score, and explanation." |
|
) |
|
|
|
def call_openai_correct(text: str) -> Tuple[str,int,str]: |
|
messages = [ |
|
{"role":"system", "content": OPENAI_PROMPT_SYSTEM}, |
|
{"role":"user", "content": OPENAI_USER_TEMPLATE.format(input_text=text)} |
|
] |
|
resp = openai.ChatCompletion.create( |
|
model="gpt-4o-mini" if "gpt-4o-mini" in openai.Model.list() else "gpt-4", |
|
messages=messages, |
|
temperature=0.0, |
|
max_tokens=300 |
|
) |
|
content = resp["choices"][0]["message"]["content"].strip() |
|
|
|
try: |
|
parsed = json.loads(content) |
|
corrected = parsed.get("corrected_text", "") |
|
score = int(parsed.get("score", score_from_edit(text, corrected))) |
|
explanation = parsed.get("explanation", "") |
|
return corrected, score, explanation |
|
except Exception: |
|
|
|
try: |
|
start = content.index("{") |
|
end = content.rindex("}")+1 |
|
data = json.loads(content[start:end]) |
|
corrected = data.get("corrected_text","") |
|
score = int(data.get("score", score_from_edit(text, corrected))) |
|
explanation = data.get("explanation","") |
|
return corrected, score, explanation |
|
except Exception: |
|
|
|
corrected = content |
|
score = score_from_edit(text, corrected) |
|
explanation = "Auto-correction from OpenAI; parsing fallback used." |
|
return corrected, score, explanation |
|
|
|
def call_local_correct(text: str) -> Tuple[str,int,str]: |
|
|
|
prefix = "gec: " + text |
|
|
|
try: |
|
inputs = tokenizer.encode(prefix, return_tensors="pt", max_length=256, truncation=True) |
|
with torch.no_grad(): |
|
outputs = model.generate(inputs, max_length=256, num_beams=4) |
|
corrected = tokenizer.decode(outputs[0], skip_special_tokens=True) |
|
except Exception: |
|
corrected = text |
|
|
|
|
|
try: |
|
args = TTSettings(num_beams=4, min_length=1) |
|
happy_out = happy_tt.generate_text(prefix, args=args).text |
|
|
|
alt_correction = happy_out or corrected |
|
except Exception: |
|
alt_correction = corrected |
|
|
|
score = score_from_edit(text, alt_correction) |
|
|
|
explanation = [] |
|
if text.strip() == alt_correction.strip(): |
|
explanation = ["No change needed."] |
|
else: |
|
explanation = ["Adjusted grammar/punctuation; minor wording edits to improve clarity."] |
|
return alt_correction, score, "; ".join(explanation) |
|
|
|
|
|
|
|
|
|
def process_input(audio, typed_text, use_tts=False, prefer_openai=False): |
|
""" |
|
audio: filepath from Gradio (or None) |
|
typed_text: str |
|
use_tts: bool -> read corrected text with local pyttsx3 |
|
prefer_openai: triage flag to prefer OpenAI (if key available) |
|
""" |
|
source_text = "" |
|
|
|
if audio: |
|
transcribed = transcribe_audio_file(audio) |
|
if transcribed.startswith("[transcription_error]"): |
|
source_text = typed_text or "" |
|
trans_msg = transcribed |
|
else: |
|
source_text = transcribed |
|
trans_msg = f"Transcribed: {transcribed}" |
|
else: |
|
source_text = typed_text or "" |
|
trans_msg = "Typed input" |
|
|
|
if not source_text.strip(): |
|
return "No input detected.", 0, "No correction (empty input).", trans_msg, json.dumps({}) |
|
|
|
|
|
use_openai_backend = False |
|
if USE_OPENAI and prefer_openai: |
|
use_openai_backend = True |
|
elif USE_OPENAI and not LOCAL_MODEL_AVAILABLE: |
|
use_openai_backend = True |
|
elif not USE_OPENAI and LOCAL_MODEL_AVAILABLE: |
|
use_openai_backend = False |
|
elif USE_OPENAI and LOCAL_MODEL_AVAILABLE: |
|
|
|
use_openai_backend = prefer_openai or True |
|
|
|
try: |
|
if use_openai_backend: |
|
corrected, score, explanation = call_openai_correct(source_text) |
|
else: |
|
corrected, score, explanation = call_local_correct(source_text) |
|
except Exception as e: |
|
|
|
corrected = source_text |
|
score = 0 |
|
explanation = f"Model error: {e}" |
|
|
|
|
|
tts_msg = "" |
|
if use_tts: |
|
try: |
|
def speak(text): |
|
engine = pyttsx3.init() |
|
engine.say(text) |
|
engine.runAndWait() |
|
threading.Thread(target=speak, args=(corrected,), daemon=True).start() |
|
tts_msg = "Speaking corrected text..." |
|
except Exception as e: |
|
tts_msg = f"TTS failed: {e}" |
|
|
|
|
|
meta = { |
|
"original": source_text, |
|
"corrected": corrected, |
|
"score": score, |
|
"explanation": explanation, |
|
"backend": "openai" if use_openai_backend else "local", |
|
"transcription_note": trans_msg, |
|
"timestamp": int(time.time()) |
|
} |
|
|
|
return corrected, score, explanation, trans_msg + (" β’ " + tts_msg if tts_msg else ""), json.dumps(meta, ensure_ascii=False, indent=2) |
|
|
|
|
|
|
|
|
|
def build_ui(): |
|
with gr.Blocks(title=APP_TITLE, css=""" |
|
.header {background: linear-gradient(90deg,#ff8fa3,#ff6aa3); padding: 18px; border-radius: 12px; color:white} |
|
.muted {color: #6b7280} |
|
""") as demo: |
|
|
|
with gr.Row(elem_id="top-row"): |
|
with gr.Column(scale=3): |
|
gr.Markdown(f"## <div class='header'>ESPeak β AI Grammar & Speech Assistant</div>") |
|
gr.Markdown("Speak or type a sentence β ESPeak will correct grammar, score it, and explain changes. Use OpenAI backend if you set `OPENAI_API_KEY` in environment.") |
|
with gr.Column(scale=1): |
|
gr.Markdown("**Quick tips**\n- Speak clearly (short sentences work best)\n- Toggle TTS to hear the corrected sentence\n- Use `Prefer OpenAI` to route to ChatGPT if available") |
|
gr.Markdown("---") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
audio = gr.Audio(sources="microphone", type="filepath", label="Record (microphone)") |
|
typed = gr.Textbox(lines=3, placeholder="Or type your sentence here...", label="Text input") |
|
with gr.Row(): |
|
tts_checkbox = gr.Checkbox(label="Play corrected (TTS)", value=False) |
|
prefer_openai = gr.Checkbox(label="Prefer OpenAI backend (if available)", value=True) |
|
run_btn = gr.Button("Check Grammar", variant="primary") |
|
with gr.Column(scale=2): |
|
corrected_out = gr.Textbox(label="Corrected Text", interactive=False) |
|
score_out = gr.Number(label="Grammar Score (0-100)", interactive=False) |
|
explanation_out = gr.Textbox(label="Explanation (what I changed)", interactive=False) |
|
trans_note = gr.Textbox(label="Transcription / Info", interactive=False) |
|
meta_out = gr.Code(label="JSON metadata (copyable)", language="json") |
|
|
|
def on_submit(audio_file, typed_text, use_tts, use_openai): |
|
return process_input(audio_file, typed_text, use_tts, use_openai) |
|
|
|
run_btn.click(on_submit, inputs=[audio, typed, tts_checkbox, prefer_openai], |
|
outputs=[corrected_out, score_out, explanation_out, trans_note, meta_out]) |
|
|
|
gr.Markdown("---") |
|
gr.Markdown("**ESPeak** Β· Built for quick grammar checking of spoken and typed English. Designed for demos and interview projects.") |
|
return demo |
|
|
|
if __name__ == "__main__": |
|
demo = build_ui() |
|
demo.launch(share=False, inbrowser=True) |