|
import os |
|
import json |
|
import numpy as np |
|
import subprocess |
|
import faiss |
|
import cv2 |
|
import re |
|
import gradio as gr |
|
from sentence_transformers import SentenceTransformer |
|
from openai import OpenAI |
|
import logging |
|
from PIL import Image |
|
import base64 |
|
import io |
|
|
|
|
|
deepseek_api_key = os.environ.get("DEEPSEEK_API_KEY", "YOUR_API_KEY") |
|
client = OpenAI( |
|
base_url="https://openrouter.ai/api/v1", |
|
api_key=deepseek_api_key, |
|
) |
|
|
|
|
|
DATASET_PATH = "data" |
|
JSON_PATH = f"{DATASET_PATH}/sign_language_data.json" |
|
|
|
if os.path.exists(JSON_PATH): |
|
with open(JSON_PATH, "r") as f: |
|
dataset = json.load(f) |
|
|
|
for item in dataset: |
|
|
|
category = item["category"].lower().replace(" ", "_") |
|
|
|
|
|
video_filename = os.path.basename(item["video_clip_path"]) |
|
item["video_clip_path"] = f"{DATASET_PATH}/clips/{category}/{video_filename}" |
|
|
|
|
|
frame_filename = os.path.basename(item["frame_path"]) |
|
item["frame_path"] = f"{DATASET_PATH}/all_signs/{frame_filename}" |
|
|
|
|
|
else: |
|
|
|
dataset = [] |
|
print(f"Warning: {JSON_PATH} does not exist. Using empty dataset.") |
|
|
|
|
|
logging.getLogger("sentence_transformers").setLevel(logging.ERROR) |
|
|
|
print("Loading sentence transformer model...") |
|
embed_model = SentenceTransformer("all-MiniLM-L6-v2") |
|
|
|
|
|
dimension = 384 |
|
index = faiss.IndexFlatL2(dimension) |
|
text_to_video = {} |
|
idx_to_text = [] |
|
|
|
|
|
for item in dataset: |
|
phrases = [item["text"]] + item.get("semantic_meaning", []) |
|
|
|
for phrase in phrases: |
|
embedding = embed_model.encode(phrase).astype(np.float32) |
|
index.add(np.array([embedding])) |
|
text_to_video[phrase] = item["video_clip_path"] |
|
idx_to_text.append(phrase) |
|
|
|
print(f"Indexed {len(idx_to_text)} phrases") |
|
|
|
def list_available_phrases(): |
|
print("Available phrases in dataset:") |
|
for idx, phrase in enumerate(text_to_video.keys()): |
|
print(f"{idx+1}. '{phrase}'") |
|
print(f"Total: {len(text_to_video)} phrases") |
|
|
|
|
|
def preprocess_text(text): |
|
|
|
emoji_pattern = re.compile("[" |
|
u"\U0001F600-\U0001F64F" |
|
u"\U0001F300-\U0001F5FF" |
|
u"\U0001F680-\U0001F6FF" |
|
u"\U0001F700-\U0001F77F" |
|
u"\U0001F780-\U0001F7FF" |
|
u"\U0001F800-\U0001F8FF" |
|
u"\U0001F900-\U0001F9FF" |
|
u"\U0001FA00-\U0001FA6F" |
|
u"\U0001FA70-\U0001FAFF" |
|
u"\U00002702-\U000027B0" |
|
u"\U000024C2-\U0001F251" |
|
"]+", flags=re.UNICODE) |
|
|
|
text = emoji_pattern.sub(r'', text) |
|
text = re.sub(r'[^\w\s\?\/]', '', text) |
|
text = re.sub(r'\s+', ' ', text).strip() |
|
|
|
return text |
|
|
|
|
|
def refine_sentence_with_deepseek(text): |
|
|
|
text = preprocess_text(text) |
|
|
|
prompt = f""" |
|
Convert the following sentence into a sign-language-friendly version: |
|
- Remove unnecessary words like articles (a, an, the). |
|
- Keep essential words like pronouns (I, you, we, they). |
|
- Maintain question words (what, where, when, why, how). |
|
- Ensure verbs and key actions are included. |
|
- Reorder words to match sign language grammar. |
|
- IMPORTANT: Format your response with "SIGN_LANGUAGE_VERSION: [your simplified phrase]" at the beginning. |
|
- Sign language often places topic first, then comment (e.g., "READY YOU?" instead of "YOU READY?"). |
|
|
|
Sentence: "{text}" |
|
""" |
|
|
|
try: |
|
completion = client.chat.completions.create( |
|
model="deepseek/deepseek-r1:free", |
|
messages=[{"role": "user", "content": prompt}], |
|
temperature=0.3 |
|
) |
|
|
|
full_response = completion.choices[0].message.content.strip() |
|
|
|
patterns = [ |
|
r"SIGN_LANGUAGE_VERSION:\s*(.+?)(?:\n|$)", |
|
r"\*\*Signs?\*\*:?\s*(.+?)(?:\n|$)", |
|
r"\*\*Sign-language-friendly version:\*\*\s*(.+?)(?:\n|$)", |
|
r"(?:^|\n)([A-Z\s\?\!]+)(?:\n|$)" |
|
] |
|
|
|
for pattern in patterns: |
|
match = re.search(pattern, full_response, re.MULTILINE) |
|
if match: |
|
refined_text = match.group(1).strip() |
|
return refined_text |
|
|
|
first_line = full_response.split('\n')[0].strip() |
|
return first_line |
|
|
|
except Exception as e: |
|
print(f"Error with DeepSeek API: {str(e)}") |
|
|
|
words = text.split() |
|
filtered_words = [w for w in words if w.lower() not in ['a', 'an', 'the', 'is', 'are', 'am']] |
|
return ' '.join(filtered_words) |
|
|
|
|
|
def retrieve_video(text, debug=False, similarity_threshold=0.9): |
|
|
|
if not text or text.isspace(): |
|
return None |
|
|
|
text = preprocess_text(text) |
|
|
|
if debug: |
|
print(f"Creating embedding for '{text}'") |
|
|
|
|
|
if text.lower() == "i": |
|
if "I/me" in text_to_video: |
|
if debug: |
|
print(f" Direct mapping found: '{text}' → 'I/me'") |
|
return text_to_video["I/me"] |
|
|
|
if index.ntotal == 0: |
|
if debug: |
|
print("No items in the index") |
|
return None |
|
|
|
query_embedding = embed_model.encode(text).astype(np.float32) |
|
distances, closest_idx = index.search(np.array([query_embedding]), min(3, index.ntotal)) |
|
|
|
closest_texts = [idx_to_text[idx] for idx in closest_idx[0]] |
|
similarity_scores = distances[0] |
|
|
|
if debug: |
|
print(f"Top matches for '{text}':") |
|
for i, (phrase, score) in enumerate(zip(closest_texts, similarity_scores)): |
|
print(f" {i+1}. '{phrase}' (score: {score:.4f})") |
|
|
|
if len(similarity_scores) > 0 and similarity_scores[0] < similarity_threshold: |
|
closest_text = closest_texts[0] |
|
query_word_count = len(text.split()) |
|
match_word_count = len(closest_text.split()) |
|
|
|
if query_word_count > 1 and match_word_count == 1: |
|
if debug: |
|
print(f"Rejecting single-word match '{closest_text}' for multi-word query '{text}'") |
|
return None |
|
|
|
if debug: |
|
print(f" Found match: '{closest_text}' with score {similarity_scores[0]:.4f}") |
|
return text_to_video.get(closest_text, None) |
|
else: |
|
if debug: |
|
print(f"No match found with similarity below threshold {similarity_threshold}") |
|
return None |
|
|
|
def merge_videos(video_list, output_path="temp/output.mp4"): |
|
|
|
os.makedirs("temp", exist_ok=True) |
|
|
|
if not video_list: |
|
return None |
|
|
|
if len(video_list) == 1: |
|
|
|
try: |
|
import shutil |
|
shutil.copy(video_list[0], output_path) |
|
return output_path |
|
except Exception as e: |
|
print(f"Error copying single video: {e}") |
|
return None |
|
|
|
|
|
verified_paths = [] |
|
for path in video_list: |
|
if os.path.exists(path): |
|
verified_paths.append(path) |
|
else: |
|
print(f"Warning: Video path does not exist: {path}") |
|
|
|
if not verified_paths: |
|
print("No valid video paths found") |
|
return None |
|
|
|
|
|
list_path = "temp/video_list.txt" |
|
with open(list_path, "w") as f: |
|
for path in verified_paths: |
|
|
|
abs_path = os.path.abspath(path) |
|
f.write(f"file '{abs_path}'\n") |
|
|
|
|
|
abs_output = os.path.abspath(output_path) |
|
abs_list = os.path.abspath(list_path) |
|
|
|
command = f"ffmpeg -f concat -safe 0 -i '{abs_list}' -c copy '{abs_output}' -y" |
|
|
|
|
|
print(f"Running command: {command}") |
|
|
|
process = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|
|
|
if process.returncode != 0: |
|
print(f"FFmpeg error: {process.stderr.decode()}") |
|
return None |
|
|
|
return output_path |
|
|
|
|
|
def save_video(video_path, output_path="temp/display_output.mp4"): |
|
|
|
os.makedirs("temp", exist_ok=True) |
|
|
|
if not video_path or not os.path.exists(video_path): |
|
return None |
|
|
|
if video_path != output_path: |
|
os.system(f"cp '{video_path}' '{output_path}'") |
|
return output_path |
|
|
|
|
|
def text_to_sign_pipeline(user_input, debug=False): |
|
|
|
user_input = preprocess_text(user_input) |
|
|
|
if debug: |
|
print(f"Processing input: '{user_input}'") |
|
|
|
has_multiple_words = len(user_input.split()) > 1 |
|
|
|
if not has_multiple_words: |
|
direct_video = retrieve_video(user_input, debug=debug) |
|
if direct_video: |
|
if debug: |
|
print(f"Single word match found for '{user_input}'") |
|
return save_video(direct_video) |
|
|
|
sign_friendly_sentence = refine_sentence_with_deepseek(user_input) |
|
if debug: |
|
print(f"DeepSeek refined input to: '{sign_friendly_sentence}'") |
|
|
|
full_sentence_video = retrieve_video(sign_friendly_sentence, debug=debug) |
|
if full_sentence_video: |
|
if debug: |
|
print(f"Found full sentence match for '{sign_friendly_sentence}'") |
|
return save_video(full_sentence_video) |
|
|
|
words = sign_friendly_sentence.split() |
|
video_paths = [] |
|
|
|
if debug: |
|
print(f"No full sentence match. Trying word-by-word approach for: {words}") |
|
|
|
for word in words: |
|
clean_word = preprocess_text(word).replace('?', '') |
|
if not clean_word or clean_word.isspace(): |
|
continue |
|
|
|
word_video = retrieve_video(clean_word, debug=debug) |
|
if word_video: |
|
print(f" Found video for word: '{clean_word}'") |
|
video_paths.append(word_video) |
|
else: |
|
print(f" No video found for word: '{clean_word}'") |
|
|
|
if not video_paths: |
|
print(" No videos found for any words in the sentence") |
|
return None |
|
|
|
if debug: |
|
print(f"Found videos for {len(video_paths)} words, merging...") |
|
|
|
merged_video = merge_videos(video_paths) |
|
return save_video(merged_video) |
|
|
|
|
|
def encode_image_to_base64(image_path): |
|
with open(image_path, "rb") as image_file: |
|
return base64.b64encode(image_file.read()).decode('utf-8') |
|
|
|
|
|
def preprocess_image(image_path): |
|
img = cv2.imread(image_path) |
|
if img is None: |
|
return None |
|
|
|
height, width = img.shape[:2] |
|
|
|
|
|
right_side = img[:, width//2:width] |
|
|
|
|
|
os.makedirs("temp", exist_ok=True) |
|
cropped_path = "temp/cropped_image.jpg" |
|
cv2.imwrite(cropped_path, right_side) |
|
|
|
return cropped_path |
|
|
|
|
|
def detect_text_in_image(image_path, debug=False): |
|
base64_image = encode_image_to_base64(image_path) |
|
|
|
prompt = """ |
|
Is there any prominent text label or sign language text in this image? |
|
Answer with ONLY "YES" or "NO". |
|
""" |
|
|
|
try: |
|
completion = client.chat.completions.create( |
|
model="qwen/qwen2.5-vl-3b-instruct:free", |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "text", "text": prompt}, |
|
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}} |
|
] |
|
} |
|
], |
|
temperature=0.3 |
|
) |
|
|
|
response = completion.choices[0].message.content.strip().upper() |
|
|
|
if debug: |
|
print(f"Text detection response: {response}") |
|
|
|
return "YES" in response |
|
|
|
except Exception as e: |
|
if debug: |
|
print(f"Error in text detection: {str(e)}") |
|
return False |
|
|
|
|
|
def image_to_text_with_qwen(image_path, debug=False): |
|
base64_image = encode_image_to_base64(image_path) |
|
|
|
|
|
has_text = detect_text_in_image(image_path, debug) |
|
|
|
if has_text: |
|
|
|
cropped_image_path = preprocess_image(image_path) |
|
if cropped_image_path: |
|
cropped_base64 = encode_image_to_base64(cropped_image_path) |
|
|
|
prompt = """ |
|
Extract ONLY the main text label from this image. I'm looking for a single word or short phrase |
|
that appears as the main text (like "AFTERNOON"). Ignore any numbers, categories, or other text. |
|
|
|
Provide ONLY the extracted text without any other explanation or context. |
|
""" |
|
|
|
try: |
|
completion = client.chat.completions.create( |
|
model="qwen/qwen2.5-vl-3b-instruct:free", |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "text", "text": prompt}, |
|
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{cropped_base64}"}} |
|
] |
|
} |
|
], |
|
temperature=0.3 |
|
) |
|
|
|
response = completion.choices[0].message.content.strip() |
|
|
|
if debug: |
|
print(f"Qwen VL text extraction response: {response}") |
|
|
|
|
|
cleaned_text = re.sub(r"^(the|main|text|label|is|:|\.|\s)+", "", response, flags=re.IGNORECASE) |
|
cleaned_text = re.sub(r'["\'\(\)]', '', cleaned_text) |
|
cleaned_text = cleaned_text.strip().upper() |
|
|
|
if cleaned_text: |
|
return cleaned_text, "text" |
|
|
|
except Exception as e: |
|
if debug: |
|
print(f"Error using Qwen VL for text extraction: {str(e)}") |
|
|
|
|
|
prompt = """ |
|
Describe this image in a SINGLE WORD only. |
|
Focus on the main subject (like "MAN", "WOMAN", "HOUSE", "HAPPY", "SAD", etc.). |
|
Provide ONLY this single word without any punctuation or explanation. |
|
""" |
|
|
|
try: |
|
completion = client.chat.completions.create( |
|
model="qwen/qwen2.5-vl-3b-instruct:free", |
|
messages=[ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "text", "text": prompt}, |
|
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}} |
|
] |
|
} |
|
], |
|
temperature=0.3 |
|
) |
|
|
|
response = completion.choices[0].message.content.strip() |
|
|
|
if debug: |
|
print(f"Qwen VL caption response: {response}") |
|
|
|
|
|
cleaned_caption = re.sub(r'[^\w\s]', '', response) |
|
cleaned_caption = cleaned_caption.strip().split()[0] |
|
cleaned_caption = cleaned_caption.upper() |
|
|
|
return cleaned_caption, "caption" |
|
|
|
except Exception as e: |
|
if debug: |
|
print(f"Error using Qwen VL for captioning: {str(e)}") |
|
return "ERROR", "error" |
|
|
|
|
|
def process_text(input_text): |
|
if not input_text or input_text.isspace(): |
|
return "Please enter some text to convert." |
|
|
|
final_video = text_to_sign_pipeline(input_text, debug=True) |
|
if final_video: |
|
return final_video |
|
else: |
|
return "Sorry, no matching sign language video found." |
|
|
|
|
|
def process_image(input_image): |
|
|
|
os.makedirs("temp", exist_ok=True) |
|
|
|
|
|
image_path = "temp/uploaded_image.jpg" |
|
input_image.save(image_path) |
|
|
|
|
|
extracted_text, source_type = image_to_text_with_qwen(image_path, debug=True) |
|
|
|
if extracted_text == "ERROR": |
|
return "Error processing image", None |
|
|
|
|
|
sign_video = text_to_sign_pipeline(extracted_text, debug=True) |
|
|
|
|
|
if source_type == "text": |
|
result_text = f"Extracted text: {extracted_text}" |
|
else: |
|
result_text = f"Generated caption: {extracted_text}" |
|
|
|
return result_text, sign_video if sign_video else "No matching sign language video found" |
|
|
|
|
|
|
|
with gr.Blocks() as app: |
|
gr.Markdown("# Sign Language Conversion") |
|
|
|
with gr.Tabs(): |
|
with gr.Tab("Text to Sign"): |
|
text_input = gr.Textbox(label="Enter text to convert to sign language") |
|
text_button = gr.Button("Convert Text to Sign") |
|
text_output = gr.Video(label="Sign Language Output") |
|
text_button.click(process_text, inputs=text_input, outputs=text_output) |
|
|
|
with gr.Tab("Image to Text/Caption and Sign"): |
|
image_input = gr.Image(type="pil", label="Upload image") |
|
image_button = gr.Button("Process Image and Convert to Sign") |
|
extracted_text_output = gr.Textbox(label="Extracted Text/Caption") |
|
image_output = gr.Video(label="Sign Language Output") |
|
|
|
image_button.click( |
|
process_image, |
|
inputs=image_input, |
|
outputs=[extracted_text_output, image_output] |
|
) |
|
|
|
|
|
app.launch() |