|
from fastapi import FastAPI, Request |
|
from fastapi.responses import JSONResponse |
|
import httpx |
|
from telegram import Update |
|
from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes |
|
import os |
|
|
|
import logging |
|
|
|
from huggingface_hub import InferenceClient, login |
|
import langid |
|
|
|
|
|
logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s", level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
HUGGING_FACE_SPACE_URL = "https://demaking-decision-helper-bot.hf.space" |
|
|
|
|
|
TOKEN = os.getenv("TELEGRAM_BOT_TOKEN") |
|
if not TOKEN: |
|
raise ValueError("Missing Telegram Bot Token. Please set TELEGRAM_BOT_TOKEN environment variable.") |
|
|
|
|
|
|
|
HF_HUB_TOKEN = os.getenv("HUGGINGFACEHUB_API_TOKEN") |
|
if not HF_HUB_TOKEN: |
|
raise ValueError("Missing Hugging Face API token. Please set HUGGINGFACEHUB_API_TOKEN.") |
|
|
|
|
|
login(token=HF_HUB_TOKEN) |
|
client = InferenceClient(api_key=HF_HUB_TOKEN) |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
def detect_language(user_input): |
|
try: |
|
lang, _ = langid.classify(user_input) |
|
return "hebrew" if lang == "he" else "english" if lang == "en" else "unsupported" |
|
except Exception as e: |
|
logging.error(f"Language detection error: {e}") |
|
return "unsupported" |
|
|
|
|
|
|
|
def generate_response(text): |
|
language = detect_language(text) |
|
|
|
if language == "hebrew": |
|
content = "转注谞讛 讘拽爪专讛 讗讘诇 转砖转祝 讗转 转讛诇讬讱 拽讘诇转 讛讛讞诇讟讜转 砖诇讱, " + text |
|
model = "microsoft/Phi-3.5-mini-instruct" |
|
elif language == "english": |
|
content = "keep it short but tell your decision making process, " + text |
|
model = "mistralai/Mistral-Nemo-Instruct-2407" |
|
else: |
|
return "Sorry, I only support Hebrew and English." |
|
|
|
messages = [{"role": "user", "content": content}] |
|
|
|
completion = client.chat.completions.create( |
|
model=model, |
|
messages=messages, |
|
max_tokens=2048, |
|
temperature=0.5, |
|
top_p=0.7 |
|
) |
|
return completion.choices[0].message.content |
|
|
|
|
|
@app.post("/generate_response") |
|
async def generate_text(request: Request): |
|
""" |
|
Endpoint to generate a response from the chat model. |
|
Expects a JSON with a "text" field. |
|
""" |
|
try: |
|
data = await request.json() |
|
text = data.get("text", "").strip() |
|
if not text: |
|
return {"error": "No text provided"} |
|
response = generate_response(text) |
|
return {"response": response} |
|
except Exception as e: |
|
logging.error(f"Error processing request: {e}") |
|
return {"error": "An unexpected error occurred."} |
|
|
|
|
|
@app.get("/") |
|
async def root(): |
|
""" |
|
Root endpoint to check that the API is running. |
|
""" |
|
return {"message": "Decision Helper API is running!"} |
|
|
|
|
|
|
|
|
|
|
|
async def call_hugging_face_space(input_data: str): |
|
""" |
|
Sends a POST request to the FastAPI API with the user's imput and returns the JSON response. |
|
""" |
|
async with httpx.AsyncClient(timeout=45.0) as client: |
|
try: |
|
response = await client.post(HUGGING_FACE_SPACE_URL, json={"input": input_data}) |
|
response.raise_for_status() |
|
return response.json() |
|
except httpx.HTTPStatusError as e: |
|
logger.error(f"HTTP Error: {e.response.status_code} - {e.response.text}") |
|
return {"response": "Error: API returned an error."} |
|
except httpx.RequestError as e: |
|
logger.error(f"Request Error: {e}") |
|
return {"response": "Error: Request Error. Could not reach API."} |
|
except httpx.ConnectError as e: |
|
logger.error(f"Connection error: {e}") |
|
return {"error": "Could not connect to the Hugging Face Space"} |
|
except Exception as e: |
|
logger.error(f"Unexpected Error: {e}") |
|
return {"response": "Error: Unexpected error occurred."} |
|
|
|
|
|
@app.post("/webhook/{token}") |
|
async def webhook(token: str, request: Request): |
|
if token != TOKEN: |
|
logger.error(f"Tokens doesn't match. {e}") |
|
return JSONResponse(status_code=403, content={"message": "Forbidden"}) |
|
|
|
update = Update.de_json(await request.json(), None) |
|
message_text = update.message.text |
|
|
|
result = await call_hugging_face_space(message_text) |
|
|
|
return JSONResponse(content=result) |
|
|
|
|
|
def start_telegram_bot(): |
|
application = ApplicationBuilder().token(TOKEN).build() |
|
|
|
|
|
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): |
|
await update.message.reply_text("Hello! Tell me your decision-making issue, and I'll try to help.") |
|
logger.info("Start command received.") |
|
|
|
|
|
async def handle_message(update: Update, context: ContextTypes.DEFAULT_TYPE): |
|
user_text = update.message.text |
|
logger.info(f"User message: {user_text}") |
|
|
|
|
|
result = await call_hugging_face_space(user_text) |
|
response_text = result.get("response", "Error generating response.") |
|
|
|
logger.info(f"API Response: {response_text}") |
|
await update.message.reply_text(response_text) |
|
|
|
application.add_handler(CommandHandler("start", start)) |
|
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message)) |
|
|
|
|
|
application.run_polling() |
|
|
|
|
|
if __name__ == "__main__": |
|
import threading |
|
|
|
|
|
threading.Thread(target=start_telegram_bot).start() |
|
|
|
|
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|