import argparse import uvicorn import sys import json from fastapi import FastAPI from fastapi import Request from fastapi import Body from fastapi.encoders import jsonable_encoder from fastapi.responses import JSONResponse, StreamingResponse import uuid import time import re import requests try: import langid except Exception: langid = None from pydantic import BaseModel, Field from deep_translator import GoogleTranslator from deep_translator import single_detection from fastapi.middleware.cors import CORSMiddleware class ChatAPIApp: def __init__(self): self.app = FastAPI( docs_url="/", title="Selam Translate API", swagger_ui_parameters={"defaultModelsExpandDepth": -1}, version="1.0", ) self.setup_routes() def get_available_langs(self): f = open('apis/lang_name.json', "r") self.available_models = json.loads(f.read()) return self.available_models class TranslateCompletionsPostItem(BaseModel): from_language: str = Field( default="en", description="(str) `Detect`", ) to_language: str = Field( default="am", description="(str) `en`", ) input_text: str = Field( default="Hello", description="(str) `Text for translate`", ) def translate_completions(self, item: TranslateCompletionsPostItem): f = open('apis/lang_name.json', "r") available_langs = json.loads(f.read()) to_lang = 'en' for lang_item in available_langs: if item.to_language == lang_item['code']: to_lang = item.to_language break if to_lang == 'auto': to_lang = 'en' # Advanced source detection and romanized Ethiopic handling detected_src, _conf = self._detect_language_advanced(item.input_text) processed_input = self._preprocess_text_for_translation(item.input_text, detected_src) translated_text = GoogleTranslator(source='auto', target=to_lang).translate(processed_input) item_response = { "from_language": detected_src, "to_language": to_lang, "text": item.input_text, "translate": translated_text, } json_compatible_item_data = jsonable_encoder(item_response) return JSONResponse(content=json_compatible_item_data) class DetectLanguagePostItem(BaseModel): input_text: str = Field( default="Hello, how are you?", description="(str) `Text for detection`", ) class ChatTranslateStreamItem(BaseModel): # OpenAI-style payload compatibility model: str | None = Field(default=None, description="(optional) ignored") stream: bool | None = Field(default=True, description="(optional) ignored") to_language: str = Field(default="am", description="Target language code") messages: list[dict] = Field( default_factory=list, description="OpenAI-style messages; the last user message's content is translated", ) def detect_language(self, item: DetectLanguagePostItem): lang_code, confidence = self._detect_language_advanced(item.input_text) item_response = { "lang": lang_code, "confidence": confidence, } json_compatible_item_data = jsonable_encoder(item_response) return JSONResponse(content=json_compatible_item_data) # Advanced language detection tailored for Ethiopic scripts (Amharic, Tigrinya) def _detect_language_advanced(self, text: str) -> tuple[str | None, float | None]: if not text or not text.strip(): return None, None # 1) Script detection: Ethiopic blocks ethiopic_pattern = re.compile(r"[\u1200-\u137F\u1380-\u1399\u2D80-\u2DDF\uAB00-\uAB2F]") contains_ethiopic = bool(ethiopic_pattern.search(text)) if contains_ethiopic: # 2) Heuristic keywords for Amharic vs Tigrinya amharic_keywords = { "እንዴት", "ነህ", "ነሽ", "ነው", "ሰላም", "አመሰግናለሁ", "አሁን", "ለመንገድ", "ይሄ", "ትክክል", } tigrinya_keywords = { "ከመይ", "እየ", "እዩ", "ይኹን", "ኣብ", "ኣሎ", "ሰላም", "እቲ", "እዚ", "ኣይ", } # Normalize text_norm = text.strip() # Tokenize on whitespace; Ethiopic has no case, so case-folding is unnecessary tokens = re.findall(r"\w+", text_norm) am_score = sum(1 for tok in tokens if tok in amharic_keywords) ti_score = sum(1 for tok in tokens if tok in tigrinya_keywords) if am_score > ti_score and am_score > 0: # Strong heuristic win return "am", 0.9 if (am_score - ti_score) >= 1 else 0.7 if ti_score > am_score and ti_score > 0: return "ti", 0.9 if (ti_score - am_score) >= 1 else 0.7 # 3) Fallback to statistical detector if available if langid is not None: try: code, score = langid.classify(text) # Adjust confidence for Ethiopic hits if code in ("am", "ti"): return code, float(score) except Exception: pass # 4) Fallback to Google detection via deep_translator try: code = single_detection(text) # If Google says Ethiopic langs, accept; else assume Amharic by default if code in ("am", "ti"): return code, None return "am", None except Exception: return "am", None # Not Ethiopic: use langid first, then Google fallback if langid is not None: try: code, score = langid.classify(text) return code, float(score) except Exception: pass try: code = single_detection(text) return code, None except Exception: return None, None def setup_routes(self): for prefix in ["", "/v1"]: self.app.get( prefix + "/langs", summary="Get available languages", )(self.get_available_langs) self.app.post( prefix + "/translate", summary="translate text", )(self.translate_completions) # Removed AI translation endpoint self.app.post( prefix + "/detect", summary="detect language", )(self.detect_language) self.app.post( prefix + "/translate/stream", summary="stream translated text (OpenAI-compatible SSE)", )(self.translate_stream) # Raw-text friendly streaming endpoint to avoid JSON escaping issues self.app.post( prefix + "/translate/stream/raw", summary="stream translated text (plain text body; set ?to_language=am)", openapi_extra={ "requestBody": { "required": True, "content": { "text/plain": { "schema": {"type": "string", "example": "selam, endet neh?"}, "examples": { "AmharicRomanized": { "summary": "Romanized Amharic", "value": "selam, endet neh?" }, "Paragraph": { "summary": "Multiline plain text", "value": "The Ethiopian Alphasyllabary: A Look at Amharic and Tigrinya\nThe writing systems for Amharic and Tigrinya are beautiful and complex examples of an alphasyllabary." } } } } } }, responses={ 200: { "description": "SSE stream", "content": { "text/event-stream": { "schema": {"type": "string", "example": "data: {\\\"choices\\\":[{\\\"delta\\\":{\\\"content\\\":\\\"...\\\"}}]}\\n\\n"} } } } }, )(self.translate_stream_raw) self.app.post( prefix + "/translate/chat/stream", summary="stream translated text from OpenAI-style chat payload", )(self.translate_chat_stream) # Proxy an OpenAI-style SSE stream via Pollinations, pre/post translating self.app.post( prefix + "/translate/chat/proxy/stream", summary="proxy OpenAI-style chat stream via Pollinations with translation", )(self.translate_chat_proxy_stream) def translate_stream(self, item: TranslateCompletionsPostItem): f = open('apis/lang_name.json', "r") available_langs = json.loads(f.read()) to_lang = 'en' for lang_item in available_langs: if item.to_language == lang_item['code']: to_lang = item.to_language break if to_lang == 'auto': to_lang = 'en' # Detect/prepare input (romanized Ethiopic -> Ethiopic script) detected_src, _conf = self._detect_language_advanced(item.input_text) processed_input = self._preprocess_text_for_translation(item.input_text, detected_src) try: translated_full = GoogleTranslator(source='auto', target=to_lang).translate(processed_input) except Exception as e: error_event = { "id": f"trans-{uuid.uuid4()}", "object": "chat.completion.chunk", "choices": [ { "index": 0, "delta": {"content": ""}, "finish_reason": "error", } ], "error": str(e), } def error_gen(): yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n" yield "data: [DONE]\n\n" return StreamingResponse(error_gen(), media_type="text/event-stream") # Character-based streaming for natural flow in languages without spaces chars = list(translated_full) if translated_full else [] stream_id = f"trans-{uuid.uuid4()}" def event_generator(): for ch in chars: chunk = { "id": stream_id, "object": "chat.completion.chunk", "choices": [ { "index": 0, "delta": {"content": ch}, "finish_reason": None, } ], } yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n" time.sleep(0.005) # Stream end yield "data: [DONE]\n\n" return StreamingResponse(event_generator(), media_type="text/event-stream") async def translate_stream_raw(self, request: Request, to_language: str = "am", text: str = Body(default=None, media_type="text/plain")): # Prefer explicit text/plain body if provided, else use raw bytes if text is not None: input_text = text else: body_bytes = await request.body() input_text = body_bytes.decode("utf-8", errors="ignore") payload = self.TranslateCompletionsPostItem( to_language=to_language, input_text=input_text, ) return self.translate_stream(payload) class ChatProxyStreamItem(BaseModel): model: str = Field(default="gpt-4.1", description="Pollinations model name") stream: bool = Field(default=True) to_language: str = Field(default="am") from_language: str | None = Field(default=None) messages: list[dict] = Field(default_factory=list) api_url: str | None = Field(default=None, description="Override Pollinations API URL") def translate_chat_proxy_stream(self, item: ChatProxyStreamItem): api_url = item.api_url or "https://text.pollinations.ai/openai" # Find last user message user_text = "" for msg in reversed(item.messages or []): if msg.get("role") == "user": user_text = msg.get("content", "") break # Pre-translate user input to English for LLM detected_src, _ = self._detect_language_advanced(user_text) pre_text = self._preprocess_text_for_translation(user_text, detected_src) try: llm_input_en = GoogleTranslator(source='auto', target='en').translate(pre_text) except Exception: llm_input_en = user_text # Build messages with replaced last user message proxied_messages = list(item.messages or []) for i in range(len(proxied_messages) - 1, -1, -1): if proxied_messages[i].get("role") == "user": proxied_messages[i] = {**proxied_messages[i], "content": llm_input_en} break req_headers = { "Content-Type": "application/json", "Accept": "text/event-stream", } req_body = { "model": item.model, "messages": proxied_messages, "stream": True, } # Make streaming request to Pollinations try: resp = requests.post(api_url, headers=req_headers, json=req_body, stream=True, timeout=60) resp.raise_for_status() except Exception as e: def err_gen(): chunk = { "id": f"proxy-{uuid.uuid4()}", "object": "chat.completion.chunk", "choices": [{"index": 0, "delta": {"content": ""}, "finish_reason": "error"}], "error": str(e), } yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n" yield "data: [DONE]\n\n" return StreamingResponse(err_gen(), media_type="text/event-stream") stream_id = f"proxy-{uuid.uuid4()}" def gen(): buffer = "" for line in resp.iter_lines(): if not line: continue try: s = line.decode("utf-8") except Exception: continue s = s.strip() if not s.startswith("data:"): continue data = s[len("data:"):].strip() if data == "[DONE]": # Flush remaining buffer if buffer: try: translated = GoogleTranslator(source='en', target=item.to_language).translate(buffer) except Exception: translated = buffer chunk = {"id": stream_id, "object": "chat.completion.chunk", "choices": [{"index": 0, "delta": {"content": translated}, "finish_reason": None}]} yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n" buffer = "" yield "data: [DONE]\n\n" break # Parse JSON try: obj = json.loads(data) piece = obj.get("choices", [{}])[0].get("delta", {}).get("content") except Exception: piece = None if piece: buffer += piece # Translate and flush on sentence boundary or buffer size if any(piece.endswith(x) for x in [".", "!", "?", "\n"]) or len(buffer) > 120: try: translated = GoogleTranslator(source='en', target=item.to_language).translate(buffer) except Exception: translated = buffer chunk = {"id": stream_id, "object": "chat.completion.chunk", "choices": [{"index": 0, "delta": {"content": translated}, "finish_reason": None}]} yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n" buffer = "" # Safety end yield "data: [DONE]\n\n" return StreamingResponse(gen(), media_type="text/event-stream") def translate_chat_stream(self, item: ChatTranslateStreamItem): # Extract latest user content input_text = None for message in reversed(item.messages or []): if message.get("role") == "user": input_text = message.get("content", "") break if not input_text: # Fallback to empty stream end def empty_gen(): yield "data: [DONE]\n\n" return StreamingResponse(empty_gen(), media_type="text/event-stream") # Reuse the streaming translator payload = self.TranslateCompletionsPostItem( to_language=item.to_language, input_text=input_text, ) return self.translate_stream(payload) def _preprocess_text_for_translation(self, text: str, detected_lang: str | None) -> str: """If the text appears to be a romanized Ethiopic language, convert to Ethiopic script. Otherwise return original text. """ if not text: return text # If already Ethiopic, return as-is ethiopic_pattern = re.compile(r"[\u1200-\u137F\u1380-\u1399\u2D80-\u2DDF\uAB00-\uAB2F]") if ethiopic_pattern.search(text): return text # Romanized patterns for Amharic/Tigrinya detection and mapping roman_am_keywords = { "selam", "endet", "ende", "dehna", "dena", "amesegenallo", "amaseginalehu", "betam", "ish", "eske", "yene", "wedaj", "wedaje", "indemin", "indet", "bereket", "melkam", } roman_ti_keywords = { "kemey", "tsnuy", "selam", "aydelem", "welat", "hade", "abzi", "abey", } text_lower = text.lower() tokens = re.findall(r"[a-zA-Z]+", text_lower) am_hits = sum(1 for t in tokens if t in roman_am_keywords) ti_hits = sum(1 for t in tokens if t in roman_ti_keywords) likely_am = (detected_lang == "am") or (am_hits > ti_hits and am_hits > 0) likely_ti = (detected_lang == "ti") or (ti_hits > am_hits and ti_hits > 0) if not (likely_am or likely_ti): return text # Minimal romanized -> Ethiopic mapping (extensible) replacements = [ # Amharic common phrases (r"\bselam\b", "ሰላም"), (r"\bdehna\b", "ደህና"), (r"\bdena\b", "ደና"), (r"\bendet\b", "እንዴት"), (r"\bneh\b", "ነህ"), (r"\bnesh\b", "ነሽ"), (r"\bbetam\b", "በጣም"), (r"\bamesegenallo\b", "አመሰግናለሁ"), (r"\bamaseginalehu\b", "አመሰግናለሁ"), (r"\bindemin\b", "እንዴት"), (r"\bmelkam\b", "መልካም"), # Tigrinya common phrases (r"\bkemey\b", "ከመይ"), (r"\btsnuy\b", "ጽኑይ"), (r"\baydelem\b", "ኣይደለም"), ] def apply_replacements(s: str) -> str: out = s for pat, repl in replacements: out = re.sub(pat, repl, out, flags=re.IGNORECASE) return out converted = apply_replacements(text) if ethiopic_pattern.search(converted): return converted # 2) General transliteration (SERA-like approximation) try: transliterated = self._transliterate_latin_to_ethiopic(text) if ethiopic_pattern.search(transliterated): return transliterated except Exception: pass # Fallback to original return text def _transliterate_latin_to_ethiopic(self, text: str) -> str: """Approximate Latin -> Ethiopic (Ge'ez) transliteration for Amharic/Tigrinya. This is a pragmatic mapping sufficient for common phrases. It uses a consonant→base-codepoint table and vowel→order offsets following the 7 orders: e, u, i, a, ee, (consonant/ɨ), o. Limitations: not a full SERA implementation; can be extended. """ # Base codepoints per consonant (first order 'e'). base_map = { # simple "h": 0x1200, # ሀ "l": 0x1208, # ለ "m": 0x1218, # መ "r": 0x1228, # ረ "s": 0x1230, # ሰ "sh": 0x1238, # ሸ "q": 0x1240, # ቀ (ejective k’) "b": 0x1260, # በ "v": 0x1268, # ቨ "t": 0x1270, # ተ "ch": 0x1278, # ቸ "n": 0x1290, # ነ "k": 0x12A8, # ከ "w": 0x12C8, # ወ "z": 0x12D8, # ዘ "y": 0x12E8, # የ "d": 0x12F0, # ደ "j": 0x1300, # ጀ "g": 0x1308, # ገ "t'": 0x1320, # ጠ "ts'": 0x1338, # ጸ (often written ts') "p'": 0x1330, # ጰ "p": 0x1350, # ፐ "f": 0x1348, # ፈ } # Prefer longer graphemes first graphemes = sorted(base_map.keys(), key=len, reverse=True) # Vowel to order offset (first order 'e' has offset 0) # Map long 'ee' to 5th order, bare consonant to 6th vowel_orders = [ (re.compile(r"^ee", re.IGNORECASE), 4, 2), # consume 2 chars, +4 offset (re.compile(r"^e", re.IGNORECASE), 0, 1), # +0 (re.compile(r"^u", re.IGNORECASE), 1, 1), # +1 (re.compile(r"^i", re.IGNORECASE), 2, 1), # +2 (re.compile(r"^a", re.IGNORECASE), 3, 1), # +3 (re.compile(r"^o", re.IGNORECASE), 6, 1), # +6 ] # Initial vowel letters initial_vowel_map = { "a": "አ", "e": "እ", "i": "ኢ", "o": "ኦ", "u": "ኡ", } def transliterate_word(word: str) -> str: i = 0 out = [] w = word # Initial vowel if i < len(w) and w[i].lower() in initial_vowel_map: out.append(initial_vowel_map[w[i].lower()]) i += 1 while i < len(w): # Skip non-letters if not w[i].isalpha() and w[i] not in ["'"]: out.append(w[i]) i += 1 continue # Match grapheme cons = None for gph in graphemes: if w[i:].lower().startswith(gph): cons = gph break if cons is None: # Fallback: emit as-is out.append(w[i]) i += 1 continue i += len(cons) # Match vowel order_offset = 5 # default consonant/6th order consumed = 0 for rx, off, length in vowel_orders: m = rx.match(w[i:]) if m: order_offset = off consumed = length break i += consumed base = base_map[cons] ch = chr(base + order_offset) out.append(ch) return "".join(out) # Split text preserving spaces and punctuation parts = re.findall(r"[A-Za-z']+|\s+|[^\w\s]", text) converted_parts = [transliterate_word(p) if re.match(r"[A-Za-z']+", p) else p for p in parts] return "".join(converted_parts) class ArgParser(argparse.ArgumentParser): def __init__(self, *args, **kwargs): super(ArgParser, self).__init__(*args, **kwargs) self.add_argument( "-s", "--server", type=str, default="0.0.0.0", help="Server IP for HF LLM Chat API", ) self.add_argument( "-p", "--port", type=int, default=23333, help="Server Port for HF LLM Chat API", ) self.add_argument( "-d", "--dev", default=False, action="store_true", help="Run in dev mode", ) self.args = self.parse_args(sys.argv[1:]) app = ChatAPIApp().app app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) if __name__ == "__main__": args = ArgParser().args if args.dev: uvicorn.run("__main__:app", host=args.server, port=args.port, reload=True) else: uvicorn.run("__main__:app", host=args.server, port=args.port, reload=False) # python -m apis.chat_api # [Docker] on product mode # python -m apis.chat_api -d # [Dev] on develop mode