Hydra-Bolt commited on
Commit
b8392c6
·
1 Parent(s): 31eedc3
Dockerfile CHANGED
@@ -18,6 +18,11 @@ RUN pip install --no-cache-dir -r requirements.txt
18
  # Copy application code
19
  COPY . .
20
 
 
 
21
 
22
- # Run the automation script
23
- CMD ["python", "main.py"]
 
 
 
 
18
  # Copy application code
19
  COPY . .
20
 
21
+ # Expose port (Hugging Face Spaces expects 7860, but FastAPI default is 3000)
22
+ EXPOSE 7860
23
 
24
+ # Set environment variable for Hugging Face Spaces
25
+ ENV PORT=7860
26
+
27
+ # Start FastAPI server on the expected port
28
+ CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "7860"]
app/__pycache__/config.cpython-313.pyc ADDED
Binary file (1.04 kB). View file
 
app/__pycache__/main.cpython-313.pyc ADDED
Binary file (1.06 kB). View file
 
app/config.py ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from dotenv import load_dotenv
3
+
4
+ load_dotenv()
5
+
6
+ LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()
7
+ LOG_FILE = os.environ.get("LOG_FILE", "send_notifications.log")
8
+ LOG_MAX_BYTES = int(os.environ.get("LOG_MAX_BYTES", 10 * 1024 * 1024))
9
+ LOG_BACKUP_COUNT = int(os.environ.get("LOG_BACKUP_COUNT", 5))
10
+
11
+ SUPABASE_URL = os.environ.get("SUPABASE_URL")
12
+ SUPABASE_KEY = os.environ.get("SUPABASE_KEY")
13
+ FIREBASE_SERVICE_ACCOUNT_JSON = os.environ.get("FIREBASE_SERVICE_ACCOUNT_JSON")
app/main.py ADDED
@@ -0,0 +1,22 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+ from fastapi import FastAPI
3
+ import threading
4
+ import uvicorn
5
+ from app.services.notification_service import notification_service
6
+ from app.services.firebase import * # Ensure Firebase is initialized
7
+
8
+ app = FastAPI()
9
+
10
+ @app.get("/")
11
+ def read_root():
12
+ return {"message": "Notification Service is running"}
13
+
14
+ def start_notification_service():
15
+ notification_service()
16
+
17
+ if __name__ == "__main__":
18
+ # Start notification service in a background thread
19
+ notif_thread = threading.Thread(target=start_notification_service, daemon=True)
20
+ notif_thread.start()
21
+ # Run FastAPI server in the main thread
22
+ uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=False)
app/models/notification_models.py ADDED
@@ -0,0 +1,16 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import List, Optional, Dict, Any
2
+ from pydantic import BaseModel
3
+
4
+ class Notification(BaseModel):
5
+ title: str
6
+ body: str
7
+ type: str
8
+ priority: Optional[str] = "normal"
9
+ event_id: int
10
+ user_ids: List[int]
11
+ ids: List[int]
12
+ data: Optional[List[Dict[str, Any]]] = None
13
+
14
+ class FCMToken(BaseModel):
15
+ user_id: int
16
+ fcm_token: str
app/services/__pycache__/database.cpython-313.pyc ADDED
Binary file (570 Bytes). View file
 
app/services/__pycache__/firebase.cpython-313.pyc ADDED
Binary file (1.06 kB). View file
 
app/services/__pycache__/notification_service.cpython-313.pyc ADDED
Binary file (13.2 kB). View file
 
app/services/database.py ADDED
@@ -0,0 +1,9 @@
 
 
 
 
 
 
 
 
 
 
1
+ from supabase import create_client
2
+ from app.config import SUPABASE_URL, SUPABASE_KEY
3
+ from app.utils.logging_config import log
4
+
5
+ if not SUPABASE_URL or not SUPABASE_KEY:
6
+ log.critical("SUPABASE_URL and SUPABASE_KEY must be set in environment variables")
7
+ raise EnvironmentError("SUPABASE_URL and SUPABASE_KEY must be set in environment variables")
8
+
9
+ supabase = create_client(SUPABASE_URL, SUPABASE_KEY)
app/services/firebase.py ADDED
@@ -0,0 +1,18 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import json
2
+ import firebase_admin
3
+ from firebase_admin import credentials
4
+ from app.config import FIREBASE_SERVICE_ACCOUNT_JSON
5
+ from app.utils.logging_config import log
6
+
7
+ if not FIREBASE_SERVICE_ACCOUNT_JSON:
8
+ log.critical("FIREBASE_SERVICE_ACCOUNT_JSON environment variable not set")
9
+ raise EnvironmentError("FIREBASE_SERVICE_ACCOUNT_JSON environment variable not set")
10
+
11
+ service_account = json.loads(FIREBASE_SERVICE_ACCOUNT_JSON)
12
+ cred = credentials.Certificate(service_account)
13
+ try:
14
+ firebase_admin.initialize_app(cred)
15
+ log.info("Initialized Firebase app with provided service account")
16
+ except Exception as e:
17
+ log.exception("Failed to initialize Firebase admin: %s", e)
18
+ raise
main.py → app/services/notification_service.py RENAMED
@@ -1,81 +1,12 @@
1
-
2
- import os
3
- import json
4
  import time
5
- import logging
6
- from logging.handlers import RotatingFileHandler
7
- from typing import List, Optional, Dict, Any
8
- from dotenv import load_dotenv
9
- from supabase import create_client
10
- import firebase_admin
11
- from firebase_admin import credentials, messaging
12
- import requests
13
- from fastapi import FastAPI, HTTPException
14
- from pydantic import BaseModel
15
-
16
-
17
- # Load environment variables from .env file
18
- load_dotenv()
19
-
20
- # --- Logging configuration -------------------------------------------------
21
- LOG_LEVEL = os.environ.get("LOG_LEVEL", "INFO").upper()
22
- LOG_FILE = os.environ.get("LOG_FILE", "send_notifications.log")
23
- LOG_MAX_BYTES = int(os.environ.get("LOG_MAX_BYTES", 10 * 1024 * 1024)) # 10MB
24
- LOG_BACKUP_COUNT = int(os.environ.get("LOG_BACKUP_COUNT", 5))
25
-
26
- logger = logging.getLogger("send_notifications")
27
- if not logger.handlers:
28
- logger.setLevel(LOG_LEVEL)
29
- # Console handler
30
- ch = logging.StreamHandler()
31
- ch.setLevel(LOG_LEVEL)
32
- ch_formatter = logging.Formatter(
33
- "%(asctime)s.%(msecs)03d %(levelname)s [%(name)s] %(message)s",
34
- datefmt="%Y-%m-%d %H:%M:%S",
35
- )
36
- ch.setFormatter(ch_formatter)
37
- logger.addHandler(ch)
38
-
39
- # Rotating file handler
40
- try:
41
- fh = RotatingFileHandler(LOG_FILE, maxBytes=LOG_MAX_BYTES, backupCount=LOG_BACKUP_COUNT)
42
- fh.setLevel(LOG_LEVEL)
43
- fh.setFormatter(ch_formatter)
44
- logger.addHandler(fh)
45
- except Exception as e:
46
- logger.warning("Failed to create file handler for logging: %s", e)
47
-
48
- # Convenience alias for root logger
49
- log = logger
50
-
51
- # Supabase config
52
- SUPABASE_URL = os.environ.get("SUPABASE_URL")
53
- SUPABASE_KEY = os.environ.get("SUPABASE_KEY")
54
- if not SUPABASE_URL or not SUPABASE_KEY:
55
- log.critical("SUPABASE_URL and SUPABASE_KEY must be set in environment variables")
56
- raise EnvironmentError("SUPABASE_URL and SUPABASE_KEY must be set in environment variables")
57
-
58
- # Initialize supabase client
59
- supabase = create_client(SUPABASE_URL, SUPABASE_KEY)
60
-
61
- # Firebase config
62
- service_account_json = os.environ.get("FIREBASE_SERVICE_ACCOUNT_JSON")
63
- if not service_account_json:
64
- log.critical("FIREBASE_SERVICE_ACCOUNT_JSON environment variable not set")
65
- raise EnvironmentError("FIREBASE_SERVICE_ACCOUNT_JSON environment variable not set")
66
- service_account = json.loads(service_account_json)
67
- cred = credentials.Certificate(service_account)
68
- try:
69
- firebase_admin.initialize_app(cred)
70
- log.info("Initialized Firebase app with provided service account")
71
- except Exception as e:
72
- log.exception("Failed to initialize Firebase admin: %s", e)
73
- raise
74
-
75
 
76
- # --- Efficient Notification Service ---
77
  def fetch_user_notifications():
78
- """Fetch all pending user_notifications efficiently via Supabase client."""
79
  log.debug("Fetching user_notifications from Supabase")
80
  start = time.time()
81
  resp = supabase.table("user_notifications").select("*").execute()
@@ -89,15 +20,11 @@ def fetch_user_notifications():
89
  return data or []
90
 
91
  def fetch_event_attendance_for_events(event_ids: List[int]) -> List[Dict[str, Any]]:
92
- """Fetch attendance rows for multiple event_ids in one query."""
93
  if not event_ids:
94
  return []
95
  log.debug("Fetching event_attendance for event_ids: %s", event_ids)
96
  start = time.time()
97
- resp = supabase.table("event_attendance").select("*").eq("event_id", 201).execute()
98
- print(resp.data)
99
-
100
- print(resp)
101
  data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None)
102
  error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None)
103
  if error:
@@ -108,7 +35,6 @@ def fetch_event_attendance_for_events(event_ids: List[int]) -> List[Dict[str, An
108
  return data or []
109
 
110
  def fetch_fcm_tokens_for_users(user_ids: List[int]) -> List[Dict[str, Any]]:
111
- """Fetch fcm tokens for multiple users in one query."""
112
  if not user_ids:
113
  return []
114
  log.debug("Fetching FCM tokens for user_ids (count=%d)", len(user_ids))
@@ -185,14 +111,11 @@ def send_fcm(tokens, title, body, data):
185
  if not tokens:
186
  log.debug("No tokens provided to send_fcm")
187
  return None
188
- # Firebase FCM requires all values in the `data` dict to be strings.
189
- # Ensure we coerce non-string values to strings and handle None.
190
  def _sanitize_data(d):
191
  if not d:
192
  return {}
193
  out = {}
194
  for k, v in d.items():
195
- # keys must be strings as well
196
  key = str(k)
197
  if v is None:
198
  out[key] = ""
@@ -204,9 +127,7 @@ def send_fcm(tokens, title, body, data):
204
  except Exception:
205
  out[key] = str(v)
206
  return out
207
-
208
  safe_data = _sanitize_data(data)
209
-
210
  message = messaging.MulticastMessage(
211
  notification=messaging.Notification(title=title, body=body),
212
  data=safe_data,
@@ -224,19 +145,14 @@ def send_fcm(tokens, title, body, data):
224
  def notification_service():
225
  while True:
226
  try:
227
- # 1. Query user_notifications
228
  notifications = fetch_user_notifications()
229
  if not notifications:
230
  log.debug("No notifications to process. Sleeping for 10 minutes.")
231
  time.sleep(600)
232
  continue
233
-
234
- # 2. Aggregate by event_id
235
  agg = aggregate_notifications(notifications)
236
- # Batch fetch attendance for all events we need
237
  event_ids = list(agg.keys())
238
  attendance_rows = fetch_event_attendance_for_events(event_ids)
239
- # build attendance_map: event_id -> [user_id]
240
  attendance_map: Dict[int, List[int]] = {}
241
  for r in attendance_rows:
242
  eid = r.get("event_id")
@@ -244,30 +160,21 @@ def notification_service():
244
  if eid is None or uid is None:
245
  continue
246
  attendance_map.setdefault(eid, []).append(uid)
247
-
248
- # Collect distinct user ids across all events and also include user_ids from notifications
249
  all_user_ids = set()
250
  for v in agg.values():
251
  all_user_ids.update(v["user_ids"])
252
  for ulist in attendance_map.values():
253
  all_user_ids.update(ulist)
254
  all_user_ids = list(all_user_ids)
255
-
256
- # Batch fetch tokens
257
  token_rows = fetch_fcm_tokens_for_users(all_user_ids)
258
  token_map: Dict[int, str] = {r["user_id"]: r["fcm_token"] for r in token_rows}
259
-
260
  for event_id, notif in agg.items():
261
- # 3. Get attendees for event (from pre-fetched attendance_map)
262
  user_ids = attendance_map.get(event_id, [])
263
- # 4. Get FCM tokens for users (from pre-fetched token_map)
264
  tokens = [token_map.get(uid) for uid in user_ids]
265
  tokens = [t for t in tokens if t]
266
- # 5. Send notification
267
  data = {"event_id": event_id, "type": notif["type"], "priority": notif["priority"]}
268
  response = send_fcm(tokens, notif["title"], notif["body"], data)
269
  log.info("Sent notification for event %s to %d users. fcm_response=%s", event_id, len(tokens), getattr(response, 'success_count', response))
270
- # 6. Upsert notification record
271
  upsert_payload = [{
272
  "title": notif["title"],
273
  "message": notif["body"],
@@ -278,13 +185,8 @@ def notification_service():
278
  "metadata": data
279
  }]
280
  upsert_notifications(upsert_payload)
281
- # 7. Delete processed notifications in bulk
282
  delete_user_notifications_by_ids(notif["ids"])
283
  log.debug("Deleted processed notifications for event %s.", event_id)
284
  except Exception as e:
285
  log.exception("Error in notification service main loop: %s", e)
286
- time.sleep(600) # 10 minutes
287
-
288
- # --- Run service if main ---
289
- if __name__ == "__main__":
290
- notification_service()
 
 
 
 
1
  import time
2
+ import json
3
+ from typing import List, Dict, Any
4
+ from firebase_admin import messaging
5
+ from app.services.database import supabase
6
+ from app.utils.logging_config import log
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7
 
8
+ # --- Data Fetching Functions ---
9
  def fetch_user_notifications():
 
10
  log.debug("Fetching user_notifications from Supabase")
11
  start = time.time()
12
  resp = supabase.table("user_notifications").select("*").execute()
 
20
  return data or []
21
 
22
  def fetch_event_attendance_for_events(event_ids: List[int]) -> List[Dict[str, Any]]:
 
23
  if not event_ids:
24
  return []
25
  log.debug("Fetching event_attendance for event_ids: %s", event_ids)
26
  start = time.time()
27
+ resp = supabase.table("event_attendance").select("*").in_("event_id", event_ids).execute()
 
 
 
28
  data = getattr(resp, "data", None) or (resp.get("data") if isinstance(resp, dict) else None)
29
  error = getattr(resp, "error", None) or (resp.get("error") if isinstance(resp, dict) else None)
30
  if error:
 
35
  return data or []
36
 
37
  def fetch_fcm_tokens_for_users(user_ids: List[int]) -> List[Dict[str, Any]]:
 
38
  if not user_ids:
39
  return []
40
  log.debug("Fetching FCM tokens for user_ids (count=%d)", len(user_ids))
 
111
  if not tokens:
112
  log.debug("No tokens provided to send_fcm")
113
  return None
 
 
114
  def _sanitize_data(d):
115
  if not d:
116
  return {}
117
  out = {}
118
  for k, v in d.items():
 
119
  key = str(k)
120
  if v is None:
121
  out[key] = ""
 
127
  except Exception:
128
  out[key] = str(v)
129
  return out
 
130
  safe_data = _sanitize_data(data)
 
131
  message = messaging.MulticastMessage(
132
  notification=messaging.Notification(title=title, body=body),
133
  data=safe_data,
 
145
  def notification_service():
146
  while True:
147
  try:
 
148
  notifications = fetch_user_notifications()
149
  if not notifications:
150
  log.debug("No notifications to process. Sleeping for 10 minutes.")
151
  time.sleep(600)
152
  continue
 
 
153
  agg = aggregate_notifications(notifications)
 
154
  event_ids = list(agg.keys())
155
  attendance_rows = fetch_event_attendance_for_events(event_ids)
 
156
  attendance_map: Dict[int, List[int]] = {}
157
  for r in attendance_rows:
158
  eid = r.get("event_id")
 
160
  if eid is None or uid is None:
161
  continue
162
  attendance_map.setdefault(eid, []).append(uid)
 
 
163
  all_user_ids = set()
164
  for v in agg.values():
165
  all_user_ids.update(v["user_ids"])
166
  for ulist in attendance_map.values():
167
  all_user_ids.update(ulist)
168
  all_user_ids = list(all_user_ids)
 
 
169
  token_rows = fetch_fcm_tokens_for_users(all_user_ids)
170
  token_map: Dict[int, str] = {r["user_id"]: r["fcm_token"] for r in token_rows}
 
171
  for event_id, notif in agg.items():
 
172
  user_ids = attendance_map.get(event_id, [])
 
173
  tokens = [token_map.get(uid) for uid in user_ids]
174
  tokens = [t for t in tokens if t]
 
175
  data = {"event_id": event_id, "type": notif["type"], "priority": notif["priority"]}
176
  response = send_fcm(tokens, notif["title"], notif["body"], data)
177
  log.info("Sent notification for event %s to %d users. fcm_response=%s", event_id, len(tokens), getattr(response, 'success_count', response))
 
178
  upsert_payload = [{
179
  "title": notif["title"],
180
  "message": notif["body"],
 
185
  "metadata": data
186
  }]
187
  upsert_notifications(upsert_payload)
 
188
  delete_user_notifications_by_ids(notif["ids"])
189
  log.debug("Deleted processed notifications for event %s.", event_id)
190
  except Exception as e:
191
  log.exception("Error in notification service main loop: %s", e)
192
+ time.sleep(600)
 
 
 
 
app/utils/__pycache__/logging_config.cpython-313.pyc ADDED
Binary file (1.42 kB). View file
 
app/utils/logging_config.py ADDED
@@ -0,0 +1,24 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ from logging.handlers import RotatingFileHandler
3
+ from app.config import LOG_LEVEL, LOG_FILE, LOG_MAX_BYTES, LOG_BACKUP_COUNT
4
+
5
+ logger = logging.getLogger("send_notifications")
6
+ if not logger.handlers:
7
+ logger.setLevel(LOG_LEVEL)
8
+ ch = logging.StreamHandler()
9
+ ch.setLevel(LOG_LEVEL)
10
+ ch_formatter = logging.Formatter(
11
+ "%(asctime)s.%(msecs)03d %(levelname)s [%(name)s] %(message)s",
12
+ datefmt="%Y-%m-%d %H:%M:%S",
13
+ )
14
+ ch.setFormatter(ch_formatter)
15
+ logger.addHandler(ch)
16
+ try:
17
+ fh = RotatingFileHandler(LOG_FILE, maxBytes=LOG_MAX_BYTES, backupCount=LOG_BACKUP_COUNT)
18
+ fh.setLevel(LOG_LEVEL)
19
+ fh.setFormatter(ch_formatter)
20
+ logger.addHandler(fh)
21
+ except Exception as e:
22
+ logger.warning("Failed to create file handler for logging: %s", e)
23
+
24
+ log = logger
send_notifications.log CHANGED
@@ -162,3 +162,12 @@ ValueError: Message.data must not contain non-string values.
162
  2025-09-29 21:10:50.094 DEBUG [send_notifications] Deleting 3 user_notifications
163
  2025-09-29 21:10:50.331 INFO [send_notifications] Deleted 3 user_notifications in 0.238s
164
  2025-09-29 21:10:50.332 DEBUG [send_notifications] Deleted processed notifications for event 201.
 
 
 
 
 
 
 
 
 
 
162
  2025-09-29 21:10:50.094 DEBUG [send_notifications] Deleting 3 user_notifications
163
  2025-09-29 21:10:50.331 INFO [send_notifications] Deleted 3 user_notifications in 0.238s
164
  2025-09-29 21:10:50.332 DEBUG [send_notifications] Deleted processed notifications for event 201.
165
+ 2025-09-30 20:42:12.112 INFO [send_notifications] Initialized Firebase app with provided service account
166
+ 2025-09-30 20:42:12.113 DEBUG [send_notifications] Fetching user_notifications from Supabase
167
+ 2025-09-30 20:42:13.065 INFO [send_notifications] Fetched 0 user_notifications in 0.952s
168
+ 2025-09-30 20:42:13.065 DEBUG [send_notifications] No notifications to process. Sleeping for 10 minutes.
169
+ 2025-09-30 20:51:10.751 INFO [send_notifications] Initialized Firebase app with provided service account
170
+ 2025-09-30 20:51:45.015 INFO [send_notifications] Initialized Firebase app with provided service account
171
+ 2025-09-30 20:51:45.015 DEBUG [send_notifications] Fetching user_notifications from Supabase
172
+ 2025-09-30 20:51:46.065 INFO [send_notifications] Fetched 0 user_notifications in 1.049s
173
+ 2025-09-30 20:51:46.065 DEBUG [send_notifications] No notifications to process. Sleeping for 10 minutes.