Spaces:
Running
Running
''' | |
Created By Lewis Kamau Kimaru | |
Sema translator fastapi implementation | |
January 2024 | |
Docker deployment | |
''' | |
from fastapi import FastAPI, HTTPException, Request, Depends | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.responses import HTMLResponse | |
import uvicorn | |
from pydantic import BaseModel | |
from pymongo import MongoClient | |
import jwt | |
from jwt import encode as jwt_encode | |
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
from bson import ObjectId | |
import ctranslate2 | |
import sentencepiece as spm | |
import fasttext | |
import pytz | |
from datetime import datetime | |
import os | |
app = FastAPI() | |
origins = ["*"] | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=origins, | |
allow_credentials=False, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
fasttext.FastText.eprint = lambda x: None | |
# set this key as an environment variable | |
os.environ["HUGGINGFACEHUB_API_TOKEN"] = st.secrets['huggingface_token'] | |
# User interface | |
templates_folder = os.path.join(os.path.dirname(__file__), "templates") | |
# Authentication | |
class User(BaseModel): | |
username: str = None # Make the username field optional | |
email: str | |
password: str | |
# Connect to the MongoDB database | |
client = MongoClient("mongodb://localhost:27017") | |
db = client["mydatabase"] | |
users_collection = db["users"] | |
# Secret key for signing the token | |
SECRET_KEY = "helloworld" | |
security = HTTPBearer() | |
#Implement the login route: | |
def login(user: User): | |
# Check if user exists in the database | |
user_data = users_collection.find_one( | |
{"email": user.email, "password": user.password} | |
) | |
if user_data: | |
# Generate a token | |
token = generate_token(user.email) | |
# Convert ObjectId to string | |
user_data["_id"] = str(user_data["_id"]) | |
# Store user details and token in local storage | |
user_data["token"] = token | |
return user_data | |
return {"message": "Invalid email or password"} | |
#Implement the registration route: | |
def register(user: User): | |
# Check if user already exists in the database | |
existing_user = users_collection.find_one({"email": user.email}) | |
if existing_user: | |
return {"message": "User already exists"} | |
#Insert the new user into the database | |
user_dict = user.dict() | |
users_collection.insert_one(user_dict) | |
# Generate a token | |
token = generate_token(user.email) | |
# Convert ObjectId to string | |
user_dict["_id"] = str(user_dict["_id"]) | |
# Store user details and token in local storage | |
user_dict["token"] = token | |
return user_dict | |
#Implement the `/api/user` route to fetch user data based on the JWT token | |
def get_user(credentials: HTTPAuthorizationCredentials = Depends(security)): | |
# Extract the token from the Authorization header | |
token = credentials.credentials | |
# Authenticate and retrieve the user data from the database based on the token | |
# Here, you would implement the authentication logic and fetch user details | |
# based on the token from the database or any other authentication mechanism | |
# For demonstration purposes, assuming the user data is stored in local storage | |
# Note: Local storage is not accessible from server-side code | |
# This is just a placeholder to demonstrate the concept | |
user_data = { | |
"username": "John Doe", | |
"email": "johndoe@example.com" | |
} | |
if user_data["username"] and user_data["email"]: | |
return user_data | |
raise HTTPException(status_code=401, detail="Invalid token") | |
#Define a helper function to generate a JWT token | |
def generate_token(email: str) -> str: | |
payload = {"email": email} | |
token = jwt_encode(payload, SECRET_KEY, algorithm="HS256") | |
return token | |
# Get time of request | |
def get_time(): | |
nairobi_timezone = pytz.timezone('Africa/Nairobi') | |
current_time_nairobi = datetime.now(nairobi_timezone) | |
curr_day = current_time_nairobi.strftime('%A') | |
curr_date = current_time_nairobi.strftime('%Y-%m-%d') | |
curr_time = current_time_nairobi.strftime('%H:%M:%S') | |
full_date = f"{curr_day} | {curr_date} | {curr_time}" | |
return full_date, curr_time | |
# Load the model and tokenizer ..... only once! | |
beam_size = 1 # change to a smaller value for faster inference | |
device = "cpu" # or "cuda" | |
# Language Prediction model | |
print("\nimporting Language Prediction model") | |
lang_model_file = "lid218e.bin" | |
lang_model_full_path = os.path.join(os.path.dirname(__file__), lang_model_file) | |
lang_model = fasttext.load_model(lang_model_full_path) | |
# Load the source SentencePiece model | |
print("\nimporting SentencePiece model") | |
sp_model_file = "spm.model" | |
sp_model_full_path = os.path.join(os.path.dirname(__file__), sp_model_file) | |
sp = spm.SentencePieceProcessor() | |
sp.load(sp_model_full_path) | |
# Import The Translator model | |
print("\nimporting Translator model") | |
ct_model_file = "sematrans-3.3B" | |
ct_model_full_path = os.path.join(os.path.dirname(__file__), ct_model_file) | |
translator = ctranslate2.Translator(ct_model_full_path, device) | |
print('\nDone importing models\n') | |
def translate_detect(userinput: str, target_lang: str): | |
source_sents = [userinput] | |
source_sents = [sent.strip() for sent in source_sents] | |
target_prefix = [[target_lang]] * len(source_sents) | |
# Predict the source language | |
predictions = lang_model.predict(source_sents[0], k=1) | |
source_lang = predictions[0][0].replace('__label__', '') | |
# Subword the source sentences | |
source_sents_subworded = sp.encode(source_sents, out_type=str) | |
source_sents_subworded = [[source_lang] + sent + ["</s>"] for sent in source_sents_subworded] | |
# Translate the source sentences | |
translations = translator.translate_batch( | |
source_sents_subworded, | |
batch_type="tokens", | |
max_batch_size=2024, | |
beam_size=beam_size, | |
target_prefix=target_prefix, | |
) | |
translations = [translation[0]['tokens'] for translation in translations] | |
# Desubword the target sentences | |
translations_desubword = sp.decode(translations) | |
translations_desubword = [sent[len(target_lang):] for sent in translations_desubword] | |
# Return the source language and the translated text | |
return source_lang, translations_desubword | |
def translate_enter(userinput: str, source_lang: str, target_lang: str): | |
source_sents = [userinput] | |
source_sents = [sent.strip() for sent in source_sents] | |
target_prefix = [[target_lang]] * len(source_sents) | |
# Subword the source sentences | |
source_sents_subworded = sp.encode(source_sents, out_type=str) | |
source_sents_subworded = [[source_lang] + sent + ["</s>"] for sent in source_sents_subworded] | |
# Translate the source sentences | |
translations = translator.translate_batch(source_sents_subworded, batch_type="tokens", max_batch_size=2024, beam_size=beam_size, target_prefix=target_prefix) | |
translations = [translation[0]['tokens'] for translation in translations] | |
# Desubword the target sentences | |
translations_desubword = sp.decode(translations) | |
translations_desubword = [sent[len(target_lang):] for sent in translations_desubword] | |
# Return the source language and the translated text | |
return translations_desubword[0] | |
async def read_root(request: Request): | |
return HTMLResponse(content=open(os.path.join(templates_folder, "translator.html"), "r").read(), status_code=200) | |
async def translate_detect_endpoint(request: Request): | |
datad = await request.json() | |
userinputd = datad.get("userinput") | |
target_langd = datad.get("target_lang") | |
dfull_date = get_time()[0] | |
print(f"\nrequest: {dfull_date}\nTarget Language; {target_langd}, User Input: {userinputd}\n") | |
if not userinputd or not target_langd: | |
raise HTTPException(status_code=422, detail="Both 'userinput' and 'target_lang' are required.") | |
source_langd, translated_text_d = translate_detect(userinputd, target_langd) | |
dcurrent_time = get_time()[1] | |
print(f"\nresponse: {dcurrent_time}; ... Source_language: {source_langd}, Translated Text: {translated_text_d}\n\n") | |
return { | |
"source_language": source_langd, | |
"translated_text": translated_text_d[0], | |
} | |
async def translate_enter_endpoint(request: Request): | |
datae = await request.json() | |
userinpute = datae.get("userinput") | |
source_lange = datae.get("source_lang") | |
target_lange = datae.get("target_lang") | |
efull_date = get_time()[0] | |
print(f"\nrequest: {efull_date}\nSource_language; {source_lange}, Target Language; {target_lange}, User Input: {userinpute}\n") | |
if not userinpute or not target_lange: | |
raise HTTPException(status_code=422, detail="'userinput' 'sourc_lang'and 'target_lang' are required.") | |
translated_text_e = translate_enter(userinpute, source_lange, target_lange) | |
ecurrent_time = get_time()[1] | |
print(f"\nresponse: {ecurrent_time}; ... Translated Text: {translated_text_e}\n\n") | |
return { | |
"translated_text": translated_text_e, | |
} | |
print("\nAPI starting .......\n") | |