selam-translate / apis /chat_api.py
snackshell's picture
Update apis/chat_api.py
badccec verified
import argparse
import uvicorn
import sys
import json
from fastapi import FastAPI
from fastapi import Request
from fastapi import Body
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse, StreamingResponse
import uuid
import time
import re
import requests
try:
import langid
except Exception:
langid = None
from pydantic import BaseModel, Field
from deep_translator import GoogleTranslator
from deep_translator import single_detection
from fastapi.middleware.cors import CORSMiddleware
class ChatAPIApp:
def __init__(self):
self.app = FastAPI(
docs_url="/",
title="Selam Translate API",
swagger_ui_parameters={"defaultModelsExpandDepth": -1},
version="1.0",
)
self.setup_routes()
def get_available_langs(self):
f = open('apis/lang_name.json', "r")
self.available_models = json.loads(f.read())
return self.available_models
class TranslateCompletionsPostItem(BaseModel):
from_language: str = Field(
default="en",
description="(str) `Detect`",
)
to_language: str = Field(
default="am",
description="(str) `en`",
)
input_text: str = Field(
default="Hello",
description="(str) `Text for translate`",
)
def translate_completions(self, item: TranslateCompletionsPostItem):
f = open('apis/lang_name.json', "r")
available_langs = json.loads(f.read())
to_lang = 'en'
for lang_item in available_langs:
if item.to_language == lang_item['code']:
to_lang = item.to_language
break
if to_lang == 'auto':
to_lang = 'en'
# Advanced source detection and romanized Ethiopic handling
detected_src, _conf = self._detect_language_advanced(item.input_text)
processed_input = self._preprocess_text_for_translation(item.input_text, detected_src)
translated_text = GoogleTranslator(source='auto', target=to_lang).translate(processed_input)
item_response = {
"from_language": detected_src,
"to_language": to_lang,
"text": item.input_text,
"translate": translated_text,
}
json_compatible_item_data = jsonable_encoder(item_response)
return JSONResponse(content=json_compatible_item_data)
class DetectLanguagePostItem(BaseModel):
input_text: str = Field(
default="Hello, how are you?",
description="(str) `Text for detection`",
)
class ChatTranslateStreamItem(BaseModel):
# OpenAI-style payload compatibility
model: str | None = Field(default=None, description="(optional) ignored")
stream: bool | None = Field(default=True, description="(optional) ignored")
to_language: str = Field(default="am", description="Target language code")
messages: list[dict] = Field(
default_factory=list,
description="OpenAI-style messages; the last user message's content is translated",
)
def detect_language(self, item: DetectLanguagePostItem):
lang_code, confidence = self._detect_language_advanced(item.input_text)
item_response = {
"lang": lang_code,
"confidence": confidence,
}
json_compatible_item_data = jsonable_encoder(item_response)
return JSONResponse(content=json_compatible_item_data)
# Advanced language detection tailored for Ethiopic scripts (Amharic, Tigrinya)
def _detect_language_advanced(self, text: str) -> tuple[str | None, float | None]:
if not text or not text.strip():
return None, None
# 1) Script detection: Ethiopic blocks
ethiopic_pattern = re.compile(r"[\u1200-\u137F\u1380-\u1399\u2D80-\u2DDF\uAB00-\uAB2F]")
contains_ethiopic = bool(ethiopic_pattern.search(text))
if contains_ethiopic:
# 2) Heuristic keywords for Amharic vs Tigrinya
amharic_keywords = {
"እንዴት", "ነህ", "ነሽ", "ነው", "ሰላም", "አመሰግናለሁ", "አሁን", "ለመንገድ", "ይሄ", "ትክክል",
}
tigrinya_keywords = {
"ከመይ", "እየ", "እዩ", "ይኹን", "ኣብ", "ኣሎ", "ሰላም", "እቲ", "እዚ", "ኣይ",
}
# Normalize
text_norm = text.strip()
# Tokenize on whitespace; Ethiopic has no case, so case-folding is unnecessary
tokens = re.findall(r"\w+", text_norm)
am_score = sum(1 for tok in tokens if tok in amharic_keywords)
ti_score = sum(1 for tok in tokens if tok in tigrinya_keywords)
if am_score > ti_score and am_score > 0:
# Strong heuristic win
return "am", 0.9 if (am_score - ti_score) >= 1 else 0.7
if ti_score > am_score and ti_score > 0:
return "ti", 0.9 if (ti_score - am_score) >= 1 else 0.7
# 3) Fallback to statistical detector if available
if langid is not None:
try:
code, score = langid.classify(text)
# Adjust confidence for Ethiopic hits
if code in ("am", "ti"):
return code, float(score)
except Exception:
pass
# 4) Fallback to Google detection via deep_translator
try:
code = single_detection(text)
# If Google says Ethiopic langs, accept; else assume Amharic by default
if code in ("am", "ti"):
return code, None
return "am", None
except Exception:
return "am", None
# Not Ethiopic: use langid first, then Google fallback
if langid is not None:
try:
code, score = langid.classify(text)
return code, float(score)
except Exception:
pass
try:
code = single_detection(text)
return code, None
except Exception:
return None, None
def setup_routes(self):
for prefix in ["", "/v1"]:
self.app.get(
prefix + "/langs",
summary="Get available languages",
)(self.get_available_langs)
self.app.post(
prefix + "/translate",
summary="translate text",
)(self.translate_completions)
# Removed AI translation endpoint
self.app.post(
prefix + "/detect",
summary="detect language",
)(self.detect_language)
self.app.post(
prefix + "/translate/stream",
summary="stream translated text (OpenAI-compatible SSE)",
)(self.translate_stream)
# Raw-text friendly streaming endpoint to avoid JSON escaping issues
self.app.post(
prefix + "/translate/stream/raw",
summary="stream translated text (plain text body; set ?to_language=am)",
openapi_extra={
"requestBody": {
"required": True,
"content": {
"text/plain": {
"schema": {"type": "string", "example": "selam, endet neh?"},
"examples": {
"AmharicRomanized": {
"summary": "Romanized Amharic",
"value": "selam, endet neh?"
},
"Paragraph": {
"summary": "Multiline plain text",
"value": "The Ethiopian Alphasyllabary: A Look at Amharic and Tigrinya\nThe writing systems for Amharic and Tigrinya are beautiful and complex examples of an alphasyllabary."
}
}
}
}
}
},
responses={
200: {
"description": "SSE stream",
"content": {
"text/event-stream": {
"schema": {"type": "string", "example": "data: {\\\"choices\\\":[{\\\"delta\\\":{\\\"content\\\":\\\"...\\\"}}]}\\n\\n"}
}
}
}
},
)(self.translate_stream_raw)
self.app.post(
prefix + "/translate/chat/stream",
summary="stream translated text from OpenAI-style chat payload",
)(self.translate_chat_stream)
# Proxy an OpenAI-style SSE stream via Pollinations, pre/post translating
self.app.post(
prefix + "/translate/chat/proxy/stream",
summary="proxy OpenAI-style chat stream via Pollinations with translation",
)(self.translate_chat_proxy_stream)
def translate_stream(self, item: TranslateCompletionsPostItem):
f = open('apis/lang_name.json', "r")
available_langs = json.loads(f.read())
to_lang = 'en'
for lang_item in available_langs:
if item.to_language == lang_item['code']:
to_lang = item.to_language
break
if to_lang == 'auto':
to_lang = 'en'
# Detect/prepare input (romanized Ethiopic -> Ethiopic script)
detected_src, _conf = self._detect_language_advanced(item.input_text)
processed_input = self._preprocess_text_for_translation(item.input_text, detected_src)
try:
translated_full = GoogleTranslator(source='auto', target=to_lang).translate(processed_input)
except Exception as e:
error_event = {
"id": f"trans-{uuid.uuid4()}",
"object": "chat.completion.chunk",
"choices": [
{
"index": 0,
"delta": {"content": ""},
"finish_reason": "error",
}
],
"error": str(e),
}
def error_gen():
yield f"data: {json.dumps(error_event, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(error_gen(), media_type="text/event-stream")
# Character-based streaming for natural flow in languages without spaces
chars = list(translated_full) if translated_full else []
stream_id = f"trans-{uuid.uuid4()}"
def event_generator():
for ch in chars:
chunk = {
"id": stream_id,
"object": "chat.completion.chunk",
"choices": [
{
"index": 0,
"delta": {"content": ch},
"finish_reason": None,
}
],
}
yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
time.sleep(0.005)
# Stream end
yield "data: [DONE]\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
async def translate_stream_raw(self, request: Request, to_language: str = "am", text: str = Body(default=None, media_type="text/plain")):
# Prefer explicit text/plain body if provided, else use raw bytes
if text is not None:
input_text = text
else:
body_bytes = await request.body()
input_text = body_bytes.decode("utf-8", errors="ignore")
payload = self.TranslateCompletionsPostItem(
to_language=to_language,
input_text=input_text,
)
return self.translate_stream(payload)
class ChatProxyStreamItem(BaseModel):
model: str = Field(default="gpt-4.1", description="Pollinations model name")
stream: bool = Field(default=True)
to_language: str = Field(default="am")
from_language: str | None = Field(default=None)
messages: list[dict] = Field(default_factory=list)
api_url: str | None = Field(default=None, description="Override Pollinations API URL")
def translate_chat_proxy_stream(self, item: ChatProxyStreamItem):
api_url = item.api_url or "https://text.pollinations.ai/openai"
# Find last user message
user_text = ""
for msg in reversed(item.messages or []):
if msg.get("role") == "user":
user_text = msg.get("content", "")
break
# Pre-translate user input to English for LLM
detected_src, _ = self._detect_language_advanced(user_text)
pre_text = self._preprocess_text_for_translation(user_text, detected_src)
try:
llm_input_en = GoogleTranslator(source='auto', target='en').translate(pre_text)
except Exception:
llm_input_en = user_text
# Build messages with replaced last user message
proxied_messages = list(item.messages or [])
for i in range(len(proxied_messages) - 1, -1, -1):
if proxied_messages[i].get("role") == "user":
proxied_messages[i] = {**proxied_messages[i], "content": llm_input_en}
break
req_headers = {
"Content-Type": "application/json",
"Accept": "text/event-stream",
}
req_body = {
"model": item.model,
"messages": proxied_messages,
"stream": True,
}
# Make streaming request to Pollinations
try:
resp = requests.post(api_url, headers=req_headers, json=req_body, stream=True, timeout=60)
resp.raise_for_status()
except Exception as e:
def err_gen():
chunk = {
"id": f"proxy-{uuid.uuid4()}",
"object": "chat.completion.chunk",
"choices": [{"index": 0, "delta": {"content": ""}, "finish_reason": "error"}],
"error": str(e),
}
yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(err_gen(), media_type="text/event-stream")
stream_id = f"proxy-{uuid.uuid4()}"
def gen():
buffer = ""
for line in resp.iter_lines():
if not line:
continue
try:
s = line.decode("utf-8")
except Exception:
continue
s = s.strip()
if not s.startswith("data:"):
continue
data = s[len("data:"):].strip()
if data == "[DONE]":
# Flush remaining buffer
if buffer:
try:
translated = GoogleTranslator(source='en', target=item.to_language).translate(buffer)
except Exception:
translated = buffer
chunk = {"id": stream_id, "object": "chat.completion.chunk", "choices": [{"index": 0, "delta": {"content": translated}, "finish_reason": None}]}
yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
buffer = ""
yield "data: [DONE]\n\n"
break
# Parse JSON
try:
obj = json.loads(data)
piece = obj.get("choices", [{}])[0].get("delta", {}).get("content")
except Exception:
piece = None
if piece:
buffer += piece
# Translate and flush on sentence boundary or buffer size
if any(piece.endswith(x) for x in [".", "!", "?", "\n"]) or len(buffer) > 120:
try:
translated = GoogleTranslator(source='en', target=item.to_language).translate(buffer)
except Exception:
translated = buffer
chunk = {"id": stream_id, "object": "chat.completion.chunk", "choices": [{"index": 0, "delta": {"content": translated}, "finish_reason": None}]}
yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"
buffer = ""
# Safety end
yield "data: [DONE]\n\n"
return StreamingResponse(gen(), media_type="text/event-stream")
def translate_chat_stream(self, item: ChatTranslateStreamItem):
# Extract latest user content
input_text = None
for message in reversed(item.messages or []):
if message.get("role") == "user":
input_text = message.get("content", "")
break
if not input_text:
# Fallback to empty stream end
def empty_gen():
yield "data: [DONE]\n\n"
return StreamingResponse(empty_gen(), media_type="text/event-stream")
# Reuse the streaming translator
payload = self.TranslateCompletionsPostItem(
to_language=item.to_language,
input_text=input_text,
)
return self.translate_stream(payload)
def _preprocess_text_for_translation(self, text: str, detected_lang: str | None) -> str:
"""If the text appears to be a romanized Ethiopic language, convert to Ethiopic script.
Otherwise return original text.
"""
if not text:
return text
# If already Ethiopic, return as-is
ethiopic_pattern = re.compile(r"[\u1200-\u137F\u1380-\u1399\u2D80-\u2DDF\uAB00-\uAB2F]")
if ethiopic_pattern.search(text):
return text
# Romanized patterns for Amharic/Tigrinya detection and mapping
roman_am_keywords = {
"selam", "endet", "ende", "dehna", "dena", "amesegenallo", "amaseginalehu", "betam",
"ish", "eske", "yene", "wedaj", "wedaje", "indemin", "indet", "bereket", "melkam",
}
roman_ti_keywords = {
"kemey", "tsnuy", "selam", "aydelem", "welat", "hade", "abzi", "abey",
}
text_lower = text.lower()
tokens = re.findall(r"[a-zA-Z]+", text_lower)
am_hits = sum(1 for t in tokens if t in roman_am_keywords)
ti_hits = sum(1 for t in tokens if t in roman_ti_keywords)
likely_am = (detected_lang == "am") or (am_hits > ti_hits and am_hits > 0)
likely_ti = (detected_lang == "ti") or (ti_hits > am_hits and ti_hits > 0)
if not (likely_am or likely_ti):
return text
# Minimal romanized -> Ethiopic mapping (extensible)
replacements = [
# Amharic common phrases
(r"\bselam\b", "ሰላም"),
(r"\bdehna\b", "ደህና"),
(r"\bdena\b", "ደና"),
(r"\bendet\b", "እንዴት"),
(r"\bneh\b", "ነህ"),
(r"\bnesh\b", "ነሽ"),
(r"\bbetam\b", "በጣም"),
(r"\bamesegenallo\b", "አመሰግናለሁ"),
(r"\bamaseginalehu\b", "አመሰግናለሁ"),
(r"\bindemin\b", "እንዴት"),
(r"\bmelkam\b", "መልካም"),
# Tigrinya common phrases
(r"\bkemey\b", "ከመይ"),
(r"\btsnuy\b", "ጽኑይ"),
(r"\baydelem\b", "ኣይደለም"),
]
def apply_replacements(s: str) -> str:
out = s
for pat, repl in replacements:
out = re.sub(pat, repl, out, flags=re.IGNORECASE)
return out
converted = apply_replacements(text)
if ethiopic_pattern.search(converted):
return converted
# 2) General transliteration (SERA-like approximation)
try:
transliterated = self._transliterate_latin_to_ethiopic(text)
if ethiopic_pattern.search(transliterated):
return transliterated
except Exception:
pass
# Fallback to original
return text
def _transliterate_latin_to_ethiopic(self, text: str) -> str:
"""Approximate Latin -> Ethiopic (Ge'ez) transliteration for Amharic/Tigrinya.
This is a pragmatic mapping sufficient for common phrases. It uses a
consonant→base-codepoint table and vowel→order offsets following the
7 orders: e, u, i, a, ee, (consonant/ɨ), o.
Limitations: not a full SERA implementation; can be extended.
"""
# Base codepoints per consonant (first order 'e').
base_map = {
# simple
"h": 0x1200, # ሀ
"l": 0x1208, # ለ
"m": 0x1218, # መ
"r": 0x1228, # ረ
"s": 0x1230, # ሰ
"sh": 0x1238, # ሸ
"q": 0x1240, # ቀ (ejective k’)
"b": 0x1260, # በ
"v": 0x1268, # ቨ
"t": 0x1270, # ተ
"ch": 0x1278, # ቸ
"n": 0x1290, # ነ
"k": 0x12A8, # ከ
"w": 0x12C8, # ወ
"z": 0x12D8, # ዘ
"y": 0x12E8, # የ
"d": 0x12F0, # ደ
"j": 0x1300, # ጀ
"g": 0x1308, # ገ
"t'": 0x1320, # ጠ
"ts'": 0x1338, # ጸ (often written ts')
"p'": 0x1330, # ጰ
"p": 0x1350, # ፐ
"f": 0x1348, # ፈ
}
# Prefer longer graphemes first
graphemes = sorted(base_map.keys(), key=len, reverse=True)
# Vowel to order offset (first order 'e' has offset 0)
# Map long 'ee' to 5th order, bare consonant to 6th
vowel_orders = [
(re.compile(r"^ee", re.IGNORECASE), 4, 2), # consume 2 chars, +4 offset
(re.compile(r"^e", re.IGNORECASE), 0, 1), # +0
(re.compile(r"^u", re.IGNORECASE), 1, 1), # +1
(re.compile(r"^i", re.IGNORECASE), 2, 1), # +2
(re.compile(r"^a", re.IGNORECASE), 3, 1), # +3
(re.compile(r"^o", re.IGNORECASE), 6, 1), # +6
]
# Initial vowel letters
initial_vowel_map = {
"a": "አ",
"e": "እ",
"i": "ኢ",
"o": "ኦ",
"u": "ኡ",
}
def transliterate_word(word: str) -> str:
i = 0
out = []
w = word
# Initial vowel
if i < len(w) and w[i].lower() in initial_vowel_map:
out.append(initial_vowel_map[w[i].lower()])
i += 1
while i < len(w):
# Skip non-letters
if not w[i].isalpha() and w[i] not in ["'"]:
out.append(w[i])
i += 1
continue
# Match grapheme
cons = None
for gph in graphemes:
if w[i:].lower().startswith(gph):
cons = gph
break
if cons is None:
# Fallback: emit as-is
out.append(w[i])
i += 1
continue
i += len(cons)
# Match vowel
order_offset = 5 # default consonant/6th order
consumed = 0
for rx, off, length in vowel_orders:
m = rx.match(w[i:])
if m:
order_offset = off
consumed = length
break
i += consumed
base = base_map[cons]
ch = chr(base + order_offset)
out.append(ch)
return "".join(out)
# Split text preserving spaces and punctuation
parts = re.findall(r"[A-Za-z']+|\s+|[^\w\s]", text)
converted_parts = [transliterate_word(p) if re.match(r"[A-Za-z']+", p) else p for p in parts]
return "".join(converted_parts)
class ArgParser(argparse.ArgumentParser):
def __init__(self, *args, **kwargs):
super(ArgParser, self).__init__(*args, **kwargs)
self.add_argument(
"-s",
"--server",
type=str,
default="0.0.0.0",
help="Server IP for HF LLM Chat API",
)
self.add_argument(
"-p",
"--port",
type=int,
default=23333,
help="Server Port for HF LLM Chat API",
)
self.add_argument(
"-d",
"--dev",
default=False,
action="store_true",
help="Run in dev mode",
)
self.args = self.parse_args(sys.argv[1:])
app = ChatAPIApp().app
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
if __name__ == "__main__":
args = ArgParser().args
if args.dev:
uvicorn.run("__main__:app", host=args.server, port=args.port, reload=True)
else:
uvicorn.run("__main__:app", host=args.server, port=args.port, reload=False)
# python -m apis.chat_api # [Docker] on product mode
# python -m apis.chat_api -d # [Dev] on develop mode