| from fastapi import FastAPI, UploadFile, File, Depends, HTTPException, Query, Request |
| from fastapi.responses import FileResponse, StreamingResponse |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.middleware.cors import CORSMiddleware |
| from starlette.middleware.sessions import SessionMiddleware |
| from pathlib import Path |
| from sqlalchemy.orm import Session |
| from sqlalchemy import func, or_ |
| import pandas as pd |
| import uuid |
| import os |
| import csv |
| import io |
| import concurrent.futures |
| from functools import partial |
| from typing import Any, Dict, List, Optional |
| import json |
| import asyncio |
| import math |
| import re |
| import logging |
| import requests |
| from urllib.parse import urlparse, unquote |
| from datetime import datetime, timedelta |
| from calendar import monthrange |
|
|
| from .database import ( |
| get_db, |
| SessionLocal, |
| Tenant, |
| TenantMembership, |
| User, |
| UploadedFile, |
| Prompt, |
| GeneratedSequence, |
| SmartleadRun, |
| Contact, |
| CrmLead, |
| CrmDeal, |
| UnipileAccount, |
| LinkedinCampaign, |
| UnipileHostedAuthState, |
| OutreachSendReceipt, |
| ) |
| from pydantic import ValidationError |
|
|
| from .models import ( |
| UploadResponse, |
| PromptSaveRequest, |
| SequenceResponse, |
| SmartleadPushRequest, |
| SmartleadRunResponse, |
| CrmLeadPatchRequest, |
| BulkLeadIdsRequest, |
| BulkContactIdsRequest, |
| CrmDealPatchRequest, |
| CrmDealCreateRequest, |
| ContactCreateRequest, |
| ContactPatchRequest, |
| WonBillingPayload, |
| UnipileConnectRequest, |
| UnipileHostedLinkRequest, |
| UnipileMailboxHostedLinkRequest, |
| UserLinkedinPrefsPatch, |
| UserMailboxPrefsPatch, |
| LinkedinCampaignCreateRequest, |
| LinkedinCampaignGenerateRequest, |
| LinkedinCampaignExecuteRequest, |
| LinkedinCampaignPatchRequest, |
| ) |
| from .gmail_invite import send_invite_email_via_gmail |
| from .gpt_service import generate_email_sequence, generate_linkedin_sequence, enrich_manual_contact_profile |
| from .smartlead_client import SmartleadClient |
| from .auth_routes import router as auth_router |
| from .tenant_deps import TenantContext, get_tenant_context |
| from .tenant_routes import router as tenant_router |
| from .outreach_routes import router as outreach_router |
| from .outreach_routes import parse_tracking_label, refresh_outreach_execution_stats_for_file, normalize_smtp_message_id |
| from .deal_revenue import build_quarterly_board |
|
|
| app = FastAPI() |
|
|
| |
| _session_secret = os.environ.get("SESSION_SECRET", "dev-session-secret-change-in-production") |
| _is_https = os.environ.get("HTTPS_ONLY_COOKIES", "").strip() in ("1", "true", "yes") |
| app.add_middleware( |
| SessionMiddleware, |
| secret_key=_session_secret, |
| same_site="lax", |
| https_only=_is_https, |
| max_age=14 * 24 * 60 * 60, |
| ) |
|
|
| _cors_raw = os.environ.get( |
| "CORS_ORIGINS", |
| "http://localhost:5173,http://127.0.0.1:5173,http://localhost:3000,http://127.0.0.1:3000", |
| ) |
| _cors_origins = [o.strip() for o in _cors_raw.split(",") if o.strip()] |
| if not _cors_origins: |
| _cors_origins = ["*"] |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=_cors_origins, |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| app.include_router(auth_router) |
| app.include_router(tenant_router) |
| app.include_router(outreach_router) |
|
|
| |
| UPLOAD_DIR = Path("/data/uploads") |
| UPLOAD_DIR.mkdir(parents=True, exist_ok=True) |
| |
| |
| UNIPILE_API_BASE = os.getenv("UNIPILE_API_BASE", "").rstrip("/") |
| UNIPILE_API_KEY = os.getenv("UNIPILE_API_KEY", "") |
| FRONTEND_ORIGIN = os.getenv("FRONTEND_ORIGIN", "").rstrip("/") |
| UNIPILE_WEBHOOK_SECRET = os.getenv("UNIPILE_WEBHOOK_SECRET", "").strip() |
| UNIPILE_WEBHOOK_LOG_EVENTS = os.getenv("UNIPILE_WEBHOOK_LOG_EVENTS", "").strip().lower() in ( |
| "1", |
| "true", |
| "yes", |
| ) |
| _unipile_webhook_log = logging.getLogger("emailout.unipile_webhook") |
|
|
| |
| def _safe_str(value): |
| if value is None: |
| return "" |
| if isinstance(value, float): |
| if math.isnan(value): |
| return "" |
| return str(value).strip() |
| return str(value).strip() |
|
|
|
|
| def _json_safe(value): |
| if value is None: |
| return None |
| if isinstance(value, float): |
| if math.isnan(value): |
| return None |
| return value |
| |
| try: |
| json.dumps(value) |
| return value |
| except TypeError: |
| return _safe_str(value) |
|
|
|
|
| def _pick_field(row_dict: Dict, aliases: List[str]) -> str: |
| lowered = {str(k).strip().lower(): v for k, v in row_dict.items()} |
| for alias in aliases: |
| if alias in lowered: |
| return _safe_str(lowered[alias]) |
| return "" |
|
|
|
|
| def _pick_from_raw(raw_data: Dict, aliases: List[str]) -> str: |
| if not isinstance(raw_data, dict): |
| return "" |
| lowered = {str(k).strip().lower(): v for k, v in raw_data.items()} |
| for alias in aliases: |
| if alias in lowered: |
| return _safe_str(lowered[alias]) |
| return "" |
|
|
|
|
| def _pick_linkedin_url(raw_data: Dict) -> str: |
| return _pick_from_raw( |
| raw_data, |
| [ |
| "linkedin", |
| "linkedin url", |
| "linkedin profile", |
| "person linkedin url", |
| "person linkedin profile url", |
| "linkedin profile url", |
| "linkedin public url", |
| "linkedin_url", |
| "linkedin profile link", |
| ], |
| ) |
|
|
|
|
| def _normalize_linkedin_url(raw: str) -> str: |
| """Strip whitespace; upgrade http→https for stable display and parsing.""" |
| s = _safe_str(raw) |
| if not s: |
| return "" |
| if s.startswith("http://"): |
| s = "https://" + s[len("http://") :] |
| return s |
|
|
|
|
| def _linkedin_public_identifier(raw: str) -> str: |
| """ |
| UniPile profile lookup expects the public slug from /in/{slug}, not a full URL. |
| Accepts plain slugs (e.g. satyanadella) as well. |
| """ |
| s = _normalize_linkedin_url(raw).strip() |
| if not s: |
| return "" |
| low = s.lower() |
| if "linkedin.com" in low: |
| try: |
| path = urlparse(s).path or "" |
| except Exception: |
| path = "" |
| path = unquote(path).strip("/") |
| parts = [p for p in path.split("/") if p] |
| for i, seg in enumerate(parts): |
| if seg.lower() == "in" and i + 1 < len(parts): |
| slug = parts[i + 1].strip() |
| return slug.split("?")[0].strip() |
| if seg.lower() == "sales" and i + 2 < len(parts) and parts[i + 1].lower() == "lead": |
| |
| return parts[i + 2].split("?")[0].strip() |
| return "" |
| |
| return s.split("?")[0].strip("/").strip() |
|
|
|
|
| def _load_unipile_env() -> tuple[str, str]: |
| """Read UniPile DSN and API key from the environment (strip whitespace / newlines from secrets).""" |
| base = (os.getenv("UNIPILE_API_BASE") or "").strip().rstrip("/") |
| key = (os.getenv("UNIPILE_API_KEY") or "").strip() |
| return base, key |
|
|
|
|
| def _require_unipile_config(): |
| base, key = _load_unipile_env() |
| if not base or not key: |
| raise HTTPException( |
| status_code=400, |
| detail="UniPile is not configured. Set UNIPILE_API_BASE and UNIPILE_API_KEY in backend env (Hugging Face: Space → Settings → Secrets).", |
| ) |
|
|
|
|
| def _unipile_headers(): |
| _, key = _load_unipile_env() |
| return { |
| "X-API-KEY": key, |
| "accept": "application/json", |
| "content-type": "application/json", |
| } |
|
|
|
|
| def _unipile_call(method: str, path: str, payload: Optional[dict] = None): |
| """ |
| Raw UniPile HTTP call. Returns (status_code, parsed_body). |
| Raises HTTPException only on transport failure (not on 4xx/5xx responses). |
| """ |
| _require_unipile_config() |
| base, _ = _load_unipile_env() |
| url = f"{base}{path}" |
| try: |
| resp = requests.request( |
| method=method.upper(), |
| url=url, |
| headers=_unipile_headers(), |
| json=payload, |
| timeout=45, |
| ) |
| except requests.RequestException as e: |
| raise HTTPException(status_code=502, detail=f"UniPile request failed: {str(e)}") |
| data = None |
| try: |
| data = resp.json() |
| except Exception: |
| data = {"raw": resp.text} |
| return resp.status_code, data |
|
|
|
|
| def _unipile_request(method: str, path: str, payload: Optional[dict] = None): |
| status, data = _unipile_call(method, path, payload) |
| if status >= 400: |
| msg = None |
| if isinstance(data, dict): |
| msg = ( |
| data.get("message") |
| or data.get("detail") |
| or data.get("error") |
| or data.get("errors") |
| ) |
| if isinstance(msg, (dict, list)): |
| msg = json.dumps(msg) |
| err_type = _safe_str(data.get("type")) |
| if status == 401 and err_type == "errors/missing_credentials": |
| msg = ( |
| "UniPile rejected the request (missing or invalid API credentials). " |
| "In Hugging Face Space → Settings → Secrets, set UNIPILE_API_KEY to your UniPile API access token " |
| "(Dashboard → API keys) and UNIPILE_API_BASE to your UniPile API URL (no trailing slash). " |
| "Remove accidental quotes or spaces when pasting; redeploy/restart the Space after saving." |
| ) |
| raise HTTPException( |
| status_code=status, |
| detail=msg or f"UniPile error ({status}): {data if not isinstance(data, dict) else json.dumps(data)}", |
| ) |
| return data |
|
|
|
|
| def _linkedin_invite_note(text: str) -> str: |
| """LinkedIn personalized invite notes are short; keep a safe upper bound.""" |
| t = _safe_str(text).replace("\r\n", "\n").strip() |
| if len(t) <= 280: |
| return t |
| return t[:277].rstrip() + "..." |
|
|
|
|
| def _send_unipile_dm(account_id: str, provider_id: str, text: str): |
| """Open (or reuse) a 1:1 LinkedIn chat and send a message.""" |
| chat = _unipile_request( |
| "POST", |
| "/api/v1/chats", |
| {"account_id": account_id, "attendees": [provider_id]}, |
| ) |
| chat_id = _safe_str(chat.get("id") if isinstance(chat, dict) else "") |
| if not chat_id: |
| raise ValueError("UniPile chat id not found") |
| _unipile_request( |
| "POST", |
| f"/api/v1/chats/{requests.utils.quote(chat_id, safe='')}/messages", |
| {"text": text or ""}, |
| ) |
|
|
|
|
| def _public_origin_from_request(request: Request) -> str: |
| """ |
| Resolve browser-facing origin behind proxies (HF Spaces, ingress). |
| Falls back to request URL if forwarded headers are absent. |
| """ |
| fwd_proto = request.headers.get("x-forwarded-proto") |
| proto = (fwd_proto.split(",")[0].strip().lower() if fwd_proto else request.url.scheme or "https") |
| fwd_host = request.headers.get("x-forwarded-host") |
| host = (fwd_host.split(",")[0].strip() if fwd_host else request.headers.get("host") or request.url.netloc or "") |
| if host: |
| return f"{proto}://{host}".rstrip("/") |
| return str(request.base_url).rstrip("/") |
|
|
|
|
| _UNIPILE_ID_UUID_RE = re.compile( |
| r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$", |
| re.IGNORECASE, |
| ) |
|
|
|
|
| def _is_unipile_nonsense_display_name(val: str, account_id_hint: str = "") -> bool: |
| """Hosted-auth notify bodies store the state token in `name`; account ids are UUID-shaped.""" |
| s = (val or "").strip() |
| if not s: |
| return True |
| if _UNIPILE_ID_UUID_RE.match(s): |
| return True |
| if account_id_hint and s == account_id_hint.strip(): |
| return True |
| return False |
|
|
|
|
| def _extract_unipile_identity_from_layer(data: dict, account_id_hint: str = "") -> tuple[str, str]: |
| """Read display name + avatar from one object (e.g. GET /accounts/{id} payload).""" |
| candidates_name = [ |
| data.get("name"), |
| data.get("display_name"), |
| data.get("full_name"), |
| (data.get("user") or {}).get("name") if isinstance(data.get("user"), dict) else None, |
| (data.get("profile") or {}).get("name") if isinstance(data.get("profile"), dict) else None, |
| (data.get("account") or {}).get("name") if isinstance(data.get("account"), dict) else None, |
| ] |
| candidates_avatar = [ |
| data.get("picture"), |
| data.get("avatar"), |
| data.get("photo_url"), |
| data.get("image_url"), |
| (data.get("user") or {}).get("picture") if isinstance(data.get("user"), dict) else None, |
| (data.get("profile") or {}).get("picture") if isinstance(data.get("profile"), dict) else None, |
| (data.get("profile") or {}).get("avatar") if isinstance(data.get("profile"), dict) else None, |
| ] |
| raw_name = next((_safe_str(x) for x in candidates_name if _safe_str(x)), "") |
| if _is_unipile_nonsense_display_name(raw_name, account_id_hint): |
| raw_name = "" |
| avatar = next((_safe_str(x) for x in candidates_avatar if _safe_str(x)), "") |
| return raw_name, avatar |
|
|
|
|
| def _extract_unipile_identity(data: Optional[dict]) -> tuple[str, str]: |
| """ |
| Best-effort extraction of profile display name and avatar URL from UniPile account payloads. |
| Prefer GET /accounts enrichment under `account_profile`; ignore hosted-auth token in top-level `name`. |
| """ |
| if not isinstance(data, dict): |
| return "", "" |
| account_id_hint = _safe_str(data.get("account_id")) |
| layers: list[dict] = [] |
| ap = data.get("account_profile") |
| if isinstance(ap, dict) and ap: |
| layers.append(ap) |
| layers.append(data) |
| name, avatar = "", "" |
| for layer in layers: |
| n, av = _extract_unipile_identity_from_layer(layer, account_id_hint=account_id_hint) |
| if av and not avatar: |
| avatar = av |
| if n and not name: |
| name = n |
| if name and avatar: |
| break |
| return name, avatar |
|
|
|
|
| def _linkedin_account_display_name(row: UnipileAccount) -> str: |
| """Human-facing profile name for UI (not the UniPile id or generic label).""" |
| meta = row.metadata_json if isinstance(row.metadata_json, dict) else {} |
| n = _extract_unipile_identity(meta)[0].strip() |
| if n: |
| return n |
| lab = (row.label or "").strip() |
| suffix = "'s LinkedIn account" |
| if lab.endswith(suffix) and len(lab) > len(suffix): |
| return lab[: -len(suffix)].strip() |
| return lab or "LinkedIn account" |
|
|
|
|
| _MAILBOX_PROVIDER_SET = ("GOOGLE", "MICROSOFT", "OUTLOOK", "MAIL", "IMAP") |
|
|
|
|
| def _mailbox_account_display_name(row: UnipileAccount) -> str: |
| meta = row.metadata_json if isinstance(row.metadata_json, dict) else {} |
| n = _extract_unipile_identity(meta)[0].strip() |
| if n: |
| return n |
| return (row.label or "").strip() or "Mailbox" |
|
|
|
|
| def _try_fetch_unipile_account_profile(account_id: str) -> dict: |
| """ |
| Optional enrichment call. If it fails, we just keep existing metadata. |
| """ |
| try: |
| out = _unipile_request("GET", f"/api/v1/accounts/{requests.utils.quote(account_id, safe='')}") |
| return out if isinstance(out, dict) else {} |
| except Exception: |
| return {} |
|
|
|
|
| def _contact_value(contact: Contact, field: str): |
| if field == "first_name": |
| return contact.first_name or _pick_from_raw(contact.raw_data, ["first name", "firstname", "first_name"]) |
| if field == "last_name": |
| return contact.last_name or _pick_from_raw(contact.raw_data, ["last name", "lastname", "last_name"]) |
| if field == "email": |
| return contact.email or _pick_from_raw(contact.raw_data, ["email", "work email", "email address", "secondary email", "tertiary email"]) |
| if field == "company": |
| return contact.company or _pick_from_raw(contact.raw_data, ["company", "company name", "company name for emails", "organization name", "account name"]) |
| if field == "title": |
| return contact.title or _pick_from_raw(contact.raw_data, ["title", "job title"]) |
| if field == "file_id": |
| return contact.file_id or "" |
| if field == "created_at": |
| return contact.created_at.isoformat() if contact.created_at else "" |
| return (contact.raw_data or {}).get(field) |
|
|
|
|
| def _to_float(val): |
| s = _safe_str(val).replace(",", "") |
| if not s: |
| return None |
| try: |
| return float(s) |
| except Exception: |
| return None |
|
|
|
|
| def _to_datetime(val): |
| s = _safe_str(val) |
| if not s: |
| return None |
| try: |
| return datetime.fromisoformat(s.replace("Z", "+00:00")) |
| except Exception: |
| return None |
|
|
|
|
| CRM_STATUS_ALLOWED = frozenset({ |
| "none", |
| "new_lead", |
| "contacted", |
| "qualified", |
| "unqualified", |
| }) |
| DEAL_STAGE_ALLOWED = frozenset({ |
| "new", |
| "discovery", |
| "proposal", |
| "negotiation", |
| "won", |
| "lost", |
| }) |
| DEAL_LOST_REASON_ALLOWED = frozenset({ |
| "price", |
| "no_decision", |
| "chose_another", |
| "didnt_meet_expectation", |
| "unknown", |
| }) |
| DEAL_REVENUE_TYPE_ALLOWED = frozenset({"arr", "qrr", "mrr", "one_time"}) |
| SMARTLEAD_IMPORT_FILE_ID = "smartlead-imports" |
| MANUAL_CONTACT_FILE_ID = "manual-contacts" |
| DEMO_CONTACTS_FILE_ID = "demo-contacts-crm" |
|
|
|
|
| def _contact_matches_apollo_filter(c: Contact, rule: dict) -> bool: |
| """Single dynamic-field rule (same semantics as legacy single-field query).""" |
| field = _safe_str(rule.get("field")) |
| if not field: |
| return True |
| op_local = (_safe_str(rule.get("op") or "contains")).lower() |
| value = _safe_str(rule.get("value")) |
| from_value = _safe_str(rule.get("from_value")) |
| to_value = _safe_str(rule.get("to_value")) |
| cmp_value = value or from_value |
| v = _safe_str(cmp_value).lower() |
| v_from = from_value |
| v_to = to_value |
| field_val = _contact_value(c, field) |
| field_str = _safe_str(field_val).lower() |
| if op_local == "equals": |
| return field_str == v |
| if op_local == "from": |
| f_num, v_num = _to_float(field_val), _to_float(v_from or value) |
| if f_num is not None and v_num is not None: |
| return f_num >= v_num |
| f_dt, v_dt = _to_datetime(field_val), _to_datetime(v_from or value) |
| if f_dt and v_dt: |
| return f_dt >= v_dt |
| return field_str >= _safe_str(v_from or value).lower() |
| if op_local == "to": |
| f_num, v_num = _to_float(field_val), _to_float(v_to or value) |
| if f_num is not None and v_num is not None: |
| return f_num <= v_num |
| f_dt, v_dt = _to_datetime(field_val), _to_datetime(v_to or value) |
| if f_dt and v_dt: |
| return f_dt <= v_dt |
| return field_str <= _safe_str(v_to or value).lower() |
| if op_local == "between": |
| f_num, from_num, to_num = _to_float(field_val), _to_float(v_from), _to_float(v_to) |
| if f_num is not None and from_num is not None and to_num is not None: |
| return from_num <= f_num <= to_num |
| f_dt, from_dt, to_dt = _to_datetime(field_val), _to_datetime(v_from), _to_datetime(v_to) |
| if f_dt and from_dt and to_dt: |
| return from_dt <= f_dt <= to_dt |
| low, high = v_from.lower(), v_to.lower() |
| return low <= field_str <= high |
| return v in field_str |
|
|
|
|
| @app.on_event("startup") |
| def _migrate_lead_status_removed_option(): |
| """Map legacy attempted_to_contact rows to contacted once at startup.""" |
| db = SessionLocal() |
| try: |
| db.query(CrmLead).filter(CrmLead.crm_status == "attempted_to_contact").update( |
| {CrmLead.crm_status: "contacted"}, |
| synchronize_session=False, |
| ) |
| db.commit() |
| except Exception: |
| db.rollback() |
| finally: |
| db.close() |
|
|
|
|
| def _strip_html_simple(text: str) -> str: |
| if not text: |
| return "" |
| t = re.sub(r"<[^>]+>", " ", text) |
| return re.sub(r"\s+", " ", t).strip() |
|
|
|
|
| def _parse_smartlead_reply_payload(body: dict) -> Optional[dict]: |
| """Normalize Smartlead webhook JSON into fields for CrmLead. Returns None if not a reply or missing identity.""" |
| if not isinstance(body, dict): |
| return None |
| nest = body.get("data") if isinstance(body.get("data"), dict) else {} |
| lead = body.get("lead") or nest.get("lead") or {} |
| if not isinstance(lead, dict): |
| lead = {} |
| reply = body.get("reply") or nest.get("reply") or {} |
| if not isinstance(reply, dict): |
| reply = {} |
|
|
| event = str(body.get("event") or body.get("event_type") or body.get("type") or "").upper() |
| has_reply_content = bool( |
| reply.get("body") |
| or reply.get("html_body") |
| or reply.get("text_body") |
| or body.get("reply_body") |
| or body.get("message") |
| or nest.get("reply_body") |
| ) |
| is_reply_event = "REPLIED" in event or event in ("EMAIL_REPLIED", "LEAD_REPLIED") |
| if not is_reply_event and not has_reply_content: |
| return None |
|
|
| campaign_id = _safe_str( |
| body.get("campaign_id") |
| or nest.get("campaign_id") |
| or lead.get("campaign_id") |
| or body.get("campaignId") |
| ) |
| sl_id = lead.get("lead_id") or lead.get("id") or body.get("lead_id") or nest.get("lead_id") |
| sl_id_str = _safe_str(sl_id) if sl_id is not None else "" |
|
|
| email = ( |
| lead.get("email") |
| or lead.get("email_address") |
| or body.get("lead_email") |
| or body.get("email") |
| or nest.get("email") |
| ) |
| email = _safe_str(email).lower() |
| if not email and not sl_id_str: |
| return None |
|
|
| reply_body = ( |
| reply.get("body") |
| or reply.get("html_body") |
| or reply.get("text_body") |
| or body.get("reply_body") |
| or body.get("message") |
| or nest.get("reply_body") |
| or "" |
| ) |
| reply_body = _strip_html_simple(_safe_str(reply_body)) |
| subject = _safe_str(reply.get("subject") or body.get("subject") or body.get("reply_subject") or "") |
| if not reply_body and not subject: |
| return None |
|
|
| received_raw = ( |
| reply.get("received_at") |
| or reply.get("created_at") |
| or body.get("received_at") |
| or body.get("timestamp") |
| or nest.get("received_at") |
| ) |
| received_at = _to_datetime(received_raw) if received_raw else datetime.utcnow() |
|
|
| fn = _safe_str(lead.get("first_name") or body.get("first_name")) |
| ln = _safe_str(lead.get("last_name") or body.get("last_name")) |
| company = _safe_str( |
| lead.get("company_name") |
| or lead.get("company") |
| or body.get("company_name") |
| or nest.get("company_name") |
| ) |
| title = _safe_str(lead.get("title") or lead.get("job_title") or body.get("title")) |
| campaign_name = _safe_str(body.get("campaign_name") or nest.get("campaign_name") or "") |
|
|
| return { |
| "smartlead_lead_id": sl_id_str, |
| "campaign_id": campaign_id, |
| "campaign_name": campaign_name, |
| "email": email, |
| "first_name": fn, |
| "last_name": ln, |
| "company_name": company, |
| "title": title, |
| "last_reply_subject": subject, |
| "last_reply_body": reply_body, |
| "last_reply_at": received_at, |
| "raw_webhook": body, |
| } |
|
|
|
|
| def _crm_lead_to_dict(row: CrmLead) -> dict: |
| return { |
| "id": row.id, |
| "smartlead_lead_id": row.smartlead_lead_id, |
| "campaign_id": row.campaign_id, |
| "campaign_name": row.campaign_name or "", |
| "email": row.email or "", |
| "first_name": row.first_name or "", |
| "last_name": row.last_name or "", |
| "company_name": row.company_name or "", |
| "title": row.title or "", |
| "last_reply_subject": row.last_reply_subject or "", |
| "last_reply_body": row.last_reply_body or "", |
| "last_reply_at": row.last_reply_at.isoformat() if row.last_reply_at else None, |
| "crm_status": row.crm_status or "new_lead", |
| "contact_id": row.contact_id, |
| "created_at": row.created_at.isoformat() if row.created_at else None, |
| "updated_at": row.updated_at.isoformat() if row.updated_at else None, |
| "company_details": _lead_company_details_dict(row), |
| } |
|
|
|
|
| def _move_lead_to_contacts_core(db: Session, lead: CrmLead) -> dict: |
| if lead.contact_id: |
| return {"ok": True, "contact_id": lead.contact_id, "message": "Already linked to Contacts"} |
| email = (lead.email or "").strip() |
| if not email: |
| return {"ok": False, "error": "Lead has no email"} |
| existing = ( |
| db.query(Contact) |
| .filter( |
| Contact.tenant_id == lead.tenant_id, |
| func.lower(Contact.email) == email.lower(), |
| ) |
| .first() |
| ) |
| if existing: |
| lead.contact_id = existing.id |
| return {"ok": True, "contact_id": existing.id, "message": "Linked to existing contact"} |
| raw = { |
| "Company Name": lead.company_name or "", |
| "Title": lead.title or "", |
| "source": "smartlead_reply", |
| "smartlead_lead_id": lead.smartlead_lead_id, |
| "campaign_id": lead.campaign_id, |
| "last_reply_subject": lead.last_reply_subject, |
| "last_reply_body": lead.last_reply_body, |
| "smartlead_webhook": lead.raw_webhook, |
| } |
| contact = Contact( |
| tenant_id=lead.tenant_id, |
| file_id=SMARTLEAD_IMPORT_FILE_ID, |
| row_index=lead.id, |
| first_name=lead.first_name or "", |
| last_name=lead.last_name or "", |
| email=email, |
| company=lead.company_name or "", |
| title=lead.title or "", |
| source="smartlead", |
| raw_data=raw, |
| ) |
| db.add(contact) |
| db.flush() |
| lead.contact_id = contact.id |
| return {"ok": True, "contact_id": contact.id, "message": "Contact created"} |
|
|
|
|
| CONTACTS_TO_LEADS_CAMPAIGN_ID = "contacts_import" |
| CONTACTS_TO_LEADS_CAMPAIGN_NAME = "Contacts" |
|
|
|
|
| def _convert_contact_to_lead_core(db: Session, contact: Contact) -> dict: |
| """ |
| Create a CrmLead row copied from a Contact. The contact row is kept; only deleted manually. |
| |
| Links the lead via ``contact_id`` so CRM stays consistent. Skips if a lead with the same |
| email already exists. |
| """ |
| email = (_contact_value(contact, "email") or "").strip() |
| if not email: |
| return {"ok": False, "error": "Contact has no email"} |
| dup = ( |
| db.query(CrmLead) |
| .filter( |
| CrmLead.tenant_id == contact.tenant_id, |
| func.lower(CrmLead.email) == email.lower(), |
| ) |
| .first() |
| ) |
| if dup: |
| return {"ok": False, "error": "A lead with this email already exists"} |
| fn = _contact_value(contact, "first_name") or "" |
| ln = _contact_value(contact, "last_name") or "" |
| co = _contact_value(contact, "company") or "" |
| ti = _contact_value(contact, "title") or "" |
| rd = contact.raw_data if isinstance(contact.raw_data, dict) else {} |
| company_snapshot = { |
| "Company Name": _safe_str(co or rd.get("Company Name")), |
| "Company Name for Emails": _safe_str(rd.get("Company Name for Emails")), |
| "Industry": _safe_str(rd.get("Industry")), |
| "# Employees": _safe_str(rd.get("# Employees")), |
| "Annual Revenue": _safe_str(rd.get("Annual Revenue")), |
| "Last Raised At": _safe_str(rd.get("Last Raised At")), |
| "Website": _safe_str(rd.get("Website")), |
| "City": _safe_str(rd.get("City")), |
| "State": _safe_str(rd.get("State")), |
| "Country": _safe_str(rd.get("Country")), |
| } |
| raw = { |
| "source": "contacts_convert", |
| "source_contact_id": contact.id, |
| "company_details": company_snapshot, |
| } |
| lead = CrmLead( |
| tenant_id=contact.tenant_id, |
| smartlead_lead_id=f"from-contact-{contact.id}", |
| campaign_id=CONTACTS_TO_LEADS_CAMPAIGN_ID, |
| campaign_name=CONTACTS_TO_LEADS_CAMPAIGN_NAME, |
| email=email, |
| first_name=fn, |
| last_name=ln, |
| company_name=co, |
| title=ti, |
| last_reply_subject="", |
| last_reply_body="Added from Contacts", |
| last_reply_at=datetime.utcnow(), |
| crm_status="new_lead", |
| contact_id=contact.id, |
| raw_webhook=raw, |
| ) |
| db.add(lead) |
| db.flush() |
| return {"ok": True, "lead_id": lead.id} |
|
|
|
|
| def _deal_name_from_lead(lead: CrmLead) -> str: |
| person = " ".join(filter(None, [lead.first_name or "", lead.last_name or ""])).strip() |
| if lead.company_name and person: |
| return f"{lead.company_name} — {person}" |
| if lead.company_name: |
| return lead.company_name or "Deal" |
| if person: |
| return person |
| return (lead.email or "").strip() or "Untitled deal" |
|
|
|
|
| def _user_initials_from_user(u: User | None) -> str: |
| if not u: |
| return "" |
| name = (u.name or "").strip() |
| em = (u.email or "").strip() |
| if name: |
| parts = name.split() |
| if len(parts) >= 2: |
| return (parts[0][:1] + parts[-1][:1]).upper()[:2] |
| return (parts[0][:2] or "?").upper()[:2] |
| if em: |
| return em[:2].upper() |
| return "?" |
|
|
|
|
| def _tenant_member_ids(db: Session, tenant_id: int) -> set[int]: |
| rows = ( |
| db.query(TenantMembership.user_id) |
| .filter(TenantMembership.tenant_id == int(tenant_id)) |
| .all() |
| ) |
| return {int(r[0]) for r in rows} |
|
|
|
|
| def _deal_member_can_see(row: CrmDeal, user_id: int) -> bool: |
| if row.owner_user_id is None: |
| return True |
| return int(row.owner_user_id) == int(user_id) |
|
|
|
|
| def _deal_access_or_403(row: CrmDeal, t: TenantContext) -> None: |
| if t.role == "admin": |
| return |
| if not _deal_member_can_see(row, t.user_id): |
| raise HTTPException(status_code=403, detail="You do not have access to this deal") |
|
|
|
|
| def _dashboard_period_bounds_utc(period: str) -> Optional[tuple[datetime, datetime]]: |
| """Bounds for filtering won deals by won_at (dashboard date presets). None = all time.""" |
| p = (period or "all").strip().lower() |
| if p in ("", "all"): |
| return None |
| now = datetime.utcnow() |
| if p == "month": |
| start = datetime(now.year, now.month, 1) |
| last = monthrange(now.year, now.month)[1] |
| end = datetime(now.year, now.month, last, 23, 59, 59) |
| return start, end |
| if p == "quarter": |
| qi = (now.month - 1) // 3 |
| sm = qi * 3 + 1 |
| start = datetime(now.year, sm, 1) |
| em = sm + 2 |
| if em == 12: |
| end = datetime(now.year, 12, 31, 23, 59, 59) |
| else: |
| end = datetime(now.year, em + 1, 1) - timedelta(microseconds=1) |
| return start, end |
| if p == "year": |
| return datetime(now.year, 1, 1), datetime(now.year, 12, 31, 23, 59, 59) |
| return None |
|
|
|
|
| def _previous_period_bounds_utc(period: str) -> Optional[tuple[datetime, datetime]]: |
| """Calendar window immediately before the current preset (month → prior month, etc.).""" |
| p = (period or "all").strip().lower() |
| if p in ("", "all"): |
| return None |
| now = datetime.utcnow() |
| if p == "month": |
| if now.month == 1: |
| y, m = now.year - 1, 12 |
| else: |
| y, m = now.year, now.month - 1 |
| start = datetime(y, m, 1) |
| last = monthrange(y, m)[1] |
| end = datetime(y, m, last, 23, 59, 59) |
| return start, end |
| if p == "quarter": |
| qi = (now.month - 1) // 3 |
| if qi == 0: |
| y = now.year - 1 |
| sm = 10 |
| else: |
| y = now.year |
| sm = (qi - 1) * 3 + 1 |
| start = datetime(y, sm, 1) |
| em = sm + 2 |
| if em == 12: |
| end = datetime(y, 12, 31, 23, 59, 59) |
| else: |
| end = datetime(y, em + 1, 1) - timedelta(microseconds=1) |
| return start, end |
| if p == "year": |
| py = now.year - 1 |
| return datetime(py, 1, 1), datetime(py, 12, 31, 23, 59, 59) |
| return None |
|
|
|
|
| def _comparison_period_label(period: str) -> str: |
| p = (period or "all").strip().lower() |
| if p == "month": |
| return "Last month" |
| if p == "quarter": |
| return "Last quarter" |
| if p == "year": |
| return "Last year" |
| return "" |
|
|
|
|
| def _won_at_within_bounds(won_at: Optional[datetime], bounds: Optional[tuple[datetime, datetime]]) -> bool: |
| if bounds is None: |
| return True |
| if won_at is None: |
| return False |
| wdt = won_at.replace(tzinfo=None) if getattr(won_at, "tzinfo", None) else won_at |
| return bounds[0] <= wdt <= bounds[1] |
|
|
|
|
| def _deal_to_dict(d: CrmDeal) -> dict: |
| fv = None |
| if d.deal_value is not None and d.close_probability is not None: |
| fv = int(round(d.deal_value * (d.close_probability / 100.0))) |
| ecd = None |
| if d.expected_close_date: |
| try: |
| ecd = d.expected_close_date.date().isoformat() |
| except Exception: |
| ecd = d.expected_close_date.isoformat() |
| rt = (getattr(d, "revenue_type", None) or "arr").strip().lower() |
| if rt not in DEAL_REVENUE_TYPE_ALLOWED: |
| rt = "arr" |
| return { |
| "id": d.id, |
| "name": d.name or "", |
| "stage": d.stage or "new", |
| "owner_user_id": d.owner_user_id, |
| "owner_initials": d.owner_initials or "", |
| "deal_value": d.deal_value, |
| "revenue_type": rt, |
| "contact_display": d.contact_display or "", |
| "account_name": d.account_name or "", |
| "expected_close_date": ecd, |
| "close_probability": d.close_probability if d.close_probability is not None else 10, |
| "forecast_value": fv, |
| "last_interaction_at": d.last_interaction_at.isoformat() if d.last_interaction_at else None, |
| "country": d.country or "", |
| "source_lead_id": d.source_lead_id, |
| "source_campaign_name": d.source_campaign_name or "", |
| "contact_id": d.contact_id, |
| "created_at": d.created_at.isoformat() if d.created_at else None, |
| "lost_reason": _safe_str(getattr(d, "lost_reason", None) or ""), |
| "lost_price_discount_pct": getattr(d, "lost_price_discount_pct", None), |
| "lost_chose_product_name": _safe_str(getattr(d, "lost_chose_product_name", None) or ""), |
| "won_po_number": _safe_str(getattr(d, "won_po_number", None) or ""), |
| "won_customer_legal_name": _safe_str(getattr(d, "won_customer_legal_name", None) or ""), |
| "won_customer_address": _safe_str(getattr(d, "won_customer_address", None) or ""), |
| "won_contact_person_name": _safe_str(getattr(d, "won_contact_person_name", None) or ""), |
| "won_channel_partner_name": _safe_str(getattr(d, "won_channel_partner_name", None) or ""), |
| "won_note_to_customer": _safe_str(getattr(d, "won_note_to_customer", None) or ""), |
| "won_note_to_accounts": _safe_str(getattr(d, "won_note_to_accounts", None) or ""), |
| "won_line_items": _won_line_items_list(d), |
| "won_at": d.won_at.isoformat() if getattr(d, "won_at", None) else None, |
| } |
|
|
|
|
| def _won_line_items_list(d: CrmDeal) -> list: |
| raw = getattr(d, "won_line_items", None) |
| if raw is None: |
| return [] |
| if isinstance(raw, list): |
| return raw |
| if isinstance(raw, str) and raw.strip(): |
| try: |
| v = json.loads(raw) |
| return v if isinstance(v, list) else [] |
| except Exception: |
| return [] |
| return [] |
|
|
|
|
| def _clear_won_billing_fields(row: CrmDeal) -> None: |
| row.won_po_number = "" |
| row.won_customer_legal_name = "" |
| row.won_customer_address = "" |
| row.won_contact_person_name = "" |
| row.won_channel_partner_name = "" |
| row.won_note_to_customer = "" |
| row.won_note_to_accounts = "" |
| row.won_line_items = None |
| row.won_at = None |
|
|
|
|
| def _won_currency_label(raw) -> str: |
| c = (_safe_str(raw) or "USD").strip().upper() |
| return "CAD" if c == "CAD" else "USD" |
|
|
|
|
| def _fmt_won_money(amount: float, currency_raw) -> str: |
| """Plain-text money with symbol + ISO code for won-deal emails.""" |
| lab = _won_currency_label(currency_raw) |
| sym = "CA$" if lab == "CAD" else "$" |
| try: |
| n = float(amount) |
| except (TypeError, ValueError): |
| n = 0.0 |
| return f"{sym}{n:,.2f} {lab}" |
|
|
|
|
| def _won_billing_interval_label(raw) -> str: |
| """Human-readable billing cadence (matches win popover).""" |
| s = (_safe_str(raw) or "monthly").strip().lower() |
| if s in ("one_time", "onetime", "one-time", "non-recurring"): |
| return "One-time" |
| if s in ("quarterly", "quarter", "every 3 months"): |
| return "Every 3 months" |
| if s in ("annual", "year", "yearly", "every year"): |
| return "Every year" |
| return "Every month" |
|
|
|
|
| async def _notify_tenant_admins_deal_won( |
| db: Session, |
| tenant_id: int, |
| sender: User, |
| deal_id: int, |
| deal_name: str, |
| lines: list, |
| subtotal: float, |
| po: str, |
| legal: str, |
| addr: str, |
| contact_person: str, |
| channel_partner: str, |
| note_customer: str, |
| note_accounts: str, |
| ) -> tuple[bool, Optional[str]]: |
| """Email each workspace admin using the marking user's Gmail (same as invites).""" |
| rt = (sender.google_refresh_token or "").strip() |
| from_email = (sender.email or "").strip() |
| if not rt or not from_email: |
| return ( |
| False, |
| "Gmail is not connected for your account. The deal was saved as Won, but admins were not emailed automatically.", |
| ) |
| tenant = db.query(Tenant).filter(Tenant.id == int(tenant_id)).first() |
| workspace_name = (tenant.name or "Workspace").strip() if tenant else "Workspace" |
| admins = ( |
| db.query(User) |
| .join(TenantMembership, TenantMembership.user_id == User.id) |
| .filter(TenantMembership.tenant_id == int(tenant_id), TenantMembership.role == "admin") |
| .all() |
| ) |
| recipients: list[str] = [] |
| seen: set[str] = set() |
| for u in admins: |
| em = (u.email or "").strip().lower() |
| if em and em not in seen: |
| seen.add(em) |
| recipients.append((u.email or "").strip()) |
| if not recipients: |
| return False, "No workspace admin email addresses were found." |
|
|
| subject = f'[Won deal] {deal_name or "Deal"} — PO {po[:80]}' |
| body_lines = [ |
| f"A deal was marked Won in workspace \"{workspace_name}\".", |
| "", |
| f"Deal ID: {deal_id}", |
| f"Deal name: {deal_name}", |
| f"Marked won by: {(sender.name or '').strip() or sender.email} <{sender.email}>", |
| "", |
| f"PO number: {po}", |
| f"Customer legal name: {legal}", |
| "Customer address:", |
| addr, |
| f"Contact person: {contact_person}", |
| ] |
| if channel_partner: |
| body_lines.append(f"Channel partner: {channel_partner}") |
| body_lines += [ |
| "", |
| "Note to customer:", |
| note_customer, |
| "", |
| "Note to our accounts:", |
| note_accounts, |
| "", |
| "Products / services (line items):", |
| ] |
| for i, li in enumerate(lines, 1): |
| ps = _safe_str(li.get("product_service", "")) |
| cur = _won_currency_label(li.get("currency")) |
| try: |
| qf = float(li.get("qty") or 0) |
| rf = float(li.get("rate") or 0) |
| af = float(li.get("amount") or 0) |
| except (TypeError, ValueError): |
| qf = rf = af = 0.0 |
| cadence = _won_billing_interval_label(li.get("billing_interval")) |
| body_lines.append( |
| f" {i}. {ps} | {cadence} | Qty {qf:g} × {_fmt_won_money(rf, cur)} = {_fmt_won_money(af, cur)}" |
| ) |
| desc = _safe_str(li.get("description", "")) |
| if desc: |
| body_lines.append(f" Description: {desc}") |
| body_lines.append("") |
| if lines: |
| distinct = {_won_currency_label(li.get("currency")) for li in lines} |
| if len(distinct) == 1: |
| only = distinct.pop() |
| body_lines.append(f"Subtotal: {_fmt_won_money(float(subtotal), only)}") |
| else: |
| body_lines.append( |
| "Subtotal (numeric sum of line amounts; USD and CAD lines are not converted between currencies): " |
| f"{subtotal:,.2f}" |
| ) |
| else: |
| body_lines.append(f"Subtotal: {subtotal:,.2f}") |
| body_text = "\n".join(body_lines) |
|
|
| errs: list[str] = [] |
| ok_any = False |
| for to_email in recipients: |
| try: |
| await send_invite_email_via_gmail(rt, from_email, to_email, subject, body_text) |
| ok_any = True |
| except Exception as e: |
| errs.append(f"{to_email}: {str(e)}") |
| if ok_any: |
| return True, ("; ".join(errs) if errs else None) |
| return False, "; ".join(errs) if errs else "Could not send notification email." |
|
|
|
|
| @app.get("/api/health") |
| def health(): |
| return {"status": "ok"} |
|
|
| @app.get("/api/hello") |
| def hello(): |
| return {"message": "Hello from FastAPI"} |
|
|
|
|
| @app.post("/api/upload-csv") |
| async def upload_csv(file: UploadFile = File(...), t: TenantContext = Depends(get_tenant_context)): |
| """Upload and parse CSV file from Apollo""" |
| db = t.db |
| tenant_id = t.tenant_id |
| try: |
| |
| file_id = str(uuid.uuid4()) |
| file_path = UPLOAD_DIR / f"{file_id}.csv" |
| |
| |
| content = await file.read() |
| with open(file_path, "wb") as f: |
| f.write(content) |
| |
| |
| df = pd.read_csv(file_path) |
| contact_count = len(df) |
| |
| |
| db_file = UploadedFile( |
| tenant_id=tenant_id, |
| file_id=file_id, |
| filename=file.filename, |
| contact_count=contact_count, |
| file_path=str(file_path) |
| ) |
| db.add(db_file) |
|
|
| |
| for idx, row in df.iterrows(): |
| row_dict = row.to_dict() |
| sanitized_raw_data = {str(k): _json_safe(v) for k, v in row_dict.items()} |
| contact = Contact( |
| tenant_id=tenant_id, |
| file_id=file_id, |
| row_index=idx + 1, |
| first_name=_pick_field(row_dict, ["first name", "firstname", "first_name"]), |
| last_name=_pick_field(row_dict, ["last name", "lastname", "last_name"]), |
| email=_pick_field(row_dict, ["email", "work email", "email address"]), |
| company=_pick_field(row_dict, ["company", "company name", "company name for emails", "organization name", "account name"]), |
| title=_pick_field(row_dict, ["title", "job title"]), |
| source="apollo_csv", |
| raw_data=sanitized_raw_data, |
| ) |
| db.add(contact) |
|
|
| db.commit() |
| |
| return { |
| "file_id": file_id, |
| "contact_count": contact_count, |
| "message": "File uploaded successfully" |
| } |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Error uploading file: {str(e)}") |
|
|
|
|
| @app.get("/api/unipile/linkedin/campaign-defaults") |
| async def unipile_linkedin_campaign_defaults(t: TenantContext = Depends(get_tenant_context)): |
| """ |
| Combined prefs + accounts for sequence builder and campaign UI (saved LinkedIn profile name + default account). |
| """ |
| db = t.db |
| user_row = db.query(User).filter(User.id == t.user_id).first() |
| rows = ( |
| db.query(UnipileAccount) |
| .filter( |
| UnipileAccount.tenant_id == t.tenant_id, |
| UnipileAccount.user_id == t.user_id, |
| UnipileAccount.provider == "LINKEDIN", |
| ) |
| .order_by(UnipileAccount.created_at.desc(), UnipileAccount.id.desc()) |
| .all() |
| ) |
| accounts = [] |
| for r in rows: |
| ident = _extract_unipile_identity(r.metadata_json or {}) |
| accounts.append( |
| { |
| "id": r.id, |
| "label": r.label or "LinkedIn account", |
| "display_name": _linkedin_account_display_name(r), |
| "avatar_url": ident[1] or "", |
| "provider": r.provider, |
| "unipile_account_id": r.unipile_account_id, |
| "status": r.status or "unknown", |
| "auth_mode": r.auth_mode or "hosted", |
| "created_at": r.created_at.isoformat() if r.created_at else None, |
| } |
| ) |
| prof = (user_row.linkedin_profile_display_name or "").strip() if user_row else "" |
| default_ref = getattr(user_row, "default_unipile_account_ref_id", None) if user_row else None |
| |
| valid_default = None |
| if default_ref is not None: |
| if any(a["id"] == default_ref for a in accounts): |
| valid_default = default_ref |
| return { |
| "linkedin_profile_display_name": prof, |
| "default_unipile_account_ref_id": valid_default, |
| "accounts": accounts, |
| } |
|
|
|
|
| @app.patch("/api/me/linkedin-prefs") |
| async def patch_me_linkedin_prefs( |
| body: UserLinkedinPrefsPatch, |
| t: TenantContext = Depends(get_tenant_context), |
| ): |
| db = t.db |
| u = db.query(User).filter(User.id == t.user_id).first() |
| if not u: |
| raise HTTPException(status_code=404, detail="User not found") |
|
|
| patch = body.model_dump(exclude_unset=True) |
|
|
| if "linkedin_profile_display_name" in patch: |
| s = (patch["linkedin_profile_display_name"] or "").strip() |
| u.linkedin_profile_display_name = s[:200] if s else None |
|
|
| if "default_unipile_account_ref_id" in patch: |
| rid = patch["default_unipile_account_ref_id"] |
| if rid is None or rid == 0: |
| u.default_unipile_account_ref_id = None |
| else: |
| acc = ( |
| db.query(UnipileAccount) |
| .filter( |
| UnipileAccount.id == rid, |
| UnipileAccount.tenant_id == t.tenant_id, |
| UnipileAccount.user_id == t.user_id, |
| ) |
| .first() |
| ) |
| if not acc: |
| raise HTTPException(status_code=404, detail="LinkedIn account not found for this user") |
| u.default_unipile_account_ref_id = rid |
|
|
| db.commit() |
| return { |
| "ok": True, |
| "linkedin_profile_display_name": (u.linkedin_profile_display_name or ""), |
| "default_unipile_account_ref_id": u.default_unipile_account_ref_id, |
| } |
|
|
|
|
| @app.get("/api/unipile/mailbox/campaign-defaults") |
| async def unipile_mailbox_campaign_defaults(t: TenantContext = Depends(get_tenant_context)): |
| db = t.db |
| user_row = db.query(User).filter(User.id == t.user_id).first() |
| rows = ( |
| db.query(UnipileAccount) |
| .filter( |
| UnipileAccount.tenant_id == t.tenant_id, |
| UnipileAccount.user_id == t.user_id, |
| UnipileAccount.provider.in_(_MAILBOX_PROVIDER_SET), |
| ) |
| .order_by(UnipileAccount.created_at.desc(), UnipileAccount.id.desc()) |
| .all() |
| ) |
| accounts = [] |
| for r in rows: |
| ident = _extract_unipile_identity(r.metadata_json or {}) |
| accounts.append( |
| { |
| "id": r.id, |
| "label": r.label or "Mailbox", |
| "display_name": _mailbox_account_display_name(r), |
| "avatar_url": ident[1] or "", |
| "provider": r.provider, |
| "unipile_account_id": r.unipile_account_id, |
| "status": r.status or "unknown", |
| "auth_mode": r.auth_mode or "hosted", |
| "created_at": r.created_at.isoformat() if r.created_at else None, |
| } |
| ) |
| prof = (getattr(user_row, "mailbox_profile_display_name", None) or "").strip() if user_row else "" |
| default_ref = getattr(user_row, "default_mailbox_unipile_account_ref_id", None) if user_row else None |
| valid_default = None |
| if default_ref is not None and any(a["id"] == default_ref for a in accounts): |
| valid_default = default_ref |
| return { |
| "mailbox_profile_display_name": prof, |
| "default_mailbox_unipile_account_ref_id": valid_default, |
| "accounts": accounts, |
| } |
|
|
|
|
| @app.patch("/api/me/mailbox-prefs") |
| async def patch_me_mailbox_prefs( |
| body: UserMailboxPrefsPatch, |
| t: TenantContext = Depends(get_tenant_context), |
| ): |
| db = t.db |
| u = db.query(User).filter(User.id == t.user_id).first() |
| if not u: |
| raise HTTPException(status_code=404, detail="User not found") |
| patch = body.model_dump(exclude_unset=True) |
| if "mailbox_profile_display_name" in patch: |
| s = (patch["mailbox_profile_display_name"] or "").strip() |
| u.mailbox_profile_display_name = s[:200] if s else None |
| if "default_mailbox_unipile_account_ref_id" in patch: |
| rid = patch["default_mailbox_unipile_account_ref_id"] |
| if rid is None or rid == 0: |
| u.default_mailbox_unipile_account_ref_id = None |
| else: |
| acc = ( |
| db.query(UnipileAccount) |
| .filter( |
| UnipileAccount.id == rid, |
| UnipileAccount.tenant_id == t.tenant_id, |
| UnipileAccount.user_id == t.user_id, |
| UnipileAccount.provider.in_(_MAILBOX_PROVIDER_SET), |
| ) |
| .first() |
| ) |
| if not acc: |
| raise HTTPException(status_code=404, detail="Mailbox account not found for this user") |
| u.default_mailbox_unipile_account_ref_id = rid |
| db.commit() |
| return { |
| "ok": True, |
| "mailbox_profile_display_name": (u.mailbox_profile_display_name or ""), |
| "default_mailbox_unipile_account_ref_id": u.default_mailbox_unipile_account_ref_id, |
| } |
|
|
|
|
| @app.post("/api/unipile/mailbox/hosted-link") |
| async def create_unipile_mailbox_hosted_link( |
| body: UnipileMailboxHostedLinkRequest, |
| request: Request, |
| t: TenantContext = Depends(get_tenant_context), |
| ): |
| origin = (FRONTEND_ORIGIN or "").strip().rstrip("/") or _public_origin_from_request(request) |
| if not origin: |
| raise HTTPException(status_code=400, detail="Could not determine frontend origin for hosted auth.") |
| token = str(uuid.uuid4()) |
| expires_at = datetime.utcnow() + timedelta(minutes=20) |
| u = t.db.query(User).filter(User.id == t.user_id).first() |
| if u is not None and body.mailbox_profile_display_name is not None: |
| s = (body.mailbox_profile_display_name or "").strip() |
| if s: |
| u.mailbox_profile_display_name = s[:200] |
| state = UnipileHostedAuthState( |
| tenant_id=t.tenant_id, |
| user_id=t.user_id, |
| token=token, |
| provider="GOOGLE", |
| label=(body.label or "Mailbox").strip(), |
| expires_at=expires_at, |
| ) |
| t.db.add(state) |
| t.db.commit() |
| expires_iso = expires_at.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" |
| api_base, _ = _load_unipile_env() |
| payload = { |
| "type": "create", |
| "providers": ["GOOGLE"], |
| "api_url": api_base, |
| "expiresOn": expires_iso, |
| "notify_url": f"{origin}/api/unipile/mailbox/hosted-callback", |
| "success_redirect_url": f"{origin}/settings?mailbox=connected", |
| "failure_redirect_url": f"{origin}/settings?mailbox=failed", |
| "name": token, |
| } |
| response = _unipile_request("POST", "/api/v1/hosted/accounts/link", payload) |
| hosted_url = _safe_str(response.get("url") if isinstance(response, dict) else "") |
| if not hosted_url: |
| raise HTTPException(status_code=400, detail="UniPile did not return hosted auth URL") |
| return {"url": hosted_url} |
|
|
|
|
| @app.post("/api/unipile/mailbox/hosted-callback") |
| async def unipile_mailbox_hosted_callback(request: Request, db: Session = Depends(get_db)): |
| try: |
| body = await request.json() |
| except Exception: |
| raise HTTPException(status_code=400, detail="Expected JSON body") |
| name_token = _safe_str((body or {}).get("name")) |
| unipile_account_id = _safe_str((body or {}).get("account_id")) |
| status = _safe_str((body or {}).get("status") or "connected") |
| if not name_token: |
| raise HTTPException(status_code=400, detail="Missing name token") |
| if not unipile_account_id: |
| return {"ok": True, "ignored": True, "reason": "missing_account_id"} |
| account_profile = _try_fetch_unipile_account_profile(unipile_account_id) |
| merged_meta = {} |
| if isinstance(body, dict): |
| merged_meta.update(body) |
| if isinstance(account_profile, dict) and account_profile: |
| merged_meta["account_profile"] = account_profile |
| state = ( |
| db.query(UnipileHostedAuthState) |
| .filter( |
| UnipileHostedAuthState.token == name_token, |
| UnipileHostedAuthState.provider == "GOOGLE", |
| ) |
| .first() |
| ) |
| if not state: |
| return {"ok": True, "ignored": True, "reason": "unknown_token"} |
| if state.used_at is not None: |
| return {"ok": True, "ignored": True, "reason": "token_already_used"} |
| if state.expires_at and state.expires_at < datetime.utcnow(): |
| return {"ok": True, "ignored": True, "reason": "token_expired"} |
| existing = ( |
| db.query(UnipileAccount) |
| .filter( |
| UnipileAccount.tenant_id == state.tenant_id, |
| UnipileAccount.user_id == state.user_id, |
| UnipileAccount.provider.in_(_MAILBOX_PROVIDER_SET), |
| UnipileAccount.unipile_account_id == unipile_account_id, |
| ) |
| .first() |
| ) |
| user_row = db.query(User).filter(User.id == state.user_id).first() |
| box_name = _extract_unipile_identity(merged_meta)[0].strip() |
| if user_row is not None and box_name: |
| user_row.mailbox_profile_display_name = box_name[:200] |
| nice_label = (state.label or "").strip() or "Mailbox" |
| if box_name: |
| nice_label = f"{box_name}'s mailbox" |
| nice_label = nice_label[:255] |
| if existing: |
| existing.label = nice_label |
| existing.status = status or existing.status |
| existing.auth_mode = "hosted" |
| existing.metadata_json = merged_meta or body |
| existing.updated_at = datetime.utcnow() |
| if user_row is not None and getattr(user_row, "default_mailbox_unipile_account_ref_id", None) is None: |
| user_row.default_mailbox_unipile_account_ref_id = existing.id |
| else: |
| row = UnipileAccount( |
| tenant_id=state.tenant_id, |
| user_id=state.user_id, |
| label=nice_label, |
| provider="GOOGLE", |
| unipile_account_id=unipile_account_id, |
| status=status or "connected", |
| auth_mode="hosted", |
| metadata_json=merged_meta or body, |
| ) |
| db.add(row) |
| db.flush() |
| acc_ref_id = row.id |
| if user_row is not None and getattr(user_row, "default_mailbox_unipile_account_ref_id", None) is None: |
| user_row.default_mailbox_unipile_account_ref_id = acc_ref_id |
| state.used_at = datetime.utcnow() |
| db.commit() |
| return {"ok": True} |
|
|
|
|
| @app.get("/api/unipile/mailbox/accounts") |
| async def list_unipile_mailbox_accounts(t: TenantContext = Depends(get_tenant_context)): |
| rows = ( |
| t.db.query(UnipileAccount) |
| .filter( |
| UnipileAccount.tenant_id == t.tenant_id, |
| UnipileAccount.user_id == t.user_id, |
| UnipileAccount.provider.in_(_MAILBOX_PROVIDER_SET), |
| ) |
| .order_by(UnipileAccount.created_at.desc(), UnipileAccount.id.desc()) |
| .all() |
| ) |
| out = [] |
| for r in rows: |
| ident = _extract_unipile_identity(r.metadata_json or {}) |
| out.append( |
| { |
| "id": r.id, |
| "label": r.label or "Mailbox", |
| "display_name": _mailbox_account_display_name(r), |
| "avatar_url": ident[1] or "", |
| "provider": r.provider, |
| "unipile_account_id": r.unipile_account_id, |
| "status": r.status or "unknown", |
| "auth_mode": r.auth_mode or "hosted", |
| "created_at": r.created_at.isoformat() if r.created_at else None, |
| } |
| ) |
| return {"accounts": out} |
|
|
|
|
| @app.get("/api/unipile/linkedin/accounts") |
| async def list_unipile_linkedin_accounts(t: TenantContext = Depends(get_tenant_context)): |
| rows = ( |
| t.db.query(UnipileAccount) |
| .filter( |
| UnipileAccount.tenant_id == t.tenant_id, |
| UnipileAccount.user_id == t.user_id, |
| UnipileAccount.provider == "LINKEDIN", |
| ) |
| .order_by(UnipileAccount.created_at.desc(), UnipileAccount.id.desc()) |
| .all() |
| ) |
| acc_out = [] |
| for r in rows: |
| ident = _extract_unipile_identity(r.metadata_json or {}) |
| acc_out.append( |
| { |
| "id": r.id, |
| "label": r.label or "LinkedIn account", |
| "display_name": _linkedin_account_display_name(r), |
| "avatar_url": ident[1] or "", |
| "provider": r.provider, |
| "unipile_account_id": r.unipile_account_id, |
| "status": r.status or "unknown", |
| "auth_mode": r.auth_mode or "hosted", |
| "created_at": r.created_at.isoformat() if r.created_at else None, |
| } |
| ) |
| return {"accounts": acc_out} |
|
|
|
|
| @app.post("/api/unipile/linkedin/hosted-link") |
| async def create_unipile_linkedin_hosted_link( |
| body: UnipileHostedLinkRequest, |
| request: Request, |
| t: TenantContext = Depends(get_tenant_context), |
| ): |
| origin = (FRONTEND_ORIGIN or "").strip().rstrip("/") or _public_origin_from_request(request) |
| if not origin: |
| raise HTTPException(status_code=400, detail="Could not determine frontend origin for hosted auth.") |
| token = str(uuid.uuid4()) |
| expires_at = datetime.utcnow() + timedelta(minutes=20) |
| u = t.db.query(User).filter(User.id == t.user_id).first() |
| if u is not None and body.linkedin_profile_display_name is not None: |
| s = (body.linkedin_profile_display_name or "").strip() |
| if s: |
| u.linkedin_profile_display_name = s[:200] |
| state = UnipileHostedAuthState( |
| tenant_id=t.tenant_id, |
| user_id=t.user_id, |
| token=token, |
| provider="LINKEDIN", |
| label=(body.label or "LinkedIn Account").strip(), |
| expires_at=expires_at, |
| ) |
| t.db.add(state) |
| t.db.commit() |
|
|
| expires_iso = expires_at.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" |
| api_base, _ = _load_unipile_env() |
| payload = { |
| "type": "create", |
| "providers": ["LINKEDIN"], |
| "api_url": api_base, |
| "expiresOn": expires_iso, |
| "notify_url": f"{origin}/api/unipile/linkedin/hosted-callback", |
| "success_redirect_url": f"{origin}/settings?linkedin=connected", |
| "failure_redirect_url": f"{origin}/settings?linkedin=failed", |
| "name": token, |
| } |
| response = _unipile_request("POST", "/api/v1/hosted/accounts/link", payload) |
| hosted_url = _safe_str(response.get("url") if isinstance(response, dict) else "") |
| if not hosted_url: |
| raise HTTPException(status_code=400, detail="UniPile did not return hosted auth URL") |
| return {"url": hosted_url} |
|
|
|
|
| @app.post("/api/unipile/linkedin/hosted-callback") |
| async def unipile_linkedin_hosted_callback(request: Request, db: Session = Depends(get_db)): |
| """ |
| UniPile Hosted Auth notify_url callback. |
| No session is available on this route (server-to-server call), so we map by stored token in `name`. |
| """ |
| try: |
| body = await request.json() |
| except Exception: |
| raise HTTPException(status_code=400, detail="Expected JSON body") |
|
|
| name_token = _safe_str((body or {}).get("name")) |
| unipile_account_id = _safe_str((body or {}).get("account_id")) |
| status = _safe_str((body or {}).get("status") or "connected") |
| if not name_token: |
| raise HTTPException(status_code=400, detail="Missing name token") |
| if not unipile_account_id: |
| return {"ok": True, "ignored": True, "reason": "missing_account_id"} |
| account_profile = _try_fetch_unipile_account_profile(unipile_account_id) |
| merged_meta = {} |
| if isinstance(body, dict): |
| merged_meta.update(body) |
| if isinstance(account_profile, dict) and account_profile: |
| merged_meta["account_profile"] = account_profile |
|
|
| state = ( |
| db.query(UnipileHostedAuthState) |
| .filter( |
| UnipileHostedAuthState.token == name_token, |
| UnipileHostedAuthState.provider == "LINKEDIN", |
| ) |
| .first() |
| ) |
| if not state: |
| return {"ok": True, "ignored": True, "reason": "unknown_token"} |
| if state.used_at is not None: |
| return {"ok": True, "ignored": True, "reason": "token_already_used"} |
| if state.expires_at and state.expires_at < datetime.utcnow(): |
| return {"ok": True, "ignored": True, "reason": "token_expired"} |
|
|
| existing = ( |
| db.query(UnipileAccount) |
| .filter( |
| UnipileAccount.tenant_id == state.tenant_id, |
| UnipileAccount.user_id == state.user_id, |
| UnipileAccount.provider == "LINKEDIN", |
| UnipileAccount.unipile_account_id == unipile_account_id, |
| ) |
| .first() |
| ) |
| user_row = db.query(User).filter(User.id == state.user_id).first() |
| li_name = _extract_unipile_identity(merged_meta)[0].strip() |
| if user_row is not None and li_name: |
| user_row.linkedin_profile_display_name = li_name[:200] |
| if li_name: |
| nice_label = f"{li_name}'s LinkedIn account" |
| else: |
| nice_label = (state.label or "").strip() or "LinkedIn account" |
| nice_label = nice_label[:255] |
|
|
| if existing: |
| existing.label = nice_label |
| existing.status = status or existing.status |
| existing.auth_mode = "hosted" |
| existing.metadata_json = merged_meta or body |
| existing.updated_at = datetime.utcnow() |
| if user_row is not None and getattr(user_row, "default_unipile_account_ref_id", None) is None: |
| user_row.default_unipile_account_ref_id = existing.id |
| else: |
| row = UnipileAccount( |
| tenant_id=state.tenant_id, |
| user_id=state.user_id, |
| label=nice_label, |
| provider="LINKEDIN", |
| unipile_account_id=unipile_account_id, |
| status=status or "connected", |
| auth_mode="hosted", |
| metadata_json=merged_meta or body, |
| ) |
| db.add(row) |
| db.flush() |
| acc_ref_id = row.id |
| if user_row is not None and getattr(user_row, "default_unipile_account_ref_id", None) is None: |
| user_row.default_unipile_account_ref_id = acc_ref_id |
| state.used_at = datetime.utcnow() |
| db.commit() |
| return {"ok": True} |
|
|
|
|
| def _parse_unipile_webhook_datetime(raw) -> Optional[datetime]: |
| if not raw or not isinstance(raw, str): |
| return None |
| try: |
| s = raw.strip().replace("Z", "") |
| return datetime.fromisoformat(s) |
| except Exception: |
| return None |
|
|
|
|
| @app.post("/api/webhooks/unipile") |
| async def unipile_webhook(request: Request, db: Session = Depends(get_db)): |
| """ |
| UniPile webhooks (same URL for multiple UniPile registrations). |
| |
| Supported ``event`` values: |
| |
| * ``new_relation`` — LinkedIn invite accepted (source: users). Updates pending contacts. |
| * ``mail_opened``, ``mail_link_clicked`` — email tracking (source: ``email_tracking``). |
| * ``mail_received`` — mailbox email (source: ``email`` in UniPile API, “Email” / new-mail webhook in the dashboard). Replies matched via ``in_reply_to`` / ``tracking_id`` / ``label`` / RFC ``message_id``. |
| """ |
| if UNIPILE_WEBHOOK_SECRET: |
| auth_h = request.headers.get("Unipile-Auth") or request.headers.get("X-Unipile-Auth") or "" |
| if auth_h != UNIPILE_WEBHOOK_SECRET: |
| raise HTTPException(status_code=401, detail="Invalid webhook secret") |
|
|
| try: |
| body = await request.json() |
| except Exception: |
| raise HTTPException(status_code=400, detail="Expected JSON body") |
|
|
| event = _safe_str((body or {}).get("event")) |
| if UNIPILE_WEBHOOK_LOG_EVENTS and isinstance(body, dict): |
| ir = body.get("in_reply_to") |
| _unipile_webhook_log.info( |
| "UniPile webhook event=%s role=%s keys=%s in_reply_to_keys=%s", |
| event, |
| body.get("role"), |
| sorted(body.keys()), |
| sorted(ir.keys()) if isinstance(ir, dict) else None, |
| ) |
| now = datetime.utcnow() |
|
|
| if event == "new_relation": |
| account_id = _safe_str((body or {}).get("account_id")) |
| provider_id = _safe_str((body or {}).get("user_provider_id")) |
| if not account_id or not provider_id: |
| return {"ok": True, "ignored": True, "reason": "missing_ids"} |
|
|
| ua = ( |
| db.query(UnipileAccount) |
| .filter(UnipileAccount.unipile_account_id == account_id) |
| .first() |
| ) |
| if not ua: |
| return {"ok": True, "ignored": True, "reason": "unknown_account"} |
|
|
| contacts = ( |
| db.query(Contact) |
| .filter( |
| Contact.tenant_id == ua.tenant_id, |
| Contact.unipile_provider_id == provider_id, |
| Contact.source.in_(("apollo_csv", "linkedin_campaign")), |
| ) |
| .all() |
| ) |
| updated = 0 |
| for ct in contacts: |
| if ct.linkedin_connection_accepted_at: |
| continue |
| ct.linkedin_invite_pending = 0 |
| ct.linkedin_connection_accepted_at = now |
| updated += 1 |
| db.commit() |
| return {"ok": True, "updated_contacts": updated, "event": event} |
|
|
| if event in ("mail_opened", "mail_link_clicked"): |
| label = _safe_str((body or {}).get("label")) |
| parsed = parse_tracking_label(label) |
| if not parsed: |
| return {"ok": True, "ignored": True, "reason": "no_tracking_label"} |
| tenant_id, file_id, contact_id, step_slot = parsed |
| rec = ( |
| db.query(OutreachSendReceipt) |
| .filter( |
| OutreachSendReceipt.tenant_id == tenant_id, |
| OutreachSendReceipt.file_id == file_id, |
| OutreachSendReceipt.contact_id == contact_id, |
| OutreachSendReceipt.step_slot == step_slot, |
| OutreachSendReceipt.channel == "gmail", |
| ) |
| .first() |
| ) |
| if not rec: |
| return {"ok": True, "ignored": True, "reason": "receipt_not_found"} |
| when = _parse_unipile_webhook_datetime((body or {}).get("date")) or now |
| if not rec.opened_at: |
| rec.opened_at = when |
| try: |
| rec.open_count = int(rec.open_count or 0) + 1 |
| except (TypeError, ValueError): |
| rec.open_count = 1 |
| db.commit() |
| refresh_outreach_execution_stats_for_file(db, tenant_id, file_id) |
| db.commit() |
| return {"ok": True, "event": event, "updated_receipt_id": rec.id} |
|
|
| if event == "mail_received": |
| account_id = _safe_str((body or {}).get("account_id")) |
| role = _safe_str((body or {}).get("role")) |
| if role != "inbox": |
| return {"ok": True, "ignored": True, "reason": "not_inbox"} |
|
|
| ua = ( |
| db.query(UnipileAccount) |
| .filter(UnipileAccount.unipile_account_id == account_id) |
| .first() |
| ) |
| if not ua: |
| return {"ok": True, "ignored": True, "reason": "unknown_account"} |
|
|
| tenant_id = int(ua.tenant_id) |
| in_reply = (body or {}).get("in_reply_to") or {} |
| parent_id = _safe_str(in_reply.get("id")) |
| parent_rfc = normalize_smtp_message_id(_safe_str(in_reply.get("message_id"))) |
| trk = _safe_str((body or {}).get("tracking_id")) |
| label = _safe_str((body or {}).get("label")) |
|
|
| rec = None |
| if parent_id: |
| rec = ( |
| db.query(OutreachSendReceipt) |
| .filter( |
| OutreachSendReceipt.tenant_id == tenant_id, |
| OutreachSendReceipt.channel == "gmail", |
| OutreachSendReceipt.unipile_message_id == parent_id, |
| ) |
| .first() |
| ) |
| if not rec and parent_rfc: |
| rec = ( |
| db.query(OutreachSendReceipt) |
| .filter( |
| OutreachSendReceipt.tenant_id == tenant_id, |
| OutreachSendReceipt.channel == "gmail", |
| OutreachSendReceipt.smtp_message_id == parent_rfc, |
| ) |
| .first() |
| ) |
| if not rec and trk: |
| rec = ( |
| db.query(OutreachSendReceipt) |
| .filter( |
| OutreachSendReceipt.tenant_id == tenant_id, |
| OutreachSendReceipt.channel == "gmail", |
| or_( |
| OutreachSendReceipt.unipile_tracking_id == trk, |
| OutreachSendReceipt.unipile_message_id == trk, |
| ), |
| ) |
| .first() |
| ) |
| if not rec and label: |
| p2 = parse_tracking_label(label) |
| if p2: |
| tid2, fid2, cid2, slot2 = p2 |
| if tid2 == tenant_id: |
| rec = ( |
| db.query(OutreachSendReceipt) |
| .filter( |
| OutreachSendReceipt.tenant_id == tid2, |
| OutreachSendReceipt.file_id == fid2, |
| OutreachSendReceipt.contact_id == cid2, |
| OutreachSendReceipt.step_slot == slot2, |
| OutreachSendReceipt.channel == "gmail", |
| ) |
| .first() |
| ) |
|
|
| folders = (body or {}).get("folders") or [] |
| subj = (_safe_str((body or {}).get("subject")) or "").lower() |
| bounce_like = any("bounce" in str(f).lower() for f in folders if f) or "undeliverable" in subj |
| has_reply_parent = bool(in_reply and (in_reply.get("id") or in_reply.get("message_id"))) |
|
|
| if rec: |
| when = _parse_unipile_webhook_datetime((body or {}).get("date")) or now |
| if bounce_like and not rec.bounced_at: |
| rec.bounced_at = when |
| elif has_reply_parent and not bounce_like and not rec.replied_at: |
| rec.replied_at = when |
| db.commit() |
| refresh_outreach_execution_stats_for_file(db, rec.tenant_id, rec.file_id) |
| db.commit() |
| return {"ok": True, "event": event, "updated_receipt_id": rec.id} |
|
|
| return {"ok": True, "ignored": True, "reason": "no_matching_receipt"} |
|
|
| return {"ok": True, "ignored": True, "reason": "unsupported_event", "event": event or ""} |
|
|
|
|
| @app.get("/api/unipile/webhook-url-hint") |
| async def unipile_webhook_url_hint(request: Request): |
| """Public URL your UniPile webhooks should POST to (same host as this API).""" |
| root = _public_origin_from_request(request) |
| post_url = f"{root}/api/webhooks/unipile" |
| return { |
| "post_url": post_url, |
| "auth_header": "Set Unipile-Auth or X-Unipile-Auth to UNIPILE_WEBHOOK_SECRET on each webhook registration.", |
| "registrations": [ |
| { |
| "source": "users", |
| "events": ["new_relation"], |
| "doc": "https://developer.unipile.com/docs/detecting-accepted-invitations", |
| }, |
| { |
| "source": "email_tracking", |
| "events": ["mail_opened", "mail_link_clicked"], |
| "doc": "https://developer.unipile.com/docs/tracking-email", |
| }, |
| { |
| "source": "email", |
| "events": ["mail_received", "mail_sent", "mail_moved"], |
| "doc": "https://developer.unipile.com/docs/new-emails-webhook", |
| "dashboard_hint": "Create an **Email** (mailbox) webhook in the UniPile UI — not the **Messaging** (chat) webhook. Use the same callback URL.", |
| "note": "Reply attribution uses mail_received with role inbox and in_reply_to / tracking_id / label; enable mail_received for connected mail accounts.", |
| }, |
| { |
| "source": "messaging", |
| "events": [ |
| "message_received", |
| "message_read", |
| "message_reaction", |
| "message_edited", |
| "message_deleted", |
| "message_delivered", |
| ], |
| "doc": "https://developer.unipile.com/docs/new-messages-webhook", |
| "note": "LinkedIn/chat channels — not used for Gmail mail_received.", |
| }, |
| ], |
| "debug_logging": "Set UNIPILE_WEBHOOK_LOG_EVENTS=1 to log each webhook's event name, role, and JSON keys (no body content).", |
| "latency_note": "UniPile may deliver new_relation many hours after acceptance; LinkedIn has no real-time connection API.", |
| } |
|
|
|
|
| @app.post("/api/unipile/linkedin/connect") |
| async def connect_unipile_linkedin( |
| body: UnipileConnectRequest, |
| t: TenantContext = Depends(get_tenant_context), |
| ): |
| payload = {"provider": "LINKEDIN"} |
| if body.auth_mode == "credentials": |
| if not (body.username and body.password): |
| raise HTTPException(status_code=400, detail="username/password are required for credentials auth") |
| payload["username"] = body.username |
| payload["password"] = body.password |
| else: |
| if not body.access_token: |
| raise HTTPException(status_code=400, detail="access_token is required for cookie auth") |
| payload["access_token"] = body.access_token |
| if body.user_agent: |
| payload["user_agent"] = body.user_agent |
| if body.country: |
| payload["country"] = body.country |
| if body.ip: |
| payload["ip"] = body.ip |
|
|
| response = _unipile_request("POST", "/api/v1/accounts", payload) |
| account_id = _safe_str(response.get("account_id") if isinstance(response, dict) else "") or _safe_str( |
| response.get("id") if isinstance(response, dict) else "" |
| ) |
| if not account_id: |
| |
| account_id = _safe_str((response or {}).get("account", {}).get("id") if isinstance(response, dict) else "") |
| if not account_id: |
| raise HTTPException(status_code=400, detail="UniPile did not return an account_id") |
| account_profile = _try_fetch_unipile_account_profile(account_id) |
| merged_meta = response if isinstance(response, dict) else {"raw": str(response)} |
| if isinstance(merged_meta, dict) and account_profile: |
| merged_meta["account_profile"] = account_profile |
|
|
| db_row = UnipileAccount( |
| tenant_id=t.tenant_id, |
| user_id=t.user_id, |
| label=(body.label or "LinkedIn account").strip(), |
| provider="LINKEDIN", |
| unipile_account_id=account_id, |
| status=_safe_str(response.get("status") if isinstance(response, dict) else "") or "connected", |
| auth_mode=body.auth_mode, |
| metadata_json=merged_meta, |
| ) |
| t.db.add(db_row) |
| t.db.commit() |
| t.db.refresh(db_row) |
|
|
| return { |
| "message": "LinkedIn account connected", |
| "account": { |
| "id": db_row.id, |
| "label": db_row.label, |
| "unipile_account_id": db_row.unipile_account_id, |
| "status": db_row.status, |
| }, |
| "unipile_response": response, |
| } |
|
|
|
|
| @app.get("/api/linkedin-campaigns") |
| async def list_linkedin_campaigns(t: TenantContext = Depends(get_tenant_context)): |
| rows = ( |
| t.db.query(LinkedinCampaign, UnipileAccount) |
| .join(UnipileAccount, UnipileAccount.id == LinkedinCampaign.unipile_account_ref_id) |
| .filter( |
| LinkedinCampaign.tenant_id == t.tenant_id, |
| LinkedinCampaign.user_id == t.user_id, |
| UnipileAccount.tenant_id == t.tenant_id, |
| UnipileAccount.user_id == t.user_id, |
| ) |
| .order_by(LinkedinCampaign.created_at.desc(), LinkedinCampaign.id.desc()) |
| .all() |
| ) |
| return { |
| "campaigns": [ |
| { |
| "id": c.id, |
| "name": c.name, |
| "status": c.status, |
| "file_id": c.file_id, |
| "contact_count": c.contact_count or 0, |
| "prompt_template": c.prompt_template or "", |
| "unipile_account_ref_id": c.unipile_account_ref_id, |
| "unipile_account_label": a.label if a else "", |
| "created_at": c.created_at.isoformat() if c.created_at else None, |
| "executed_at": c.executed_at.isoformat() if c.executed_at else None, |
| "execution_result": c.execution_result, |
| "followup_interval_hours": c.followup_interval_hours if c.followup_interval_hours is not None else 72, |
| } |
| for c, a in rows |
| ] |
| } |
|
|
|
|
| @app.patch("/api/linkedin-campaigns/{campaign_id}") |
| async def patch_linkedin_campaign( |
| campaign_id: int, |
| body: LinkedinCampaignPatchRequest, |
| t: TenantContext = Depends(get_tenant_context), |
| ): |
| db = t.db |
| row = ( |
| db.query(LinkedinCampaign) |
| .filter( |
| LinkedinCampaign.tenant_id == t.tenant_id, |
| LinkedinCampaign.user_id == t.user_id, |
| LinkedinCampaign.id == campaign_id, |
| ) |
| .first() |
| ) |
| if not row: |
| raise HTTPException(status_code=404, detail="Campaign not found") |
| if body.followup_interval_hours is not None: |
| row.followup_interval_hours = body.followup_interval_hours |
| row.updated_at = datetime.utcnow() |
| db.commit() |
| return {"message": "Campaign updated", "followup_interval_hours": row.followup_interval_hours} |
|
|
|
|
| @app.post("/api/linkedin-campaigns") |
| async def create_linkedin_campaign( |
| body: LinkedinCampaignCreateRequest, |
| t: TenantContext = Depends(get_tenant_context), |
| ): |
| account = ( |
| t.db.query(UnipileAccount) |
| .filter( |
| UnipileAccount.tenant_id == t.tenant_id, |
| UnipileAccount.user_id == t.user_id, |
| UnipileAccount.id == body.unipile_account_ref_id, |
| ) |
| .first() |
| ) |
| if not account: |
| raise HTTPException(status_code=404, detail="LinkedIn account not found") |
| name = (body.name or "").strip() |
| if not name: |
| raise HTTPException(status_code=400, detail="Campaign name is required") |
| row = LinkedinCampaign( |
| tenant_id=t.tenant_id, |
| user_id=t.user_id, |
| name=name, |
| status="draft", |
| unipile_account_ref_id=body.unipile_account_ref_id, |
| ) |
| t.db.add(row) |
| t.db.commit() |
| t.db.refresh(row) |
| return {"campaign_id": row.id, "message": "Campaign created"} |
|
|
|
|
| @app.post("/api/linkedin-campaigns/{campaign_id}/upload-csv") |
| async def upload_linkedin_campaign_csv( |
| campaign_id: int, |
| file: UploadFile = File(...), |
| t: TenantContext = Depends(get_tenant_context), |
| ): |
| db = t.db |
| campaign = ( |
| db.query(LinkedinCampaign) |
| .filter( |
| LinkedinCampaign.tenant_id == t.tenant_id, |
| LinkedinCampaign.user_id == t.user_id, |
| LinkedinCampaign.id == campaign_id, |
| ) |
| .first() |
| ) |
| if not campaign: |
| raise HTTPException(status_code=404, detail="Campaign not found") |
|
|
| file_id = str(uuid.uuid4()) |
| file_path = UPLOAD_DIR / f"{file_id}.csv" |
| content = await file.read() |
| with open(file_path, "wb") as f: |
| f.write(content) |
| df = pd.read_csv(file_path) |
| contact_count = len(df) |
|
|
| db_file = UploadedFile( |
| tenant_id=t.tenant_id, |
| file_id=file_id, |
| filename=file.filename, |
| contact_count=contact_count, |
| file_path=str(file_path), |
| ) |
| db.add(db_file) |
|
|
| |
| if campaign.file_id: |
| db.query(Contact).filter( |
| Contact.tenant_id == t.tenant_id, |
| Contact.file_id == campaign.file_id, |
| Contact.source == "linkedin_campaign", |
| ).delete() |
| db.query(GeneratedSequence).filter( |
| GeneratedSequence.tenant_id == t.tenant_id, |
| GeneratedSequence.file_id == campaign.file_id, |
| GeneratedSequence.channel == "linkedin", |
| GeneratedSequence.product == campaign.name, |
| ).delete() |
|
|
| linkedin_profiles_detected = 0 |
| for idx, row in df.iterrows(): |
| row_dict = row.to_dict() |
| sanitized_raw_data = {str(k): _json_safe(v) for k, v in row_dict.items()} |
| li_raw = _pick_linkedin_url(sanitized_raw_data) |
| if _linkedin_public_identifier(li_raw): |
| linkedin_profiles_detected += 1 |
| contact = Contact( |
| tenant_id=t.tenant_id, |
| file_id=file_id, |
| row_index=idx + 1, |
| first_name=_pick_field(row_dict, ["first name", "firstname", "first_name"]), |
| last_name=_pick_field(row_dict, ["last name", "lastname", "last_name"]), |
| email=_pick_field(row_dict, ["email", "work email", "email address"]), |
| company=_pick_field(row_dict, ["company", "company name", "organization name", "account name"]), |
| title=_pick_field(row_dict, ["title", "job title"]), |
| source="linkedin_campaign", |
| raw_data=sanitized_raw_data, |
| ) |
| db.add(contact) |
|
|
| campaign.file_id = file_id |
| campaign.contact_count = contact_count |
| campaign.status = "draft" |
| campaign.updated_at = datetime.utcnow() |
| db.commit() |
| return { |
| "file_id": file_id, |
| "contact_count": contact_count, |
| "linkedin_profiles_detected": linkedin_profiles_detected, |
| "message": "Campaign CSV uploaded", |
| } |
|
|
|
|
| @app.post("/api/linkedin-campaigns/{campaign_id}/generate") |
| async def generate_linkedin_campaign_sequences( |
| campaign_id: int, |
| body: LinkedinCampaignGenerateRequest, |
| t: TenantContext = Depends(get_tenant_context), |
| ): |
| db = t.db |
| campaign = ( |
| db.query(LinkedinCampaign) |
| .filter( |
| LinkedinCampaign.tenant_id == t.tenant_id, |
| LinkedinCampaign.user_id == t.user_id, |
| LinkedinCampaign.id == campaign_id, |
| ) |
| .first() |
| ) |
| if not campaign: |
| raise HTTPException(status_code=404, detail="Campaign not found") |
| if not campaign.file_id: |
| raise HTTPException(status_code=400, detail="Upload a campaign CSV first") |
|
|
| db_file = ( |
| db.query(UploadedFile) |
| .filter(UploadedFile.tenant_id == t.tenant_id, UploadedFile.file_id == campaign.file_id) |
| .first() |
| ) |
| if not db_file: |
| raise HTTPException(status_code=404, detail="Campaign file not found") |
|
|
| df = pd.read_csv(db_file.file_path) |
| db.query(GeneratedSequence).filter( |
| GeneratedSequence.tenant_id == t.tenant_id, |
| GeneratedSequence.file_id == campaign.file_id, |
| GeneratedSequence.channel == "linkedin", |
| ).delete() |
|
|
| urow = db.query(User).filter(User.id == t.user_id).first() |
| li_sig = "" |
| if urow: |
| li_sig = (getattr(urow, "linkedin_profile_display_name", None) or "").strip() or (urow.name or "").strip() |
|
|
| sequence_id = 1 |
| generated_rows = 0 |
| for _, row in df.iterrows(): |
| contact = row.to_dict() |
| li_list = generate_linkedin_sequence( |
| contact, body.prompt_template, campaign.name, sender_linkedin_name=li_sig or None |
| ) |
| for seq_data in li_list: |
| db.add( |
| GeneratedSequence( |
| tenant_id=t.tenant_id, |
| file_id=campaign.file_id, |
| sequence_id=sequence_id, |
| email_number=seq_data["email_number"], |
| channel="linkedin", |
| first_name=seq_data["first_name"], |
| last_name=seq_data["last_name"], |
| email=seq_data["email"], |
| company=seq_data["company"], |
| title=seq_data.get("title", ""), |
| product=campaign.name, |
| subject=seq_data.get("subject") or "", |
| email_content=seq_data["email_content"], |
| ) |
| ) |
| generated_rows += 1 |
| sequence_id += 1 |
|
|
| campaign.prompt_template = body.prompt_template |
| campaign.status = "generated" |
| campaign.updated_at = datetime.utcnow() |
| db.commit() |
| return {"message": "LinkedIn sequences generated", "rows": generated_rows, "contacts": len(df)} |
|
|
|
|
| @app.get("/api/linkedin-campaigns/{campaign_id}/sequences") |
| async def get_linkedin_campaign_sequences(campaign_id: int, t: TenantContext = Depends(get_tenant_context)): |
| db = t.db |
| campaign = ( |
| db.query(LinkedinCampaign) |
| .filter( |
| LinkedinCampaign.tenant_id == t.tenant_id, |
| LinkedinCampaign.user_id == t.user_id, |
| LinkedinCampaign.id == campaign_id, |
| ) |
| .first() |
| ) |
| if not campaign: |
| raise HTTPException(status_code=404, detail="Campaign not found") |
| if not campaign.file_id: |
| return {"sequences": []} |
|
|
| rows = ( |
| db.query(GeneratedSequence) |
| .filter( |
| GeneratedSequence.tenant_id == t.tenant_id, |
| GeneratedSequence.file_id == campaign.file_id, |
| GeneratedSequence.channel == "linkedin", |
| ) |
| .order_by(GeneratedSequence.sequence_id, GeneratedSequence.email_number) |
| .all() |
| ) |
| return { |
| "sequences": [ |
| { |
| "id": r.id, |
| "sequence_id": r.sequence_id, |
| "email_number": r.email_number, |
| "first_name": r.first_name or "", |
| "last_name": r.last_name or "", |
| "email": r.email or "", |
| "company": r.company or "", |
| "title": r.title or "", |
| "subject": r.subject or "", |
| "message": r.email_content or "", |
| } |
| for r in rows |
| ] |
| } |
|
|
|
|
| @app.post("/api/linkedin-campaigns/{campaign_id}/execute") |
| async def execute_linkedin_campaign( |
| campaign_id: int, |
| body: LinkedinCampaignExecuteRequest, |
| t: TenantContext = Depends(get_tenant_context), |
| ): |
| """ |
| Execute LinkedIn campaign via UniPile. |
| |
| Sends a connection request using POST /users/invite with the first generated step as the |
| invite note (visible under LinkedIn → My Network → Manage invitations → Sent). |
| |
| UniPile expects the public /in/{{slug}} identifier plus account_id on profile lookup — not a raw URL path. |
| |
| If the invite cannot be sent (e.g. already connected), falls back to a 1:1 LinkedIn DM via chats API. |
| """ |
| db = t.db |
| campaign = ( |
| db.query(LinkedinCampaign) |
| .filter( |
| LinkedinCampaign.tenant_id == t.tenant_id, |
| LinkedinCampaign.user_id == t.user_id, |
| LinkedinCampaign.id == campaign_id, |
| ) |
| .first() |
| ) |
| if not campaign: |
| raise HTTPException(status_code=404, detail="Campaign not found") |
| account = ( |
| db.query(UnipileAccount) |
| .filter( |
| UnipileAccount.tenant_id == t.tenant_id, |
| UnipileAccount.user_id == t.user_id, |
| UnipileAccount.id == campaign.unipile_account_ref_id, |
| ) |
| .first() |
| ) |
| if not account: |
| raise HTTPException(status_code=404, detail="Connected LinkedIn account not found") |
| if not campaign.file_id: |
| raise HTTPException(status_code=400, detail="Campaign has no uploaded CSV") |
|
|
| contacts = ( |
| db.query(Contact) |
| .filter( |
| Contact.tenant_id == t.tenant_id, |
| Contact.file_id == campaign.file_id, |
| Contact.source == "linkedin_campaign", |
| ) |
| .order_by(Contact.row_index.asc(), Contact.id.asc()) |
| .all() |
| ) |
| seq_rows = ( |
| db.query(GeneratedSequence) |
| .filter( |
| GeneratedSequence.tenant_id == t.tenant_id, |
| GeneratedSequence.file_id == campaign.file_id, |
| GeneratedSequence.channel == "linkedin", |
| GeneratedSequence.email_number == 1, |
| ) |
| .order_by(GeneratedSequence.sequence_id.asc()) |
| .all() |
| ) |
| if not seq_rows: |
| raise HTTPException( |
| status_code=400, |
| detail="Generate LinkedIn sequences before executing the campaign.", |
| ) |
| msg_by_seq = {r.sequence_id: r for r in seq_rows} |
|
|
| invite_sent = 0 |
| dm_sent = 0 |
| dry_preview = 0 |
| skipped = 0 |
| failed = 0 |
| errors = [] |
| attempts = [] |
|
|
| acc_uid = account.unipile_account_id |
|
|
| def _contact_label(cr: Contact) -> str: |
| parts = [_safe_str(cr.first_name), _safe_str(cr.last_name)] |
| lab = " ".join(p for p in parts if p).strip() |
| return lab or "Contact" |
|
|
| for idx, c in enumerate(contacts, start=1): |
| first_msg = msg_by_seq.get(idx) |
| name_label = _contact_label(c) |
| if not first_msg: |
| skipped += 1 |
| attempts.append( |
| { |
| "contact_id": c.id, |
| "name": name_label, |
| "status": "skipped", |
| "reason": "no_generated_message", |
| } |
| ) |
| continue |
|
|
| raw_li = _pick_linkedin_url(c.raw_data or {}) |
| url_disp = _normalize_linkedin_url(raw_li) or _safe_str(raw_li) |
| slug = _linkedin_public_identifier(raw_li) |
|
|
| if not slug: |
| skipped += 1 |
| errors.append( |
| { |
| "contact_id": c.id, |
| "reason": "missing_linkedin_url", |
| "message": "No usable LinkedIn profile URL (expected Apollo-style Person Linkedin Url or /in/{{slug}}).", |
| } |
| ) |
| attempts.append( |
| { |
| "contact_id": c.id, |
| "name": name_label, |
| "linkedin_url": url_disp, |
| "status": "skipped", |
| "reason": "missing_or_invalid_linkedin_url", |
| } |
| ) |
| continue |
|
|
| note = _linkedin_invite_note(first_msg.email_content or "") |
|
|
| if body.dry_run: |
| dry_preview += 1 |
| attempts.append( |
| { |
| "contact_id": c.id, |
| "name": name_label, |
| "linkedin_url": url_disp, |
| "public_identifier": slug, |
| "status": "dry_run", |
| "detail": "Dry run: no UniPile calls. Uncheck the Dry run checkbox to send connection invites.", |
| "invite_note_preview": note[:180], |
| } |
| ) |
| continue |
|
|
| slug_enc = requests.utils.quote(slug, safe="") |
| acc_enc = requests.utils.quote(acc_uid, safe="") |
| st_prof, profile = _unipile_call("GET", f"/api/v1/users/{slug_enc}?account_id={acc_enc}") |
|
|
| if st_prof >= 400 or not isinstance(profile, dict): |
| failed += 1 |
| detail = profile if isinstance(profile, dict) else {} |
| msg = ( |
| detail.get("message") |
| or detail.get("detail") |
| or detail.get("error") |
| or (json.dumps(detail)[:800] if detail else "profile lookup failed") |
| ) |
| if isinstance(msg, (dict, list)): |
| msg = json.dumps(msg) |
| errors.append({"contact_id": c.id, "reason": "profile_lookup_failed", "message": str(msg)}) |
| attempts.append( |
| { |
| "contact_id": c.id, |
| "name": name_label, |
| "linkedin_url": url_disp, |
| "public_identifier": slug, |
| "status": "failed", |
| "step": "resolve_profile", |
| "detail": str(msg)[:500], |
| } |
| ) |
| continue |
|
|
| provider_id = _safe_str(profile.get("provider_id")) |
| rel_hint = _safe_str( |
| profile.get("network_distance") or profile.get("distance") or "" |
| ) |
| if not rel_hint and isinstance(profile.get("relation"), dict): |
| rel_hint = _safe_str((profile.get("relation") or {}).get("distance")) |
|
|
| if not provider_id: |
| failed += 1 |
| errors.append( |
| { |
| "contact_id": c.id, |
| "reason": "missing_provider_id", |
| "message": "UniPile profile response had no provider_id.", |
| } |
| ) |
| attempts.append( |
| { |
| "contact_id": c.id, |
| "name": name_label, |
| "linkedin_url": url_disp, |
| "public_identifier": slug, |
| "status": "failed", |
| "step": "resolve_profile", |
| "detail": "provider_id missing from UniPile profile payload", |
| } |
| ) |
| continue |
|
|
| st_inv, inv_res = _unipile_call( |
| "POST", |
| "/api/v1/users/invite", |
| {"provider_id": provider_id, "account_id": acc_uid, "message": note}, |
| ) |
|
|
| if st_inv < 400: |
| invite_sent += 1 |
| ts = datetime.utcnow() |
| c.unipile_provider_id = provider_id |
| c.linkedin_invite_pending = 1 |
| c.linkedin_invite_sent_at = ts |
| c.linkedin_followup_next_email_number = 2 |
| attempts.append( |
| { |
| "contact_id": c.id, |
| "name": name_label, |
| "linkedin_url": url_disp, |
| "public_identifier": slug, |
| "provider_id": provider_id, |
| "status": "invite_sent", |
| "network_distance": rel_hint or None, |
| "connection_hint": rel_hint or None, |
| } |
| ) |
| continue |
|
|
| inv_err = "" |
| if isinstance(inv_res, dict): |
| inv_err = _safe_str( |
| inv_res.get("message") or inv_res.get("detail") or inv_res.get("error") or "" |
| ) |
| if isinstance(inv_res.get("errors"), list): |
| inv_err = inv_err or json.dumps(inv_res.get("errors"))[:400] |
|
|
| try: |
| _send_unipile_dm(acc_uid, provider_id, first_msg.email_content or "") |
| dm_sent += 1 |
| ts = datetime.utcnow() |
| c.unipile_provider_id = provider_id |
| c.linkedin_invite_pending = 0 |
| c.linkedin_connection_accepted_at = ts |
| c.linkedin_last_followup_sent_at = ts |
| c.linkedin_followup_next_email_number = 2 |
| attempts.append( |
| { |
| "contact_id": c.id, |
| "name": name_label, |
| "linkedin_url": url_disp, |
| "public_identifier": slug, |
| "provider_id": provider_id, |
| "status": "message_sent", |
| "network_distance": rel_hint or None, |
| "connection_hint": rel_hint or None, |
| "fallback_from_invite_error": inv_err or None, |
| "detail": "Connection invite was not sent (see fallback_from_invite_error); sent LinkedIn DM instead.", |
| } |
| ) |
| except Exception as e: |
| failed += 1 |
| errors.append( |
| { |
| "contact_id": c.id, |
| "reason": "invite_and_dm_failed", |
| "message": f"invite: {inv_err}; dm: {str(e)}", |
| } |
| ) |
| attempts.append( |
| { |
| "contact_id": c.id, |
| "name": name_label, |
| "linkedin_url": url_disp, |
| "public_identifier": slug, |
| "status": "failed", |
| "step": "invite_and_message", |
| "invite_error": inv_err or None, |
| "detail": str(e), |
| } |
| ) |
|
|
| total_ok = dry_preview if body.dry_run else (invite_sent + dm_sent) |
| result = { |
| "dry_run": body.dry_run, |
| "dry_run_preview_count": dry_preview, |
| "linkedin_invites_sent": invite_sent, |
| "direct_messages_sent": dm_sent, |
| "sent": total_ok, |
| "skipped": skipped, |
| "failed": failed, |
| "errors": errors[:200], |
| "attempts": attempts[:500], |
| "execution_kind": "linkedin_connection_invite", |
| "help": ( |
| "Invites appear under LinkedIn → My Network → Manage invitations → Sent. " |
| "Follow-up steps are sent via POST /api/linkedin-campaigns/{id}/process-followups after acceptance " |
| "(UniPile users webhook new_relation marks acceptance; can lag hours)." |
| ), |
| } |
| campaign.status = "completed" if failed == 0 else "failed" |
| campaign.execution_result = result |
| campaign.executed_at = datetime.utcnow() |
| campaign.updated_at = datetime.utcnow() |
| db.commit() |
| return {"message": "Campaign execution finished", **result} |
|
|
|
|
| @app.post("/api/linkedin-campaigns/{campaign_id}/process-followups") |
| async def process_linkedin_campaign_followups( |
| campaign_id: int, |
| t: TenantContext = Depends(get_tenant_context), |
| ): |
| """ |
| Deliver generated sequence steps 2+ as LinkedIn DMs when due. |
| |
| * Invite flow: waits for `linkedin_connection_accepted_at` (set by UniPile `new_relation` webhook or DM fallback). |
| * First follow-up (step 2) after invite: `accepted_at + followup_interval_hours`. |
| * If execute fell back to DM for step 1: step 2 waits `last_followup_sent_at + interval`. |
| |
| Run this endpoint on a schedule (e.g. hourly cron); it only sends when the interval has elapsed. |
| """ |
| db = t.db |
| campaign = ( |
| db.query(LinkedinCampaign) |
| .filter( |
| LinkedinCampaign.tenant_id == t.tenant_id, |
| LinkedinCampaign.user_id == t.user_id, |
| LinkedinCampaign.id == campaign_id, |
| ) |
| .first() |
| ) |
| if not campaign: |
| raise HTTPException(status_code=404, detail="Campaign not found") |
| account = ( |
| db.query(UnipileAccount) |
| .filter( |
| UnipileAccount.tenant_id == t.tenant_id, |
| UnipileAccount.user_id == t.user_id, |
| UnipileAccount.id == campaign.unipile_account_ref_id, |
| ) |
| .first() |
| ) |
| if not account: |
| raise HTTPException(status_code=404, detail="Connected LinkedIn account not found") |
| if not campaign.file_id: |
| raise HTTPException(status_code=400, detail="Campaign has no uploaded CSV") |
|
|
| interval_h = campaign.followup_interval_hours if campaign.followup_interval_hours else 72 |
| now = datetime.utcnow() |
| contacts = ( |
| db.query(Contact) |
| .filter( |
| Contact.tenant_id == t.tenant_id, |
| Contact.file_id == campaign.file_id, |
| Contact.source == "linkedin_campaign", |
| ) |
| .order_by(Contact.row_index.asc(), Contact.id.asc()) |
| .all() |
| ) |
|
|
| sent = 0 |
| skipped = 0 |
| failed = 0 |
| details = [] |
|
|
| for c in contacts: |
| n = c.linkedin_followup_next_email_number |
| if not n or not c.unipile_provider_id: |
| skipped += 1 |
| continue |
|
|
| max_step = ( |
| db.query(func.max(GeneratedSequence.email_number)) |
| .filter( |
| GeneratedSequence.tenant_id == t.tenant_id, |
| GeneratedSequence.file_id == campaign.file_id, |
| GeneratedSequence.channel == "linkedin", |
| GeneratedSequence.sequence_id == c.row_index, |
| ) |
| .scalar() |
| ) or 0 |
|
|
| if max_step < 2: |
| skipped += 1 |
| continue |
|
|
| if n > max_step: |
| c.linkedin_followup_next_email_number = None |
| skipped += 1 |
| continue |
|
|
| if c.linkedin_invite_pending and not c.linkedin_connection_accepted_at: |
| skipped += 1 |
| continue |
|
|
| if n == 2: |
| if c.linkedin_last_followup_sent_at is None: |
| if not c.linkedin_connection_accepted_at: |
| skipped += 1 |
| continue |
| if now < c.linkedin_connection_accepted_at + timedelta(hours=interval_h): |
| skipped += 1 |
| continue |
| else: |
| if now < c.linkedin_last_followup_sent_at + timedelta(hours=interval_h): |
| skipped += 1 |
| continue |
| else: |
| if not c.linkedin_last_followup_sent_at: |
| skipped += 1 |
| continue |
| if now < c.linkedin_last_followup_sent_at + timedelta(hours=interval_h): |
| skipped += 1 |
| continue |
|
|
| msg_row = ( |
| db.query(GeneratedSequence) |
| .filter( |
| GeneratedSequence.tenant_id == t.tenant_id, |
| GeneratedSequence.file_id == campaign.file_id, |
| GeneratedSequence.channel == "linkedin", |
| GeneratedSequence.sequence_id == c.row_index, |
| GeneratedSequence.email_number == n, |
| ) |
| .first() |
| ) |
| if not msg_row: |
| skipped += 1 |
| continue |
|
|
| try: |
| _send_unipile_dm( |
| account.unipile_account_id, |
| c.unipile_provider_id, |
| msg_row.email_content or "", |
| ) |
| c.linkedin_last_followup_sent_at = now |
| c.linkedin_followup_next_email_number = n + 1 if n < max_step else None |
| sent += 1 |
| details.append({"contact_id": c.id, "step": n, "status": "sent"}) |
| except Exception as e: |
| failed += 1 |
| details.append({"contact_id": c.id, "step": n, "status": "failed", "error": str(e)}) |
|
|
| campaign.updated_at = datetime.utcnow() |
| db.commit() |
| return { |
| "message": "Follow-up pass finished", |
| "sent": sent, |
| "skipped": skipped, |
| "failed": failed, |
| "followup_interval_hours": interval_h, |
| "details": details[:200], |
| } |
|
|
|
|
| @app.get("/api/contact-fields") |
| async def contact_fields(t: TenantContext = Depends(get_tenant_context)): |
| """Return all available contact field names from uploaded Apollo rows.""" |
| db = t.db |
| contacts = db.query(Contact.raw_data).filter(Contact.tenant_id == t.tenant_id).all() |
| fields = set(["first_name", "last_name", "email", "company", "title", "file_id", "created_at"]) |
| for item in contacts: |
| raw = item[0] or {} |
| if isinstance(raw, dict): |
| for key in raw.keys(): |
| fields.add(str(key)) |
| return {"fields": sorted(fields)} |
|
|
|
|
| @app.get("/api/contacts") |
| async def list_contacts( |
| search: str = Query("", description="Search by name/email/company/title"), |
| filters: str = Query( |
| "", |
| description='JSON array of {field, op, value?, from_value?, to_value?}; combined with AND', |
| ), |
| field: str = Query("", description="Optional field name to filter (legacy single filter)"), |
| value: str = Query("", description="Optional field value to filter"), |
| op: str = Query("contains", description="Filter op: contains|equals|from|to|between"), |
| from_value: str = Query("", description="From value for range filters"), |
| to_value: str = Query("", description="To value for range filters"), |
| sort_by: str = Query("created_at", description="Sort field"), |
| sort_dir: str = Query("desc", description="Sort direction: asc|desc"), |
| limit: int = Query(200, ge=1, le=1000), |
| offset: int = Query(0, ge=0), |
| t: TenantContext = Depends(get_tenant_context), |
| ): |
| db = t.db |
| query = db.query(Contact).filter(Contact.tenant_id == t.tenant_id) |
| if search: |
| pattern = f"%{search}%" |
| query = query.filter( |
| (Contact.first_name.ilike(pattern)) |
| | (Contact.last_name.ilike(pattern)) |
| | (Contact.email.ilike(pattern)) |
| | (Contact.company.ilike(pattern)) |
| | (Contact.title.ilike(pattern)) |
| ) |
| contacts_all = query.order_by(Contact.created_at.desc(), Contact.id.desc()).all() |
|
|
| |
| if filters and filters.strip(): |
| try: |
| rules = json.loads(filters) |
| except json.JSONDecodeError: |
| raise HTTPException(status_code=400, detail="Invalid filters JSON") |
| if not isinstance(rules, list): |
| raise HTTPException(status_code=400, detail="filters must be a JSON array") |
| active_rules = [r for r in rules if isinstance(r, dict) and _safe_str(r.get("field"))] |
| if active_rules: |
| contacts_all = [ |
| c |
| for c in contacts_all |
| if all(_contact_matches_apollo_filter(c, r) for r in active_rules) |
| ] |
| elif field: |
| rule = { |
| "field": field, |
| "op": op, |
| "value": value, |
| "from_value": from_value, |
| "to_value": to_value, |
| } |
| contacts_all = [c for c in contacts_all if _contact_matches_apollo_filter(c, rule)] |
|
|
| |
| reverse = (sort_dir or "desc").lower() != "asc" |
| sort_field = sort_by or "created_at" |
|
|
| def sort_key(c: Contact): |
| val = _contact_value(c, sort_field) |
| num = _to_float(val) |
| if num is not None: |
| return (0, num) |
| dt = _to_datetime(val) |
| if dt is not None: |
| return (1, dt.timestamp()) |
| return (2, _safe_str(val).lower()) |
|
|
| contacts_all = sorted(contacts_all, key=sort_key, reverse=reverse) |
|
|
| total = len(contacts_all) |
| contacts = contacts_all[offset:offset + limit] |
| return { |
| "total": total, |
| "contacts": [ |
| { |
| "id": c.id, |
| "file_id": c.file_id, |
| "row_index": c.row_index, |
| "first_name": c.first_name or _pick_from_raw(c.raw_data, ["first name", "firstname", "first_name"]), |
| "last_name": c.last_name or _pick_from_raw(c.raw_data, ["last name", "lastname", "last_name"]), |
| "email": c.email or _pick_from_raw(c.raw_data, ["email", "work email", "email address", "secondary email", "tertiary email"]), |
| "company": c.company or _pick_from_raw(c.raw_data, ["company", "company name", "company name for emails", "organization name", "account name"]), |
| "title": c.title or _pick_from_raw(c.raw_data, ["title", "job title"]), |
| "source": c.source or "apollo_csv", |
| "created_at": c.created_at.isoformat() if c.created_at else None, |
| } |
| for c in contacts |
| ], |
| } |
|
|
|
|
| @app.post("/api/contacts") |
| async def create_contact(body: ContactCreateRequest, t: TenantContext = Depends(get_tenant_context)): |
| """Create a contact manually (inline table add). Email must be unique.""" |
| db = t.db |
| email = _safe_str(body.email).lower() |
| if not email: |
| raise HTTPException(status_code=400, detail="Email is required") |
| exists = ( |
| db.query(Contact) |
| .filter( |
| Contact.tenant_id == t.tenant_id, |
| func.lower(Contact.email) == email, |
| ) |
| .first() |
| ) |
| if exists: |
| raise HTTPException(status_code=409, detail="A contact with this email already exists") |
| fn = _safe_str(body.first_name) |
| ln = _safe_str(body.last_name) |
| co = _safe_str(body.company) |
| ti = _safe_str(body.title) |
| raw = { |
| "First Name": fn, |
| "Last Name": ln, |
| "Email": email, |
| "Company Name": co, |
| "Title": ti, |
| } |
| contact = Contact( |
| tenant_id=t.tenant_id, |
| file_id=MANUAL_CONTACT_FILE_ID, |
| row_index=0, |
| first_name=fn, |
| last_name=ln, |
| email=email, |
| company=co, |
| title=ti, |
| source="manual", |
| raw_data=raw, |
| ) |
| db.add(contact) |
| db.commit() |
| db.refresh(contact) |
| return { |
| "id": contact.id, |
| "file_id": contact.file_id, |
| "row_index": contact.row_index, |
| "first_name": contact.first_name, |
| "last_name": contact.last_name, |
| "email": contact.email, |
| "company": contact.company, |
| "title": contact.title, |
| "source": contact.source or "manual", |
| "created_at": contact.created_at.isoformat() if contact.created_at else None, |
| } |
|
|
|
|
| @app.post("/api/contacts/bulk-delete") |
| async def bulk_delete_contacts(body: BulkContactIdsRequest, t: TenantContext = Depends(get_tenant_context)): |
| db = t.db |
| if not body.contact_ids: |
| raise HTTPException(status_code=400, detail="contact_ids required") |
| ids = list({int(x) for x in body.contact_ids}) |
| deleted = ( |
| db.query(Contact) |
| .filter(Contact.tenant_id == t.tenant_id, Contact.id.in_(ids)) |
| .delete(synchronize_session=False) |
| ) |
| db.commit() |
| return {"deleted": deleted} |
|
|
|
|
| @app.post("/api/contacts/seed-demo") |
| async def seed_demo_contacts(t: TenantContext = Depends(get_tenant_context)): |
| """ |
| Replace previous demo-seeded contacts and insert a variety of sample rows |
| (rich Apollo-style raw_data) for testing filters and UI. |
| """ |
| db = t.db |
| tenant_id = t.tenant_id |
| removed = ( |
| db.query(Contact) |
| .filter( |
| Contact.tenant_id == tenant_id, |
| Contact.file_id == DEMO_CONTACTS_FILE_ID, |
| ) |
| .delete(synchronize_session=False) |
| ) |
| specs = [ |
| { |
| "fn": "Morgan", |
| "ln": "Lee", |
| "email": "morgan.lee@blueharbor.demo", |
| "co": "Blue Harbor Analytics", |
| "ti": "Chief Revenue Officer", |
| "raw": { |
| "Company Name": "Blue Harbor Analytics", |
| "Company Name for Emails": "BlueHarbor", |
| "Industry": "Business intelligence & analytics software", |
| "# Employees": "51-200", |
| "Annual Revenue": "$8,200,000", |
| "Last Raised At": "2023-11", |
| "Website": "https://www.blueharbor-analytics.demo", |
| "City": "Seattle", |
| "State": "WA", |
| "Country": "United States", |
| }, |
| }, |
| { |
| "fn": "Priya", |
| "ln": "Shah", |
| "email": "priya.shah@lumengrid.demo", |
| "co": "LumenGrid Energy", |
| "ti": "Director of Procurement", |
| "raw": { |
| "Company Name": "LumenGrid Energy", |
| "Industry": "Renewable energy infrastructure", |
| "# Employees": "201-500", |
| "Annual Revenue": "$45,000,000", |
| "Website": "https://lumengrid.demo", |
| "City": "Toronto", |
| "State": "ON", |
| "Country": "Canada", |
| }, |
| }, |
| { |
| "fn": "Diego", |
| "ln": "Ramos", |
| "email": "diego.ramos@vertexledger.demo", |
| "co": "Vertex Ledger", |
| "ti": "Controller", |
| "raw": { |
| "Company Name": "Vertex Ledger", |
| "Industry": "Enterprise document and content management software", |
| "# Employees": "11-50", |
| "Annual Revenue": "$3,100,000", |
| "Website": "https://vertexledger.demo", |
| "City": "Austin", |
| "State": "TX", |
| "Country": "United States", |
| }, |
| }, |
| { |
| "fn": "Elena", |
| "ln": "Volkov", |
| "email": "elena.volkov@northpole.demo", |
| "co": "Northpole Freight Co.", |
| "ti": "Operations Manager", |
| "raw": { |
| "Company Name": "Northpole Freight Co.", |
| "Industry": "Logistics & Supply Chain", |
| "# Employees": "501-1000", |
| "Annual Revenue": "$120,000,000", |
| "Website": "https://northpole-freight.demo", |
| "City": "Hamburg", |
| "State": "", |
| "Country": "Germany", |
| }, |
| }, |
| { |
| "fn": "James", |
| "ln": "Okonkwo", |
| "email": "james.okonkwo@silverfin.demo", |
| "co": "Silverfin Capital", |
| "ti": "Investment Analyst", |
| "raw": { |
| "Company Name": "Silverfin Capital", |
| "Industry": "Financial services", |
| "# Employees": "51-200", |
| "Annual Revenue": "$22,500,000", |
| "Website": "https://silverfin.demo", |
| "City": "London", |
| "State": "", |
| "Country": "United Kingdom", |
| }, |
| }, |
| { |
| "fn": "Yuki", |
| "ln": "Tanaka", |
| "email": "yuki.tanaka@skyforge.demo", |
| "co": "Skyforge Robotics", |
| "ti": "Head of Engineering", |
| "raw": { |
| "Company Name": "Skyforge Robotics", |
| "Industry": "Industrial automation", |
| "# Employees": "201-500", |
| "Annual Revenue": "$67,000,000", |
| "Website": "https://skyforge.demo", |
| "City": "Osaka", |
| "State": "", |
| "Country": "Japan", |
| }, |
| }, |
| { |
| "fn": "Amira", |
| "ln": "Hassan", |
| "email": "amira.hassan@desertwave.demo", |
| "co": "Desert Wave Telecom", |
| "ti": "VP Customer Success", |
| "raw": { |
| "Company Name": "Desert Wave Telecom", |
| "Industry": "Telecommunications", |
| "# Employees": "1001-5000", |
| "Annual Revenue": "$410,000,000", |
| "Website": "https://desertwave.demo", |
| "City": "Dubai", |
| "State": "", |
| "Country": "United Arab Emirates", |
| }, |
| }, |
| { |
| "fn": "Chris", |
| "ln": "Martinez", |
| "email": "chris.martinez@cascade.demo", |
| "co": "Cascade Health Systems", |
| "ti": "Clinical Director", |
| "raw": { |
| "Company Name": "Cascade Health Systems", |
| "Industry": "Healthcare IT", |
| "# Employees": "501-1000", |
| "Annual Revenue": "$95,000,000", |
| "Website": "https://cascade-health.demo", |
| "City": "Denver", |
| "State": "CO", |
| "Country": "United States", |
| }, |
| }, |
| { |
| "fn": "Sofia", |
| "ln": "Andersson", |
| "email": "sofia.andersson@fjord.demo", |
| "co": "Fjord Maritime AS", |
| "ti": "Fleet Coordinator", |
| "raw": { |
| "Company Name": "Fjord Maritime AS", |
| "Industry": "Shipping & maritime", |
| "# Employees": "51-200", |
| "Annual Revenue": "$18,000,000", |
| "Website": "https://fjord-maritime.demo", |
| "City": "Bergen", |
| "State": "", |
| "Country": "Norway", |
| }, |
| }, |
| { |
| "fn": "Marcus", |
| "ln": "Webbe", |
| "email": "marcus.webbe@pixelloft.demo", |
| "co": "Pixelloft Creative Agency", |
| "ti": "Creative Director", |
| "raw": { |
| "Company Name": "Pixelloft Creative Agency", |
| "Industry": "Marketing & advertising", |
| "# Employees": "11-50", |
| "Annual Revenue": "$1,800,000", |
| "Website": "https://pixelloft.demo", |
| "City": "Melbourne", |
| "State": "VIC", |
| "Country": "Australia", |
| }, |
| }, |
| ] |
| for i, s in enumerate(specs): |
| rd = dict(s["raw"]) |
| rd["Title"] = s["ti"] |
| rd["Email"] = s["email"] |
| rd["First Name"] = s["fn"] |
| rd["Last Name"] = s["ln"] |
| row = Contact( |
| tenant_id=tenant_id, |
| file_id=DEMO_CONTACTS_FILE_ID, |
| row_index=i, |
| first_name=s["fn"], |
| last_name=s["ln"], |
| email=s["email"], |
| company=s["co"], |
| title=s["ti"], |
| source="demo", |
| raw_data=rd, |
| ) |
| db.add(row) |
| db.commit() |
| return { |
| "ok": True, |
| "removed_previous_demo_rows": removed, |
| "inserted": len(specs), |
| } |
|
|
|
|
| @app.post("/api/contacts/bulk-convert-to-leads") |
| async def bulk_convert_contacts_to_leads(body: BulkContactIdsRequest, t: TenantContext = Depends(get_tenant_context)): |
| """ |
| For each contact: create a CrmLead (campaign «Contacts») copied from the contact. |
| Contacts are not removed; they remain until explicitly deleted. Fails per-row if the |
| contact has no email or a lead with the same email already exists. |
| """ |
| db = t.db |
| if not body.contact_ids: |
| raise HTTPException(status_code=400, detail="contact_ids required") |
| converted = 0 |
| errors: List[dict] = [] |
| for cid in body.contact_ids: |
| contact = ( |
| db.query(Contact) |
| .filter(Contact.tenant_id == t.tenant_id, Contact.id == int(cid)) |
| .first() |
| ) |
| if not contact: |
| errors.append({"contact_id": cid, "error": "not found"}) |
| continue |
| r = _convert_contact_to_lead_core(db, contact) |
| if not r["ok"]: |
| errors.append({"contact_id": cid, "error": r.get("error", "failed")}) |
| else: |
| converted += 1 |
| db.commit() |
| return {"converted": converted, "errors": errors} |
|
|
|
|
| @app.get("/api/company-names") |
| async def search_company_names( |
| q: str = Query("", description="Substring match on Contact.company"), |
| limit: int = Query(25, ge=1, le=100), |
| t: TenantContext = Depends(get_tenant_context), |
| ): |
| """Distinct company strings from contacts for deal/account linking.""" |
| db = t.db |
| raw_q = _safe_str(q) |
| pattern = f"%{raw_q}%" if raw_q else "%" |
| rows = ( |
| db.query(Contact.company) |
| .filter( |
| Contact.tenant_id == t.tenant_id, |
| Contact.company.isnot(None), |
| Contact.company != "", |
| Contact.company.ilike(pattern), |
| ) |
| .distinct() |
| .order_by(Contact.company.asc()) |
| .limit(limit * 4) |
| .all() |
| ) |
| names: List[str] = [] |
| seen = set() |
| for (co,) in rows: |
| s = _safe_str(co) |
| if not s: |
| continue |
| lk = s.lower() |
| if lk in seen: |
| continue |
| seen.add(lk) |
| names.append(s) |
| if len(names) >= limit: |
| break |
| return {"names": names} |
|
|
|
|
| @app.get("/api/contacts/{contact_id}") |
| async def get_contact(contact_id: int, t: TenantContext = Depends(get_tenant_context)): |
| db = t.db |
| contact = ( |
| db.query(Contact) |
| .filter(Contact.tenant_id == t.tenant_id, Contact.id == contact_id) |
| .first() |
| ) |
| if not contact: |
| raise HTTPException(status_code=404, detail="Contact not found") |
| return { |
| "id": contact.id, |
| "file_id": contact.file_id, |
| "row_index": contact.row_index, |
| "first_name": contact.first_name or _pick_from_raw(contact.raw_data, ["first name", "firstname", "first_name"]), |
| "last_name": contact.last_name or _pick_from_raw(contact.raw_data, ["last name", "lastname", "last_name"]), |
| "email": contact.email or _pick_from_raw(contact.raw_data, ["email", "work email", "email address", "secondary email", "tertiary email"]), |
| "company": contact.company or _pick_from_raw(contact.raw_data, ["company", "company name", "company name for emails", "organization name", "account name"]), |
| "title": contact.title or _pick_from_raw(contact.raw_data, ["title", "job title"]), |
| "source": contact.source or "apollo_csv", |
| "created_at": contact.created_at.isoformat() if contact.created_at else None, |
| "raw_data": contact.raw_data or {}, |
| } |
|
|
|
|
| |
| COMPANY_DETAIL_PATCH_FIELDS = { |
| "company_name_for_emails": "Company Name for Emails", |
| "industry": "Industry", |
| "employees": "# Employees", |
| "annual_revenue": "Annual Revenue", |
| "last_raised_at": "Last Raised At", |
| "website": "Website", |
| "city": "City", |
| "state": "State", |
| "country_region": "Country", |
| } |
|
|
|
|
| def _contact_company_details_dict(c: Contact) -> dict: |
| rd = c.raw_data or {} |
| return { |
| "Company Name": _safe_str(c.company or rd.get("Company Name")), |
| "Company Name for Emails": _safe_str(rd.get("Company Name for Emails")), |
| "Industry": _safe_str(rd.get("Industry")), |
| "# Employees": _safe_str(rd.get("# Employees")), |
| "Annual Revenue": _safe_str(rd.get("Annual Revenue")), |
| "Last Raised At": _safe_str(rd.get("Last Raised At")), |
| "Website": _safe_str(rd.get("Website")), |
| "City": _safe_str(rd.get("City")), |
| "State": _safe_str(rd.get("State")), |
| "Country": _safe_str(rd.get("Country")), |
| } |
|
|
|
|
| def _merge_contact_raw_company_details(contact: Contact, patch: dict) -> None: |
| rd = dict(contact.raw_data or {}) |
| for py_key, raw_key in COMPANY_DETAIL_PATCH_FIELDS.items(): |
| if py_key in patch: |
| rd[raw_key] = _safe_str(patch[py_key]) |
| contact.raw_data = rd |
|
|
|
|
| def _lead_company_details_dict(row: CrmLead) -> dict: |
| rw = row.raw_webhook if isinstance(row.raw_webhook, dict) else {} |
| cd = rw.get("company_details") if isinstance(rw.get("company_details"), dict) else {} |
| return { |
| "Company Name": _safe_str(row.company_name or cd.get("Company Name")), |
| "Company Name for Emails": _safe_str(cd.get("Company Name for Emails")), |
| "Industry": _safe_str(cd.get("Industry")), |
| "# Employees": _safe_str(cd.get("# Employees")), |
| "Annual Revenue": _safe_str(cd.get("Annual Revenue")), |
| "Last Raised At": _safe_str(cd.get("Last Raised At")), |
| "Website": _safe_str(cd.get("Website")), |
| "City": _safe_str(cd.get("City")), |
| "State": _safe_str(cd.get("State")), |
| "Country": _safe_str(cd.get("Country")), |
| } |
|
|
|
|
| def _deal_fallback_company_details(d: CrmDeal) -> dict: |
| return { |
| "Company Name": _safe_str(d.account_name), |
| "Company Name for Emails": "", |
| "Industry": "", |
| "# Employees": "", |
| "Annual Revenue": "", |
| "Last Raised At": "", |
| "Website": "", |
| "City": "", |
| "State": "", |
| "Country": _safe_str(d.country), |
| } |
|
|
|
|
| def _format_contact_display(contact: Contact) -> str: |
| fn = _safe_str(contact.first_name) |
| ln = _safe_str(contact.last_name) |
| person = " ".join(filter(None, [fn, ln])).strip() |
| return person or _safe_str(contact.email) |
|
|
|
|
| def _enrich_deal_response(db: Session, row: CrmDeal) -> dict: |
| out = _deal_to_dict(row) |
| if row.owner_user_id: |
| ou = db.query(User).filter(User.id == row.owner_user_id).first() |
| if ou: |
| out["owner_display_name"] = (ou.name or ou.email or "").strip() or None |
| if not (out.get("owner_initials") or "").strip(): |
| out["owner_initials"] = _user_initials_from_user(ou) |
| else: |
| out["owner_display_name"] = None |
| out["linked_contact"] = None |
| if row.contact_id: |
| c = ( |
| db.query(Contact) |
| .filter( |
| Contact.tenant_id == row.tenant_id, |
| Contact.id == row.contact_id, |
| ) |
| .first() |
| ) |
| if c: |
| out["company_details"] = _contact_company_details_dict(c) |
| out["company_details_contact_id"] = c.id |
| out["linked_contact"] = { |
| "id": c.id, |
| "first_name": c.first_name or "", |
| "last_name": c.last_name or "", |
| "email": c.email or "", |
| "title": c.title or "", |
| } |
| else: |
| out["company_details"] = _deal_fallback_company_details(row) |
| else: |
| out["company_details"] = _deal_fallback_company_details(row) |
| return out |
|
|
|
|
| def _sync_contact_raw_from_columns(contact: Contact) -> None: |
| rd = dict(contact.raw_data or {}) |
| rd["First Name"] = contact.first_name or "" |
| rd["Last Name"] = contact.last_name or "" |
| rd["Email"] = (contact.email or "").strip() |
| rd["Company Name"] = contact.company or "" |
| rd["Title"] = contact.title or "" |
| contact.raw_data = rd |
|
|
|
|
| @app.patch("/api/contacts/{contact_id}") |
| async def patch_contact(contact_id: int, body: ContactPatchRequest, t: TenantContext = Depends(get_tenant_context)): |
| db = t.db |
| contact = ( |
| db.query(Contact) |
| .filter(Contact.tenant_id == t.tenant_id, Contact.id == contact_id) |
| .first() |
| ) |
| if not contact: |
| raise HTTPException(status_code=404, detail="Contact not found") |
| data = body.model_dump(exclude_unset=True) |
| if not data: |
| raise HTTPException(status_code=400, detail="No fields to update") |
| if "email" in data: |
| email = _safe_str(data["email"]).lower() |
| if not email: |
| raise HTTPException(status_code=400, detail="Email cannot be empty") |
| taken = ( |
| db.query(Contact) |
| .filter( |
| Contact.tenant_id == t.tenant_id, |
| Contact.id != contact_id, |
| func.lower(Contact.email) == email, |
| ) |
| .first() |
| ) |
| if taken: |
| raise HTTPException(status_code=409, detail="Another contact already uses this email") |
| contact.email = email |
| if "first_name" in data: |
| contact.first_name = _safe_str(data["first_name"]) |
| if "last_name" in data: |
| contact.last_name = _safe_str(data["last_name"]) |
| if "company" in data: |
| contact.company = _safe_str(data["company"]) |
| if "title" in data: |
| contact.title = _safe_str(data["title"]) |
| _sync_contact_raw_from_columns(contact) |
| _merge_contact_raw_company_details(contact, data) |
| db.commit() |
| db.refresh(contact) |
| return { |
| "id": contact.id, |
| "file_id": contact.file_id, |
| "row_index": contact.row_index, |
| "first_name": contact.first_name or _pick_from_raw(contact.raw_data, ["first name", "firstname", "first_name"]), |
| "last_name": contact.last_name or _pick_from_raw(contact.raw_data, ["last name", "lastname", "last_name"]), |
| "email": contact.email or _pick_from_raw(contact.raw_data, ["email", "work email", "email address", "secondary email", "tertiary email"]), |
| "company": contact.company or _pick_from_raw(contact.raw_data, ["company", "company name", "company name for emails", "organization name", "account name"]), |
| "title": contact.title or _pick_from_raw(contact.raw_data, ["title", "job title"]), |
| "source": contact.source or "apollo_csv", |
| "created_at": contact.created_at.isoformat() if contact.created_at else None, |
| "raw_data": contact.raw_data or {}, |
| } |
|
|
|
|
| @app.post("/api/contacts/{contact_id}/enrich") |
| async def enrich_contact(contact_id: int, t: TenantContext = Depends(get_tenant_context)): |
| """ |
| GPT-enrich company/contact fields for manually added contacts only. |
| |
| Enrichment (gpt_service): multi-page crawl + schema.org JSON-LD on your domain; |
| Google Custom Search (GOOGLE_API_KEY + GOOGLE_CSE_ID); optional Gemini with |
| Google Search grounding (GEMINI_API_KEY); DuckDuckGo fallback; |
| ENRICHMENT_CONTACT_EMAIL helps Wikipedia. |
| """ |
| db = t.db |
| if not os.getenv("OPENAI_API_KEY"): |
| raise HTTPException(status_code=503, detail="OpenAI API key is not configured") |
| contact = ( |
| db.query(Contact) |
| .filter(Contact.tenant_id == t.tenant_id, Contact.id == contact_id) |
| .first() |
| ) |
| if not contact: |
| raise HTTPException(status_code=404, detail="Contact not found") |
| src = (contact.source or "").strip().lower() |
| if src != "manual" and contact.file_id != MANUAL_CONTACT_FILE_ID: |
| raise HTTPException( |
| status_code=400, |
| detail="Enrich is only available for manually added contacts", |
| ) |
|
|
| seed = { |
| "first_name": contact.first_name |
| or _pick_from_raw(contact.raw_data, ["first name", "firstname", "first_name"]), |
| "last_name": contact.last_name |
| or _pick_from_raw(contact.raw_data, ["last name", "lastname", "last_name"]), |
| "email": contact.email |
| or _pick_from_raw( |
| contact.raw_data, |
| ["email", "work email", "email address", "secondary email", "tertiary email"], |
| ), |
| "company": contact.company |
| or _pick_from_raw( |
| contact.raw_data, |
| [ |
| "company", |
| "company name", |
| "company name for emails", |
| "organization name", |
| "account name", |
| ], |
| ), |
| "title": contact.title |
| or _pick_from_raw(contact.raw_data, ["title", "job title"]), |
| "website_hint": _pick_from_raw(contact.raw_data, ["website", "Website"]), |
| } |
| try: |
| merged, enrichment_report = enrich_manual_contact_profile(seed) |
| except RuntimeError as e: |
| raise HTTPException(status_code=503, detail=str(e)) from e |
| except Exception as e: |
| raise HTTPException(status_code=502, detail=f"Enrichment failed: {e}") from e |
|
|
| contact.first_name = _safe_str(merged.get("first_name")) or contact.first_name |
| contact.last_name = _safe_str(merged.get("last_name")) or contact.last_name |
| contact.title = _safe_str(merged.get("title")) or contact.title |
| contact.company = _safe_str(merged.get("company_name")) or contact.company |
|
|
| rd = dict(contact.raw_data or {}) |
| rd["First Name"] = contact.first_name |
| rd["Last Name"] = contact.last_name |
| rd["Title"] = contact.title |
| rd["Email"] = _safe_str(contact.email) |
| if contact.company: |
| rd["Company Name"] = contact.company |
| optional_raw = [ |
| ("company_name_for_emails", "Company Name for Emails"), |
| ("industry", "Industry"), |
| ("employees", "# Employees"), |
| ("annual_revenue", "Annual Revenue"), |
| ("last_raised_at", "Last Raised At"), |
| ("website", "Website"), |
| ("city", "City"), |
| ("state", "State"), |
| ("country", "Country"), |
| ] |
| for gpt_key, raw_key in optional_raw: |
| v = _safe_str(merged.get(gpt_key)) |
| if v: |
| rd[raw_key] = v |
| contact.raw_data = rd |
| db.commit() |
| db.refresh(contact) |
| return { |
| "id": contact.id, |
| "file_id": contact.file_id, |
| "row_index": contact.row_index, |
| "first_name": contact.first_name |
| or _pick_from_raw(contact.raw_data, ["first name", "firstname", "first_name"]), |
| "last_name": contact.last_name |
| or _pick_from_raw(contact.raw_data, ["last name", "lastname", "last_name"]), |
| "email": contact.email |
| or _pick_from_raw( |
| contact.raw_data, |
| ["email", "work email", "email address", "secondary email", "tertiary email"], |
| ), |
| "company": contact.company |
| or _pick_from_raw( |
| contact.raw_data, |
| ["company", "company name", "company name for emails", "organization name", "account name"], |
| ), |
| "title": contact.title or _pick_from_raw(contact.raw_data, ["title", "job title"]), |
| "source": contact.source or "manual", |
| "created_at": contact.created_at.isoformat() if contact.created_at else None, |
| "raw_data": contact.raw_data or {}, |
| "enrichment_report": enrichment_report, |
| } |
|
|
|
|
| @app.get("/api/contacts/{contact_id}/sequences") |
| async def get_contact_sequences(contact_id: int, t: TenantContext = Depends(get_tenant_context)): |
| db = t.db |
| contact = ( |
| db.query(Contact) |
| .filter(Contact.tenant_id == t.tenant_id, Contact.id == contact_id) |
| .first() |
| ) |
| if not contact: |
| raise HTTPException(status_code=404, detail="Contact not found") |
|
|
| if not contact.email: |
| return {"contact_id": contact_id, "sequences": []} |
|
|
| sequences = ( |
| db.query(GeneratedSequence) |
| .filter( |
| GeneratedSequence.tenant_id == t.tenant_id, |
| GeneratedSequence.file_id == contact.file_id, |
| GeneratedSequence.email == contact.email, |
| ) |
| .order_by(GeneratedSequence.sequence_id, GeneratedSequence.email_number) |
| .all() |
| ) |
|
|
| return { |
| "contact_id": contact_id, |
| "sequences": [ |
| { |
| "id": s.id, |
| "file_id": s.file_id, |
| "sequence_id": s.sequence_id, |
| "email_number": s.email_number, |
| "subject": s.subject or "", |
| "email_content": s.email_content or "", |
| "product": s.product or "", |
| "created_at": s.created_at.isoformat() if s.created_at else None, |
| } |
| for s in sequences |
| ], |
| } |
|
|
|
|
| @app.post("/api/save-prompts") |
| async def save_prompts(request: PromptSaveRequest, t: TenantContext = Depends(get_tenant_context)): |
| """Save prompt templates for products""" |
| db = t.db |
| try: |
| |
| db.query(Prompt).filter( |
| Prompt.tenant_id == t.tenant_id, |
| Prompt.file_id == request.file_id, |
| ).delete() |
|
|
| |
| for product_name, prompt_template in request.prompts.items(): |
| prompt = Prompt( |
| tenant_id=t.tenant_id, |
| file_id=request.file_id, |
| product_name=product_name, |
| prompt_template=prompt_template, |
| prompt_kind="email", |
| ) |
| db.add(prompt) |
| |
| li = request.linkedin_prompts or {} |
| for product_name, prompt_template in li.items(): |
| if not (prompt_template or "").strip(): |
| continue |
| db.add( |
| Prompt( |
| tenant_id=t.tenant_id, |
| file_id=request.file_id, |
| product_name=product_name, |
| prompt_template=prompt_template, |
| prompt_kind="linkedin", |
| ) |
| ) |
|
|
| db.commit() |
|
|
| uf = ( |
| db.query(UploadedFile) |
| .filter(UploadedFile.tenant_id == t.tenant_id, UploadedFile.file_id == request.file_id) |
| .first() |
| ) |
| if uf is not None: |
| if request.sequence_plan is not None: |
| uf.sequence_plan_json = json.dumps(request.sequence_plan) |
| db.commit() |
|
|
| return {"message": "Prompts saved successfully"} |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Error saving prompts: {str(e)}") |
|
|
|
|
| def _normalize_campaign_plan_steps(raw_data) -> Optional[List[Dict]]: |
| if raw_data is None: |
| return None |
| if isinstance(raw_data, dict) and isinstance(raw_data.get("steps"), list): |
| return raw_data["steps"] |
| if isinstance(raw_data, list): |
| return raw_data |
| return None |
|
|
|
|
| def _parse_campaign_plan_pack(db_file) -> Optional[Dict[str, Any]]: |
| """Build GPT kwargs + per-row step_order alignment from wizard sequence JSON on UploadedFile.""" |
| raw = getattr(db_file, "sequence_plan_json", None) |
| if not raw or not str(raw).strip(): |
| return None |
| try: |
| parsed = json.loads(raw) |
| except Exception: |
| return None |
| steps = _normalize_campaign_plan_steps(parsed) |
| if not steps: |
| return None |
| gmail_specs: List[Dict[str, Any]] = [] |
| li_wizard: List[Dict[str, Any]] = [] |
| wait_lines: List[str] = [] |
| ord_global = 0 |
| for s in steps: |
| if not isinstance(s, dict): |
| continue |
| if s.get("type") == "wait": |
| try: |
| d = int(s.get("days") or 1) |
| except (TypeError, ValueError): |
| d = 1 |
| wait_lines.append(f"Wait {d} day(s) between surrounding sequence touches.") |
| continue |
| if s.get("type") != "action": |
| continue |
| ord_global += 1 |
| ch = s.get("channel") |
| act = s.get("action") |
| title = (s.get("title") or "").strip() or ("Email" if ch == "gmail" else "LinkedIn touch") |
| if ch == "gmail": |
| gmail_specs.append({"step_order": ord_global, "title": title}) |
| elif ch == "linkedin" and act in ("linkedin_connect", "linkedin_dm"): |
| li_wizard.append({"step_order": ord_global, "action": act, "title": title}) |
| li_connects = [x for x in li_wizard if x["action"] == "linkedin_connect"] |
| li_dms = [x for x in li_wizard if x["action"] == "linkedin_dm"] |
| li_other = [x for x in li_wizard if x["action"] not in ("linkedin_connect", "linkedin_dm")] |
| li_for_gpt = li_connects + li_dms + li_other |
| total_actions = len(gmail_specs) + len(li_wizard) |
| if total_actions == 0: |
| return None |
| email_campaign_kw = None |
| if gmail_specs: |
| email_campaign_kw = { |
| "email_count": len(gmail_specs), |
| "email_titles": [g["title"] for g in gmail_specs], |
| "wait_context": "\n".join(wait_lines), |
| } |
| linkedin_campaign_kw = None |
| if li_for_gpt: |
| linkedin_campaign_kw = {"linkedin_actions": [x["action"] for x in li_for_gpt]} |
| return { |
| "gmail_specs": gmail_specs, |
| "li_wizard": li_wizard, |
| "li_for_gpt": li_for_gpt, |
| "total_actions": total_actions, |
| "email_campaign_kw": email_campaign_kw, |
| "linkedin_campaign_kw": linkedin_campaign_kw, |
| } |
|
|
|
|
| @app.get("/api/generation-status") |
| async def generation_status(file_id: str = Query(...), t: TenantContext = Depends(get_tenant_context)): |
| """Return progress for sequence generation (so frontend can resume after sleep/reconnect).""" |
| db = t.db |
| db_file = ( |
| db.query(UploadedFile) |
| .filter( |
| UploadedFile.tenant_id == t.tenant_id, |
| UploadedFile.file_id == file_id, |
| ) |
| .first() |
| ) |
| if not db_file: |
| raise HTTPException(status_code=404, detail="File not found") |
| total_contacts = db_file.contact_count or 0 |
| plan_actions = 0 |
| if getattr(db_file, "sequence_plan_json", None): |
| try: |
| raw = json.loads(db_file.sequence_plan_json) |
| if isinstance(raw, list): |
| plan_actions = sum(1 for s in raw if isinstance(s, dict) and s.get("type") == "action") |
| elif isinstance(raw, dict) and isinstance(raw.get("steps"), list): |
| plan_actions = sum(1 for s in raw["steps"] if isinstance(s, dict) and s.get("type") == "action") |
| except Exception: |
| plan_actions = 0 |
| has_linkedin = ( |
| db.query(Prompt.id) |
| .filter( |
| Prompt.tenant_id == t.tenant_id, |
| Prompt.file_id == file_id, |
| Prompt.prompt_kind == "linkedin", |
| ) |
| .first() |
| is not None |
| ) |
| email_done = ( |
| db.query(GeneratedSequence.sequence_id) |
| .filter( |
| GeneratedSequence.tenant_id == t.tenant_id, |
| GeneratedSequence.file_id == file_id, |
| GeneratedSequence.channel == "email", |
| ) |
| .distinct() |
| .count() |
| ) |
| li_done = ( |
| db.query(GeneratedSequence.sequence_id) |
| .filter( |
| GeneratedSequence.tenant_id == t.tenant_id, |
| GeneratedSequence.file_id == file_id, |
| GeneratedSequence.channel == "linkedin", |
| ) |
| .distinct() |
| .count() |
| ) |
| if plan_actions > 0: |
| row_count = ( |
| db.query(GeneratedSequence.id) |
| .filter( |
| GeneratedSequence.tenant_id == t.tenant_id, |
| GeneratedSequence.file_id == file_id, |
| ) |
| .count() |
| ) |
| completed_units = row_count |
| expected_units = total_contacts * plan_actions |
| |
| |
| complete_threshold = max(1, expected_units - max(0, total_contacts)) |
| else: |
| completed_units = email_done + (li_done if has_linkedin else 0) |
| expected_units = total_contacts * (2 if has_linkedin else 1) |
| complete_threshold = expected_units |
| return { |
| "file_id": file_id, |
| "total_contacts": total_contacts, |
| "completed_count": completed_units, |
| "is_complete": total_contacts > 0 and completed_units >= complete_threshold, |
| "has_linkedin_prompts": has_linkedin, |
| "campaign_sequence_actions": plan_actions or None, |
| } |
|
|
|
|
| @app.get("/api/sequences") |
| async def get_sequences(file_id: str = Query(...), t: TenantContext = Depends(get_tenant_context)): |
| """Return all generated sequences for a file (for catch-up after reconnect).""" |
| db = t.db |
| sequences = ( |
| db.query(GeneratedSequence) |
| .filter( |
| GeneratedSequence.tenant_id == t.tenant_id, |
| GeneratedSequence.file_id == file_id, |
| ) |
| .order_by( |
| GeneratedSequence.sequence_id, |
| func.coalesce(GeneratedSequence.step_order, 999999), |
| GeneratedSequence.email_number, |
| ) |
| .all() |
| ) |
| out = [] |
| for seq in sequences: |
| out.append({ |
| "id": seq.sequence_id, |
| "emailNumber": seq.email_number, |
| "firstName": seq.first_name, |
| "lastName": seq.last_name, |
| "email": seq.email, |
| "company": seq.company, |
| "title": seq.title or "", |
| "product": seq.product, |
| "subject": seq.subject, |
| "emailContent": seq.email_content, |
| "channel": getattr(seq, "channel", None) or "email", |
| "stepOrder": getattr(seq, "step_order", None), |
| }) |
| return {"sequences": out} |
|
|
|
|
| @app.get("/api/generate-sequences") |
| async def generate_sequences( |
| file_id: str = Query(...), |
| reset: bool = Query(True), |
| t: TenantContext = Depends(get_tenant_context), |
| ): |
| """Generate email sequences using GPT with Server-Sent Events streaming. |
| Use reset=1 for a fresh run (clears existing). Use reset=0 to resume after disconnect/sleep.""" |
| db = t.db |
| tenant_id = t.tenant_id |
|
|
| async def event_generator(): |
| try: |
| db_file = ( |
| db.query(UploadedFile) |
| .filter( |
| UploadedFile.tenant_id == tenant_id, |
| UploadedFile.file_id == file_id, |
| ) |
| .first() |
| ) |
| if not db_file: |
| yield f"data: {json.dumps({'type': 'error', 'error': 'File not found'})}\n\n" |
| return |
| |
| df = pd.read_csv(db_file.file_path) |
| prompts = ( |
| db.query(Prompt) |
| .filter(Prompt.tenant_id == tenant_id, Prompt.file_id == file_id) |
| .all() |
| ) |
| email_prompt_dict = { |
| p.product_name: p.prompt_template |
| for p in prompts |
| if (getattr(p, "prompt_kind", None) or "email") == "email" |
| } |
| linkedin_prompt_dict = { |
| p.product_name: p.prompt_template |
| for p in prompts |
| if getattr(p, "prompt_kind", None) == "linkedin" |
| } |
|
|
| plan_pack = _parse_campaign_plan_pack(db_file) |
| run_email = True |
| run_linkedin = len(linkedin_prompt_dict) > 0 |
| email_kw: Optional[Dict[str, Any]] = None |
| li_kw: Optional[Dict[str, Any]] = None |
| gmail_specs: List[Dict[str, Any]] = [] |
| li_meta_for_rows: List[Dict[str, Any]] = [] |
|
|
| if plan_pack: |
| run_email = bool(plan_pack.get("email_campaign_kw")) |
| run_linkedin = bool(plan_pack.get("linkedin_campaign_kw")) |
| email_kw = plan_pack.get("email_campaign_kw") |
| li_kw = plan_pack.get("linkedin_campaign_kw") |
| gmail_specs = plan_pack.get("gmail_specs") or [] |
| li_meta_for_rows = plan_pack.get("li_for_gpt") or [] |
|
|
| if run_email and not email_prompt_dict: |
| yield f"data: {json.dumps({'type': 'error', 'error': 'No prompts found'})}\n\n" |
| return |
| if run_linkedin and not linkedin_prompt_dict: |
| yield f"data: {json.dumps({'type': 'error', 'error': 'No LinkedIn prompts found'})}\n\n" |
| return |
|
|
| products = list(email_prompt_dict.keys()) if email_prompt_dict else [] |
| li_products = list(linkedin_prompt_dict.keys()) if linkedin_prompt_dict else [] |
|
|
| if reset: |
| db.query(GeneratedSequence).filter( |
| GeneratedSequence.tenant_id == tenant_id, |
| GeneratedSequence.file_id == file_id, |
| ).delete() |
| db.commit() |
|
|
| user_row = db.query(User).filter(User.id == t.user_id).first() |
| mailbox_sig = "" |
| linkedin_sig = "" |
| if user_row: |
| mailbox_sig = ( |
| (getattr(user_row, "mailbox_profile_display_name", None) or "").strip() |
| or (user_row.name or "").strip() |
| ) |
| linkedin_sig = ( |
| (getattr(user_row, "linkedin_profile_display_name", None) or "").strip() |
| or (user_row.name or "").strip() |
| ) |
|
|
| total_contacts = len(df) |
|
|
| def progress_pct(seq_id: int, *, linkedin_phase: bool) -> float: |
| if total_contacts <= 0: |
| return 0.0 |
| if run_linkedin and not linkedin_phase: |
| return min(100.0, max(0.0, (seq_id / total_contacts) * 50.0)) |
| if run_linkedin and linkedin_phase: |
| return min(100.0, max(0.0, 50.0 + (seq_id / total_contacts) * 50.0)) |
| return min(100.0, max(0.0, (seq_id / total_contacts) * 100.0)) |
|
|
| def plan_progress_pct() -> float: |
| if not plan_pack or total_contacts <= 0: |
| return 0.0 |
| cnt = ( |
| db.query(func.count(GeneratedSequence.id)) |
| .filter( |
| GeneratedSequence.tenant_id == tenant_id, |
| GeneratedSequence.file_id == file_id, |
| ) |
| .scalar() |
| or 0 |
| ) |
| exp = total_contacts * plan_pack["total_actions"] |
| return min(100.0, max(0.0, (cnt / max(1, exp)) * 100.0)) |
|
|
| def orm_to_sequence_response(seq: GeneratedSequence) -> Dict[str, Any]: |
| ch = getattr(seq, "channel", None) or "email" |
| return { |
| "id": seq.sequence_id, |
| "emailNumber": seq.email_number, |
| "firstName": seq.first_name, |
| "lastName": seq.last_name, |
| "email": seq.email, |
| "company": seq.company, |
| "title": seq.title or "", |
| "product": seq.product, |
| "subject": (seq.subject or "") if ch == "linkedin" else seq.subject, |
| "emailContent": seq.email_content, |
| "channel": ch, |
| "stepOrder": getattr(seq, "step_order", None), |
| } |
|
|
| sequence_id = 1 |
| if run_email: |
| for idx, row in df.iterrows(): |
| if plan_pack and gmail_specs: |
| email_done_n = ( |
| db.query(func.count(GeneratedSequence.id)) |
| .filter( |
| GeneratedSequence.tenant_id == tenant_id, |
| GeneratedSequence.file_id == file_id, |
| GeneratedSequence.sequence_id == sequence_id, |
| GeneratedSequence.channel == "email", |
| ) |
| .scalar() |
| or 0 |
| ) |
| if email_done_n >= len(gmail_specs): |
| existing_em = ( |
| db.query(GeneratedSequence) |
| .filter( |
| GeneratedSequence.tenant_id == tenant_id, |
| GeneratedSequence.file_id == file_id, |
| GeneratedSequence.sequence_id == sequence_id, |
| GeneratedSequence.channel == "email", |
| ) |
| .order_by( |
| func.coalesce(GeneratedSequence.step_order, 999999), |
| GeneratedSequence.email_number, |
| ) |
| .all() |
| ) |
| for seq in existing_em: |
| yield f"data: {json.dumps({'type': 'sequence', 'sequence': orm_to_sequence_response(seq)})}\n\n" |
| prog = plan_progress_pct() if plan_pack else progress_pct(sequence_id, linkedin_phase=False) |
| yield f"data: {json.dumps({'type': 'progress', 'progress': prog})}\n\n" |
| sequence_id += 1 |
| await asyncio.sleep(0.05) |
| continue |
| else: |
| existing = ( |
| db.query(GeneratedSequence) |
| .filter( |
| GeneratedSequence.tenant_id == tenant_id, |
| GeneratedSequence.file_id == file_id, |
| GeneratedSequence.sequence_id == sequence_id, |
| GeneratedSequence.channel == "email", |
| ) |
| .order_by(GeneratedSequence.email_number) |
| .all() |
| ) |
| if existing: |
| for seq in existing: |
| yield f"data: {json.dumps({'type': 'sequence', 'sequence': orm_to_sequence_response(seq)})}\n\n" |
| prog = plan_progress_pct() if plan_pack else progress_pct(sequence_id, linkedin_phase=False) |
| yield f"data: {json.dumps({'type': 'progress', 'progress': prog})}\n\n" |
| sequence_id += 1 |
| await asyncio.sleep(0.05) |
| continue |
|
|
| contact = row.to_dict() |
| product_name = products[(sequence_id - 1) % len(products)] |
| prompt_template = email_prompt_dict[product_name] |
|
|
| loop = asyncio.get_event_loop() |
| with concurrent.futures.ThreadPoolExecutor() as executor: |
| if plan_pack and email_kw: |
| gen_fn = partial( |
| generate_email_sequence, |
| campaign_sequence=email_kw, |
| sender_mailbox_name=mailbox_sig or None, |
| ) |
| sequence_data_list = await loop.run_in_executor( |
| executor, |
| gen_fn, |
| contact, |
| prompt_template, |
| product_name, |
| ) |
| else: |
| sequence_data_list = await loop.run_in_executor( |
| executor, |
| partial(generate_email_sequence, sender_mailbox_name=mailbox_sig or None), |
| contact, |
| prompt_template, |
| product_name, |
| ) |
|
|
| for i, seq_data in enumerate(sequence_data_list): |
| step_order_v = None |
| if plan_pack and gmail_specs and i < len(gmail_specs): |
| step_order_v = gmail_specs[i]["step_order"] |
| db_sequence = GeneratedSequence( |
| tenant_id=tenant_id, |
| file_id=file_id, |
| sequence_id=sequence_id, |
| email_number=seq_data["email_number"], |
| channel="email", |
| first_name=seq_data["first_name"], |
| last_name=seq_data["last_name"], |
| email=seq_data["email"], |
| company=seq_data["company"], |
| title=seq_data.get("title", ""), |
| product=seq_data["product"], |
| subject=seq_data["subject"], |
| email_content=seq_data["email_content"], |
| step_order=step_order_v, |
| ) |
| db.add(db_sequence) |
|
|
| db.commit() |
|
|
| for i, seq_data in enumerate(sequence_data_list): |
| step_order_v = None |
| if plan_pack and gmail_specs and i < len(gmail_specs): |
| step_order_v = gmail_specs[i]["step_order"] |
| sequence_response = { |
| "id": sequence_id, |
| "emailNumber": seq_data["email_number"], |
| "firstName": seq_data["first_name"], |
| "lastName": seq_data["last_name"], |
| "email": seq_data["email"], |
| "company": seq_data["company"], |
| "title": seq_data.get("title", ""), |
| "product": seq_data["product"], |
| "subject": seq_data["subject"], |
| "emailContent": seq_data["email_content"], |
| "channel": "email", |
| "stepOrder": step_order_v, |
| } |
| yield f"data: {json.dumps({'type': 'sequence', 'sequence': sequence_response})}\n\n" |
|
|
| prog = plan_progress_pct() if plan_pack else progress_pct(sequence_id, linkedin_phase=False) |
| yield f"data: {json.dumps({'type': 'progress', 'progress': prog})}\n\n" |
| sequence_id += 1 |
| await asyncio.sleep(0.1) |
|
|
| if run_linkedin: |
| yield f"data: {json.dumps({'type': 'phase', 'phase': 'linkedin', 'message': 'Generating LinkedIn sequences'})}\n\n" |
| sequence_id = 1 |
| for idx, row in df.iterrows(): |
| if plan_pack and li_meta_for_rows: |
| li_done_n = ( |
| db.query(func.count(GeneratedSequence.id)) |
| .filter( |
| GeneratedSequence.tenant_id == tenant_id, |
| GeneratedSequence.file_id == file_id, |
| GeneratedSequence.sequence_id == sequence_id, |
| GeneratedSequence.channel == "linkedin", |
| ) |
| .scalar() |
| or 0 |
| ) |
| if li_done_n >= len(li_meta_for_rows): |
| existing_li = ( |
| db.query(GeneratedSequence) |
| .filter( |
| GeneratedSequence.tenant_id == tenant_id, |
| GeneratedSequence.file_id == file_id, |
| GeneratedSequence.sequence_id == sequence_id, |
| GeneratedSequence.channel == "linkedin", |
| ) |
| .order_by( |
| func.coalesce(GeneratedSequence.step_order, 999999), |
| GeneratedSequence.email_number, |
| ) |
| .all() |
| ) |
| for seq in existing_li: |
| yield f"data: {json.dumps({'type': 'sequence', 'sequence': orm_to_sequence_response(seq)})}\n\n" |
| prog = plan_progress_pct() if plan_pack else progress_pct(sequence_id, linkedin_phase=True) |
| yield f"data: {json.dumps({'type': 'progress', 'progress': prog})}\n\n" |
| sequence_id += 1 |
| await asyncio.sleep(0.05) |
| continue |
| else: |
| existing_li = ( |
| db.query(GeneratedSequence) |
| .filter( |
| GeneratedSequence.tenant_id == tenant_id, |
| GeneratedSequence.file_id == file_id, |
| GeneratedSequence.sequence_id == sequence_id, |
| GeneratedSequence.channel == "linkedin", |
| ) |
| .order_by(GeneratedSequence.email_number) |
| .all() |
| ) |
| if existing_li: |
| for seq in existing_li: |
| yield f"data: {json.dumps({'type': 'sequence', 'sequence': orm_to_sequence_response(seq)})}\n\n" |
| prog = plan_progress_pct() if plan_pack else progress_pct(sequence_id, linkedin_phase=True) |
| yield f"data: {json.dumps({'type': 'progress', 'progress': prog})}\n\n" |
| sequence_id += 1 |
| await asyncio.sleep(0.05) |
| continue |
|
|
| contact = row.to_dict() |
| li_product = li_products[(sequence_id - 1) % len(li_products)] |
| li_template = linkedin_prompt_dict[li_product] |
|
|
| loop = asyncio.get_event_loop() |
| with concurrent.futures.ThreadPoolExecutor() as executor: |
| if plan_pack and li_kw: |
| gen_li = partial( |
| generate_linkedin_sequence, |
| campaign_sequence=li_kw, |
| sender_linkedin_name=linkedin_sig or None, |
| ) |
| li_list = await loop.run_in_executor( |
| executor, |
| gen_li, |
| contact, |
| li_template, |
| li_product, |
| ) |
| else: |
| li_list = await loop.run_in_executor( |
| executor, |
| partial(generate_linkedin_sequence, sender_linkedin_name=linkedin_sig or None), |
| contact, |
| li_template, |
| li_product, |
| ) |
|
|
| for i, seq_data in enumerate(li_list): |
| step_order_v = None |
| if plan_pack and li_meta_for_rows and i < len(li_meta_for_rows): |
| step_order_v = li_meta_for_rows[i]["step_order"] |
| db_sequence = GeneratedSequence( |
| tenant_id=tenant_id, |
| file_id=file_id, |
| sequence_id=sequence_id, |
| email_number=seq_data["email_number"], |
| channel="linkedin", |
| first_name=seq_data["first_name"], |
| last_name=seq_data["last_name"], |
| email=seq_data["email"], |
| company=seq_data["company"], |
| title=seq_data.get("title", ""), |
| product=seq_data["product"], |
| subject=seq_data.get("subject") or "", |
| email_content=seq_data["email_content"], |
| step_order=step_order_v, |
| ) |
| db.add(db_sequence) |
|
|
| db.commit() |
|
|
| for i, seq_data in enumerate(li_list): |
| step_order_v = None |
| if plan_pack and li_meta_for_rows and i < len(li_meta_for_rows): |
| step_order_v = li_meta_for_rows[i]["step_order"] |
| sequence_response = { |
| "id": sequence_id, |
| "emailNumber": seq_data["email_number"], |
| "firstName": seq_data["first_name"], |
| "lastName": seq_data["last_name"], |
| "email": seq_data["email"], |
| "company": seq_data["company"], |
| "title": seq_data.get("title", ""), |
| "product": seq_data["product"], |
| "subject": seq_data.get("subject") or "", |
| "emailContent": seq_data["email_content"], |
| "channel": "linkedin", |
| "stepOrder": step_order_v, |
| } |
| yield f"data: {json.dumps({'type': 'sequence', 'sequence': sequence_response})}\n\n" |
|
|
| prog = plan_progress_pct() if plan_pack else progress_pct(sequence_id, linkedin_phase=True) |
| yield f"data: {json.dumps({'type': 'progress', 'progress': prog})}\n\n" |
| sequence_id += 1 |
| await asyncio.sleep(0.1) |
|
|
| yield f"data: {json.dumps({'type': 'complete'})}\n\n" |
| |
| except Exception as e: |
| yield f"data: {json.dumps({'type': 'error', 'error': str(e)})}\n\n" |
| |
| return StreamingResponse(event_generator(), media_type="text/event-stream") |
|
|
|
|
| @app.get("/api/download-sequences") |
| async def download_sequences(file_id: str = Query(...), t: TenantContext = Depends(get_tenant_context)): |
| """Download generated sequences as CSV with all subject/body fields""" |
| db = t.db |
| try: |
| |
| sequences = ( |
| db.query(GeneratedSequence) |
| .filter( |
| GeneratedSequence.tenant_id == t.tenant_id, |
| GeneratedSequence.file_id == file_id, |
| GeneratedSequence.channel == "email", |
| ) |
| .order_by(GeneratedSequence.sequence_id, GeneratedSequence.email_number) |
| .all() |
| ) |
| if not sequences: |
| raise HTTPException( |
| status_code=404, |
| detail="No email sequences found (CSV export is email-only; view LinkedIn in the app).", |
| ) |
| |
| |
| contacts = {} |
| max_email_number = 0 |
| for seq in sequences: |
| contact_key = f"{seq.sequence_id}" |
| if contact_key not in contacts: |
| contacts[contact_key] = { |
| 'first_name': seq.first_name, |
| 'last_name': seq.last_name, |
| 'email': seq.email, |
| 'company': seq.company, |
| 'title': seq.title or '', |
| 'product': seq.product, |
| 'subjects': {}, |
| 'bodies': {} |
| } |
| contacts[contact_key]['subjects'][seq.email_number] = seq.subject |
| contacts[contact_key]['bodies'][seq.email_number] = seq.email_content |
| |
| if seq.email_number > max_email_number: |
| max_email_number = seq.email_number |
| |
| |
| |
| output = io.StringIO() |
| writer = csv.writer(output, quoting=csv.QUOTE_MINIMAL) |
| |
| |
| header = ['First Name', 'Last Name', 'Email', 'Company', 'Title', 'Product'] |
| for i in range(1, max_email_number + 1): |
| header.extend([f'subject{i}', f'body{i}']) |
| writer.writerow(header) |
| |
| |
| |
| for contact in contacts.values(): |
| row = [ |
| contact['first_name'], |
| contact['last_name'], |
| contact['email'], |
| contact['company'], |
| contact['title'], |
| contact['product'] |
| ] |
| for i in range(1, max_email_number + 1): |
| subject = contact['subjects'].get(i, '') or '' |
| body = contact['bodies'].get(i, '') or '' |
| |
| |
| |
| if body: |
| |
| body_html = body.replace('\n', '<br>').replace('\r', '') |
| else: |
| body_html = '' |
| |
| row.append(subject) |
| row.append(body_html) |
| writer.writerow(row) |
| |
| output.seek(0) |
| |
| |
| return StreamingResponse( |
| iter([output.getvalue()]), |
| media_type="text/csv", |
| headers={ |
| "Content-Disposition": "attachment; filename=email_sequences_fixed.csv" |
| } |
| ) |
| |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Error downloading sequences: {str(e)}") |
|
|
|
|
| @app.get("/api/smartlead-campaigns") |
| async def get_smartlead_campaigns(): |
| """Get list of campaigns from Smartlead""" |
| try: |
| client = SmartleadClient() |
| campaigns = client.get_campaigns() |
| |
| |
| formatted_campaigns = [] |
| for campaign in campaigns: |
| campaign_id = campaign.get('campaign_id') or campaign.get('id') or campaign.get('campaignId') |
| campaign_name = campaign.get('name') or campaign.get('campaign_name') or campaign.get('title') or 'Unnamed Campaign' |
| |
| if campaign_id: |
| formatted_campaigns.append({ |
| "id": str(campaign_id), |
| "name": campaign_name |
| }) |
| |
| return {"campaigns": formatted_campaigns} |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Failed to fetch campaigns: {str(e)}") |
|
|
|
|
| @app.post("/api/push-to-smartlead") |
| async def push_to_smartlead(request: SmartleadPushRequest, t: TenantContext = Depends(get_tenant_context)): |
| """Push generated sequences to Smartlead campaign (add leads to existing campaign)""" |
| import uuid |
| |
| db = t.db |
| try: |
| |
| sequences = ( |
| db.query(GeneratedSequence) |
| .filter( |
| GeneratedSequence.tenant_id == t.tenant_id, |
| GeneratedSequence.file_id == request.file_id, |
| ) |
| .order_by(GeneratedSequence.sequence_id, GeneratedSequence.email_number) |
| .all() |
| ) |
| |
| if not sequences: |
| raise HTTPException(status_code=404, detail="No sequences found") |
| |
| |
| contacts = {} |
| for seq in sequences: |
| contact_key = f"{seq.sequence_id}" |
| if contact_key not in contacts: |
| contacts[contact_key] = { |
| 'first_name': seq.first_name, |
| 'last_name': seq.last_name, |
| 'email': seq.email, |
| 'company': seq.company, |
| 'title': seq.title or '', |
| 'subjects': {}, |
| 'bodies': {} |
| } |
| contacts[contact_key]['subjects'][seq.email_number] = seq.subject |
| contacts[contact_key]['bodies'][seq.email_number] = seq.email_content |
| |
| |
| try: |
| client = SmartleadClient() |
| except ValueError as e: |
| raise HTTPException(status_code=400, detail=str(e)) |
| |
| |
| campaigns = client.get_campaigns() |
| campaign_name = 'Unknown Campaign' |
| for campaign in campaigns: |
| campaign_id = campaign.get('campaign_id') or campaign.get('id') or campaign.get('campaignId') |
| if str(campaign_id) == str(request.campaign_id): |
| campaign_name = campaign.get('name') or campaign.get('campaign_name') or campaign.get('title') or 'Unknown Campaign' |
| break |
| |
| |
| run_id = str(uuid.uuid4()) |
| run = SmartleadRun( |
| tenant_id=t.tenant_id, |
| run_id=run_id, |
| file_id=request.file_id, |
| mode='existing', |
| campaign_id=request.campaign_id, |
| campaign_name=campaign_name, |
| steps_count=0, |
| dry_run=1 if request.dry_run else 0, |
| total_leads=len(contacts), |
| status='pending' |
| ) |
| db.add(run) |
| db.commit() |
| |
| if request.dry_run: |
| |
| return { |
| "run_id": run_id, |
| "campaign_id": request.campaign_id, |
| "campaign_name": campaign_name, |
| "total": len(contacts), |
| "added": 0, |
| "skipped": 0, |
| "failed": 0, |
| "errors": [], |
| "status": "dry_run_completed", |
| "message": "Dry run completed. No leads were sent to Smartlead." |
| } |
| |
| campaign_id = request.campaign_id |
| |
| |
| leads = [] |
| errors = [] |
| added_count = 0 |
| skipped_count = 0 |
| failed_count = 0 |
| |
| for contact in contacts.values(): |
| try: |
| |
| if not contact.get('email'): |
| errors.append({ |
| "email": contact.get('email', 'unknown'), |
| "error": "Missing email address" |
| }) |
| failed_count += 1 |
| continue |
| |
| |
| def safe_str(value): |
| if value is None: |
| return "" |
| if isinstance(value, float): |
| |
| if math.isnan(value): |
| return "" |
| return str(value).strip() |
| return str(value).strip() if value else "" |
| |
| |
| first_name = safe_str(contact.get('first_name', '')) or 'Contact' |
| last_name = safe_str(contact.get('last_name', '')) |
| company = safe_str(contact.get('company', '')) |
| title = safe_str(contact.get('title', '')) |
| email = safe_str(contact.get('email', '')) |
| |
| if not email: |
| errors.append({ |
| "email": "unknown", |
| "error": "Missing email address" |
| }) |
| failed_count += 1 |
| continue |
| |
| |
| |
| lead = { |
| "email": email, |
| "first_name": first_name, |
| "last_name": last_name |
| } |
| |
| leads.append(lead) |
| |
| except Exception as e: |
| errors.append({ |
| "email": contact.get('email', 'unknown'), |
| "error": str(e) |
| }) |
| failed_count += 1 |
| |
| |
| batch_size = 50 |
| for i in range(0, len(leads), batch_size): |
| batch = leads[i:i + batch_size] |
| try: |
| response = client.add_leads_to_campaign(campaign_id, batch) |
| added_count += len(batch) |
| except Exception as e: |
| |
| for lead in batch: |
| errors.append({ |
| "email": lead.get('email', 'unknown'), |
| "error": f"Failed to add lead: {str(e)}" |
| }) |
| failed_count += 1 |
| |
| |
| run.status = 'completed' |
| run.added_leads = added_count |
| run.skipped_leads = skipped_count |
| run.failed_leads = failed_count |
| run.error_details = json.dumps(errors) if errors else None |
| run.completed_at = datetime.utcnow() |
| db.commit() |
| |
| |
| |
| return { |
| "run_id": run_id, |
| "campaign_id": campaign_id, |
| "campaign_name": campaign_name, |
| "total": len(contacts), |
| "added": added_count, |
| "skipped": skipped_count, |
| "failed": failed_count, |
| "errors": errors[:10], |
| "status": "completed" |
| } |
| |
| except HTTPException: |
| raise |
| except Exception as e: |
| if 'run' in locals(): |
| run.status = 'failed' |
| run.error_details = str(e) |
| db.commit() |
| raise HTTPException(status_code=500, detail=f"Error pushing to Smartlead: {str(e)}") |
|
|
|
|
| @app.get("/api/smartlead-runs") |
| async def get_smartlead_runs(file_id: str = Query(None), t: TenantContext = Depends(get_tenant_context)): |
| """Get Smartlead run history""" |
| db = t.db |
| try: |
| query = db.query(SmartleadRun).filter(SmartleadRun.tenant_id == t.tenant_id) |
| if file_id: |
| query = query.filter(SmartleadRun.file_id == file_id) |
| |
| runs = query.order_by(SmartleadRun.created_at.desc()).limit(50).all() |
| |
| return [ |
| { |
| "run_id": run.run_id, |
| "file_id": run.file_id, |
| "campaign_id": run.campaign_id, |
| "campaign_name": run.campaign_name, |
| "mode": run.mode, |
| "steps_count": run.steps_count, |
| "dry_run": bool(run.dry_run), |
| "total_leads": run.total_leads, |
| "added_leads": run.added_leads, |
| "skipped_leads": run.skipped_leads, |
| "failed_leads": run.failed_leads, |
| "status": run.status, |
| "created_at": run.created_at.isoformat() if run.created_at else None, |
| "completed_at": run.completed_at.isoformat() if run.completed_at else None |
| } |
| for run in runs |
| ] |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Error fetching runs: {str(e)}") |
|
|
|
|
| @app.post("/api/webhooks/smartlead") |
| async def smartlead_webhook( |
| request: Request, |
| tenant_id: int = Query(..., description="Workspace ID — add ?tenant_id=<id> to the webhook URL in Smartlead"), |
| db: Session = Depends(get_db), |
| ): |
| """ |
| Smartlead webhook — configure in Smartlead to POST reply events to this URL when a lead replies. |
| Append **?tenant_id=<your workspace id>** to the webhook URL (same id as in the app URL bar after signing in). |
| Optional: set SMARTLEAD_WEBHOOK_SECRET and send the same value in header X-Webhook-Token (or Bearer). |
| """ |
| if not db.query(Tenant).filter(Tenant.id == tenant_id).first(): |
| raise HTTPException(status_code=404, detail="Unknown workspace") |
|
|
| secret = os.getenv("SMARTLEAD_WEBHOOK_SECRET") |
| if secret: |
| token = request.headers.get("X-Webhook-Token") or "" |
| if not token: |
| auth = request.headers.get("Authorization") or "" |
| if auth.lower().startswith("bearer "): |
| token = auth[7:].strip() |
| if token != secret: |
| raise HTTPException(status_code=401, detail="Invalid webhook token") |
|
|
| try: |
| body = await request.json() |
| except Exception: |
| raise HTTPException(status_code=400, detail="Expected JSON body") |
|
|
| parsed = _parse_smartlead_reply_payload(body) |
| if not parsed: |
| return {"ok": True, "ignored": True, "reason": "not_a_reply_or_missing_fields"} |
|
|
| q = db.query(CrmLead).filter(CrmLead.tenant_id == tenant_id) |
| row = None |
| if parsed["smartlead_lead_id"] and parsed["campaign_id"]: |
| row = q.filter( |
| CrmLead.smartlead_lead_id == parsed["smartlead_lead_id"], |
| CrmLead.campaign_id == parsed["campaign_id"], |
| ).first() |
| if row is None and parsed["email"] and parsed["campaign_id"]: |
| row = q.filter( |
| CrmLead.email == parsed["email"], |
| CrmLead.campaign_id == parsed["campaign_id"], |
| ).first() |
|
|
| if parsed["email"]: |
| apollo = ( |
| db.query(Contact) |
| .filter( |
| Contact.tenant_id == tenant_id, |
| func.lower(Contact.email) == parsed["email"].lower(), |
| ) |
| .first() |
| ) |
| if apollo: |
| if not parsed["company_name"] and apollo.company: |
| parsed["company_name"] = apollo.company |
| if not parsed["title"] and apollo.title: |
| parsed["title"] = apollo.title |
| if not parsed["first_name"] and apollo.first_name: |
| parsed["first_name"] = apollo.first_name |
| if not parsed["last_name"] and apollo.last_name: |
| parsed["last_name"] = apollo.last_name |
|
|
| new_ts = parsed["last_reply_at"] |
| if row: |
| old_ts = row.last_reply_at |
| if not old_ts or (new_ts and new_ts >= old_ts): |
| row.last_reply_subject = parsed["last_reply_subject"] |
| row.last_reply_body = parsed["last_reply_body"] |
| row.last_reply_at = new_ts |
| row.raw_webhook = parsed["raw_webhook"] |
| row.email = parsed["email"] or row.email |
| row.first_name = parsed["first_name"] or row.first_name |
| row.last_name = parsed["last_name"] or row.last_name |
| row.company_name = parsed["company_name"] or row.company_name |
| row.title = parsed["title"] or row.title |
| row.campaign_name = parsed["campaign_name"] or row.campaign_name |
| if parsed["smartlead_lead_id"]: |
| row.smartlead_lead_id = parsed["smartlead_lead_id"] |
| else: |
| row = CrmLead( |
| tenant_id=tenant_id, |
| smartlead_lead_id=parsed["smartlead_lead_id"] or "", |
| campaign_id=parsed["campaign_id"] or "", |
| campaign_name=parsed["campaign_name"] or "", |
| email=parsed["email"] or "", |
| first_name=parsed["first_name"] or "", |
| last_name=parsed["last_name"] or "", |
| company_name=parsed["company_name"] or "", |
| title=parsed["title"] or "", |
| last_reply_subject=parsed["last_reply_subject"] or "", |
| last_reply_body=parsed["last_reply_body"] or "", |
| last_reply_at=new_ts, |
| crm_status="new_lead", |
| raw_webhook=parsed["raw_webhook"], |
| ) |
| db.add(row) |
|
|
| db.commit() |
| db.refresh(row) |
| return {"ok": True, "lead_id": row.id} |
|
|
|
|
| @app.post("/api/leads/seed-demo") |
| async def seed_demo_leads(t: TenantContext = Depends(get_tenant_context)): |
| """ |
| Insert sample leads so the Leads UI can be previewed without a real Smartlead webhook. |
| Deletes any previous demo rows (emails like demo.lead.*@emailout.local) then inserts fresh ones. |
| """ |
| db = t.db |
| tid = t.tenant_id |
| demo_email_filter = CrmLead.email.like("demo.lead.%@emailout.local") |
| removed = ( |
| db.query(CrmLead) |
| .filter(CrmLead.tenant_id == tid, demo_email_filter) |
| .delete(synchronize_session=False) |
| ) |
|
|
| campaign_id = "88001" |
| campaign_name = "Logistics & Supply Chain Outreach" |
| now = datetime.utcnow() |
|
|
| specs = [ |
| { |
| "sl_id": "91001", |
| "key": "jane", |
| "first_name": "Jane", |
| "last_name": "Morgan", |
| "company_name": "ACME Logistics", |
| "title": "VP of Operations", |
| "crm_status": "new_lead", |
| "subject": "Re: Quick question on your freight volumes", |
| "body": ( |
| "Hi Sarah,\n\nThanks for reaching out. We're evaluating new 3PL partners in Q2. " |
| "Could you send a one-pager on pricing for mid-market shippers?\n\nBest,\nJane" |
| ), |
| "days_ago": 0, |
| }, |
| { |
| "sl_id": "91002", |
| "key": "marcus", |
| "first_name": "Marcus", |
| "last_name": "Chen", |
| "company_name": "Pacific Rail Partners", |
| "title": "Director of Procurement", |
| "crm_status": "contacted", |
| "subject": "Re: Partnership", |
| "body": ( |
| "Not interested right now — we renewed with our incumbent through 2026. " |
| "Feel free to circle back next year." |
| ), |
| "days_ago": 1, |
| }, |
| { |
| "sl_id": "91003", |
| "key": "elena", |
| "first_name": "Elena", |
| "last_name": "Vasquez", |
| "company_name": "Harbor Freight Collective", |
| "title": "Head of Supply Chain", |
| "crm_status": "contacted", |
| "subject": "Re: 15 min chat?", |
| "body": ( |
| "Tuesday 3pm PT works for me. Here's my calendar link: https://example.com/cal/elena — " |
| "please include an agenda." |
| ), |
| "days_ago": 2, |
| }, |
| { |
| "sl_id": "91004", |
| "key": "david", |
| "first_name": "David", |
| "last_name": "Okonkwo", |
| "company_name": "EZOFIS", |
| "title": "CEO", |
| "crm_status": "qualified", |
| "subject": "Re: outbound sequences", |
| "body": ( |
| "This looks promising. Loop in our COO (coo@ezofis.com) and let's do a 30-min scoping call this week." |
| ), |
| "days_ago": 3, |
| }, |
| { |
| "sl_id": "91005", |
| "key": "priya", |
| "first_name": "Priya", |
| "last_name": "Nair", |
| "company_name": "Summit Cold Storage", |
| "title": "Operations Manager", |
| "crm_status": "unqualified", |
| "subject": "Re: cold storage", |
| "body": ( |
| "We only work with vendors on our approved vendor list. Please don't follow up on this thread." |
| ), |
| "days_ago": 4, |
| }, |
| { |
| "sl_id": "91006", |
| "key": "sam", |
| "first_name": "Sam", |
| "last_name": "Rivera", |
| "company_name": "Inland Distribution Co.", |
| "title": "Logistics Coordinator", |
| "crm_status": "none", |
| "subject": "Re: intro", |
| "body": "Got it — thanks.", |
| "days_ago": 5, |
| }, |
| ] |
|
|
| for s in specs: |
| email = f"demo.lead.{s['key']}@emailout.local" |
| ts = now - timedelta(days=s["days_ago"]) |
| raw = { |
| "event": "EMAIL_REPLIED", |
| "demo_seed": True, |
| "campaign_id": int(campaign_id), |
| "campaign_name": campaign_name, |
| "lead_id": int(s["sl_id"]), |
| "lead": { |
| "email": email, |
| "first_name": s["first_name"], |
| "last_name": s["last_name"], |
| "company_name": s["company_name"], |
| }, |
| "reply": { |
| "subject": s["subject"], |
| "body": s["body"], |
| "received_at": ts.isoformat() + "Z", |
| }, |
| } |
| db.add( |
| CrmLead( |
| tenant_id=tid, |
| smartlead_lead_id=s["sl_id"], |
| campaign_id=campaign_id, |
| campaign_name=campaign_name, |
| email=email, |
| first_name=s["first_name"], |
| last_name=s["last_name"], |
| company_name=s["company_name"], |
| title=s["title"], |
| last_reply_subject=s["subject"], |
| last_reply_body=s["body"], |
| last_reply_at=ts, |
| crm_status=s["crm_status"], |
| contact_id=None, |
| raw_webhook=raw, |
| ) |
| ) |
|
|
| db.commit() |
| return { |
| "ok": True, |
| "removed_previous_demo_rows": removed, |
| "inserted": len(specs), |
| } |
|
|
|
|
| @app.get("/api/leads") |
| async def list_leads( |
| search: str = Query("", description="Search email, name, company"), |
| status: str = Query("", description="crm_status filter"), |
| sort_by: str = Query("last_reply_at"), |
| sort_dir: str = Query("desc"), |
| limit: int = Query(50, ge=1, le=200), |
| offset: int = Query(0, ge=0), |
| t: TenantContext = Depends(get_tenant_context), |
| ): |
| db = t.db |
| q = db.query(CrmLead).filter(CrmLead.tenant_id == t.tenant_id) |
| if search.strip(): |
| term = f"%{search.strip().lower()}%" |
| q = q.filter( |
| or_( |
| func.lower(CrmLead.email).like(term), |
| func.lower(func.coalesce(CrmLead.company_name, "")).like(term), |
| func.lower(func.coalesce(CrmLead.first_name, "")).like(term), |
| func.lower(func.coalesce(CrmLead.last_name, "")).like(term), |
| ) |
| ) |
| if status.strip() and status in CRM_STATUS_ALLOWED: |
| q = q.filter(CrmLead.crm_status == status) |
|
|
| total = q.count() |
|
|
| col_map = { |
| "last_reply_at": CrmLead.last_reply_at, |
| "created_at": CrmLead.created_at, |
| "email": CrmLead.email, |
| "company_name": CrmLead.company_name, |
| } |
| col = col_map.get(sort_by, CrmLead.last_reply_at) |
| if sort_dir == "asc": |
| q = q.order_by(col.asc()) |
| else: |
| q = q.order_by(col.desc()) |
|
|
| rows = q.offset(offset).limit(limit).all() |
| return { |
| "total": total, |
| "leads": [_crm_lead_to_dict(r) for r in rows], |
| } |
|
|
|
|
| @app.get("/api/leads/{lead_id}") |
| async def get_lead(lead_id: int, t: TenantContext = Depends(get_tenant_context)): |
| db = t.db |
| row = ( |
| db.query(CrmLead) |
| .filter(CrmLead.tenant_id == t.tenant_id, CrmLead.id == lead_id) |
| .first() |
| ) |
| if not row: |
| raise HTTPException(status_code=404, detail="Lead not found") |
| d = _crm_lead_to_dict(row) |
| if row.contact_id: |
| c = ( |
| db.query(Contact) |
| .filter( |
| Contact.tenant_id == t.tenant_id, |
| Contact.id == row.contact_id, |
| ) |
| .first() |
| ) |
| if c: |
| d["contact"] = { |
| "id": c.id, |
| "email": c.email, |
| "company": c.company, |
| "title": c.title, |
| } |
| d["company_details"] = _contact_company_details_dict(c) |
| return d |
|
|
|
|
| @app.patch("/api/leads/{lead_id}") |
| async def patch_lead(lead_id: int, body: CrmLeadPatchRequest, t: TenantContext = Depends(get_tenant_context)): |
| db = t.db |
| row = ( |
| db.query(CrmLead) |
| .filter(CrmLead.tenant_id == t.tenant_id, CrmLead.id == lead_id) |
| .first() |
| ) |
| if not row: |
| raise HTTPException(status_code=404, detail="Lead not found") |
| data = body.model_dump(exclude_unset=True) |
| if not data: |
| raise HTTPException(status_code=400, detail="No fields to update") |
| if "crm_status" in data: |
| if data["crm_status"] not in CRM_STATUS_ALLOWED: |
| raise HTTPException( |
| status_code=400, |
| detail=f"crm_status must be one of: {sorted(CRM_STATUS_ALLOWED)}", |
| ) |
| row.crm_status = data["crm_status"] |
| if "first_name" in data: |
| row.first_name = _safe_str(data["first_name"]) |
| if "last_name" in data: |
| row.last_name = _safe_str(data["last_name"]) |
| if "company_name" in data: |
| row.company_name = _safe_str(data["company_name"]) |
| if "title" in data: |
| row.title = _safe_str(data["title"]) |
| if "last_reply_subject" in data: |
| row.last_reply_subject = _safe_str(data["last_reply_subject"]) |
| if "last_reply_body" in data: |
| row.last_reply_body = _safe_str(data["last_reply_body"]) |
| if "email" in data: |
| email = _safe_str(data["email"]).lower() |
| if email: |
| taken = ( |
| db.query(CrmLead) |
| .filter( |
| CrmLead.tenant_id == row.tenant_id, |
| CrmLead.id != lead_id, |
| func.lower(CrmLead.email) == email, |
| ) |
| .first() |
| ) |
| if taken: |
| raise HTTPException( |
| status_code=409, |
| detail="Another lead already uses this email", |
| ) |
| row.email = email |
| rw = dict(row.raw_webhook) if isinstance(row.raw_webhook, dict) else {} |
| cd = dict(rw.get("company_details") or {}) if isinstance(rw.get("company_details"), dict) else {} |
| cd["Company Name"] = row.company_name or "" |
| for py_key, raw_key in COMPANY_DETAIL_PATCH_FIELDS.items(): |
| if py_key in data: |
| cd[raw_key] = _safe_str(data[py_key]) |
| rw["company_details"] = cd |
| row.raw_webhook = rw |
| row.updated_at = datetime.utcnow() |
| db.commit() |
| db.refresh(row) |
| return _crm_lead_to_dict(row) |
|
|
|
|
| @app.post("/api/leads/{lead_id}/move-to-contacts") |
| async def move_lead_to_contacts(lead_id: int, t: TenantContext = Depends(get_tenant_context)): |
| db = t.db |
| lead = ( |
| db.query(CrmLead) |
| .filter(CrmLead.tenant_id == t.tenant_id, CrmLead.id == lead_id) |
| .first() |
| ) |
| if not lead: |
| raise HTTPException(status_code=404, detail="Lead not found") |
| r = _move_lead_to_contacts_core(db, lead) |
| if not r["ok"]: |
| raise HTTPException(status_code=400, detail=r.get("error", "Move failed")) |
| db.commit() |
| return {"contact_id": r["contact_id"], "message": r["message"]} |
|
|
|
|
| @app.post("/api/leads/bulk-move-to-contacts") |
| async def bulk_move_leads_to_contacts(body: BulkLeadIdsRequest, t: TenantContext = Depends(get_tenant_context)): |
| db = t.db |
| if not body.lead_ids: |
| raise HTTPException(status_code=400, detail="lead_ids required") |
| moved = 0 |
| errors: List[dict] = [] |
| for lid in body.lead_ids: |
| lead = ( |
| db.query(CrmLead) |
| .filter(CrmLead.tenant_id == t.tenant_id, CrmLead.id == lid) |
| .first() |
| ) |
| if not lead: |
| errors.append({"lead_id": lid, "error": "not found"}) |
| continue |
| r = _move_lead_to_contacts_core(db, lead) |
| if not r["ok"]: |
| errors.append({"lead_id": lid, "error": r.get("error", "failed")}) |
| else: |
| moved += 1 |
| db.commit() |
| return {"moved": moved, "errors": errors} |
|
|
|
|
| @app.post("/api/leads/bulk-delete") |
| async def bulk_delete_leads(body: BulkLeadIdsRequest, t: TenantContext = Depends(get_tenant_context)): |
| db = t.db |
| if not body.lead_ids: |
| raise HTTPException(status_code=400, detail="lead_ids required") |
| deleted = ( |
| db.query(CrmLead) |
| .filter(CrmLead.tenant_id == t.tenant_id, CrmLead.id.in_(body.lead_ids)) |
| .delete(synchronize_session=False) |
| ) |
| db.commit() |
| return {"deleted": deleted} |
|
|
|
|
| @app.post("/api/deals/from-leads") |
| async def deals_from_leads(body: BulkLeadIdsRequest, t: TenantContext = Depends(get_tenant_context)): |
| """Create one deal per selected lead and remove those leads from the Leads table.""" |
| db = t.db |
| if not body.lead_ids: |
| raise HTTPException(status_code=400, detail="lead_ids required") |
| created: List[dict] = [] |
| errors: List[dict] = [] |
| for lid in body.lead_ids: |
| lead = ( |
| db.query(CrmLead) |
| .filter(CrmLead.tenant_id == t.tenant_id, CrmLead.id == lid) |
| .first() |
| ) |
| if not lead: |
| errors.append({"lead_id": lid, "error": "not found"}) |
| continue |
| person = " ".join(filter(None, [lead.first_name or "", lead.last_name or ""])).strip() |
| inviter = db.query(User).filter(User.id == t.user_id).first() |
| deal = CrmDeal( |
| tenant_id=lead.tenant_id, |
| name=_deal_name_from_lead(lead), |
| stage="new", |
| owner_user_id=t.user_id, |
| owner_initials=_user_initials_from_user(inviter), |
| deal_value=None, |
| revenue_type="arr", |
| contact_display=person or (lead.email or ""), |
| account_name=lead.company_name or "", |
| expected_close_date=datetime.utcnow() + timedelta(days=60), |
| close_probability=25, |
| country="", |
| last_interaction_at=lead.last_reply_at or datetime.utcnow(), |
| source_lead_id=lead.id, |
| source_campaign_name=lead.campaign_name or lead.campaign_id or "", |
| contact_id=lead.contact_id, |
| ) |
| db.add(deal) |
| db.flush() |
| dd = _deal_to_dict(deal) |
| if inviter: |
| dd["owner_display_name"] = (inviter.name or inviter.email or "").strip() or None |
| else: |
| dd["owner_display_name"] = None |
| created.append(dd) |
| db.delete(lead) |
| db.commit() |
| return {"created": created, "errors": errors} |
|
|
|
|
| @app.get("/api/deals") |
| async def list_deals( |
| search: str = Query(""), |
| sort_by: str = Query("created_at"), |
| sort_dir: str = Query("desc"), |
| limit: int = Query(100, ge=1, le=500), |
| offset: int = Query(0, ge=0), |
| t: TenantContext = Depends(get_tenant_context), |
| ): |
| db = t.db |
| q = db.query(CrmDeal).filter(CrmDeal.tenant_id == t.tenant_id) |
| if t.role != "admin": |
| q = q.filter( |
| or_(CrmDeal.owner_user_id == t.user_id, CrmDeal.owner_user_id.is_(None)) |
| ) |
| if search.strip(): |
| term = f"%{search.strip().lower()}%" |
| q = q.filter( |
| or_( |
| func.lower(CrmDeal.name).like(term), |
| func.lower(func.coalesce(CrmDeal.account_name, "")).like(term), |
| func.lower(func.coalesce(CrmDeal.contact_display, "")).like(term), |
| ) |
| ) |
| total = q.count() |
| col_map = { |
| "created_at": CrmDeal.created_at, |
| "name": CrmDeal.name, |
| "deal_value": CrmDeal.deal_value, |
| "stage": CrmDeal.stage, |
| "last_interaction_at": CrmDeal.last_interaction_at, |
| } |
| col = col_map.get(sort_by, CrmDeal.created_at) |
| if sort_dir == "asc": |
| q = q.order_by(col.asc()) |
| else: |
| q = q.order_by(col.desc()) |
| rows = q.offset(offset).limit(limit).all() |
| oid_set = {r.owner_user_id for r in rows if r.owner_user_id} |
| user_map: Dict[int, User] = {} |
| if oid_set: |
| for u in db.query(User).filter(User.id.in_(oid_set)).all(): |
| user_map[u.id] = u |
| deals_out = [] |
| for r in rows: |
| dd = _deal_to_dict(r) |
| if r.owner_user_id and r.owner_user_id in user_map: |
| u = user_map[r.owner_user_id] |
| dd["owner_display_name"] = (u.name or u.email or "").strip() or None |
| if not (dd.get("owner_initials") or "").strip(): |
| dd["owner_initials"] = _user_initials_from_user(u) |
| else: |
| dd["owner_display_name"] = None |
| deals_out.append(dd) |
| return {"total": total, "deals": deals_out} |
|
|
|
|
| @app.get("/api/deals/quarterly-recurring-metrics") |
| async def quarterly_recurring_metrics( |
| owner_user_id: Optional[int] = Query(None), |
| max_quarters: int = Query(12, ge=1, le=24), |
| period: str = Query("all"), |
| t: TenantContext = Depends(get_tenant_context), |
| ): |
| """ |
| Won deals → ARR (USD) roll-forward by calendar quarter from PO lines (interval per line + one-time lines). |
| Optional period=all|month|quarter|year filters by won_at. |
| When period is not all, includes comparison roll-forward for the prior calendar period (last month / quarter / year). |
| """ |
| db = t.db |
| q = db.query(CrmDeal).filter( |
| CrmDeal.tenant_id == t.tenant_id, |
| CrmDeal.stage == "won", |
| ) |
| if t.role != "admin": |
| q = q.filter(or_(CrmDeal.owner_user_id == t.user_id, CrmDeal.owner_user_id.is_(None))) |
| elif owner_user_id is not None: |
| q = q.filter(CrmDeal.owner_user_id == int(owner_user_id)) |
| rows = q.all() |
| bounds = _dashboard_period_bounds_utc(period) |
| deals_payload = [] |
| for r in rows: |
| wa = getattr(r, "won_at", None) |
| if not _won_at_within_bounds(wa, bounds): |
| continue |
| deals_payload.append( |
| { |
| "id": r.id, |
| "name": r.name or "", |
| "revenue_type": (getattr(r, "revenue_type", None) or "arr"), |
| "won_line_items": _won_line_items_list(r), |
| "won_at": wa, |
| } |
| ) |
| base = build_quarterly_board(deals_payload, max_quarters=max_quarters) |
| pnorm = (period or "all").strip().lower() |
| if pnorm in ("", "all"): |
| return {**base, "comparison": None} |
|
|
| bounds_prev = _previous_period_bounds_utc(period) |
| deals_prev: list[dict] = [] |
| if bounds_prev: |
| for r in rows: |
| wa = getattr(r, "won_at", None) |
| if not _won_at_within_bounds(wa, bounds_prev): |
| continue |
| deals_prev.append( |
| { |
| "id": r.id, |
| "name": r.name or "", |
| "revenue_type": (getattr(r, "revenue_type", None) or "arr"), |
| "won_line_items": _won_line_items_list(r), |
| "won_at": wa, |
| } |
| ) |
| comp = build_quarterly_board(deals_prev, max_quarters=max_quarters) |
| return { |
| **base, |
| "comparison": { |
| "period_label": _comparison_period_label(period), |
| "quarters": comp.get("quarters", []), |
| }, |
| } |
|
|
|
|
| @app.post("/api/deals") |
| async def create_deal(body: CrmDealCreateRequest, t: TenantContext = Depends(get_tenant_context)): |
| stage = (body.stage or "new").strip().lower() or "new" |
| if stage not in DEAL_STAGE_ALLOWED: |
| raise HTTPException( |
| status_code=400, |
| detail=f"stage must be one of: {sorted(DEAL_STAGE_ALLOWED)}", |
| ) |
| if stage == "lost": |
| raise HTTPException( |
| status_code=400, |
| detail="Cannot create a deal in Lost. Create the deal in another stage, then move it to Lost.", |
| ) |
| if stage == "won": |
| raise HTTPException( |
| status_code=400, |
| detail="Cannot create a deal as Won. Create the deal in another stage, then move it to Won with billing details.", |
| ) |
| db = t.db |
| raw_name = _safe_str(body.name) if body.name is not None else "" |
| name = raw_name or "Untitled deal" |
| creator = db.query(User).filter(User.id == t.user_id).first() |
| row = CrmDeal( |
| tenant_id=t.tenant_id, |
| name=name, |
| stage=stage, |
| owner_user_id=t.user_id, |
| owner_initials=_user_initials_from_user(creator), |
| deal_value=None, |
| revenue_type="arr", |
| contact_display="", |
| account_name="", |
| expected_close_date=None, |
| close_probability=10, |
| country="", |
| last_interaction_at=None, |
| source_lead_id=None, |
| source_campaign_name="", |
| contact_id=None, |
| ) |
| db.add(row) |
| db.commit() |
| db.refresh(row) |
| return _enrich_deal_response(db, row) |
|
|
|
|
| @app.get("/api/deals/{deal_id}") |
| async def get_deal(deal_id: int, t: TenantContext = Depends(get_tenant_context)): |
| db = t.db |
| row = ( |
| db.query(CrmDeal) |
| .filter(CrmDeal.tenant_id == t.tenant_id, CrmDeal.id == deal_id) |
| .first() |
| ) |
| if not row: |
| raise HTTPException(status_code=404, detail="Deal not found") |
| _deal_access_or_403(row, t) |
| return _enrich_deal_response(db, row) |
|
|
|
|
| @app.patch("/api/deals/{deal_id}") |
| async def patch_deal(deal_id: int, body: CrmDealPatchRequest, t: TenantContext = Depends(get_tenant_context)): |
| db = t.db |
| row = ( |
| db.query(CrmDeal) |
| .filter(CrmDeal.tenant_id == t.tenant_id, CrmDeal.id == deal_id) |
| .first() |
| ) |
| if not row: |
| raise HTTPException(status_code=404, detail="Deal not found") |
| _deal_access_or_403(row, t) |
| data = body.model_dump(exclude_unset=True) |
| if not data: |
| raise HTTPException(status_code=400, detail="No fields to update") |
| did_mark_won_this_request = False |
| if "owner_initials" in data: |
| del data["owner_initials"] |
| if "owner_user_id" in data: |
| val = data.pop("owner_user_id") |
| mids = _tenant_member_ids(db, t.tenant_id) |
| if t.role == "admin": |
| if val is None: |
| row.owner_user_id = None |
| row.owner_initials = "" |
| else: |
| vid = int(val) |
| if vid not in mids: |
| raise HTTPException( |
| status_code=400, |
| detail="Owner must be a member of this workspace", |
| ) |
| ou = db.query(User).filter(User.id == vid).first() |
| if not ou: |
| raise HTTPException(status_code=404, detail="User not found") |
| row.owner_user_id = vid |
| row.owner_initials = _user_initials_from_user(ou) |
| else: |
| if val is None: |
| raise HTTPException( |
| status_code=403, detail="Only admins can set a deal to unassigned" |
| ) |
| if int(val) != int(t.user_id): |
| raise HTTPException( |
| status_code=403, |
| detail="Only admins can assign deals to other users", |
| ) |
| if row.owner_user_id is not None and int(row.owner_user_id) != int(t.user_id): |
| raise HTTPException( |
| status_code=403, |
| detail="You can only take ownership of unassigned deals", |
| ) |
| claimer = db.query(User).filter(User.id == t.user_id).first() |
| row.owner_user_id = t.user_id |
| row.owner_initials = _user_initials_from_user(claimer) |
| if "contact_id" in data: |
| cid = data["contact_id"] |
| if cid is None: |
| row.contact_id = None |
| else: |
| contact = ( |
| db.query(Contact) |
| .filter( |
| Contact.tenant_id == t.tenant_id, |
| Contact.id == int(cid), |
| ) |
| .first() |
| ) |
| if not contact: |
| raise HTTPException(status_code=404, detail="Contact not found") |
| row.contact_id = contact.id |
| row.contact_display = _format_contact_display(contact) |
| if "account_name" not in data and _safe_str(contact.company): |
| row.account_name = _safe_str(contact.company) |
| if "name" in data: |
| row.name = _safe_str(data["name"]) |
| if "contact_display" in data: |
| row.contact_display = _safe_str(data["contact_display"]) |
| if "account_name" in data: |
| row.account_name = _safe_str(data["account_name"]) |
| if "stage" in data: |
| new_stage = data["stage"] |
| if new_stage not in DEAL_STAGE_ALLOWED: |
| raise HTTPException( |
| status_code=400, |
| detail=f"stage must be one of: {sorted(DEAL_STAGE_ALLOWED)}", |
| ) |
| prev_stage = (row.stage or "new").strip().lower() |
| if new_stage == "lost": |
| _clear_won_billing_fields(row) |
| lr = _safe_str(data.get("lost_reason", "")).strip().lower() |
| if not lr or lr not in DEAL_LOST_REASON_ALLOWED: |
| raise HTTPException( |
| status_code=400, |
| detail=( |
| "When marking a deal as Lost, lost_reason is required. " |
| f"Use one of: {', '.join(sorted(DEAL_LOST_REASON_ALLOWED))}." |
| ), |
| ) |
| if lr == "price": |
| pct = data.get("lost_price_discount_pct") |
| if pct is None: |
| raise HTTPException( |
| status_code=400, |
| detail="lost_price_discount_pct (0–100) is required when lost_reason is price.", |
| ) |
| try: |
| pf = float(pct) |
| except (TypeError, ValueError): |
| raise HTTPException( |
| status_code=400, |
| detail="lost_price_discount_pct must be a number.", |
| ) |
| if pf < 0 or pf > 100: |
| raise HTTPException( |
| status_code=400, |
| detail="lost_price_discount_pct must be between 0 and 100.", |
| ) |
| row.lost_reason = lr |
| row.lost_price_discount_pct = pf |
| row.lost_chose_product_name = "" |
| elif lr == "chose_another": |
| pn = _safe_str(data.get("lost_chose_product_name") or "") |
| if not pn: |
| raise HTTPException( |
| status_code=400, |
| detail="lost_chose_product_name is required when lost_reason is chose_another.", |
| ) |
| row.lost_reason = lr |
| row.lost_price_discount_pct = None |
| row.lost_chose_product_name = pn |
| else: |
| row.lost_reason = lr |
| row.lost_price_discount_pct = None |
| row.lost_chose_product_name = "" |
| row.stage = "lost" |
| elif new_stage == "won": |
| wb_raw = data.pop("won_billing", None) |
| if wb_raw is None: |
| raise HTTPException( |
| status_code=400, |
| detail="won_billing is required when marking a deal as Won (PO, customer, line items, notes).", |
| ) |
| try: |
| wb = WonBillingPayload.model_validate(wb_raw) |
| except ValidationError as e: |
| raise HTTPException(status_code=400, detail=str(e)[:3000]) |
| items_out = [] |
| subtotal = 0.0 |
| for li in wb.line_items: |
| amt = round(float(li.qty) * float(li.rate), 2) |
| subtotal += amt |
| items_out.append( |
| { |
| "product_service": _safe_str(li.product_service), |
| "description": _safe_str(li.description or ""), |
| "qty": float(li.qty), |
| "rate": float(li.rate), |
| "amount": amt, |
| "billing_interval": li.billing_interval, |
| "currency": li.currency, |
| } |
| ) |
| row.won_po_number = _safe_str(wb.po_number).strip() |
| row.won_customer_legal_name = _safe_str(wb.customer_legal_name).strip() |
| row.won_customer_address = _safe_str(wb.customer_address).strip() |
| row.won_contact_person_name = _safe_str(wb.contact_person_name).strip() |
| row.won_channel_partner_name = _safe_str(wb.channel_partner_name or "").strip() |
| row.won_note_to_customer = _safe_str(wb.note_to_customer).strip() |
| row.won_note_to_accounts = _safe_str(wb.note_to_accounts).strip() |
| row.won_line_items = items_out |
| |
| row.deal_value = int(round(subtotal)) |
| row.lost_reason = "" |
| row.lost_price_discount_pct = None |
| row.lost_chose_product_name = "" |
| row.stage = "won" |
| if prev_stage != "won": |
| row.won_at = datetime.utcnow() |
| did_mark_won_this_request = True |
| else: |
| if prev_stage == "lost": |
| row.lost_reason = "" |
| row.lost_price_discount_pct = None |
| row.lost_chose_product_name = "" |
| if prev_stage == "won": |
| _clear_won_billing_fields(row) |
| row.stage = new_stage |
| data.pop("lost_reason", None) |
| data.pop("lost_price_discount_pct", None) |
| data.pop("lost_chose_product_name", None) |
| data.pop("won_billing", None) |
| else: |
| data.pop("lost_reason", None) |
| data.pop("lost_price_discount_pct", None) |
| data.pop("lost_chose_product_name", None) |
| data.pop("won_billing", None) |
| if "deal_value" in data: |
| row.deal_value = data["deal_value"] |
| if "revenue_type" in data: |
| rt = _safe_str(data["revenue_type"]).lower() |
| if rt not in DEAL_REVENUE_TYPE_ALLOWED: |
| raise HTTPException( |
| status_code=400, |
| detail=f"revenue_type must be one of: {sorted(DEAL_REVENUE_TYPE_ALLOWED)}", |
| ) |
| row.revenue_type = rt |
| if "close_probability" in data: |
| row.close_probability = max(0, min(100, int(data["close_probability"]))) |
| if "country" in data: |
| row.country = _safe_str(data["country"]) |
| if "expected_close_date" in data: |
| ts = _to_datetime(data["expected_close_date"]) |
| row.expected_close_date = ts |
| if "last_interaction_at" in data: |
| ts = _to_datetime(data["last_interaction_at"]) |
| row.last_interaction_at = ts |
|
|
| c = None |
| if row.contact_id: |
| c = ( |
| db.query(Contact) |
| .filter( |
| Contact.tenant_id == t.tenant_id, |
| Contact.id == row.contact_id, |
| ) |
| .first() |
| ) |
| if "account_name" in data and c: |
| c.company = row.account_name |
| if c: |
| _sync_contact_raw_from_columns(c) |
| if any(k in data for k in COMPANY_DETAIL_PATCH_FIELDS): |
| _merge_contact_raw_company_details(c, data) |
| if "country_region" in data: |
| row.country = _safe_str(data["country_region"]) |
| if "country" in data and "country_region" not in data: |
| rd = dict(c.raw_data or {}) |
| rd["Country"] = _safe_str(data["country"]) |
| c.raw_data = rd |
| db.add(c) |
| row.updated_at = datetime.utcnow() |
| db.commit() |
| db.refresh(row) |
| out = _enrich_deal_response(db, row) |
| if did_mark_won_this_request: |
| sender = db.query(User).filter(User.id == int(t.user_id)).first() |
| if sender: |
| lines = _won_line_items_list(row) |
| st = sum(float(li.get("amount") or 0) for li in lines) if lines else 0.0 |
| ok, err = await _notify_tenant_admins_deal_won( |
| db, |
| int(t.tenant_id), |
| sender, |
| int(row.id), |
| row.name or "", |
| lines, |
| st, |
| row.won_po_number or "", |
| row.won_customer_legal_name or "", |
| row.won_customer_address or "", |
| row.won_contact_person_name or "", |
| row.won_channel_partner_name or "", |
| row.won_note_to_customer or "", |
| row.won_note_to_accounts or "", |
| ) |
| out["won_email_to_admins_sent"] = ok |
| if err: |
| out["won_email_error"] = err |
| return out |
|
|
|
|
| @app.delete("/api/deals/{deal_id}") |
| async def delete_deal(deal_id: int, t: TenantContext = Depends(get_tenant_context)): |
| db = t.db |
| row = ( |
| db.query(CrmDeal) |
| .filter(CrmDeal.tenant_id == t.tenant_id, CrmDeal.id == deal_id) |
| .first() |
| ) |
| if not row: |
| raise HTTPException(status_code=404, detail="Deal not found") |
| _deal_access_or_403(row, t) |
| db.delete(row) |
| db.commit() |
| return {"ok": True, "id": deal_id} |
|
|
|
|
| @app.post("/api/deals/seed-demo") |
| async def seed_demo_deals(t: TenantContext = Depends(get_tenant_context)): |
| db = t.db |
| tid = t.tenant_id |
| removed = ( |
| db.query(CrmDeal) |
| .filter(CrmDeal.tenant_id == tid, CrmDeal.name.like("DEMO: %")) |
| .delete(synchronize_session=False) |
| ) |
| now = datetime.utcnow() |
| samples = [ |
| { |
| "name": "DEMO: Ramco — Aus", |
| "stage": "discovery", |
| "owner_initials": "", |
| "deal_value": 10000, |
| "contact_display": "KENNETH WON", |
| "account_name": "Quanticsit", |
| "close_probability": 10, |
| "close_in_days": 120, |
| "country": "Australia", |
| }, |
| { |
| "name": "DEMO: HT Pilot", |
| "stage": "proposal", |
| "owner_initials": "", |
| "deal_value": 1600, |
| "contact_display": "SARAH LEE", |
| "account_name": "HT Logistics", |
| "close_probability": 60, |
| "close_in_days": 45, |
| "country": "Malaysia", |
| }, |
| { |
| "name": "DEMO: Northwind rollout", |
| "stage": "negotiation", |
| "owner_initials": "", |
| "deal_value": 45000, |
| "contact_display": "JAMES DIAZ", |
| "account_name": "Northwind Trading", |
| "close_probability": 40, |
| "close_in_days": 60, |
| "country": "USA", |
| }, |
| { |
| "name": "DEMO: Maple cold chain", |
| "stage": "new", |
| "owner_initials": "", |
| "deal_value": None, |
| "contact_display": "ALEX RUIZ", |
| "account_name": "Maple Cold", |
| "close_probability": 15, |
| "close_in_days": 90, |
| "country": "Canada", |
| }, |
| ] |
| for s in samples: |
| db.add( |
| CrmDeal( |
| tenant_id=tid, |
| name=s["name"], |
| stage=s["stage"], |
| owner_user_id=None, |
| owner_initials=s["owner_initials"], |
| deal_value=s["deal_value"], |
| contact_display=s["contact_display"], |
| account_name=s["account_name"], |
| expected_close_date=now + timedelta(days=s["close_in_days"]), |
| close_probability=s["close_probability"], |
| country=s["country"], |
| last_interaction_at=now - timedelta(days=1), |
| source_lead_id=None, |
| source_campaign_name="Demo", |
| ) |
| ) |
| db.commit() |
| return {"ok": True, "removed_previous_demo_rows": removed, "inserted": len(samples)} |
|
|
|
|
| @app.get("/api/leads/{lead_id}/smartlead-thread") |
| async def lead_smartlead_thread(lead_id: int, t: TenantContext = Depends(get_tenant_context)): |
| """Fetch full thread from Smartlead API (Admin API key required).""" |
| db = t.db |
| row = ( |
| db.query(CrmLead) |
| .filter(CrmLead.tenant_id == t.tenant_id, CrmLead.id == lead_id) |
| .first() |
| ) |
| if not row: |
| raise HTTPException(status_code=404, detail="Lead not found") |
| if not row.smartlead_lead_id or not row.campaign_id: |
| raise HTTPException(status_code=400, detail="Lead missing Smartlead campaign/lead id") |
| try: |
| lid = int(row.smartlead_lead_id) |
| except (TypeError, ValueError): |
| raise HTTPException(status_code=400, detail="Invalid smartlead_lead_id") |
| try: |
| client = SmartleadClient() |
| hist = client.get_message_history(str(row.campaign_id), lid) |
| except ValueError as e: |
| raise HTTPException(status_code=400, detail=str(e)) |
| except Exception as e: |
| raise HTTPException(status_code=502, detail=str(e)) |
| return {"history": hist} |
|
|
|
|
| |
| FRONTEND_DIST = Path(__file__).resolve().parents[2] / "frontend" / "dist" |
| INDEX_FILE = FRONTEND_DIST / "index.html" |
| FRONTEND_ASSETS = FRONTEND_DIST / "assets" |
|
|
| |
| _SPA_HTML_HEADERS = { |
| "Cache-Control": "no-store, no-cache, must-revalidate, max-age=0", |
| "Pragma": "no-cache", |
| } |
|
|
|
|
| def _spa_index_response() -> FileResponse: |
| return FileResponse(str(INDEX_FILE), headers=_SPA_HTML_HEADERS) |
|
|
|
|
| if FRONTEND_DIST.exists() and INDEX_FILE.is_file(): |
| |
| |
| if FRONTEND_ASSETS.is_dir(): |
| app.mount("/assets", StaticFiles(directory=str(FRONTEND_ASSETS)), name="frontend_assets") |
|
|
| @app.get("/") |
| def spa_index(): |
| return _spa_index_response() |
|
|
| @app.get("/{full_path:path}") |
| def spa_fallback(full_path: str): |
| if full_path.startswith("api"): |
| raise HTTPException(status_code=404, detail="Not Found") |
| return _spa_index_response() |
|
|