Spaces:
Sleeping
Sleeping
import argparse | |
import uvicorn | |
import sys | |
import json | |
from fastapi import FastAPI | |
from fastapi import Request | |
from fastapi import Body | |
from fastapi.encoders import jsonable_encoder | |
from fastapi.responses import JSONResponse, StreamingResponse | |
import uuid | |
import time | |
import re | |
import requests | |
try: | |
import langid | |
except Exception: | |
langid = None | |
from pydantic import BaseModel, Field | |
from deep_translator import GoogleTranslator | |
from deep_translator import single_detection | |
from fastapi.middleware.cors import CORSMiddleware | |
class ChatAPIApp: | |
def __init__(self): | |
self.app = FastAPI( | |
docs_url="/", | |
title="Selam Translate API", | |
swagger_ui_parameters={"defaultModelsExpandDepth": -1}, | |
version="1.0", | |
) | |
self.setup_routes() | |
def get_available_langs(self): | |
f = open('apis/lang_name.json', "r") | |
self.available_models = json.loads(f.read()) | |
return self.available_models | |
class TranslateCompletionsPostItem(BaseModel): | |
from_language: str = Field( | |
default="en", | |
description="(str) `Detect`", | |
) | |
to_language: str = Field( | |
default="am", | |
description="(str) `en`", | |
) | |
input_text: str = Field( | |
default="Hello", | |
description="(str) `Text for translate`", | |
) | |
def translate_completions(self, item: TranslateCompletionsPostItem): | |
f = open('apis/lang_name.json', "r") | |
available_langs = json.loads(f.read()) | |
to_lang = 'en' | |
for lang_item in available_langs: | |
if item.to_language == lang_item['code']: | |
to_lang = item.to_language | |
break | |
if to_lang == 'auto': | |
to_lang = 'en' | |
# Advanced source detection and romanized Ethiopic handling | |
detected_src, _conf = self._detect_language_advanced(item.input_text) | |
processed_input = self._preprocess_text_for_translation(item.input_text, detected_src) | |
translated_text = GoogleTranslator(source='auto', target=to_lang).translate(processed_input) | |
item_response = { | |
"from_language": detected_src, | |
"to_language": to_lang, | |
"text": item.input_text, | |
"translate": translated_text, | |
} | |
json_compatible_item_data = jsonable_encoder(item_response) | |
return JSONResponse(content=json_compatible_item_data) | |
class DetectLanguagePostItem(BaseModel): | |
input_text: str = Field( | |
default="Hello, how are you?", | |
description="(str) `Text for detection`", | |
) | |
class ChatTranslateStreamItem(BaseModel): | |
# OpenAI-style payload compatibility | |
model: str | None = Field(default=None, description="(optional) ignored") | |
stream: bool | None = Field(default=True, description="(optional) ignored") | |
to_language: str = Field(default="am", description="Target language code") | |
messages: list[dict] = Field( | |
default_factory=list, | |
description="OpenAI-style messages; the last user message's content is translated", | |
) | |
def detect_language(self, item: DetectLanguagePostItem): | |
lang_code, confidence = self._detect_language_advanced(item.input_text) | |
item_response = { | |
"lang": lang_code, | |
"confidence": confidence, | |
} | |
json_compatible_item_data = jsonable_encoder(item_response) | |
return JSONResponse(content=json_compatible_item_data) | |
# Advanced language detection tailored for Ethiopic scripts (Amharic, Tigrinya) | |
def _detect_language_advanced(self, text: str) -> tuple[str | None, float | None]: | |
if not text or not text.strip(): | |
return None, None | |
# 1) Script detection: Ethiopic blocks | |
ethiopic_pattern = re.compile(r"[\u1200-\u137F\u1380-\u1399\u2D80-\u2DDF\uAB00-\uAB2F]") | |
contains_ethiopic = bool(ethiopic_pattern.search(text)) | |
if contains_ethiopic: | |
# 2) Heuristic keywords for Amharic vs Tigrinya | |
amharic_keywords = { | |
"እንዴት", "ነህ", "ነሽ", "ነው", "ሰላም", "አመሰግናለሁ", "አሁን", "ለመንገድ", "ይሄ", "ትክክል", | |
} | |
tigrinya_keywords = { | |
"ከመይ", "እየ", "እዩ", "ይኹን", "ኣብ", "ኣሎ", "ሰላም", "እቲ", "እዚ", "ኣይ", | |
} | |
# Normalize | |
text_norm = text.strip() | |
# Tokenize on whitespace; Ethiopic has no case, so case-folding is unnecessary | |
tokens = re.findall(r"\w+", text_norm) | |
am_score = sum(1 for tok in tokens if tok in amharic_keywords) | |
ti_score = sum(1 for tok in tokens if tok in tigrinya_keywords) | |
if am_score > ti_score and am_score > 0: | |
# Strong heuristic win | |
return "am", 0.9 if (am_score - ti_score) >= 1 else 0.7 | |
if ti_score > am_score and ti_score > 0: | |
return "ti", 0.9 if (ti_score - am_score) >= 1 else 0.7 | |
# 3) Fallback to statistical detector if available | |
if langid is not None: | |
try: | |
code, score = langid.classify(text) | |
# Adjust confidence for Ethiopic hits | |
if code in ("am", "ti"): | |
return code, float(score) | |
except Exception: | |
pass | |
# 4) Fallback to Google detection via deep_translator | |
try: | |
code = single_detection(text) | |
# If Google says Ethiopic langs, accept; else assume Amharic by default | |
if code in ("am", "ti"): | |
return code, None | |
return "am", None | |
except Exception: | |
return "am", None | |
# Not Ethiopic: use langid first, then Google fallback | |
if langid is not None: | |
try: | |
code, score = langid.classify(text) | |
return code, float(score) | |
except Exception: | |
pass | |
try: | |
code = single_detection(text) | |
return code, None | |
except Exception: | |
return None, None | |
def setup_routes(self): | |
for prefix in ["", "/v1"]: | |
self.app.get( | |
prefix + "/langs", | |
summary="Get available languages", | |
)(self.get_available_langs) | |
self.app.post( | |
prefix + "/translate", | |
summary="translate text", | |
)(self.translate_completions) | |
# Removed AI translation endpoint | |
self.app.post( | |
prefix + "/detect", | |
summary="detect language", | |
)(self.detect_language) | |
self.app.post( | |
prefix + "/translate/stream", | |
summary="stream translated text (OpenAI-compatible SSE)", | |
)(self.translate_stream) | |
# Raw-text friendly streaming endpoint to avoid JSON escaping issues | |
self.app.post( | |
prefix + "/translate/stream/raw", | |
summary="stream translated text (plain text body; set ?to_language=am)", | |
openapi_extra={ | |
"requestBody": { | |
"required": True, | |
"content": { | |
"text/plain": { | |
"schema": {"type": "string", "example": "selam, endet neh?"}, | |
"examples": { | |
"AmharicRomanized": { | |
"summary": "Romanized Amharic", | |
"value": "selam, endet neh?" | |
}, | |
"Paragraph": { | |
"summary": "Multiline plain text", | |
"value": "The Ethiopian Alphasyllabary: A Look at Amharic and Tigrinya\nThe writing systems for Amharic and Tigrinya are beautiful and complex examples of an alphasyllabary." | |
} | |
} | |
} | |
} | |
} | |
}, | |
responses={ | |
200: { | |
"description": "SSE stream", | |
"content": { | |
"text/event-stream": { | |
"schema": {"type": "string", "example": "data: {\\\"choices\\\":[{\\\"delta\\\":{\\\"content\\\":\\\"...\\\"}}]}\\n\\n"} | |
} | |
} | |
} | |
}, | |
)(self.translate_stream_raw) | |
self.app.post( | |
prefix + "/translate/chat/stream", | |
summary="stream translated text from OpenAI-style chat payload", | |
)(self.translate_chat_stream) | |
# Proxy an OpenAI-style SSE stream via Pollinations, pre/post translating | |
self.app.post( | |
prefix + "/translate/chat/proxy/stream", | |
summary="proxy OpenAI-style chat stream via Pollinations with translation", | |
)(self.translate_chat_proxy_stream) | |
def translate_stream(self, item: TranslateCompletionsPostItem): | |
f = open('apis/lang_name.json', "r") | |
available_langs = json.loads(f.read()) | |
to_lang = 'en' | |
for lang_item in available_langs: | |
if item.to_language == lang_item['code']: | |
to_lang = item.to_language | |
break | |
if to_lang == 'auto': | |
to_lang = 'en' | |
# Detect/prepare input (romanized Ethiopic -> Ethiopic script) | |
detected_src, _conf = self._detect_language_advanced(item.input_text) | |
processed_input = self._preprocess_text_for_translation(item.input_text, detected_src) | |
try: | |
translated_full = GoogleTranslator(source='auto', target=to_lang).translate(processed_input) | |
except Exception as e: | |
error_event = { | |
"id": f"trans-{uuid.uuid4()}", | |
"object": "chat.completion.chunk", | |
"choices": [ | |
{ | |
"index": 0, | |
"delta": {"content": ""}, | |
"finish_reason": "error", | |
} | |
], | |
"error": str(e), | |
} | |
def error_gen(): | |
yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n" | |
yield "data: [DONE]\n\n" | |
return StreamingResponse(error_gen(), media_type="text/event-stream") | |
# Character-based streaming for natural flow in languages without spaces | |
chars = list(translated_full) if translated_full else [] | |
stream_id = f"trans-{uuid.uuid4()}" | |
def event_generator(): | |
for ch in chars: | |
chunk = { | |
"id": stream_id, | |
"object": "chat.completion.chunk", | |
"choices": [ | |
{ | |
"index": 0, | |
"delta": {"content": ch}, | |
"finish_reason": None, | |
} | |
], | |
} | |
yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n" | |
time.sleep(0.005) | |
# Stream end | |
yield "data: [DONE]\n\n" | |
return StreamingResponse(event_generator(), media_type="text/event-stream") | |
async def translate_stream_raw(self, request: Request, to_language: str = "am", text: str = Body(default=None, media_type="text/plain")): | |
# Prefer explicit text/plain body if provided, else use raw bytes | |
if text is not None: | |
input_text = text | |
else: | |
body_bytes = await request.body() | |
input_text = body_bytes.decode("utf-8", errors="ignore") | |
payload = self.TranslateCompletionsPostItem( | |
to_language=to_language, | |
input_text=input_text, | |
) | |
return self.translate_stream(payload) | |
class ChatProxyStreamItem(BaseModel): | |
model: str = Field(default="gpt-4.1", description="Pollinations model name") | |
stream: bool = Field(default=True) | |
to_language: str = Field(default="am") | |
from_language: str | None = Field(default=None) | |
messages: list[dict] = Field(default_factory=list) | |
api_url: str | None = Field(default=None, description="Override Pollinations API URL") | |
def translate_chat_proxy_stream(self, item: ChatProxyStreamItem): | |
api_url = item.api_url or "https://text.pollinations.ai/openai" | |
# Find last user message | |
user_text = "" | |
for msg in reversed(item.messages or []): | |
if msg.get("role") == "user": | |
user_text = msg.get("content", "") | |
break | |
# Pre-translate user input to English for LLM | |
detected_src, _ = self._detect_language_advanced(user_text) | |
pre_text = self._preprocess_text_for_translation(user_text, detected_src) | |
try: | |
llm_input_en = GoogleTranslator(source='auto', target='en').translate(pre_text) | |
except Exception: | |
llm_input_en = user_text | |
# Build messages with replaced last user message | |
proxied_messages = list(item.messages or []) | |
for i in range(len(proxied_messages) - 1, -1, -1): | |
if proxied_messages[i].get("role") == "user": | |
proxied_messages[i] = {**proxied_messages[i], "content": llm_input_en} | |
break | |
req_headers = { | |
"Content-Type": "application/json", | |
"Accept": "text/event-stream", | |
} | |
req_body = { | |
"model": item.model, | |
"messages": proxied_messages, | |
"stream": True, | |
} | |
# Make streaming request to Pollinations | |
try: | |
resp = requests.post(api_url, headers=req_headers, json=req_body, stream=True, timeout=60) | |
resp.raise_for_status() | |
except Exception as e: | |
def err_gen(): | |
chunk = { | |
"id": f"proxy-{uuid.uuid4()}", | |
"object": "chat.completion.chunk", | |
"choices": [{"index": 0, "delta": {"content": ""}, "finish_reason": "error"}], | |
"error": str(e), | |
} | |
yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n" | |
yield "data: [DONE]\n\n" | |
return StreamingResponse(err_gen(), media_type="text/event-stream") | |
stream_id = f"proxy-{uuid.uuid4()}" | |
def gen(): | |
buffer = "" | |
for line in resp.iter_lines(): | |
if not line: | |
continue | |
try: | |
s = line.decode("utf-8") | |
except Exception: | |
continue | |
s = s.strip() | |
if not s.startswith("data:"): | |
continue | |
data = s[len("data:"):].strip() | |
if data == "[DONE]": | |
# Flush remaining buffer | |
if buffer: | |
try: | |
translated = GoogleTranslator(source='en', target=item.to_language).translate(buffer) | |
except Exception: | |
translated = buffer | |
chunk = {"id": stream_id, "object": "chat.completion.chunk", "choices": [{"index": 0, "delta": {"content": translated}, "finish_reason": None}]} | |
yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n" | |
buffer = "" | |
yield "data: [DONE]\n\n" | |
break | |
# Parse JSON | |
try: | |
obj = json.loads(data) | |
piece = obj.get("choices", [{}])[0].get("delta", {}).get("content") | |
except Exception: | |
piece = None | |
if piece: | |
buffer += piece | |
# Translate and flush on sentence boundary or buffer size | |
if any(piece.endswith(x) for x in [".", "!", "?", "\n"]) or len(buffer) > 120: | |
try: | |
translated = GoogleTranslator(source='en', target=item.to_language).translate(buffer) | |
except Exception: | |
translated = buffer | |
chunk = {"id": stream_id, "object": "chat.completion.chunk", "choices": [{"index": 0, "delta": {"content": translated}, "finish_reason": None}]} | |
yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n" | |
buffer = "" | |
# Safety end | |
yield "data: [DONE]\n\n" | |
return StreamingResponse(gen(), media_type="text/event-stream") | |
def translate_chat_stream(self, item: ChatTranslateStreamItem): | |
# Extract latest user content | |
input_text = None | |
for message in reversed(item.messages or []): | |
if message.get("role") == "user": | |
input_text = message.get("content", "") | |
break | |
if not input_text: | |
# Fallback to empty stream end | |
def empty_gen(): | |
yield "data: [DONE]\n\n" | |
return StreamingResponse(empty_gen(), media_type="text/event-stream") | |
# Reuse the streaming translator | |
payload = self.TranslateCompletionsPostItem( | |
to_language=item.to_language, | |
input_text=input_text, | |
) | |
return self.translate_stream(payload) | |
def _preprocess_text_for_translation(self, text: str, detected_lang: str | None) -> str: | |
"""If the text appears to be a romanized Ethiopic language, convert to Ethiopic script. | |
Otherwise return original text. | |
""" | |
if not text: | |
return text | |
# If already Ethiopic, return as-is | |
ethiopic_pattern = re.compile(r"[\u1200-\u137F\u1380-\u1399\u2D80-\u2DDF\uAB00-\uAB2F]") | |
if ethiopic_pattern.search(text): | |
return text | |
# Romanized patterns for Amharic/Tigrinya detection and mapping | |
roman_am_keywords = { | |
"selam", "endet", "ende", "dehna", "dena", "amesegenallo", "amaseginalehu", "betam", | |
"ish", "eske", "yene", "wedaj", "wedaje", "indemin", "indet", "bereket", "melkam", | |
} | |
roman_ti_keywords = { | |
"kemey", "tsnuy", "selam", "aydelem", "welat", "hade", "abzi", "abey", | |
} | |
text_lower = text.lower() | |
tokens = re.findall(r"[a-zA-Z]+", text_lower) | |
am_hits = sum(1 for t in tokens if t in roman_am_keywords) | |
ti_hits = sum(1 for t in tokens if t in roman_ti_keywords) | |
likely_am = (detected_lang == "am") or (am_hits > ti_hits and am_hits > 0) | |
likely_ti = (detected_lang == "ti") or (ti_hits > am_hits and ti_hits > 0) | |
if not (likely_am or likely_ti): | |
return text | |
# Minimal romanized -> Ethiopic mapping (extensible) | |
replacements = [ | |
# Amharic common phrases | |
(r"\bselam\b", "ሰላም"), | |
(r"\bdehna\b", "ደህና"), | |
(r"\bdena\b", "ደና"), | |
(r"\bendet\b", "እንዴት"), | |
(r"\bneh\b", "ነህ"), | |
(r"\bnesh\b", "ነሽ"), | |
(r"\bbetam\b", "በጣም"), | |
(r"\bamesegenallo\b", "አመሰግናለሁ"), | |
(r"\bamaseginalehu\b", "አመሰግናለሁ"), | |
(r"\bindemin\b", "እንዴት"), | |
(r"\bmelkam\b", "መልካም"), | |
# Tigrinya common phrases | |
(r"\bkemey\b", "ከመይ"), | |
(r"\btsnuy\b", "ጽኑይ"), | |
(r"\baydelem\b", "ኣይደለም"), | |
] | |
def apply_replacements(s: str) -> str: | |
out = s | |
for pat, repl in replacements: | |
out = re.sub(pat, repl, out, flags=re.IGNORECASE) | |
return out | |
converted = apply_replacements(text) | |
if ethiopic_pattern.search(converted): | |
return converted | |
# 2) General transliteration (SERA-like approximation) | |
try: | |
transliterated = self._transliterate_latin_to_ethiopic(text) | |
if ethiopic_pattern.search(transliterated): | |
return transliterated | |
except Exception: | |
pass | |
# Fallback to original | |
return text | |
def _transliterate_latin_to_ethiopic(self, text: str) -> str: | |
"""Approximate Latin -> Ethiopic (Ge'ez) transliteration for Amharic/Tigrinya. | |
This is a pragmatic mapping sufficient for common phrases. It uses a | |
consonant→base-codepoint table and vowel→order offsets following the | |
7 orders: e, u, i, a, ee, (consonant/ɨ), o. | |
Limitations: not a full SERA implementation; can be extended. | |
""" | |
# Base codepoints per consonant (first order 'e'). | |
base_map = { | |
# simple | |
"h": 0x1200, # ሀ | |
"l": 0x1208, # ለ | |
"m": 0x1218, # መ | |
"r": 0x1228, # ረ | |
"s": 0x1230, # ሰ | |
"sh": 0x1238, # ሸ | |
"q": 0x1240, # ቀ (ejective k’) | |
"b": 0x1260, # በ | |
"v": 0x1268, # ቨ | |
"t": 0x1270, # ተ | |
"ch": 0x1278, # ቸ | |
"n": 0x1290, # ነ | |
"k": 0x12A8, # ከ | |
"w": 0x12C8, # ወ | |
"z": 0x12D8, # ዘ | |
"y": 0x12E8, # የ | |
"d": 0x12F0, # ደ | |
"j": 0x1300, # ጀ | |
"g": 0x1308, # ገ | |
"t'": 0x1320, # ጠ | |
"ts'": 0x1338, # ጸ (often written ts') | |
"p'": 0x1330, # ጰ | |
"p": 0x1350, # ፐ | |
"f": 0x1348, # ፈ | |
} | |
# Prefer longer graphemes first | |
graphemes = sorted(base_map.keys(), key=len, reverse=True) | |
# Vowel to order offset (first order 'e' has offset 0) | |
# Map long 'ee' to 5th order, bare consonant to 6th | |
vowel_orders = [ | |
(re.compile(r"^ee", re.IGNORECASE), 4, 2), # consume 2 chars, +4 offset | |
(re.compile(r"^e", re.IGNORECASE), 0, 1), # +0 | |
(re.compile(r"^u", re.IGNORECASE), 1, 1), # +1 | |
(re.compile(r"^i", re.IGNORECASE), 2, 1), # +2 | |
(re.compile(r"^a", re.IGNORECASE), 3, 1), # +3 | |
(re.compile(r"^o", re.IGNORECASE), 6, 1), # +6 | |
] | |
# Initial vowel letters | |
initial_vowel_map = { | |
"a": "አ", | |
"e": "እ", | |
"i": "ኢ", | |
"o": "ኦ", | |
"u": "ኡ", | |
} | |
def transliterate_word(word: str) -> str: | |
i = 0 | |
out = [] | |
w = word | |
# Initial vowel | |
if i < len(w) and w[i].lower() in initial_vowel_map: | |
out.append(initial_vowel_map[w[i].lower()]) | |
i += 1 | |
while i < len(w): | |
# Skip non-letters | |
if not w[i].isalpha() and w[i] not in ["'"]: | |
out.append(w[i]) | |
i += 1 | |
continue | |
# Match grapheme | |
cons = None | |
for gph in graphemes: | |
if w[i:].lower().startswith(gph): | |
cons = gph | |
break | |
if cons is None: | |
# Fallback: emit as-is | |
out.append(w[i]) | |
i += 1 | |
continue | |
i += len(cons) | |
# Match vowel | |
order_offset = 5 # default consonant/6th order | |
consumed = 0 | |
for rx, off, length in vowel_orders: | |
m = rx.match(w[i:]) | |
if m: | |
order_offset = off | |
consumed = length | |
break | |
i += consumed | |
base = base_map[cons] | |
ch = chr(base + order_offset) | |
out.append(ch) | |
return "".join(out) | |
# Split text preserving spaces and punctuation | |
parts = re.findall(r"[A-Za-z']+|\s+|[^\w\s]", text) | |
converted_parts = [transliterate_word(p) if re.match(r"[A-Za-z']+", p) else p for p in parts] | |
return "".join(converted_parts) | |
class ArgParser(argparse.ArgumentParser): | |
def __init__(self, *args, **kwargs): | |
super(ArgParser, self).__init__(*args, **kwargs) | |
self.add_argument( | |
"-s", | |
"--server", | |
type=str, | |
default="0.0.0.0", | |
help="Server IP for HF LLM Chat API", | |
) | |
self.add_argument( | |
"-p", | |
"--port", | |
type=int, | |
default=23333, | |
help="Server Port for HF LLM Chat API", | |
) | |
self.add_argument( | |
"-d", | |
"--dev", | |
default=False, | |
action="store_true", | |
help="Run in dev mode", | |
) | |
self.args = self.parse_args(sys.argv[1:]) | |
app = ChatAPIApp().app | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
if __name__ == "__main__": | |
args = ArgParser().args | |
if args.dev: | |
uvicorn.run("__main__:app", host=args.server, port=args.port, reload=True) | |
else: | |
uvicorn.run("__main__:app", host=args.server, port=args.port, reload=False) | |
# python -m apis.chat_api # [Docker] on product mode | |
# python -m apis.chat_api -d # [Dev] on develop mode | |