|
|
from src.core.config import settings |
|
|
import httpx |
|
|
from google.oauth2 import service_account |
|
|
import google.auth.transport.requests |
|
|
|
|
|
|
|
|
FCM_PROJECT_ID = settings.FIREBASE_PROJECT_ID |
|
|
|
|
|
|
|
|
service_account_info = { |
|
|
"type": settings.FIREBASE_TYPE, |
|
|
"project_id": settings.FIREBASE_PROJECT_ID, |
|
|
"private_key_id": settings.FIREBASE_PRIVATE_KEY_ID, |
|
|
"private_key": settings.FIREBASE_PRIVATE_KEY.replace("\\n", "\n"), |
|
|
"client_email": settings.FIREBASE_CLIENT_EMAIL, |
|
|
"client_id": settings.FIREBASE_CLIENT_ID, |
|
|
"auth_uri": settings.FIREBASE_AUTH_URI, |
|
|
"token_uri": settings.FIREBASE_TOKEN_URI, |
|
|
"auth_provider_x509_cert_url": settings.FIREBASE_AUTH_PROVIDER_X509_CERT_URL, |
|
|
"client_x509_cert_url": settings.FIREBASE_CLIENT_X509_CERT_URL, |
|
|
"universe_domain": settings.FIREBASE_UNIVERSE_DOMAIN, |
|
|
} |
|
|
|
|
|
|
|
|
def get_access_token(): |
|
|
"""Generate OAuth2 access token for FCM HTTP v1.""" |
|
|
scopes = ["https://www.googleapis.com/auth/firebase.messaging"] |
|
|
|
|
|
credentials = service_account.Credentials.from_service_account_info( |
|
|
service_account_info, scopes=scopes |
|
|
) |
|
|
|
|
|
request = google.auth.transport.requests.Request() |
|
|
credentials.refresh(request) |
|
|
|
|
|
return credentials.token |
|
|
|
|
|
|
|
|
async def send_fcm( |
|
|
tokens: list[str], |
|
|
title: str, |
|
|
body: str, |
|
|
data: dict | None = None, |
|
|
priority: str = "high", |
|
|
): |
|
|
"""Send push notifications using Firebase HTTP v1.""" |
|
|
if not tokens: |
|
|
return |
|
|
|
|
|
access_token = get_access_token() |
|
|
|
|
|
url = f"https://fcm.googleapis.com/v1/projects/{FCM_PROJECT_ID}/messages:send" |
|
|
|
|
|
headers = { |
|
|
"Authorization": f"Bearer {access_token}", |
|
|
"Content-Type": "application/json; UTF-8", |
|
|
} |
|
|
|
|
|
|
|
|
for token in tokens: |
|
|
message = { |
|
|
"message": { |
|
|
"token": token, |
|
|
"data": {"title": title, "body": body, **(data or {})}, |
|
|
"android": { |
|
|
"priority": priority, |
|
|
}, |
|
|
} |
|
|
} |
|
|
|
|
|
async with httpx.AsyncClient() as client: |
|
|
res = await client.post(url, json=message, headers=headers) |
|
|
print("FCM Response:", res.text) |
|
|
|