codeBOKER commited on
Commit
7a0a3dd
·
1 Parent(s): ae1bd8b

Add webhook secret validation

Browse files
Files changed (5) hide show
  1. ai_service.py +0 -2
  2. config.py +1 -0
  3. main.py +20 -9
  4. security.py +54 -0
  5. telegram_handlers.py +0 -3
ai_service.py CHANGED
@@ -100,8 +100,6 @@ async def get_ai_response(user_query: str, telegram_id: int):
100
  response_message = completion.choices[0].message
101
 
102
  final_response = clean_ai_response(response_message.content if response_message.content else "")
103
- print(f"--- AI Raw Response: {repr(response_message.content)} ---")
104
- print(f"--- AI Final Response: {repr(final_response)} ---")
105
 
106
  if db_manager:
107
  db_manager.save_message(telegram_id, user_query, "user")
 
100
  response_message = completion.choices[0].message
101
 
102
  final_response = clean_ai_response(response_message.content if response_message.content else "")
 
 
103
 
104
  if db_manager:
105
  db_manager.save_message(telegram_id, user_query, "user")
config.py CHANGED
@@ -10,6 +10,7 @@ load_dotenv()
10
  PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY")
11
  HF_TOKEN = os.environ.get("HF_TOKEN") or os.environ.get("HF_API_KEY")
12
  TELEGRAM_TOKEN = os.environ.get("TELEGRAM_TOKEN")
 
13
  SUPABASE_URL = os.environ.get("SUPABASE_URL")
14
  SUPABASE_KEY = os.environ.get("SUPABASE_KEY")
15
  TELEGRAM_DOMAIN = os.environ.get("TELEGRAM_DOMAIN", "https://api.telegram.org").rstrip("/")
 
10
  PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY")
11
  HF_TOKEN = os.environ.get("HF_TOKEN") or os.environ.get("HF_API_KEY")
12
  TELEGRAM_TOKEN = os.environ.get("TELEGRAM_TOKEN")
13
+ TELEGRAM_WEBHOOK_SECRET = os.environ.get("TELEGRAM_WEBHOOK_SECRET")
14
  SUPABASE_URL = os.environ.get("SUPABASE_URL")
15
  SUPABASE_KEY = os.environ.get("SUPABASE_KEY")
16
  TELEGRAM_DOMAIN = os.environ.get("TELEGRAM_DOMAIN", "https://api.telegram.org").rstrip("/")
main.py CHANGED
@@ -1,4 +1,5 @@
1
- from fastapi import FastAPI
 
2
  from telegram_handlers import telegram_webhook, WebhookData
3
  from utils import dns_test, test_ai_response
4
 
@@ -9,17 +10,27 @@ async def root():
9
  return {"message": "Hadhramout Bank AI Backend is Live"}
10
 
11
  @app.post("/webhook")
12
- async def webhook(data: WebhookData):
13
- return await telegram_webhook(data)
 
 
 
 
14
 
15
- # @app.post("/test")
16
- # async def test(data: WebhookData):
17
- # return await test_webhook(data)
18
 
19
  @app.get("/dns-test")
20
- async def dns():
 
 
 
 
21
  return await dns_test()
22
 
23
  @app.get("/ai-test")
24
- async def ai():
25
- return await test_ai_response()
 
 
 
 
 
1
+ from fastapi import FastAPI, Header, Request
2
+ from security import validate_webhook_secret
3
  from telegram_handlers import telegram_webhook, WebhookData
4
  from utils import dns_test, test_ai_response
5
 
 
10
  return {"message": "Hadhramout Bank AI Backend is Live"}
11
 
12
  @app.post("/webhook")
13
+ async def webhook(
14
+ request: Request,
15
+ data: WebhookData,
16
+ x_telegram_bot_api_secret_token: str | None = Header(default=None),
17
+ ):
18
+ validate_webhook_secret(request, x_telegram_bot_api_secret_token)
19
 
20
+ return await telegram_webhook(data)
 
 
21
 
22
  @app.get("/dns-test")
23
+ async def dns(
24
+ request: Request,
25
+ x_telegram_bot_api_secret_token: str | None = Header(default=None),
26
+ ):
27
+ validate_webhook_secret(request, x_telegram_bot_api_secret_token)
28
  return await dns_test()
29
 
30
  @app.get("/ai-test")
31
+ async def ai(
32
+ request: Request,
33
+ x_telegram_bot_api_secret_token: str | None = Header(default=None),
34
+ ):
35
+ validate_webhook_secret(request, x_telegram_bot_api_secret_token)
36
+ return await test_ai_response()
security.py ADDED
@@ -0,0 +1,54 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from collections import defaultdict, deque
2
+ from time import monotonic
3
+
4
+ from fastapi import HTTPException, Request
5
+
6
+ from config import TELEGRAM_WEBHOOK_SECRET
7
+
8
+ FAILED_ATTEMPT_WINDOW_SECONDS = 60
9
+ FAILED_ATTEMPT_LIMIT = 5
10
+ BLOCK_DURATION_SECONDS = 15 * 60
11
+
12
+ _failed_secret_attempts: dict[str, deque[float]] = defaultdict(deque)
13
+ _blocked_clients: dict[str, float] = {}
14
+
15
+
16
+ def _get_client_key(request: Request) -> str:
17
+ forwarded_for = request.headers.get("x-forwarded-for")
18
+ if forwarded_for:
19
+ return forwarded_for.split(",")[0].strip()
20
+
21
+ if request.client and request.client.host:
22
+ return request.client.host
23
+
24
+ return "unknown"
25
+
26
+
27
+ def _prune_failed_attempts(client_key: str, now: float) -> deque[float]:
28
+ attempts = _failed_secret_attempts[client_key]
29
+ cutoff = now - FAILED_ATTEMPT_WINDOW_SECONDS
30
+ while attempts and attempts[0] < cutoff:
31
+ attempts.popleft()
32
+ return attempts
33
+
34
+
35
+ def validate_webhook_secret(request: Request, secret_header: str | None) -> None:
36
+ if not TELEGRAM_WEBHOOK_SECRET:
37
+ raise HTTPException(status_code=500, detail="Webhook secret is not configured")
38
+
39
+ client_key = _get_client_key(request)
40
+ now = monotonic()
41
+ blocked_until = _blocked_clients.get(client_key)
42
+ if blocked_until and now < blocked_until:
43
+ raise HTTPException(status_code=429, detail="Too many requests")
44
+
45
+ if secret_header != TELEGRAM_WEBHOOK_SECRET:
46
+ attempts = _prune_failed_attempts(client_key, now)
47
+ attempts.append(now)
48
+ if len(attempts) >= FAILED_ATTEMPT_LIMIT:
49
+ _blocked_clients[client_key] = now + BLOCK_DURATION_SECONDS
50
+ attempts.clear()
51
+ raise HTTPException(status_code=403, detail="Forbidden")
52
+
53
+ _blocked_clients.pop(client_key, None)
54
+ _failed_secret_attempts.pop(client_key, None)
telegram_handlers.py CHANGED
@@ -48,8 +48,6 @@ async def telegram_webhook(data: WebhookData):
48
  username = data.message.chat.username
49
  first_name = data.message.chat.first_name
50
 
51
- print(f"--- Processing message from {first_name} ---")
52
-
53
  if db_manager:
54
  db_manager.create_or_update_user(telegram_id, username, first_name, data.message.chat.last_name)
55
 
@@ -75,7 +73,6 @@ async def telegram_webhook(data: WebhookData):
75
  "text": final_text if final_text.strip() else ".",
76
  }
77
 
78
- print(f"--- Sending Telegram message (chars={len(payload['text'])}) ---")
79
 
80
  try:
81
  response = await client.post(TELEGRAM_URL, data=payload)
 
48
  username = data.message.chat.username
49
  first_name = data.message.chat.first_name
50
 
 
 
51
  if db_manager:
52
  db_manager.create_or_update_user(telegram_id, username, first_name, data.message.chat.last_name)
53
 
 
73
  "text": final_text if final_text.strip() else ".",
74
  }
75
 
 
76
 
77
  try:
78
  response = await client.post(TELEGRAM_URL, data=payload)