EMAILOUT / backend /app /main.py
Seth
update
e72db37
from fastapi import FastAPI, UploadFile, File, Depends, HTTPException, Query, Request
from fastapi.responses import FileResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from starlette.middleware.sessions import SessionMiddleware
from pathlib import Path
from sqlalchemy.orm import Session
from sqlalchemy import func, or_
import pandas as pd
import uuid
import os
import csv
import io
import concurrent.futures
from functools import partial
from typing import Any, Dict, List, Optional
import json
import asyncio
import math
import re
import logging
import requests
from urllib.parse import urlparse, unquote
from datetime import datetime, timedelta
from calendar import monthrange
from .database import (
get_db,
SessionLocal,
Tenant,
TenantMembership,
User,
UploadedFile,
Prompt,
GeneratedSequence,
SmartleadRun,
Contact,
CrmLead,
CrmDeal,
UnipileAccount,
LinkedinCampaign,
UnipileHostedAuthState,
OutreachSendReceipt,
)
from pydantic import ValidationError
from .models import (
UploadResponse,
PromptSaveRequest,
SequenceResponse,
SmartleadPushRequest,
SmartleadRunResponse,
CrmLeadPatchRequest,
BulkLeadIdsRequest,
BulkContactIdsRequest,
CrmDealPatchRequest,
CrmDealCreateRequest,
ContactCreateRequest,
ContactPatchRequest,
WonBillingPayload,
UnipileConnectRequest,
UnipileHostedLinkRequest,
UnipileMailboxHostedLinkRequest,
UserLinkedinPrefsPatch,
UserMailboxPrefsPatch,
LinkedinCampaignCreateRequest,
LinkedinCampaignGenerateRequest,
LinkedinCampaignExecuteRequest,
LinkedinCampaignPatchRequest,
)
from .gmail_invite import send_invite_email_via_gmail
from .gpt_service import generate_email_sequence, generate_linkedin_sequence, enrich_manual_contact_profile
from .smartlead_client import SmartleadClient
from .auth_routes import router as auth_router
from .tenant_deps import TenantContext, get_tenant_context
from .tenant_routes import router as tenant_router
from .outreach_routes import router as outreach_router
from .outreach_routes import parse_tracking_label, refresh_outreach_execution_stats_for_file, normalize_smtp_message_id
from .deal_revenue import build_quarterly_board
app = FastAPI()
# Session cookie for Google OAuth (must be before CORS if using credentials across origins)
_session_secret = os.environ.get("SESSION_SECRET", "dev-session-secret-change-in-production")
_is_https = os.environ.get("HTTPS_ONLY_COOKIES", "").strip() in ("1", "true", "yes")
app.add_middleware(
SessionMiddleware,
secret_key=_session_secret,
same_site="lax",
https_only=_is_https,
max_age=14 * 24 * 60 * 60,
)
_cors_raw = os.environ.get(
"CORS_ORIGINS",
"http://localhost:5173,http://127.0.0.1:5173,http://localhost:3000,http://127.0.0.1:3000",
)
_cors_origins = [o.strip() for o in _cors_raw.split(",") if o.strip()]
if not _cors_origins:
_cors_origins = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=_cors_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(auth_router)
app.include_router(tenant_router)
app.include_router(outreach_router)
# Create uploads directory
UPLOAD_DIR = Path("/data/uploads")
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
# UniPile credentials are read fresh on each request (see _load_unipile_env) so Spaces secret
# updates and stripping whitespace behave reliably. Module-level kept for backwards compat.
UNIPILE_API_BASE = os.getenv("UNIPILE_API_BASE", "").rstrip("/")
UNIPILE_API_KEY = os.getenv("UNIPILE_API_KEY", "")
FRONTEND_ORIGIN = os.getenv("FRONTEND_ORIGIN", "").rstrip("/")
UNIPILE_WEBHOOK_SECRET = os.getenv("UNIPILE_WEBHOOK_SECRET", "").strip()
UNIPILE_WEBHOOK_LOG_EVENTS = os.getenv("UNIPILE_WEBHOOK_LOG_EVENTS", "").strip().lower() in (
"1",
"true",
"yes",
)
_unipile_webhook_log = logging.getLogger("emailout.unipile_webhook")
# ---- API ----
def _safe_str(value):
if value is None:
return ""
if isinstance(value, float):
if math.isnan(value):
return ""
return str(value).strip()
return str(value).strip()
def _json_safe(value):
if value is None:
return None
if isinstance(value, float):
if math.isnan(value):
return None
return value
# Pandas may return numpy scalar types; use Python-native via json fallback
try:
json.dumps(value)
return value
except TypeError:
return _safe_str(value)
def _pick_field(row_dict: Dict, aliases: List[str]) -> str:
lowered = {str(k).strip().lower(): v for k, v in row_dict.items()}
for alias in aliases:
if alias in lowered:
return _safe_str(lowered[alias])
return ""
def _pick_from_raw(raw_data: Dict, aliases: List[str]) -> str:
if not isinstance(raw_data, dict):
return ""
lowered = {str(k).strip().lower(): v for k, v in raw_data.items()}
for alias in aliases:
if alias in lowered:
return _safe_str(lowered[alias])
return ""
def _pick_linkedin_url(raw_data: Dict) -> str:
return _pick_from_raw(
raw_data,
[
"linkedin",
"linkedin url",
"linkedin profile",
"person linkedin url",
"person linkedin profile url",
"linkedin profile url",
"linkedin public url",
"linkedin_url",
"linkedin profile link",
],
)
def _normalize_linkedin_url(raw: str) -> str:
"""Strip whitespace; upgrade http→https for stable display and parsing."""
s = _safe_str(raw)
if not s:
return ""
if s.startswith("http://"):
s = "https://" + s[len("http://") :]
return s
def _linkedin_public_identifier(raw: str) -> str:
"""
UniPile profile lookup expects the public slug from /in/{slug}, not a full URL.
Accepts plain slugs (e.g. satyanadella) as well.
"""
s = _normalize_linkedin_url(raw).strip()
if not s:
return ""
low = s.lower()
if "linkedin.com" in low:
try:
path = urlparse(s).path or ""
except Exception:
path = ""
path = unquote(path).strip("/")
parts = [p for p in path.split("/") if p]
for i, seg in enumerate(parts):
if seg.lower() == "in" and i + 1 < len(parts):
slug = parts[i + 1].strip()
return slug.split("?")[0].strip()
if seg.lower() == "sales" and i + 2 < len(parts) and parts[i + 1].lower() == "lead":
# /sales/lead/{slug}
return parts[i + 2].split("?")[0].strip()
return ""
# Already a slug / identifier (no URL)
return s.split("?")[0].strip("/").strip()
def _load_unipile_env() -> tuple[str, str]:
"""Read UniPile DSN and API key from the environment (strip whitespace / newlines from secrets)."""
base = (os.getenv("UNIPILE_API_BASE") or "").strip().rstrip("/")
key = (os.getenv("UNIPILE_API_KEY") or "").strip()
return base, key
def _require_unipile_config():
base, key = _load_unipile_env()
if not base or not key:
raise HTTPException(
status_code=400,
detail="UniPile is not configured. Set UNIPILE_API_BASE and UNIPILE_API_KEY in backend env (Hugging Face: Space → Settings → Secrets).",
)
def _unipile_headers():
_, key = _load_unipile_env()
return {
"X-API-KEY": key,
"accept": "application/json",
"content-type": "application/json",
}
def _unipile_call(method: str, path: str, payload: Optional[dict] = None):
"""
Raw UniPile HTTP call. Returns (status_code, parsed_body).
Raises HTTPException only on transport failure (not on 4xx/5xx responses).
"""
_require_unipile_config()
base, _ = _load_unipile_env()
url = f"{base}{path}"
try:
resp = requests.request(
method=method.upper(),
url=url,
headers=_unipile_headers(),
json=payload,
timeout=45,
)
except requests.RequestException as e:
raise HTTPException(status_code=502, detail=f"UniPile request failed: {str(e)}")
data = None
try:
data = resp.json()
except Exception:
data = {"raw": resp.text}
return resp.status_code, data
def _unipile_request(method: str, path: str, payload: Optional[dict] = None):
status, data = _unipile_call(method, path, payload)
if status >= 400:
msg = None
if isinstance(data, dict):
msg = (
data.get("message")
or data.get("detail")
or data.get("error")
or data.get("errors")
)
if isinstance(msg, (dict, list)):
msg = json.dumps(msg)
err_type = _safe_str(data.get("type"))
if status == 401 and err_type == "errors/missing_credentials":
msg = (
"UniPile rejected the request (missing or invalid API credentials). "
"In Hugging Face Space → Settings → Secrets, set UNIPILE_API_KEY to your UniPile API access token "
"(Dashboard → API keys) and UNIPILE_API_BASE to your UniPile API URL (no trailing slash). "
"Remove accidental quotes or spaces when pasting; redeploy/restart the Space after saving."
)
raise HTTPException(
status_code=status,
detail=msg or f"UniPile error ({status}): {data if not isinstance(data, dict) else json.dumps(data)}",
)
return data
def _linkedin_invite_note(text: str) -> str:
"""LinkedIn personalized invite notes are short; keep a safe upper bound."""
t = _safe_str(text).replace("\r\n", "\n").strip()
if len(t) <= 280:
return t
return t[:277].rstrip() + "..."
def _send_unipile_dm(account_id: str, provider_id: str, text: str):
"""Open (or reuse) a 1:1 LinkedIn chat and send a message."""
chat = _unipile_request(
"POST",
"/api/v1/chats",
{"account_id": account_id, "attendees": [provider_id]},
)
chat_id = _safe_str(chat.get("id") if isinstance(chat, dict) else "")
if not chat_id:
raise ValueError("UniPile chat id not found")
_unipile_request(
"POST",
f"/api/v1/chats/{requests.utils.quote(chat_id, safe='')}/messages",
{"text": text or ""},
)
def _public_origin_from_request(request: Request) -> str:
"""
Resolve browser-facing origin behind proxies (HF Spaces, ingress).
Falls back to request URL if forwarded headers are absent.
"""
fwd_proto = request.headers.get("x-forwarded-proto")
proto = (fwd_proto.split(",")[0].strip().lower() if fwd_proto else request.url.scheme or "https")
fwd_host = request.headers.get("x-forwarded-host")
host = (fwd_host.split(",")[0].strip() if fwd_host else request.headers.get("host") or request.url.netloc or "")
if host:
return f"{proto}://{host}".rstrip("/")
return str(request.base_url).rstrip("/")
_UNIPILE_ID_UUID_RE = re.compile(
r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$",
re.IGNORECASE,
)
def _is_unipile_nonsense_display_name(val: str, account_id_hint: str = "") -> bool:
"""Hosted-auth notify bodies store the state token in `name`; account ids are UUID-shaped."""
s = (val or "").strip()
if not s:
return True
if _UNIPILE_ID_UUID_RE.match(s):
return True
if account_id_hint and s == account_id_hint.strip():
return True
return False
def _extract_unipile_identity_from_layer(data: dict, account_id_hint: str = "") -> tuple[str, str]:
"""Read display name + avatar from one object (e.g. GET /accounts/{id} payload)."""
candidates_name = [
data.get("name"),
data.get("display_name"),
data.get("full_name"),
(data.get("user") or {}).get("name") if isinstance(data.get("user"), dict) else None,
(data.get("profile") or {}).get("name") if isinstance(data.get("profile"), dict) else None,
(data.get("account") or {}).get("name") if isinstance(data.get("account"), dict) else None,
]
candidates_avatar = [
data.get("picture"),
data.get("avatar"),
data.get("photo_url"),
data.get("image_url"),
(data.get("user") or {}).get("picture") if isinstance(data.get("user"), dict) else None,
(data.get("profile") or {}).get("picture") if isinstance(data.get("profile"), dict) else None,
(data.get("profile") or {}).get("avatar") if isinstance(data.get("profile"), dict) else None,
]
raw_name = next((_safe_str(x) for x in candidates_name if _safe_str(x)), "")
if _is_unipile_nonsense_display_name(raw_name, account_id_hint):
raw_name = ""
avatar = next((_safe_str(x) for x in candidates_avatar if _safe_str(x)), "")
return raw_name, avatar
def _extract_unipile_identity(data: Optional[dict]) -> tuple[str, str]:
"""
Best-effort extraction of profile display name and avatar URL from UniPile account payloads.
Prefer GET /accounts enrichment under `account_profile`; ignore hosted-auth token in top-level `name`.
"""
if not isinstance(data, dict):
return "", ""
account_id_hint = _safe_str(data.get("account_id"))
layers: list[dict] = []
ap = data.get("account_profile")
if isinstance(ap, dict) and ap:
layers.append(ap)
layers.append(data)
name, avatar = "", ""
for layer in layers:
n, av = _extract_unipile_identity_from_layer(layer, account_id_hint=account_id_hint)
if av and not avatar:
avatar = av
if n and not name:
name = n
if name and avatar:
break
return name, avatar
def _linkedin_account_display_name(row: UnipileAccount) -> str:
"""Human-facing profile name for UI (not the UniPile id or generic label)."""
meta = row.metadata_json if isinstance(row.metadata_json, dict) else {}
n = _extract_unipile_identity(meta)[0].strip()
if n:
return n
lab = (row.label or "").strip()
suffix = "'s LinkedIn account"
if lab.endswith(suffix) and len(lab) > len(suffix):
return lab[: -len(suffix)].strip()
return lab or "LinkedIn account"
_MAILBOX_PROVIDER_SET = ("GOOGLE", "MICROSOFT", "OUTLOOK", "MAIL", "IMAP")
def _mailbox_account_display_name(row: UnipileAccount) -> str:
meta = row.metadata_json if isinstance(row.metadata_json, dict) else {}
n = _extract_unipile_identity(meta)[0].strip()
if n:
return n
return (row.label or "").strip() or "Mailbox"
def _try_fetch_unipile_account_profile(account_id: str) -> dict:
"""
Optional enrichment call. If it fails, we just keep existing metadata.
"""
try:
out = _unipile_request("GET", f"/api/v1/accounts/{requests.utils.quote(account_id, safe='')}")
return out if isinstance(out, dict) else {}
except Exception:
return {}
def _contact_value(contact: Contact, field: str):
if field == "first_name":
return contact.first_name or _pick_from_raw(contact.raw_data, ["first name", "firstname", "first_name"])
if field == "last_name":
return contact.last_name or _pick_from_raw(contact.raw_data, ["last name", "lastname", "last_name"])
if field == "email":
return contact.email or _pick_from_raw(contact.raw_data, ["email", "work email", "email address", "secondary email", "tertiary email"])
if field == "company":
return contact.company or _pick_from_raw(contact.raw_data, ["company", "company name", "company name for emails", "organization name", "account name"])
if field == "title":
return contact.title or _pick_from_raw(contact.raw_data, ["title", "job title"])
if field == "file_id":
return contact.file_id or ""
if field == "created_at":
return contact.created_at.isoformat() if contact.created_at else ""
return (contact.raw_data or {}).get(field)
def _to_float(val):
s = _safe_str(val).replace(",", "")
if not s:
return None
try:
return float(s)
except Exception:
return None
def _to_datetime(val):
s = _safe_str(val)
if not s:
return None
try:
return datetime.fromisoformat(s.replace("Z", "+00:00"))
except Exception:
return None
CRM_STATUS_ALLOWED = frozenset({
"none",
"new_lead",
"contacted",
"qualified",
"unqualified",
})
DEAL_STAGE_ALLOWED = frozenset({
"new",
"discovery",
"proposal",
"negotiation",
"won",
"lost",
})
DEAL_LOST_REASON_ALLOWED = frozenset({
"price",
"no_decision",
"chose_another",
"didnt_meet_expectation",
"unknown",
})
DEAL_REVENUE_TYPE_ALLOWED = frozenset({"arr", "qrr", "mrr", "one_time"})
SMARTLEAD_IMPORT_FILE_ID = "smartlead-imports"
MANUAL_CONTACT_FILE_ID = "manual-contacts"
DEMO_CONTACTS_FILE_ID = "demo-contacts-crm"
def _contact_matches_apollo_filter(c: Contact, rule: dict) -> bool:
"""Single dynamic-field rule (same semantics as legacy single-field query)."""
field = _safe_str(rule.get("field"))
if not field:
return True
op_local = (_safe_str(rule.get("op") or "contains")).lower()
value = _safe_str(rule.get("value"))
from_value = _safe_str(rule.get("from_value"))
to_value = _safe_str(rule.get("to_value"))
cmp_value = value or from_value
v = _safe_str(cmp_value).lower()
v_from = from_value
v_to = to_value
field_val = _contact_value(c, field)
field_str = _safe_str(field_val).lower()
if op_local == "equals":
return field_str == v
if op_local == "from":
f_num, v_num = _to_float(field_val), _to_float(v_from or value)
if f_num is not None and v_num is not None:
return f_num >= v_num
f_dt, v_dt = _to_datetime(field_val), _to_datetime(v_from or value)
if f_dt and v_dt:
return f_dt >= v_dt
return field_str >= _safe_str(v_from or value).lower()
if op_local == "to":
f_num, v_num = _to_float(field_val), _to_float(v_to or value)
if f_num is not None and v_num is not None:
return f_num <= v_num
f_dt, v_dt = _to_datetime(field_val), _to_datetime(v_to or value)
if f_dt and v_dt:
return f_dt <= v_dt
return field_str <= _safe_str(v_to or value).lower()
if op_local == "between":
f_num, from_num, to_num = _to_float(field_val), _to_float(v_from), _to_float(v_to)
if f_num is not None and from_num is not None and to_num is not None:
return from_num <= f_num <= to_num
f_dt, from_dt, to_dt = _to_datetime(field_val), _to_datetime(v_from), _to_datetime(v_to)
if f_dt and from_dt and to_dt:
return from_dt <= f_dt <= to_dt
low, high = v_from.lower(), v_to.lower()
return low <= field_str <= high
return v in field_str
@app.on_event("startup")
def _migrate_lead_status_removed_option():
"""Map legacy attempted_to_contact rows to contacted once at startup."""
db = SessionLocal()
try:
db.query(CrmLead).filter(CrmLead.crm_status == "attempted_to_contact").update(
{CrmLead.crm_status: "contacted"},
synchronize_session=False,
)
db.commit()
except Exception:
db.rollback()
finally:
db.close()
def _strip_html_simple(text: str) -> str:
if not text:
return ""
t = re.sub(r"<[^>]+>", " ", text)
return re.sub(r"\s+", " ", t).strip()
def _parse_smartlead_reply_payload(body: dict) -> Optional[dict]:
"""Normalize Smartlead webhook JSON into fields for CrmLead. Returns None if not a reply or missing identity."""
if not isinstance(body, dict):
return None
nest = body.get("data") if isinstance(body.get("data"), dict) else {}
lead = body.get("lead") or nest.get("lead") or {}
if not isinstance(lead, dict):
lead = {}
reply = body.get("reply") or nest.get("reply") or {}
if not isinstance(reply, dict):
reply = {}
event = str(body.get("event") or body.get("event_type") or body.get("type") or "").upper()
has_reply_content = bool(
reply.get("body")
or reply.get("html_body")
or reply.get("text_body")
or body.get("reply_body")
or body.get("message")
or nest.get("reply_body")
)
is_reply_event = "REPLIED" in event or event in ("EMAIL_REPLIED", "LEAD_REPLIED")
if not is_reply_event and not has_reply_content:
return None
campaign_id = _safe_str(
body.get("campaign_id")
or nest.get("campaign_id")
or lead.get("campaign_id")
or body.get("campaignId")
)
sl_id = lead.get("lead_id") or lead.get("id") or body.get("lead_id") or nest.get("lead_id")
sl_id_str = _safe_str(sl_id) if sl_id is not None else ""
email = (
lead.get("email")
or lead.get("email_address")
or body.get("lead_email")
or body.get("email")
or nest.get("email")
)
email = _safe_str(email).lower()
if not email and not sl_id_str:
return None
reply_body = (
reply.get("body")
or reply.get("html_body")
or reply.get("text_body")
or body.get("reply_body")
or body.get("message")
or nest.get("reply_body")
or ""
)
reply_body = _strip_html_simple(_safe_str(reply_body))
subject = _safe_str(reply.get("subject") or body.get("subject") or body.get("reply_subject") or "")
if not reply_body and not subject:
return None
received_raw = (
reply.get("received_at")
or reply.get("created_at")
or body.get("received_at")
or body.get("timestamp")
or nest.get("received_at")
)
received_at = _to_datetime(received_raw) if received_raw else datetime.utcnow()
fn = _safe_str(lead.get("first_name") or body.get("first_name"))
ln = _safe_str(lead.get("last_name") or body.get("last_name"))
company = _safe_str(
lead.get("company_name")
or lead.get("company")
or body.get("company_name")
or nest.get("company_name")
)
title = _safe_str(lead.get("title") or lead.get("job_title") or body.get("title"))
campaign_name = _safe_str(body.get("campaign_name") or nest.get("campaign_name") or "")
return {
"smartlead_lead_id": sl_id_str,
"campaign_id": campaign_id,
"campaign_name": campaign_name,
"email": email,
"first_name": fn,
"last_name": ln,
"company_name": company,
"title": title,
"last_reply_subject": subject,
"last_reply_body": reply_body,
"last_reply_at": received_at,
"raw_webhook": body,
}
def _crm_lead_to_dict(row: CrmLead) -> dict:
return {
"id": row.id,
"smartlead_lead_id": row.smartlead_lead_id,
"campaign_id": row.campaign_id,
"campaign_name": row.campaign_name or "",
"email": row.email or "",
"first_name": row.first_name or "",
"last_name": row.last_name or "",
"company_name": row.company_name or "",
"title": row.title or "",
"last_reply_subject": row.last_reply_subject or "",
"last_reply_body": row.last_reply_body or "",
"last_reply_at": row.last_reply_at.isoformat() if row.last_reply_at else None,
"crm_status": row.crm_status or "new_lead",
"contact_id": row.contact_id,
"created_at": row.created_at.isoformat() if row.created_at else None,
"updated_at": row.updated_at.isoformat() if row.updated_at else None,
"company_details": _lead_company_details_dict(row),
}
def _move_lead_to_contacts_core(db: Session, lead: CrmLead) -> dict:
if lead.contact_id:
return {"ok": True, "contact_id": lead.contact_id, "message": "Already linked to Contacts"}
email = (lead.email or "").strip()
if not email:
return {"ok": False, "error": "Lead has no email"}
existing = (
db.query(Contact)
.filter(
Contact.tenant_id == lead.tenant_id,
func.lower(Contact.email) == email.lower(),
)
.first()
)
if existing:
lead.contact_id = existing.id
return {"ok": True, "contact_id": existing.id, "message": "Linked to existing contact"}
raw = {
"Company Name": lead.company_name or "",
"Title": lead.title or "",
"source": "smartlead_reply",
"smartlead_lead_id": lead.smartlead_lead_id,
"campaign_id": lead.campaign_id,
"last_reply_subject": lead.last_reply_subject,
"last_reply_body": lead.last_reply_body,
"smartlead_webhook": lead.raw_webhook,
}
contact = Contact(
tenant_id=lead.tenant_id,
file_id=SMARTLEAD_IMPORT_FILE_ID,
row_index=lead.id,
first_name=lead.first_name or "",
last_name=lead.last_name or "",
email=email,
company=lead.company_name or "",
title=lead.title or "",
source="smartlead",
raw_data=raw,
)
db.add(contact)
db.flush()
lead.contact_id = contact.id
return {"ok": True, "contact_id": contact.id, "message": "Contact created"}
CONTACTS_TO_LEADS_CAMPAIGN_ID = "contacts_import"
CONTACTS_TO_LEADS_CAMPAIGN_NAME = "Contacts"
def _convert_contact_to_lead_core(db: Session, contact: Contact) -> dict:
"""
Create a CrmLead row copied from a Contact. The contact row is kept; only deleted manually.
Links the lead via ``contact_id`` so CRM stays consistent. Skips if a lead with the same
email already exists.
"""
email = (_contact_value(contact, "email") or "").strip()
if not email:
return {"ok": False, "error": "Contact has no email"}
dup = (
db.query(CrmLead)
.filter(
CrmLead.tenant_id == contact.tenant_id,
func.lower(CrmLead.email) == email.lower(),
)
.first()
)
if dup:
return {"ok": False, "error": "A lead with this email already exists"}
fn = _contact_value(contact, "first_name") or ""
ln = _contact_value(contact, "last_name") or ""
co = _contact_value(contact, "company") or ""
ti = _contact_value(contact, "title") or ""
rd = contact.raw_data if isinstance(contact.raw_data, dict) else {}
company_snapshot = {
"Company Name": _safe_str(co or rd.get("Company Name")),
"Company Name for Emails": _safe_str(rd.get("Company Name for Emails")),
"Industry": _safe_str(rd.get("Industry")),
"# Employees": _safe_str(rd.get("# Employees")),
"Annual Revenue": _safe_str(rd.get("Annual Revenue")),
"Last Raised At": _safe_str(rd.get("Last Raised At")),
"Website": _safe_str(rd.get("Website")),
"City": _safe_str(rd.get("City")),
"State": _safe_str(rd.get("State")),
"Country": _safe_str(rd.get("Country")),
}
raw = {
"source": "contacts_convert",
"source_contact_id": contact.id,
"company_details": company_snapshot,
}
lead = CrmLead(
tenant_id=contact.tenant_id,
smartlead_lead_id=f"from-contact-{contact.id}",
campaign_id=CONTACTS_TO_LEADS_CAMPAIGN_ID,
campaign_name=CONTACTS_TO_LEADS_CAMPAIGN_NAME,
email=email,
first_name=fn,
last_name=ln,
company_name=co,
title=ti,
last_reply_subject="",
last_reply_body="Added from Contacts",
last_reply_at=datetime.utcnow(),
crm_status="new_lead",
contact_id=contact.id,
raw_webhook=raw,
)
db.add(lead)
db.flush()
return {"ok": True, "lead_id": lead.id}
def _deal_name_from_lead(lead: CrmLead) -> str:
person = " ".join(filter(None, [lead.first_name or "", lead.last_name or ""])).strip()
if lead.company_name and person:
return f"{lead.company_name}{person}"
if lead.company_name:
return lead.company_name or "Deal"
if person:
return person
return (lead.email or "").strip() or "Untitled deal"
def _user_initials_from_user(u: User | None) -> str:
if not u:
return ""
name = (u.name or "").strip()
em = (u.email or "").strip()
if name:
parts = name.split()
if len(parts) >= 2:
return (parts[0][:1] + parts[-1][:1]).upper()[:2]
return (parts[0][:2] or "?").upper()[:2]
if em:
return em[:2].upper()
return "?"
def _tenant_member_ids(db: Session, tenant_id: int) -> set[int]:
rows = (
db.query(TenantMembership.user_id)
.filter(TenantMembership.tenant_id == int(tenant_id))
.all()
)
return {int(r[0]) for r in rows}
def _deal_member_can_see(row: CrmDeal, user_id: int) -> bool:
if row.owner_user_id is None:
return True
return int(row.owner_user_id) == int(user_id)
def _deal_access_or_403(row: CrmDeal, t: TenantContext) -> None:
if t.role == "admin":
return
if not _deal_member_can_see(row, t.user_id):
raise HTTPException(status_code=403, detail="You do not have access to this deal")
def _dashboard_period_bounds_utc(period: str) -> Optional[tuple[datetime, datetime]]:
"""Bounds for filtering won deals by won_at (dashboard date presets). None = all time."""
p = (period or "all").strip().lower()
if p in ("", "all"):
return None
now = datetime.utcnow()
if p == "month":
start = datetime(now.year, now.month, 1)
last = monthrange(now.year, now.month)[1]
end = datetime(now.year, now.month, last, 23, 59, 59)
return start, end
if p == "quarter":
qi = (now.month - 1) // 3
sm = qi * 3 + 1
start = datetime(now.year, sm, 1)
em = sm + 2
if em == 12:
end = datetime(now.year, 12, 31, 23, 59, 59)
else:
end = datetime(now.year, em + 1, 1) - timedelta(microseconds=1)
return start, end
if p == "year":
return datetime(now.year, 1, 1), datetime(now.year, 12, 31, 23, 59, 59)
return None
def _previous_period_bounds_utc(period: str) -> Optional[tuple[datetime, datetime]]:
"""Calendar window immediately before the current preset (month → prior month, etc.)."""
p = (period or "all").strip().lower()
if p in ("", "all"):
return None
now = datetime.utcnow()
if p == "month":
if now.month == 1:
y, m = now.year - 1, 12
else:
y, m = now.year, now.month - 1
start = datetime(y, m, 1)
last = monthrange(y, m)[1]
end = datetime(y, m, last, 23, 59, 59)
return start, end
if p == "quarter":
qi = (now.month - 1) // 3
if qi == 0:
y = now.year - 1
sm = 10
else:
y = now.year
sm = (qi - 1) * 3 + 1
start = datetime(y, sm, 1)
em = sm + 2
if em == 12:
end = datetime(y, 12, 31, 23, 59, 59)
else:
end = datetime(y, em + 1, 1) - timedelta(microseconds=1)
return start, end
if p == "year":
py = now.year - 1
return datetime(py, 1, 1), datetime(py, 12, 31, 23, 59, 59)
return None
def _comparison_period_label(period: str) -> str:
p = (period or "all").strip().lower()
if p == "month":
return "Last month"
if p == "quarter":
return "Last quarter"
if p == "year":
return "Last year"
return ""
def _won_at_within_bounds(won_at: Optional[datetime], bounds: Optional[tuple[datetime, datetime]]) -> bool:
if bounds is None:
return True
if won_at is None:
return False
wdt = won_at.replace(tzinfo=None) if getattr(won_at, "tzinfo", None) else won_at
return bounds[0] <= wdt <= bounds[1]
def _deal_to_dict(d: CrmDeal) -> dict:
fv = None
if d.deal_value is not None and d.close_probability is not None:
fv = int(round(d.deal_value * (d.close_probability / 100.0)))
ecd = None
if d.expected_close_date:
try:
ecd = d.expected_close_date.date().isoformat()
except Exception:
ecd = d.expected_close_date.isoformat()
rt = (getattr(d, "revenue_type", None) or "arr").strip().lower()
if rt not in DEAL_REVENUE_TYPE_ALLOWED:
rt = "arr"
return {
"id": d.id,
"name": d.name or "",
"stage": d.stage or "new",
"owner_user_id": d.owner_user_id,
"owner_initials": d.owner_initials or "",
"deal_value": d.deal_value,
"revenue_type": rt,
"contact_display": d.contact_display or "",
"account_name": d.account_name or "",
"expected_close_date": ecd,
"close_probability": d.close_probability if d.close_probability is not None else 10,
"forecast_value": fv,
"last_interaction_at": d.last_interaction_at.isoformat() if d.last_interaction_at else None,
"country": d.country or "",
"source_lead_id": d.source_lead_id,
"source_campaign_name": d.source_campaign_name or "",
"contact_id": d.contact_id,
"created_at": d.created_at.isoformat() if d.created_at else None,
"lost_reason": _safe_str(getattr(d, "lost_reason", None) or ""),
"lost_price_discount_pct": getattr(d, "lost_price_discount_pct", None),
"lost_chose_product_name": _safe_str(getattr(d, "lost_chose_product_name", None) or ""),
"won_po_number": _safe_str(getattr(d, "won_po_number", None) or ""),
"won_customer_legal_name": _safe_str(getattr(d, "won_customer_legal_name", None) or ""),
"won_customer_address": _safe_str(getattr(d, "won_customer_address", None) or ""),
"won_contact_person_name": _safe_str(getattr(d, "won_contact_person_name", None) or ""),
"won_channel_partner_name": _safe_str(getattr(d, "won_channel_partner_name", None) or ""),
"won_note_to_customer": _safe_str(getattr(d, "won_note_to_customer", None) or ""),
"won_note_to_accounts": _safe_str(getattr(d, "won_note_to_accounts", None) or ""),
"won_line_items": _won_line_items_list(d),
"won_at": d.won_at.isoformat() if getattr(d, "won_at", None) else None,
}
def _won_line_items_list(d: CrmDeal) -> list:
raw = getattr(d, "won_line_items", None)
if raw is None:
return []
if isinstance(raw, list):
return raw
if isinstance(raw, str) and raw.strip():
try:
v = json.loads(raw)
return v if isinstance(v, list) else []
except Exception:
return []
return []
def _clear_won_billing_fields(row: CrmDeal) -> None:
row.won_po_number = ""
row.won_customer_legal_name = ""
row.won_customer_address = ""
row.won_contact_person_name = ""
row.won_channel_partner_name = ""
row.won_note_to_customer = ""
row.won_note_to_accounts = ""
row.won_line_items = None
row.won_at = None
def _won_currency_label(raw) -> str:
c = (_safe_str(raw) or "USD").strip().upper()
return "CAD" if c == "CAD" else "USD"
def _fmt_won_money(amount: float, currency_raw) -> str:
"""Plain-text money with symbol + ISO code for won-deal emails."""
lab = _won_currency_label(currency_raw)
sym = "CA$" if lab == "CAD" else "$"
try:
n = float(amount)
except (TypeError, ValueError):
n = 0.0
return f"{sym}{n:,.2f} {lab}"
def _won_billing_interval_label(raw) -> str:
"""Human-readable billing cadence (matches win popover)."""
s = (_safe_str(raw) or "monthly").strip().lower()
if s in ("one_time", "onetime", "one-time", "non-recurring"):
return "One-time"
if s in ("quarterly", "quarter", "every 3 months"):
return "Every 3 months"
if s in ("annual", "year", "yearly", "every year"):
return "Every year"
return "Every month"
async def _notify_tenant_admins_deal_won(
db: Session,
tenant_id: int,
sender: User,
deal_id: int,
deal_name: str,
lines: list,
subtotal: float,
po: str,
legal: str,
addr: str,
contact_person: str,
channel_partner: str,
note_customer: str,
note_accounts: str,
) -> tuple[bool, Optional[str]]:
"""Email each workspace admin using the marking user's Gmail (same as invites)."""
rt = (sender.google_refresh_token or "").strip()
from_email = (sender.email or "").strip()
if not rt or not from_email:
return (
False,
"Gmail is not connected for your account. The deal was saved as Won, but admins were not emailed automatically.",
)
tenant = db.query(Tenant).filter(Tenant.id == int(tenant_id)).first()
workspace_name = (tenant.name or "Workspace").strip() if tenant else "Workspace"
admins = (
db.query(User)
.join(TenantMembership, TenantMembership.user_id == User.id)
.filter(TenantMembership.tenant_id == int(tenant_id), TenantMembership.role == "admin")
.all()
)
recipients: list[str] = []
seen: set[str] = set()
for u in admins:
em = (u.email or "").strip().lower()
if em and em not in seen:
seen.add(em)
recipients.append((u.email or "").strip())
if not recipients:
return False, "No workspace admin email addresses were found."
subject = f'[Won deal] {deal_name or "Deal"} — PO {po[:80]}'
body_lines = [
f"A deal was marked Won in workspace \"{workspace_name}\".",
"",
f"Deal ID: {deal_id}",
f"Deal name: {deal_name}",
f"Marked won by: {(sender.name or '').strip() or sender.email} <{sender.email}>",
"",
f"PO number: {po}",
f"Customer legal name: {legal}",
"Customer address:",
addr,
f"Contact person: {contact_person}",
]
if channel_partner:
body_lines.append(f"Channel partner: {channel_partner}")
body_lines += [
"",
"Note to customer:",
note_customer,
"",
"Note to our accounts:",
note_accounts,
"",
"Products / services (line items):",
]
for i, li in enumerate(lines, 1):
ps = _safe_str(li.get("product_service", ""))
cur = _won_currency_label(li.get("currency"))
try:
qf = float(li.get("qty") or 0)
rf = float(li.get("rate") or 0)
af = float(li.get("amount") or 0)
except (TypeError, ValueError):
qf = rf = af = 0.0
cadence = _won_billing_interval_label(li.get("billing_interval"))
body_lines.append(
f" {i}. {ps} | {cadence} | Qty {qf:g} × {_fmt_won_money(rf, cur)} = {_fmt_won_money(af, cur)}"
)
desc = _safe_str(li.get("description", ""))
if desc:
body_lines.append(f" Description: {desc}")
body_lines.append("")
if lines:
distinct = {_won_currency_label(li.get("currency")) for li in lines}
if len(distinct) == 1:
only = distinct.pop()
body_lines.append(f"Subtotal: {_fmt_won_money(float(subtotal), only)}")
else:
body_lines.append(
"Subtotal (numeric sum of line amounts; USD and CAD lines are not converted between currencies): "
f"{subtotal:,.2f}"
)
else:
body_lines.append(f"Subtotal: {subtotal:,.2f}")
body_text = "\n".join(body_lines)
errs: list[str] = []
ok_any = False
for to_email in recipients:
try:
await send_invite_email_via_gmail(rt, from_email, to_email, subject, body_text)
ok_any = True
except Exception as e:
errs.append(f"{to_email}: {str(e)}")
if ok_any:
return True, ("; ".join(errs) if errs else None)
return False, "; ".join(errs) if errs else "Could not send notification email."
@app.get("/api/health")
def health():
return {"status": "ok"}
@app.get("/api/hello")
def hello():
return {"message": "Hello from FastAPI"}
@app.post("/api/upload-csv")
async def upload_csv(file: UploadFile = File(...), t: TenantContext = Depends(get_tenant_context)):
"""Upload and parse CSV file from Apollo"""
db = t.db
tenant_id = t.tenant_id
try:
# Generate unique file ID
file_id = str(uuid.uuid4())
file_path = UPLOAD_DIR / f"{file_id}.csv"
# Save file
content = await file.read()
with open(file_path, "wb") as f:
f.write(content)
# Parse CSV to count contacts
df = pd.read_csv(file_path)
contact_count = len(df)
# Save upload metadata
db_file = UploadedFile(
tenant_id=tenant_id,
file_id=file_id,
filename=file.filename,
contact_count=contact_count,
file_path=str(file_path)
)
db.add(db_file)
# Persist uploaded contacts for CRM/contacts views
for idx, row in df.iterrows():
row_dict = row.to_dict()
sanitized_raw_data = {str(k): _json_safe(v) for k, v in row_dict.items()}
contact = Contact(
tenant_id=tenant_id,
file_id=file_id,
row_index=idx + 1,
first_name=_pick_field(row_dict, ["first name", "firstname", "first_name"]),
last_name=_pick_field(row_dict, ["last name", "lastname", "last_name"]),
email=_pick_field(row_dict, ["email", "work email", "email address"]),
company=_pick_field(row_dict, ["company", "company name", "company name for emails", "organization name", "account name"]),
title=_pick_field(row_dict, ["title", "job title"]),
source="apollo_csv",
raw_data=sanitized_raw_data,
)
db.add(contact)
db.commit()
return {
"file_id": file_id,
"contact_count": contact_count,
"message": "File uploaded successfully"
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error uploading file: {str(e)}")
@app.get("/api/unipile/linkedin/campaign-defaults")
async def unipile_linkedin_campaign_defaults(t: TenantContext = Depends(get_tenant_context)):
"""
Combined prefs + accounts for sequence builder and campaign UI (saved LinkedIn profile name + default account).
"""
db = t.db
user_row = db.query(User).filter(User.id == t.user_id).first()
rows = (
db.query(UnipileAccount)
.filter(
UnipileAccount.tenant_id == t.tenant_id,
UnipileAccount.user_id == t.user_id,
UnipileAccount.provider == "LINKEDIN",
)
.order_by(UnipileAccount.created_at.desc(), UnipileAccount.id.desc())
.all()
)
accounts = []
for r in rows:
ident = _extract_unipile_identity(r.metadata_json or {})
accounts.append(
{
"id": r.id,
"label": r.label or "LinkedIn account",
"display_name": _linkedin_account_display_name(r),
"avatar_url": ident[1] or "",
"provider": r.provider,
"unipile_account_id": r.unipile_account_id,
"status": r.status or "unknown",
"auth_mode": r.auth_mode or "hosted",
"created_at": r.created_at.isoformat() if r.created_at else None,
}
)
prof = (user_row.linkedin_profile_display_name or "").strip() if user_row else ""
default_ref = getattr(user_row, "default_unipile_account_ref_id", None) if user_row else None
# Ensure default still exists
valid_default = None
if default_ref is not None:
if any(a["id"] == default_ref for a in accounts):
valid_default = default_ref
return {
"linkedin_profile_display_name": prof,
"default_unipile_account_ref_id": valid_default,
"accounts": accounts,
}
@app.patch("/api/me/linkedin-prefs")
async def patch_me_linkedin_prefs(
body: UserLinkedinPrefsPatch,
t: TenantContext = Depends(get_tenant_context),
):
db = t.db
u = db.query(User).filter(User.id == t.user_id).first()
if not u:
raise HTTPException(status_code=404, detail="User not found")
patch = body.model_dump(exclude_unset=True)
if "linkedin_profile_display_name" in patch:
s = (patch["linkedin_profile_display_name"] or "").strip()
u.linkedin_profile_display_name = s[:200] if s else None
if "default_unipile_account_ref_id" in patch:
rid = patch["default_unipile_account_ref_id"]
if rid is None or rid == 0:
u.default_unipile_account_ref_id = None
else:
acc = (
db.query(UnipileAccount)
.filter(
UnipileAccount.id == rid,
UnipileAccount.tenant_id == t.tenant_id,
UnipileAccount.user_id == t.user_id,
)
.first()
)
if not acc:
raise HTTPException(status_code=404, detail="LinkedIn account not found for this user")
u.default_unipile_account_ref_id = rid
db.commit()
return {
"ok": True,
"linkedin_profile_display_name": (u.linkedin_profile_display_name or ""),
"default_unipile_account_ref_id": u.default_unipile_account_ref_id,
}
@app.get("/api/unipile/mailbox/campaign-defaults")
async def unipile_mailbox_campaign_defaults(t: TenantContext = Depends(get_tenant_context)):
db = t.db
user_row = db.query(User).filter(User.id == t.user_id).first()
rows = (
db.query(UnipileAccount)
.filter(
UnipileAccount.tenant_id == t.tenant_id,
UnipileAccount.user_id == t.user_id,
UnipileAccount.provider.in_(_MAILBOX_PROVIDER_SET),
)
.order_by(UnipileAccount.created_at.desc(), UnipileAccount.id.desc())
.all()
)
accounts = []
for r in rows:
ident = _extract_unipile_identity(r.metadata_json or {})
accounts.append(
{
"id": r.id,
"label": r.label or "Mailbox",
"display_name": _mailbox_account_display_name(r),
"avatar_url": ident[1] or "",
"provider": r.provider,
"unipile_account_id": r.unipile_account_id,
"status": r.status or "unknown",
"auth_mode": r.auth_mode or "hosted",
"created_at": r.created_at.isoformat() if r.created_at else None,
}
)
prof = (getattr(user_row, "mailbox_profile_display_name", None) or "").strip() if user_row else ""
default_ref = getattr(user_row, "default_mailbox_unipile_account_ref_id", None) if user_row else None
valid_default = None
if default_ref is not None and any(a["id"] == default_ref for a in accounts):
valid_default = default_ref
return {
"mailbox_profile_display_name": prof,
"default_mailbox_unipile_account_ref_id": valid_default,
"accounts": accounts,
}
@app.patch("/api/me/mailbox-prefs")
async def patch_me_mailbox_prefs(
body: UserMailboxPrefsPatch,
t: TenantContext = Depends(get_tenant_context),
):
db = t.db
u = db.query(User).filter(User.id == t.user_id).first()
if not u:
raise HTTPException(status_code=404, detail="User not found")
patch = body.model_dump(exclude_unset=True)
if "mailbox_profile_display_name" in patch:
s = (patch["mailbox_profile_display_name"] or "").strip()
u.mailbox_profile_display_name = s[:200] if s else None
if "default_mailbox_unipile_account_ref_id" in patch:
rid = patch["default_mailbox_unipile_account_ref_id"]
if rid is None or rid == 0:
u.default_mailbox_unipile_account_ref_id = None
else:
acc = (
db.query(UnipileAccount)
.filter(
UnipileAccount.id == rid,
UnipileAccount.tenant_id == t.tenant_id,
UnipileAccount.user_id == t.user_id,
UnipileAccount.provider.in_(_MAILBOX_PROVIDER_SET),
)
.first()
)
if not acc:
raise HTTPException(status_code=404, detail="Mailbox account not found for this user")
u.default_mailbox_unipile_account_ref_id = rid
db.commit()
return {
"ok": True,
"mailbox_profile_display_name": (u.mailbox_profile_display_name or ""),
"default_mailbox_unipile_account_ref_id": u.default_mailbox_unipile_account_ref_id,
}
@app.post("/api/unipile/mailbox/hosted-link")
async def create_unipile_mailbox_hosted_link(
body: UnipileMailboxHostedLinkRequest,
request: Request,
t: TenantContext = Depends(get_tenant_context),
):
origin = (FRONTEND_ORIGIN or "").strip().rstrip("/") or _public_origin_from_request(request)
if not origin:
raise HTTPException(status_code=400, detail="Could not determine frontend origin for hosted auth.")
token = str(uuid.uuid4())
expires_at = datetime.utcnow() + timedelta(minutes=20)
u = t.db.query(User).filter(User.id == t.user_id).first()
if u is not None and body.mailbox_profile_display_name is not None:
s = (body.mailbox_profile_display_name or "").strip()
if s:
u.mailbox_profile_display_name = s[:200]
state = UnipileHostedAuthState(
tenant_id=t.tenant_id,
user_id=t.user_id,
token=token,
provider="GOOGLE",
label=(body.label or "Mailbox").strip(),
expires_at=expires_at,
)
t.db.add(state)
t.db.commit()
expires_iso = expires_at.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
api_base, _ = _load_unipile_env()
payload = {
"type": "create",
"providers": ["GOOGLE"],
"api_url": api_base,
"expiresOn": expires_iso,
"notify_url": f"{origin}/api/unipile/mailbox/hosted-callback",
"success_redirect_url": f"{origin}/settings?mailbox=connected",
"failure_redirect_url": f"{origin}/settings?mailbox=failed",
"name": token,
}
response = _unipile_request("POST", "/api/v1/hosted/accounts/link", payload)
hosted_url = _safe_str(response.get("url") if isinstance(response, dict) else "")
if not hosted_url:
raise HTTPException(status_code=400, detail="UniPile did not return hosted auth URL")
return {"url": hosted_url}
@app.post("/api/unipile/mailbox/hosted-callback")
async def unipile_mailbox_hosted_callback(request: Request, db: Session = Depends(get_db)):
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Expected JSON body")
name_token = _safe_str((body or {}).get("name"))
unipile_account_id = _safe_str((body or {}).get("account_id"))
status = _safe_str((body or {}).get("status") or "connected")
if not name_token:
raise HTTPException(status_code=400, detail="Missing name token")
if not unipile_account_id:
return {"ok": True, "ignored": True, "reason": "missing_account_id"}
account_profile = _try_fetch_unipile_account_profile(unipile_account_id)
merged_meta = {}
if isinstance(body, dict):
merged_meta.update(body)
if isinstance(account_profile, dict) and account_profile:
merged_meta["account_profile"] = account_profile
state = (
db.query(UnipileHostedAuthState)
.filter(
UnipileHostedAuthState.token == name_token,
UnipileHostedAuthState.provider == "GOOGLE",
)
.first()
)
if not state:
return {"ok": True, "ignored": True, "reason": "unknown_token"}
if state.used_at is not None:
return {"ok": True, "ignored": True, "reason": "token_already_used"}
if state.expires_at and state.expires_at < datetime.utcnow():
return {"ok": True, "ignored": True, "reason": "token_expired"}
existing = (
db.query(UnipileAccount)
.filter(
UnipileAccount.tenant_id == state.tenant_id,
UnipileAccount.user_id == state.user_id,
UnipileAccount.provider.in_(_MAILBOX_PROVIDER_SET),
UnipileAccount.unipile_account_id == unipile_account_id,
)
.first()
)
user_row = db.query(User).filter(User.id == state.user_id).first()
box_name = _extract_unipile_identity(merged_meta)[0].strip()
if user_row is not None and box_name:
user_row.mailbox_profile_display_name = box_name[:200]
nice_label = (state.label or "").strip() or "Mailbox"
if box_name:
nice_label = f"{box_name}'s mailbox"
nice_label = nice_label[:255]
if existing:
existing.label = nice_label
existing.status = status or existing.status
existing.auth_mode = "hosted"
existing.metadata_json = merged_meta or body
existing.updated_at = datetime.utcnow()
if user_row is not None and getattr(user_row, "default_mailbox_unipile_account_ref_id", None) is None:
user_row.default_mailbox_unipile_account_ref_id = existing.id
else:
row = UnipileAccount(
tenant_id=state.tenant_id,
user_id=state.user_id,
label=nice_label,
provider="GOOGLE",
unipile_account_id=unipile_account_id,
status=status or "connected",
auth_mode="hosted",
metadata_json=merged_meta or body,
)
db.add(row)
db.flush()
acc_ref_id = row.id
if user_row is not None and getattr(user_row, "default_mailbox_unipile_account_ref_id", None) is None:
user_row.default_mailbox_unipile_account_ref_id = acc_ref_id
state.used_at = datetime.utcnow()
db.commit()
return {"ok": True}
@app.get("/api/unipile/mailbox/accounts")
async def list_unipile_mailbox_accounts(t: TenantContext = Depends(get_tenant_context)):
rows = (
t.db.query(UnipileAccount)
.filter(
UnipileAccount.tenant_id == t.tenant_id,
UnipileAccount.user_id == t.user_id,
UnipileAccount.provider.in_(_MAILBOX_PROVIDER_SET),
)
.order_by(UnipileAccount.created_at.desc(), UnipileAccount.id.desc())
.all()
)
out = []
for r in rows:
ident = _extract_unipile_identity(r.metadata_json or {})
out.append(
{
"id": r.id,
"label": r.label or "Mailbox",
"display_name": _mailbox_account_display_name(r),
"avatar_url": ident[1] or "",
"provider": r.provider,
"unipile_account_id": r.unipile_account_id,
"status": r.status or "unknown",
"auth_mode": r.auth_mode or "hosted",
"created_at": r.created_at.isoformat() if r.created_at else None,
}
)
return {"accounts": out}
@app.get("/api/unipile/linkedin/accounts")
async def list_unipile_linkedin_accounts(t: TenantContext = Depends(get_tenant_context)):
rows = (
t.db.query(UnipileAccount)
.filter(
UnipileAccount.tenant_id == t.tenant_id,
UnipileAccount.user_id == t.user_id,
UnipileAccount.provider == "LINKEDIN",
)
.order_by(UnipileAccount.created_at.desc(), UnipileAccount.id.desc())
.all()
)
acc_out = []
for r in rows:
ident = _extract_unipile_identity(r.metadata_json or {})
acc_out.append(
{
"id": r.id,
"label": r.label or "LinkedIn account",
"display_name": _linkedin_account_display_name(r),
"avatar_url": ident[1] or "",
"provider": r.provider,
"unipile_account_id": r.unipile_account_id,
"status": r.status or "unknown",
"auth_mode": r.auth_mode or "hosted",
"created_at": r.created_at.isoformat() if r.created_at else None,
}
)
return {"accounts": acc_out}
@app.post("/api/unipile/linkedin/hosted-link")
async def create_unipile_linkedin_hosted_link(
body: UnipileHostedLinkRequest,
request: Request,
t: TenantContext = Depends(get_tenant_context),
):
origin = (FRONTEND_ORIGIN or "").strip().rstrip("/") or _public_origin_from_request(request)
if not origin:
raise HTTPException(status_code=400, detail="Could not determine frontend origin for hosted auth.")
token = str(uuid.uuid4())
expires_at = datetime.utcnow() + timedelta(minutes=20)
u = t.db.query(User).filter(User.id == t.user_id).first()
if u is not None and body.linkedin_profile_display_name is not None:
s = (body.linkedin_profile_display_name or "").strip()
if s:
u.linkedin_profile_display_name = s[:200]
state = UnipileHostedAuthState(
tenant_id=t.tenant_id,
user_id=t.user_id,
token=token,
provider="LINKEDIN",
label=(body.label or "LinkedIn Account").strip(),
expires_at=expires_at,
)
t.db.add(state)
t.db.commit()
expires_iso = expires_at.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
api_base, _ = _load_unipile_env()
payload = {
"type": "create",
"providers": ["LINKEDIN"],
"api_url": api_base,
"expiresOn": expires_iso,
"notify_url": f"{origin}/api/unipile/linkedin/hosted-callback",
"success_redirect_url": f"{origin}/settings?linkedin=connected",
"failure_redirect_url": f"{origin}/settings?linkedin=failed",
"name": token,
}
response = _unipile_request("POST", "/api/v1/hosted/accounts/link", payload)
hosted_url = _safe_str(response.get("url") if isinstance(response, dict) else "")
if not hosted_url:
raise HTTPException(status_code=400, detail="UniPile did not return hosted auth URL")
return {"url": hosted_url}
@app.post("/api/unipile/linkedin/hosted-callback")
async def unipile_linkedin_hosted_callback(request: Request, db: Session = Depends(get_db)):
"""
UniPile Hosted Auth notify_url callback.
No session is available on this route (server-to-server call), so we map by stored token in `name`.
"""
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Expected JSON body")
name_token = _safe_str((body or {}).get("name"))
unipile_account_id = _safe_str((body or {}).get("account_id"))
status = _safe_str((body or {}).get("status") or "connected")
if not name_token:
raise HTTPException(status_code=400, detail="Missing name token")
if not unipile_account_id:
return {"ok": True, "ignored": True, "reason": "missing_account_id"}
account_profile = _try_fetch_unipile_account_profile(unipile_account_id)
merged_meta = {}
if isinstance(body, dict):
merged_meta.update(body)
if isinstance(account_profile, dict) and account_profile:
merged_meta["account_profile"] = account_profile
state = (
db.query(UnipileHostedAuthState)
.filter(
UnipileHostedAuthState.token == name_token,
UnipileHostedAuthState.provider == "LINKEDIN",
)
.first()
)
if not state:
return {"ok": True, "ignored": True, "reason": "unknown_token"}
if state.used_at is not None:
return {"ok": True, "ignored": True, "reason": "token_already_used"}
if state.expires_at and state.expires_at < datetime.utcnow():
return {"ok": True, "ignored": True, "reason": "token_expired"}
existing = (
db.query(UnipileAccount)
.filter(
UnipileAccount.tenant_id == state.tenant_id,
UnipileAccount.user_id == state.user_id,
UnipileAccount.provider == "LINKEDIN",
UnipileAccount.unipile_account_id == unipile_account_id,
)
.first()
)
user_row = db.query(User).filter(User.id == state.user_id).first()
li_name = _extract_unipile_identity(merged_meta)[0].strip()
if user_row is not None and li_name:
user_row.linkedin_profile_display_name = li_name[:200]
if li_name:
nice_label = f"{li_name}'s LinkedIn account"
else:
nice_label = (state.label or "").strip() or "LinkedIn account"
nice_label = nice_label[:255]
if existing:
existing.label = nice_label
existing.status = status or existing.status
existing.auth_mode = "hosted"
existing.metadata_json = merged_meta or body
existing.updated_at = datetime.utcnow()
if user_row is not None and getattr(user_row, "default_unipile_account_ref_id", None) is None:
user_row.default_unipile_account_ref_id = existing.id
else:
row = UnipileAccount(
tenant_id=state.tenant_id,
user_id=state.user_id,
label=nice_label,
provider="LINKEDIN",
unipile_account_id=unipile_account_id,
status=status or "connected",
auth_mode="hosted",
metadata_json=merged_meta or body,
)
db.add(row)
db.flush()
acc_ref_id = row.id
if user_row is not None and getattr(user_row, "default_unipile_account_ref_id", None) is None:
user_row.default_unipile_account_ref_id = acc_ref_id
state.used_at = datetime.utcnow()
db.commit()
return {"ok": True}
def _parse_unipile_webhook_datetime(raw) -> Optional[datetime]:
if not raw or not isinstance(raw, str):
return None
try:
s = raw.strip().replace("Z", "")
return datetime.fromisoformat(s)
except Exception:
return None
@app.post("/api/webhooks/unipile")
async def unipile_webhook(request: Request, db: Session = Depends(get_db)):
"""
UniPile webhooks (same URL for multiple UniPile registrations).
Supported ``event`` values:
* ``new_relation`` — LinkedIn invite accepted (source: users). Updates pending contacts.
* ``mail_opened``, ``mail_link_clicked`` — email tracking (source: ``email_tracking``).
* ``mail_received`` — mailbox email (source: ``email`` in UniPile API, “Email” / new-mail webhook in the dashboard). Replies matched via ``in_reply_to`` / ``tracking_id`` / ``label`` / RFC ``message_id``.
"""
if UNIPILE_WEBHOOK_SECRET:
auth_h = request.headers.get("Unipile-Auth") or request.headers.get("X-Unipile-Auth") or ""
if auth_h != UNIPILE_WEBHOOK_SECRET:
raise HTTPException(status_code=401, detail="Invalid webhook secret")
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Expected JSON body")
event = _safe_str((body or {}).get("event"))
if UNIPILE_WEBHOOK_LOG_EVENTS and isinstance(body, dict):
ir = body.get("in_reply_to")
_unipile_webhook_log.info(
"UniPile webhook event=%s role=%s keys=%s in_reply_to_keys=%s",
event,
body.get("role"),
sorted(body.keys()),
sorted(ir.keys()) if isinstance(ir, dict) else None,
)
now = datetime.utcnow()
if event == "new_relation":
account_id = _safe_str((body or {}).get("account_id"))
provider_id = _safe_str((body or {}).get("user_provider_id"))
if not account_id or not provider_id:
return {"ok": True, "ignored": True, "reason": "missing_ids"}
ua = (
db.query(UnipileAccount)
.filter(UnipileAccount.unipile_account_id == account_id)
.first()
)
if not ua:
return {"ok": True, "ignored": True, "reason": "unknown_account"}
contacts = (
db.query(Contact)
.filter(
Contact.tenant_id == ua.tenant_id,
Contact.unipile_provider_id == provider_id,
Contact.source.in_(("apollo_csv", "linkedin_campaign")),
)
.all()
)
updated = 0
for ct in contacts:
if ct.linkedin_connection_accepted_at:
continue
ct.linkedin_invite_pending = 0
ct.linkedin_connection_accepted_at = now
updated += 1
db.commit()
return {"ok": True, "updated_contacts": updated, "event": event}
if event in ("mail_opened", "mail_link_clicked"):
label = _safe_str((body or {}).get("label"))
parsed = parse_tracking_label(label)
if not parsed:
return {"ok": True, "ignored": True, "reason": "no_tracking_label"}
tenant_id, file_id, contact_id, step_slot = parsed
rec = (
db.query(OutreachSendReceipt)
.filter(
OutreachSendReceipt.tenant_id == tenant_id,
OutreachSendReceipt.file_id == file_id,
OutreachSendReceipt.contact_id == contact_id,
OutreachSendReceipt.step_slot == step_slot,
OutreachSendReceipt.channel == "gmail",
)
.first()
)
if not rec:
return {"ok": True, "ignored": True, "reason": "receipt_not_found"}
when = _parse_unipile_webhook_datetime((body or {}).get("date")) or now
if not rec.opened_at:
rec.opened_at = when
try:
rec.open_count = int(rec.open_count or 0) + 1
except (TypeError, ValueError):
rec.open_count = 1
db.commit()
refresh_outreach_execution_stats_for_file(db, tenant_id, file_id)
db.commit()
return {"ok": True, "event": event, "updated_receipt_id": rec.id}
if event == "mail_received":
account_id = _safe_str((body or {}).get("account_id"))
role = _safe_str((body or {}).get("role"))
if role != "inbox":
return {"ok": True, "ignored": True, "reason": "not_inbox"}
ua = (
db.query(UnipileAccount)
.filter(UnipileAccount.unipile_account_id == account_id)
.first()
)
if not ua:
return {"ok": True, "ignored": True, "reason": "unknown_account"}
tenant_id = int(ua.tenant_id)
in_reply = (body or {}).get("in_reply_to") or {}
parent_id = _safe_str(in_reply.get("id"))
parent_rfc = normalize_smtp_message_id(_safe_str(in_reply.get("message_id")))
trk = _safe_str((body or {}).get("tracking_id"))
label = _safe_str((body or {}).get("label"))
rec = None
if parent_id:
rec = (
db.query(OutreachSendReceipt)
.filter(
OutreachSendReceipt.tenant_id == tenant_id,
OutreachSendReceipt.channel == "gmail",
OutreachSendReceipt.unipile_message_id == parent_id,
)
.first()
)
if not rec and parent_rfc:
rec = (
db.query(OutreachSendReceipt)
.filter(
OutreachSendReceipt.tenant_id == tenant_id,
OutreachSendReceipt.channel == "gmail",
OutreachSendReceipt.smtp_message_id == parent_rfc,
)
.first()
)
if not rec and trk:
rec = (
db.query(OutreachSendReceipt)
.filter(
OutreachSendReceipt.tenant_id == tenant_id,
OutreachSendReceipt.channel == "gmail",
or_(
OutreachSendReceipt.unipile_tracking_id == trk,
OutreachSendReceipt.unipile_message_id == trk,
),
)
.first()
)
if not rec and label:
p2 = parse_tracking_label(label)
if p2:
tid2, fid2, cid2, slot2 = p2
if tid2 == tenant_id:
rec = (
db.query(OutreachSendReceipt)
.filter(
OutreachSendReceipt.tenant_id == tid2,
OutreachSendReceipt.file_id == fid2,
OutreachSendReceipt.contact_id == cid2,
OutreachSendReceipt.step_slot == slot2,
OutreachSendReceipt.channel == "gmail",
)
.first()
)
folders = (body or {}).get("folders") or []
subj = (_safe_str((body or {}).get("subject")) or "").lower()
bounce_like = any("bounce" in str(f).lower() for f in folders if f) or "undeliverable" in subj
has_reply_parent = bool(in_reply and (in_reply.get("id") or in_reply.get("message_id")))
if rec:
when = _parse_unipile_webhook_datetime((body or {}).get("date")) or now
if bounce_like and not rec.bounced_at:
rec.bounced_at = when
elif has_reply_parent and not bounce_like and not rec.replied_at:
rec.replied_at = when
db.commit()
refresh_outreach_execution_stats_for_file(db, rec.tenant_id, rec.file_id)
db.commit()
return {"ok": True, "event": event, "updated_receipt_id": rec.id}
return {"ok": True, "ignored": True, "reason": "no_matching_receipt"}
return {"ok": True, "ignored": True, "reason": "unsupported_event", "event": event or ""}
@app.get("/api/unipile/webhook-url-hint")
async def unipile_webhook_url_hint(request: Request):
"""Public URL your UniPile webhooks should POST to (same host as this API)."""
root = _public_origin_from_request(request)
post_url = f"{root}/api/webhooks/unipile"
return {
"post_url": post_url,
"auth_header": "Set Unipile-Auth or X-Unipile-Auth to UNIPILE_WEBHOOK_SECRET on each webhook registration.",
"registrations": [
{
"source": "users",
"events": ["new_relation"],
"doc": "https://developer.unipile.com/docs/detecting-accepted-invitations",
},
{
"source": "email_tracking",
"events": ["mail_opened", "mail_link_clicked"],
"doc": "https://developer.unipile.com/docs/tracking-email",
},
{
"source": "email",
"events": ["mail_received", "mail_sent", "mail_moved"],
"doc": "https://developer.unipile.com/docs/new-emails-webhook",
"dashboard_hint": "Create an **Email** (mailbox) webhook in the UniPile UI — not the **Messaging** (chat) webhook. Use the same callback URL.",
"note": "Reply attribution uses mail_received with role inbox and in_reply_to / tracking_id / label; enable mail_received for connected mail accounts.",
},
{
"source": "messaging",
"events": [
"message_received",
"message_read",
"message_reaction",
"message_edited",
"message_deleted",
"message_delivered",
],
"doc": "https://developer.unipile.com/docs/new-messages-webhook",
"note": "LinkedIn/chat channels — not used for Gmail mail_received.",
},
],
"debug_logging": "Set UNIPILE_WEBHOOK_LOG_EVENTS=1 to log each webhook's event name, role, and JSON keys (no body content).",
"latency_note": "UniPile may deliver new_relation many hours after acceptance; LinkedIn has no real-time connection API.",
}
@app.post("/api/unipile/linkedin/connect")
async def connect_unipile_linkedin(
body: UnipileConnectRequest,
t: TenantContext = Depends(get_tenant_context),
):
payload = {"provider": "LINKEDIN"}
if body.auth_mode == "credentials":
if not (body.username and body.password):
raise HTTPException(status_code=400, detail="username/password are required for credentials auth")
payload["username"] = body.username
payload["password"] = body.password
else:
if not body.access_token:
raise HTTPException(status_code=400, detail="access_token is required for cookie auth")
payload["access_token"] = body.access_token
if body.user_agent:
payload["user_agent"] = body.user_agent
if body.country:
payload["country"] = body.country
if body.ip:
payload["ip"] = body.ip
response = _unipile_request("POST", "/api/v1/accounts", payload)
account_id = _safe_str(response.get("account_id") if isinstance(response, dict) else "") or _safe_str(
response.get("id") if isinstance(response, dict) else ""
)
if not account_id:
# For checkpoint or alt response structures, still persist and expose raw response.
account_id = _safe_str((response or {}).get("account", {}).get("id") if isinstance(response, dict) else "")
if not account_id:
raise HTTPException(status_code=400, detail="UniPile did not return an account_id")
account_profile = _try_fetch_unipile_account_profile(account_id)
merged_meta = response if isinstance(response, dict) else {"raw": str(response)}
if isinstance(merged_meta, dict) and account_profile:
merged_meta["account_profile"] = account_profile
db_row = UnipileAccount(
tenant_id=t.tenant_id,
user_id=t.user_id,
label=(body.label or "LinkedIn account").strip(),
provider="LINKEDIN",
unipile_account_id=account_id,
status=_safe_str(response.get("status") if isinstance(response, dict) else "") or "connected",
auth_mode=body.auth_mode,
metadata_json=merged_meta,
)
t.db.add(db_row)
t.db.commit()
t.db.refresh(db_row)
return {
"message": "LinkedIn account connected",
"account": {
"id": db_row.id,
"label": db_row.label,
"unipile_account_id": db_row.unipile_account_id,
"status": db_row.status,
},
"unipile_response": response,
}
@app.get("/api/linkedin-campaigns")
async def list_linkedin_campaigns(t: TenantContext = Depends(get_tenant_context)):
rows = (
t.db.query(LinkedinCampaign, UnipileAccount)
.join(UnipileAccount, UnipileAccount.id == LinkedinCampaign.unipile_account_ref_id)
.filter(
LinkedinCampaign.tenant_id == t.tenant_id,
LinkedinCampaign.user_id == t.user_id,
UnipileAccount.tenant_id == t.tenant_id,
UnipileAccount.user_id == t.user_id,
)
.order_by(LinkedinCampaign.created_at.desc(), LinkedinCampaign.id.desc())
.all()
)
return {
"campaigns": [
{
"id": c.id,
"name": c.name,
"status": c.status,
"file_id": c.file_id,
"contact_count": c.contact_count or 0,
"prompt_template": c.prompt_template or "",
"unipile_account_ref_id": c.unipile_account_ref_id,
"unipile_account_label": a.label if a else "",
"created_at": c.created_at.isoformat() if c.created_at else None,
"executed_at": c.executed_at.isoformat() if c.executed_at else None,
"execution_result": c.execution_result,
"followup_interval_hours": c.followup_interval_hours if c.followup_interval_hours is not None else 72,
}
for c, a in rows
]
}
@app.patch("/api/linkedin-campaigns/{campaign_id}")
async def patch_linkedin_campaign(
campaign_id: int,
body: LinkedinCampaignPatchRequest,
t: TenantContext = Depends(get_tenant_context),
):
db = t.db
row = (
db.query(LinkedinCampaign)
.filter(
LinkedinCampaign.tenant_id == t.tenant_id,
LinkedinCampaign.user_id == t.user_id,
LinkedinCampaign.id == campaign_id,
)
.first()
)
if not row:
raise HTTPException(status_code=404, detail="Campaign not found")
if body.followup_interval_hours is not None:
row.followup_interval_hours = body.followup_interval_hours
row.updated_at = datetime.utcnow()
db.commit()
return {"message": "Campaign updated", "followup_interval_hours": row.followup_interval_hours}
@app.post("/api/linkedin-campaigns")
async def create_linkedin_campaign(
body: LinkedinCampaignCreateRequest,
t: TenantContext = Depends(get_tenant_context),
):
account = (
t.db.query(UnipileAccount)
.filter(
UnipileAccount.tenant_id == t.tenant_id,
UnipileAccount.user_id == t.user_id,
UnipileAccount.id == body.unipile_account_ref_id,
)
.first()
)
if not account:
raise HTTPException(status_code=404, detail="LinkedIn account not found")
name = (body.name or "").strip()
if not name:
raise HTTPException(status_code=400, detail="Campaign name is required")
row = LinkedinCampaign(
tenant_id=t.tenant_id,
user_id=t.user_id,
name=name,
status="draft",
unipile_account_ref_id=body.unipile_account_ref_id,
)
t.db.add(row)
t.db.commit()
t.db.refresh(row)
return {"campaign_id": row.id, "message": "Campaign created"}
@app.post("/api/linkedin-campaigns/{campaign_id}/upload-csv")
async def upload_linkedin_campaign_csv(
campaign_id: int,
file: UploadFile = File(...),
t: TenantContext = Depends(get_tenant_context),
):
db = t.db
campaign = (
db.query(LinkedinCampaign)
.filter(
LinkedinCampaign.tenant_id == t.tenant_id,
LinkedinCampaign.user_id == t.user_id,
LinkedinCampaign.id == campaign_id,
)
.first()
)
if not campaign:
raise HTTPException(status_code=404, detail="Campaign not found")
file_id = str(uuid.uuid4())
file_path = UPLOAD_DIR / f"{file_id}.csv"
content = await file.read()
with open(file_path, "wb") as f:
f.write(content)
df = pd.read_csv(file_path)
contact_count = len(df)
db_file = UploadedFile(
tenant_id=t.tenant_id,
file_id=file_id,
filename=file.filename,
contact_count=contact_count,
file_path=str(file_path),
)
db.add(db_file)
# Replace prior campaign contacts from old file, if any.
if campaign.file_id:
db.query(Contact).filter(
Contact.tenant_id == t.tenant_id,
Contact.file_id == campaign.file_id,
Contact.source == "linkedin_campaign",
).delete()
db.query(GeneratedSequence).filter(
GeneratedSequence.tenant_id == t.tenant_id,
GeneratedSequence.file_id == campaign.file_id,
GeneratedSequence.channel == "linkedin",
GeneratedSequence.product == campaign.name,
).delete()
linkedin_profiles_detected = 0
for idx, row in df.iterrows():
row_dict = row.to_dict()
sanitized_raw_data = {str(k): _json_safe(v) for k, v in row_dict.items()}
li_raw = _pick_linkedin_url(sanitized_raw_data)
if _linkedin_public_identifier(li_raw):
linkedin_profiles_detected += 1
contact = Contact(
tenant_id=t.tenant_id,
file_id=file_id,
row_index=idx + 1,
first_name=_pick_field(row_dict, ["first name", "firstname", "first_name"]),
last_name=_pick_field(row_dict, ["last name", "lastname", "last_name"]),
email=_pick_field(row_dict, ["email", "work email", "email address"]),
company=_pick_field(row_dict, ["company", "company name", "organization name", "account name"]),
title=_pick_field(row_dict, ["title", "job title"]),
source="linkedin_campaign",
raw_data=sanitized_raw_data,
)
db.add(contact)
campaign.file_id = file_id
campaign.contact_count = contact_count
campaign.status = "draft"
campaign.updated_at = datetime.utcnow()
db.commit()
return {
"file_id": file_id,
"contact_count": contact_count,
"linkedin_profiles_detected": linkedin_profiles_detected,
"message": "Campaign CSV uploaded",
}
@app.post("/api/linkedin-campaigns/{campaign_id}/generate")
async def generate_linkedin_campaign_sequences(
campaign_id: int,
body: LinkedinCampaignGenerateRequest,
t: TenantContext = Depends(get_tenant_context),
):
db = t.db
campaign = (
db.query(LinkedinCampaign)
.filter(
LinkedinCampaign.tenant_id == t.tenant_id,
LinkedinCampaign.user_id == t.user_id,
LinkedinCampaign.id == campaign_id,
)
.first()
)
if not campaign:
raise HTTPException(status_code=404, detail="Campaign not found")
if not campaign.file_id:
raise HTTPException(status_code=400, detail="Upload a campaign CSV first")
db_file = (
db.query(UploadedFile)
.filter(UploadedFile.tenant_id == t.tenant_id, UploadedFile.file_id == campaign.file_id)
.first()
)
if not db_file:
raise HTTPException(status_code=404, detail="Campaign file not found")
df = pd.read_csv(db_file.file_path)
db.query(GeneratedSequence).filter(
GeneratedSequence.tenant_id == t.tenant_id,
GeneratedSequence.file_id == campaign.file_id,
GeneratedSequence.channel == "linkedin",
).delete()
urow = db.query(User).filter(User.id == t.user_id).first()
li_sig = ""
if urow:
li_sig = (getattr(urow, "linkedin_profile_display_name", None) or "").strip() or (urow.name or "").strip()
sequence_id = 1
generated_rows = 0
for _, row in df.iterrows():
contact = row.to_dict()
li_list = generate_linkedin_sequence(
contact, body.prompt_template, campaign.name, sender_linkedin_name=li_sig or None
)
for seq_data in li_list:
db.add(
GeneratedSequence(
tenant_id=t.tenant_id,
file_id=campaign.file_id,
sequence_id=sequence_id,
email_number=seq_data["email_number"],
channel="linkedin",
first_name=seq_data["first_name"],
last_name=seq_data["last_name"],
email=seq_data["email"],
company=seq_data["company"],
title=seq_data.get("title", ""),
product=campaign.name,
subject=seq_data.get("subject") or "",
email_content=seq_data["email_content"],
)
)
generated_rows += 1
sequence_id += 1
campaign.prompt_template = body.prompt_template
campaign.status = "generated"
campaign.updated_at = datetime.utcnow()
db.commit()
return {"message": "LinkedIn sequences generated", "rows": generated_rows, "contacts": len(df)}
@app.get("/api/linkedin-campaigns/{campaign_id}/sequences")
async def get_linkedin_campaign_sequences(campaign_id: int, t: TenantContext = Depends(get_tenant_context)):
db = t.db
campaign = (
db.query(LinkedinCampaign)
.filter(
LinkedinCampaign.tenant_id == t.tenant_id,
LinkedinCampaign.user_id == t.user_id,
LinkedinCampaign.id == campaign_id,
)
.first()
)
if not campaign:
raise HTTPException(status_code=404, detail="Campaign not found")
if not campaign.file_id:
return {"sequences": []}
rows = (
db.query(GeneratedSequence)
.filter(
GeneratedSequence.tenant_id == t.tenant_id,
GeneratedSequence.file_id == campaign.file_id,
GeneratedSequence.channel == "linkedin",
)
.order_by(GeneratedSequence.sequence_id, GeneratedSequence.email_number)
.all()
)
return {
"sequences": [
{
"id": r.id,
"sequence_id": r.sequence_id,
"email_number": r.email_number,
"first_name": r.first_name or "",
"last_name": r.last_name or "",
"email": r.email or "",
"company": r.company or "",
"title": r.title or "",
"subject": r.subject or "",
"message": r.email_content or "",
}
for r in rows
]
}
@app.post("/api/linkedin-campaigns/{campaign_id}/execute")
async def execute_linkedin_campaign(
campaign_id: int,
body: LinkedinCampaignExecuteRequest,
t: TenantContext = Depends(get_tenant_context),
):
"""
Execute LinkedIn campaign via UniPile.
Sends a connection request using POST /users/invite with the first generated step as the
invite note (visible under LinkedIn → My Network → Manage invitations → Sent).
UniPile expects the public /in/{{slug}} identifier plus account_id on profile lookup — not a raw URL path.
If the invite cannot be sent (e.g. already connected), falls back to a 1:1 LinkedIn DM via chats API.
"""
db = t.db
campaign = (
db.query(LinkedinCampaign)
.filter(
LinkedinCampaign.tenant_id == t.tenant_id,
LinkedinCampaign.user_id == t.user_id,
LinkedinCampaign.id == campaign_id,
)
.first()
)
if not campaign:
raise HTTPException(status_code=404, detail="Campaign not found")
account = (
db.query(UnipileAccount)
.filter(
UnipileAccount.tenant_id == t.tenant_id,
UnipileAccount.user_id == t.user_id,
UnipileAccount.id == campaign.unipile_account_ref_id,
)
.first()
)
if not account:
raise HTTPException(status_code=404, detail="Connected LinkedIn account not found")
if not campaign.file_id:
raise HTTPException(status_code=400, detail="Campaign has no uploaded CSV")
contacts = (
db.query(Contact)
.filter(
Contact.tenant_id == t.tenant_id,
Contact.file_id == campaign.file_id,
Contact.source == "linkedin_campaign",
)
.order_by(Contact.row_index.asc(), Contact.id.asc())
.all()
)
seq_rows = (
db.query(GeneratedSequence)
.filter(
GeneratedSequence.tenant_id == t.tenant_id,
GeneratedSequence.file_id == campaign.file_id,
GeneratedSequence.channel == "linkedin",
GeneratedSequence.email_number == 1,
)
.order_by(GeneratedSequence.sequence_id.asc())
.all()
)
if not seq_rows:
raise HTTPException(
status_code=400,
detail="Generate LinkedIn sequences before executing the campaign.",
)
msg_by_seq = {r.sequence_id: r for r in seq_rows}
invite_sent = 0
dm_sent = 0
dry_preview = 0
skipped = 0
failed = 0
errors = []
attempts = []
acc_uid = account.unipile_account_id
def _contact_label(cr: Contact) -> str:
parts = [_safe_str(cr.first_name), _safe_str(cr.last_name)]
lab = " ".join(p for p in parts if p).strip()
return lab or "Contact"
for idx, c in enumerate(contacts, start=1):
first_msg = msg_by_seq.get(idx)
name_label = _contact_label(c)
if not first_msg:
skipped += 1
attempts.append(
{
"contact_id": c.id,
"name": name_label,
"status": "skipped",
"reason": "no_generated_message",
}
)
continue
raw_li = _pick_linkedin_url(c.raw_data or {})
url_disp = _normalize_linkedin_url(raw_li) or _safe_str(raw_li)
slug = _linkedin_public_identifier(raw_li)
if not slug:
skipped += 1
errors.append(
{
"contact_id": c.id,
"reason": "missing_linkedin_url",
"message": "No usable LinkedIn profile URL (expected Apollo-style Person Linkedin Url or /in/{{slug}}).",
}
)
attempts.append(
{
"contact_id": c.id,
"name": name_label,
"linkedin_url": url_disp,
"status": "skipped",
"reason": "missing_or_invalid_linkedin_url",
}
)
continue
note = _linkedin_invite_note(first_msg.email_content or "")
if body.dry_run:
dry_preview += 1
attempts.append(
{
"contact_id": c.id,
"name": name_label,
"linkedin_url": url_disp,
"public_identifier": slug,
"status": "dry_run",
"detail": "Dry run: no UniPile calls. Uncheck the Dry run checkbox to send connection invites.",
"invite_note_preview": note[:180],
}
)
continue
slug_enc = requests.utils.quote(slug, safe="")
acc_enc = requests.utils.quote(acc_uid, safe="")
st_prof, profile = _unipile_call("GET", f"/api/v1/users/{slug_enc}?account_id={acc_enc}")
if st_prof >= 400 or not isinstance(profile, dict):
failed += 1
detail = profile if isinstance(profile, dict) else {}
msg = (
detail.get("message")
or detail.get("detail")
or detail.get("error")
or (json.dumps(detail)[:800] if detail else "profile lookup failed")
)
if isinstance(msg, (dict, list)):
msg = json.dumps(msg)
errors.append({"contact_id": c.id, "reason": "profile_lookup_failed", "message": str(msg)})
attempts.append(
{
"contact_id": c.id,
"name": name_label,
"linkedin_url": url_disp,
"public_identifier": slug,
"status": "failed",
"step": "resolve_profile",
"detail": str(msg)[:500],
}
)
continue
provider_id = _safe_str(profile.get("provider_id"))
rel_hint = _safe_str(
profile.get("network_distance") or profile.get("distance") or ""
)
if not rel_hint and isinstance(profile.get("relation"), dict):
rel_hint = _safe_str((profile.get("relation") or {}).get("distance"))
if not provider_id:
failed += 1
errors.append(
{
"contact_id": c.id,
"reason": "missing_provider_id",
"message": "UniPile profile response had no provider_id.",
}
)
attempts.append(
{
"contact_id": c.id,
"name": name_label,
"linkedin_url": url_disp,
"public_identifier": slug,
"status": "failed",
"step": "resolve_profile",
"detail": "provider_id missing from UniPile profile payload",
}
)
continue
st_inv, inv_res = _unipile_call(
"POST",
"/api/v1/users/invite",
{"provider_id": provider_id, "account_id": acc_uid, "message": note},
)
if st_inv < 400:
invite_sent += 1
ts = datetime.utcnow()
c.unipile_provider_id = provider_id
c.linkedin_invite_pending = 1
c.linkedin_invite_sent_at = ts
c.linkedin_followup_next_email_number = 2
attempts.append(
{
"contact_id": c.id,
"name": name_label,
"linkedin_url": url_disp,
"public_identifier": slug,
"provider_id": provider_id,
"status": "invite_sent",
"network_distance": rel_hint or None,
"connection_hint": rel_hint or None,
}
)
continue
inv_err = ""
if isinstance(inv_res, dict):
inv_err = _safe_str(
inv_res.get("message") or inv_res.get("detail") or inv_res.get("error") or ""
)
if isinstance(inv_res.get("errors"), list):
inv_err = inv_err or json.dumps(inv_res.get("errors"))[:400]
try:
_send_unipile_dm(acc_uid, provider_id, first_msg.email_content or "")
dm_sent += 1
ts = datetime.utcnow()
c.unipile_provider_id = provider_id
c.linkedin_invite_pending = 0
c.linkedin_connection_accepted_at = ts
c.linkedin_last_followup_sent_at = ts
c.linkedin_followup_next_email_number = 2
attempts.append(
{
"contact_id": c.id,
"name": name_label,
"linkedin_url": url_disp,
"public_identifier": slug,
"provider_id": provider_id,
"status": "message_sent",
"network_distance": rel_hint or None,
"connection_hint": rel_hint or None,
"fallback_from_invite_error": inv_err or None,
"detail": "Connection invite was not sent (see fallback_from_invite_error); sent LinkedIn DM instead.",
}
)
except Exception as e:
failed += 1
errors.append(
{
"contact_id": c.id,
"reason": "invite_and_dm_failed",
"message": f"invite: {inv_err}; dm: {str(e)}",
}
)
attempts.append(
{
"contact_id": c.id,
"name": name_label,
"linkedin_url": url_disp,
"public_identifier": slug,
"status": "failed",
"step": "invite_and_message",
"invite_error": inv_err or None,
"detail": str(e),
}
)
total_ok = dry_preview if body.dry_run else (invite_sent + dm_sent)
result = {
"dry_run": body.dry_run,
"dry_run_preview_count": dry_preview,
"linkedin_invites_sent": invite_sent,
"direct_messages_sent": dm_sent,
"sent": total_ok,
"skipped": skipped,
"failed": failed,
"errors": errors[:200],
"attempts": attempts[:500],
"execution_kind": "linkedin_connection_invite",
"help": (
"Invites appear under LinkedIn → My Network → Manage invitations → Sent. "
"Follow-up steps are sent via POST /api/linkedin-campaigns/{id}/process-followups after acceptance "
"(UniPile users webhook new_relation marks acceptance; can lag hours)."
),
}
campaign.status = "completed" if failed == 0 else "failed"
campaign.execution_result = result
campaign.executed_at = datetime.utcnow()
campaign.updated_at = datetime.utcnow()
db.commit()
return {"message": "Campaign execution finished", **result}
@app.post("/api/linkedin-campaigns/{campaign_id}/process-followups")
async def process_linkedin_campaign_followups(
campaign_id: int,
t: TenantContext = Depends(get_tenant_context),
):
"""
Deliver generated sequence steps 2+ as LinkedIn DMs when due.
* Invite flow: waits for `linkedin_connection_accepted_at` (set by UniPile `new_relation` webhook or DM fallback).
* First follow-up (step 2) after invite: `accepted_at + followup_interval_hours`.
* If execute fell back to DM for step 1: step 2 waits `last_followup_sent_at + interval`.
Run this endpoint on a schedule (e.g. hourly cron); it only sends when the interval has elapsed.
"""
db = t.db
campaign = (
db.query(LinkedinCampaign)
.filter(
LinkedinCampaign.tenant_id == t.tenant_id,
LinkedinCampaign.user_id == t.user_id,
LinkedinCampaign.id == campaign_id,
)
.first()
)
if not campaign:
raise HTTPException(status_code=404, detail="Campaign not found")
account = (
db.query(UnipileAccount)
.filter(
UnipileAccount.tenant_id == t.tenant_id,
UnipileAccount.user_id == t.user_id,
UnipileAccount.id == campaign.unipile_account_ref_id,
)
.first()
)
if not account:
raise HTTPException(status_code=404, detail="Connected LinkedIn account not found")
if not campaign.file_id:
raise HTTPException(status_code=400, detail="Campaign has no uploaded CSV")
interval_h = campaign.followup_interval_hours if campaign.followup_interval_hours else 72
now = datetime.utcnow()
contacts = (
db.query(Contact)
.filter(
Contact.tenant_id == t.tenant_id,
Contact.file_id == campaign.file_id,
Contact.source == "linkedin_campaign",
)
.order_by(Contact.row_index.asc(), Contact.id.asc())
.all()
)
sent = 0
skipped = 0
failed = 0
details = []
for c in contacts:
n = c.linkedin_followup_next_email_number
if not n or not c.unipile_provider_id:
skipped += 1
continue
max_step = (
db.query(func.max(GeneratedSequence.email_number))
.filter(
GeneratedSequence.tenant_id == t.tenant_id,
GeneratedSequence.file_id == campaign.file_id,
GeneratedSequence.channel == "linkedin",
GeneratedSequence.sequence_id == c.row_index,
)
.scalar()
) or 0
if max_step < 2:
skipped += 1
continue
if n > max_step:
c.linkedin_followup_next_email_number = None
skipped += 1
continue
if c.linkedin_invite_pending and not c.linkedin_connection_accepted_at:
skipped += 1
continue
if n == 2:
if c.linkedin_last_followup_sent_at is None:
if not c.linkedin_connection_accepted_at:
skipped += 1
continue
if now < c.linkedin_connection_accepted_at + timedelta(hours=interval_h):
skipped += 1
continue
else:
if now < c.linkedin_last_followup_sent_at + timedelta(hours=interval_h):
skipped += 1
continue
else:
if not c.linkedin_last_followup_sent_at:
skipped += 1
continue
if now < c.linkedin_last_followup_sent_at + timedelta(hours=interval_h):
skipped += 1
continue
msg_row = (
db.query(GeneratedSequence)
.filter(
GeneratedSequence.tenant_id == t.tenant_id,
GeneratedSequence.file_id == campaign.file_id,
GeneratedSequence.channel == "linkedin",
GeneratedSequence.sequence_id == c.row_index,
GeneratedSequence.email_number == n,
)
.first()
)
if not msg_row:
skipped += 1
continue
try:
_send_unipile_dm(
account.unipile_account_id,
c.unipile_provider_id,
msg_row.email_content or "",
)
c.linkedin_last_followup_sent_at = now
c.linkedin_followup_next_email_number = n + 1 if n < max_step else None
sent += 1
details.append({"contact_id": c.id, "step": n, "status": "sent"})
except Exception as e:
failed += 1
details.append({"contact_id": c.id, "step": n, "status": "failed", "error": str(e)})
campaign.updated_at = datetime.utcnow()
db.commit()
return {
"message": "Follow-up pass finished",
"sent": sent,
"skipped": skipped,
"failed": failed,
"followup_interval_hours": interval_h,
"details": details[:200],
}
@app.get("/api/contact-fields")
async def contact_fields(t: TenantContext = Depends(get_tenant_context)):
"""Return all available contact field names from uploaded Apollo rows."""
db = t.db
contacts = db.query(Contact.raw_data).filter(Contact.tenant_id == t.tenant_id).all()
fields = set(["first_name", "last_name", "email", "company", "title", "file_id", "created_at"])
for item in contacts:
raw = item[0] or {}
if isinstance(raw, dict):
for key in raw.keys():
fields.add(str(key))
return {"fields": sorted(fields)}
@app.get("/api/contacts")
async def list_contacts(
search: str = Query("", description="Search by name/email/company/title"),
filters: str = Query(
"",
description='JSON array of {field, op, value?, from_value?, to_value?}; combined with AND',
),
field: str = Query("", description="Optional field name to filter (legacy single filter)"),
value: str = Query("", description="Optional field value to filter"),
op: str = Query("contains", description="Filter op: contains|equals|from|to|between"),
from_value: str = Query("", description="From value for range filters"),
to_value: str = Query("", description="To value for range filters"),
sort_by: str = Query("created_at", description="Sort field"),
sort_dir: str = Query("desc", description="Sort direction: asc|desc"),
limit: int = Query(200, ge=1, le=1000),
offset: int = Query(0, ge=0),
t: TenantContext = Depends(get_tenant_context),
):
db = t.db
query = db.query(Contact).filter(Contact.tenant_id == t.tenant_id)
if search:
pattern = f"%{search}%"
query = query.filter(
(Contact.first_name.ilike(pattern))
| (Contact.last_name.ilike(pattern))
| (Contact.email.ilike(pattern))
| (Contact.company.ilike(pattern))
| (Contact.title.ilike(pattern))
)
contacts_all = query.order_by(Contact.created_at.desc(), Contact.id.desc()).all()
# Multiple Apollo-field filters (AND). Prefer over legacy single-field params when non-empty.
if filters and filters.strip():
try:
rules = json.loads(filters)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid filters JSON")
if not isinstance(rules, list):
raise HTTPException(status_code=400, detail="filters must be a JSON array")
active_rules = [r for r in rules if isinstance(r, dict) and _safe_str(r.get("field"))]
if active_rules:
contacts_all = [
c
for c in contacts_all
if all(_contact_matches_apollo_filter(c, r) for r in active_rules)
]
elif field:
rule = {
"field": field,
"op": op,
"value": value,
"from_value": from_value,
"to_value": to_value,
}
contacts_all = [c for c in contacts_all if _contact_matches_apollo_filter(c, rule)]
# Sort by mapped or raw Apollo fields
reverse = (sort_dir or "desc").lower() != "asc"
sort_field = sort_by or "created_at"
def sort_key(c: Contact):
val = _contact_value(c, sort_field)
num = _to_float(val)
if num is not None:
return (0, num)
dt = _to_datetime(val)
if dt is not None:
return (1, dt.timestamp())
return (2, _safe_str(val).lower())
contacts_all = sorted(contacts_all, key=sort_key, reverse=reverse)
total = len(contacts_all)
contacts = contacts_all[offset:offset + limit]
return {
"total": total,
"contacts": [
{
"id": c.id,
"file_id": c.file_id,
"row_index": c.row_index,
"first_name": c.first_name or _pick_from_raw(c.raw_data, ["first name", "firstname", "first_name"]),
"last_name": c.last_name or _pick_from_raw(c.raw_data, ["last name", "lastname", "last_name"]),
"email": c.email or _pick_from_raw(c.raw_data, ["email", "work email", "email address", "secondary email", "tertiary email"]),
"company": c.company or _pick_from_raw(c.raw_data, ["company", "company name", "company name for emails", "organization name", "account name"]),
"title": c.title or _pick_from_raw(c.raw_data, ["title", "job title"]),
"source": c.source or "apollo_csv",
"created_at": c.created_at.isoformat() if c.created_at else None,
}
for c in contacts
],
}
@app.post("/api/contacts")
async def create_contact(body: ContactCreateRequest, t: TenantContext = Depends(get_tenant_context)):
"""Create a contact manually (inline table add). Email must be unique."""
db = t.db
email = _safe_str(body.email).lower()
if not email:
raise HTTPException(status_code=400, detail="Email is required")
exists = (
db.query(Contact)
.filter(
Contact.tenant_id == t.tenant_id,
func.lower(Contact.email) == email,
)
.first()
)
if exists:
raise HTTPException(status_code=409, detail="A contact with this email already exists")
fn = _safe_str(body.first_name)
ln = _safe_str(body.last_name)
co = _safe_str(body.company)
ti = _safe_str(body.title)
raw = {
"First Name": fn,
"Last Name": ln,
"Email": email,
"Company Name": co,
"Title": ti,
}
contact = Contact(
tenant_id=t.tenant_id,
file_id=MANUAL_CONTACT_FILE_ID,
row_index=0,
first_name=fn,
last_name=ln,
email=email,
company=co,
title=ti,
source="manual",
raw_data=raw,
)
db.add(contact)
db.commit()
db.refresh(contact)
return {
"id": contact.id,
"file_id": contact.file_id,
"row_index": contact.row_index,
"first_name": contact.first_name,
"last_name": contact.last_name,
"email": contact.email,
"company": contact.company,
"title": contact.title,
"source": contact.source or "manual",
"created_at": contact.created_at.isoformat() if contact.created_at else None,
}
@app.post("/api/contacts/bulk-delete")
async def bulk_delete_contacts(body: BulkContactIdsRequest, t: TenantContext = Depends(get_tenant_context)):
db = t.db
if not body.contact_ids:
raise HTTPException(status_code=400, detail="contact_ids required")
ids = list({int(x) for x in body.contact_ids})
deleted = (
db.query(Contact)
.filter(Contact.tenant_id == t.tenant_id, Contact.id.in_(ids))
.delete(synchronize_session=False)
)
db.commit()
return {"deleted": deleted}
@app.post("/api/contacts/seed-demo")
async def seed_demo_contacts(t: TenantContext = Depends(get_tenant_context)):
"""
Replace previous demo-seeded contacts and insert a variety of sample rows
(rich Apollo-style raw_data) for testing filters and UI.
"""
db = t.db
tenant_id = t.tenant_id
removed = (
db.query(Contact)
.filter(
Contact.tenant_id == tenant_id,
Contact.file_id == DEMO_CONTACTS_FILE_ID,
)
.delete(synchronize_session=False)
)
specs = [
{
"fn": "Morgan",
"ln": "Lee",
"email": "morgan.lee@blueharbor.demo",
"co": "Blue Harbor Analytics",
"ti": "Chief Revenue Officer",
"raw": {
"Company Name": "Blue Harbor Analytics",
"Company Name for Emails": "BlueHarbor",
"Industry": "Business intelligence & analytics software",
"# Employees": "51-200",
"Annual Revenue": "$8,200,000",
"Last Raised At": "2023-11",
"Website": "https://www.blueharbor-analytics.demo",
"City": "Seattle",
"State": "WA",
"Country": "United States",
},
},
{
"fn": "Priya",
"ln": "Shah",
"email": "priya.shah@lumengrid.demo",
"co": "LumenGrid Energy",
"ti": "Director of Procurement",
"raw": {
"Company Name": "LumenGrid Energy",
"Industry": "Renewable energy infrastructure",
"# Employees": "201-500",
"Annual Revenue": "$45,000,000",
"Website": "https://lumengrid.demo",
"City": "Toronto",
"State": "ON",
"Country": "Canada",
},
},
{
"fn": "Diego",
"ln": "Ramos",
"email": "diego.ramos@vertexledger.demo",
"co": "Vertex Ledger",
"ti": "Controller",
"raw": {
"Company Name": "Vertex Ledger",
"Industry": "Enterprise document and content management software",
"# Employees": "11-50",
"Annual Revenue": "$3,100,000",
"Website": "https://vertexledger.demo",
"City": "Austin",
"State": "TX",
"Country": "United States",
},
},
{
"fn": "Elena",
"ln": "Volkov",
"email": "elena.volkov@northpole.demo",
"co": "Northpole Freight Co.",
"ti": "Operations Manager",
"raw": {
"Company Name": "Northpole Freight Co.",
"Industry": "Logistics & Supply Chain",
"# Employees": "501-1000",
"Annual Revenue": "$120,000,000",
"Website": "https://northpole-freight.demo",
"City": "Hamburg",
"State": "",
"Country": "Germany",
},
},
{
"fn": "James",
"ln": "Okonkwo",
"email": "james.okonkwo@silverfin.demo",
"co": "Silverfin Capital",
"ti": "Investment Analyst",
"raw": {
"Company Name": "Silverfin Capital",
"Industry": "Financial services",
"# Employees": "51-200",
"Annual Revenue": "$22,500,000",
"Website": "https://silverfin.demo",
"City": "London",
"State": "",
"Country": "United Kingdom",
},
},
{
"fn": "Yuki",
"ln": "Tanaka",
"email": "yuki.tanaka@skyforge.demo",
"co": "Skyforge Robotics",
"ti": "Head of Engineering",
"raw": {
"Company Name": "Skyforge Robotics",
"Industry": "Industrial automation",
"# Employees": "201-500",
"Annual Revenue": "$67,000,000",
"Website": "https://skyforge.demo",
"City": "Osaka",
"State": "",
"Country": "Japan",
},
},
{
"fn": "Amira",
"ln": "Hassan",
"email": "amira.hassan@desertwave.demo",
"co": "Desert Wave Telecom",
"ti": "VP Customer Success",
"raw": {
"Company Name": "Desert Wave Telecom",
"Industry": "Telecommunications",
"# Employees": "1001-5000",
"Annual Revenue": "$410,000,000",
"Website": "https://desertwave.demo",
"City": "Dubai",
"State": "",
"Country": "United Arab Emirates",
},
},
{
"fn": "Chris",
"ln": "Martinez",
"email": "chris.martinez@cascade.demo",
"co": "Cascade Health Systems",
"ti": "Clinical Director",
"raw": {
"Company Name": "Cascade Health Systems",
"Industry": "Healthcare IT",
"# Employees": "501-1000",
"Annual Revenue": "$95,000,000",
"Website": "https://cascade-health.demo",
"City": "Denver",
"State": "CO",
"Country": "United States",
},
},
{
"fn": "Sofia",
"ln": "Andersson",
"email": "sofia.andersson@fjord.demo",
"co": "Fjord Maritime AS",
"ti": "Fleet Coordinator",
"raw": {
"Company Name": "Fjord Maritime AS",
"Industry": "Shipping & maritime",
"# Employees": "51-200",
"Annual Revenue": "$18,000,000",
"Website": "https://fjord-maritime.demo",
"City": "Bergen",
"State": "",
"Country": "Norway",
},
},
{
"fn": "Marcus",
"ln": "Webbe",
"email": "marcus.webbe@pixelloft.demo",
"co": "Pixelloft Creative Agency",
"ti": "Creative Director",
"raw": {
"Company Name": "Pixelloft Creative Agency",
"Industry": "Marketing & advertising",
"# Employees": "11-50",
"Annual Revenue": "$1,800,000",
"Website": "https://pixelloft.demo",
"City": "Melbourne",
"State": "VIC",
"Country": "Australia",
},
},
]
for i, s in enumerate(specs):
rd = dict(s["raw"])
rd["Title"] = s["ti"]
rd["Email"] = s["email"]
rd["First Name"] = s["fn"]
rd["Last Name"] = s["ln"]
row = Contact(
tenant_id=tenant_id,
file_id=DEMO_CONTACTS_FILE_ID,
row_index=i,
first_name=s["fn"],
last_name=s["ln"],
email=s["email"],
company=s["co"],
title=s["ti"],
source="demo",
raw_data=rd,
)
db.add(row)
db.commit()
return {
"ok": True,
"removed_previous_demo_rows": removed,
"inserted": len(specs),
}
@app.post("/api/contacts/bulk-convert-to-leads")
async def bulk_convert_contacts_to_leads(body: BulkContactIdsRequest, t: TenantContext = Depends(get_tenant_context)):
"""
For each contact: create a CrmLead (campaign «Contacts») copied from the contact.
Contacts are not removed; they remain until explicitly deleted. Fails per-row if the
contact has no email or a lead with the same email already exists.
"""
db = t.db
if not body.contact_ids:
raise HTTPException(status_code=400, detail="contact_ids required")
converted = 0
errors: List[dict] = []
for cid in body.contact_ids:
contact = (
db.query(Contact)
.filter(Contact.tenant_id == t.tenant_id, Contact.id == int(cid))
.first()
)
if not contact:
errors.append({"contact_id": cid, "error": "not found"})
continue
r = _convert_contact_to_lead_core(db, contact)
if not r["ok"]:
errors.append({"contact_id": cid, "error": r.get("error", "failed")})
else:
converted += 1
db.commit()
return {"converted": converted, "errors": errors}
@app.get("/api/company-names")
async def search_company_names(
q: str = Query("", description="Substring match on Contact.company"),
limit: int = Query(25, ge=1, le=100),
t: TenantContext = Depends(get_tenant_context),
):
"""Distinct company strings from contacts for deal/account linking."""
db = t.db
raw_q = _safe_str(q)
pattern = f"%{raw_q}%" if raw_q else "%"
rows = (
db.query(Contact.company)
.filter(
Contact.tenant_id == t.tenant_id,
Contact.company.isnot(None),
Contact.company != "",
Contact.company.ilike(pattern),
)
.distinct()
.order_by(Contact.company.asc())
.limit(limit * 4)
.all()
)
names: List[str] = []
seen = set()
for (co,) in rows:
s = _safe_str(co)
if not s:
continue
lk = s.lower()
if lk in seen:
continue
seen.add(lk)
names.append(s)
if len(names) >= limit:
break
return {"names": names}
@app.get("/api/contacts/{contact_id}")
async def get_contact(contact_id: int, t: TenantContext = Depends(get_tenant_context)):
db = t.db
contact = (
db.query(Contact)
.filter(Contact.tenant_id == t.tenant_id, Contact.id == contact_id)
.first()
)
if not contact:
raise HTTPException(status_code=404, detail="Contact not found")
return {
"id": contact.id,
"file_id": contact.file_id,
"row_index": contact.row_index,
"first_name": contact.first_name or _pick_from_raw(contact.raw_data, ["first name", "firstname", "first_name"]),
"last_name": contact.last_name or _pick_from_raw(contact.raw_data, ["last name", "lastname", "last_name"]),
"email": contact.email or _pick_from_raw(contact.raw_data, ["email", "work email", "email address", "secondary email", "tertiary email"]),
"company": contact.company or _pick_from_raw(contact.raw_data, ["company", "company name", "company name for emails", "organization name", "account name"]),
"title": contact.title or _pick_from_raw(contact.raw_data, ["title", "job title"]),
"source": contact.source or "apollo_csv",
"created_at": contact.created_at.isoformat() if contact.created_at else None,
"raw_data": contact.raw_data or {},
}
# Merged into Contact.raw_data["Company Name for Emails"], etc. (Apollo CSV keys)
COMPANY_DETAIL_PATCH_FIELDS = {
"company_name_for_emails": "Company Name for Emails",
"industry": "Industry",
"employees": "# Employees",
"annual_revenue": "Annual Revenue",
"last_raised_at": "Last Raised At",
"website": "Website",
"city": "City",
"state": "State",
"country_region": "Country",
}
def _contact_company_details_dict(c: Contact) -> dict:
rd = c.raw_data or {}
return {
"Company Name": _safe_str(c.company or rd.get("Company Name")),
"Company Name for Emails": _safe_str(rd.get("Company Name for Emails")),
"Industry": _safe_str(rd.get("Industry")),
"# Employees": _safe_str(rd.get("# Employees")),
"Annual Revenue": _safe_str(rd.get("Annual Revenue")),
"Last Raised At": _safe_str(rd.get("Last Raised At")),
"Website": _safe_str(rd.get("Website")),
"City": _safe_str(rd.get("City")),
"State": _safe_str(rd.get("State")),
"Country": _safe_str(rd.get("Country")),
}
def _merge_contact_raw_company_details(contact: Contact, patch: dict) -> None:
rd = dict(contact.raw_data or {})
for py_key, raw_key in COMPANY_DETAIL_PATCH_FIELDS.items():
if py_key in patch:
rd[raw_key] = _safe_str(patch[py_key])
contact.raw_data = rd
def _lead_company_details_dict(row: CrmLead) -> dict:
rw = row.raw_webhook if isinstance(row.raw_webhook, dict) else {}
cd = rw.get("company_details") if isinstance(rw.get("company_details"), dict) else {}
return {
"Company Name": _safe_str(row.company_name or cd.get("Company Name")),
"Company Name for Emails": _safe_str(cd.get("Company Name for Emails")),
"Industry": _safe_str(cd.get("Industry")),
"# Employees": _safe_str(cd.get("# Employees")),
"Annual Revenue": _safe_str(cd.get("Annual Revenue")),
"Last Raised At": _safe_str(cd.get("Last Raised At")),
"Website": _safe_str(cd.get("Website")),
"City": _safe_str(cd.get("City")),
"State": _safe_str(cd.get("State")),
"Country": _safe_str(cd.get("Country")),
}
def _deal_fallback_company_details(d: CrmDeal) -> dict:
return {
"Company Name": _safe_str(d.account_name),
"Company Name for Emails": "",
"Industry": "",
"# Employees": "",
"Annual Revenue": "",
"Last Raised At": "",
"Website": "",
"City": "",
"State": "",
"Country": _safe_str(d.country),
}
def _format_contact_display(contact: Contact) -> str:
fn = _safe_str(contact.first_name)
ln = _safe_str(contact.last_name)
person = " ".join(filter(None, [fn, ln])).strip()
return person or _safe_str(contact.email)
def _enrich_deal_response(db: Session, row: CrmDeal) -> dict:
out = _deal_to_dict(row)
if row.owner_user_id:
ou = db.query(User).filter(User.id == row.owner_user_id).first()
if ou:
out["owner_display_name"] = (ou.name or ou.email or "").strip() or None
if not (out.get("owner_initials") or "").strip():
out["owner_initials"] = _user_initials_from_user(ou)
else:
out["owner_display_name"] = None
out["linked_contact"] = None
if row.contact_id:
c = (
db.query(Contact)
.filter(
Contact.tenant_id == row.tenant_id,
Contact.id == row.contact_id,
)
.first()
)
if c:
out["company_details"] = _contact_company_details_dict(c)
out["company_details_contact_id"] = c.id
out["linked_contact"] = {
"id": c.id,
"first_name": c.first_name or "",
"last_name": c.last_name or "",
"email": c.email or "",
"title": c.title or "",
}
else:
out["company_details"] = _deal_fallback_company_details(row)
else:
out["company_details"] = _deal_fallback_company_details(row)
return out
def _sync_contact_raw_from_columns(contact: Contact) -> None:
rd = dict(contact.raw_data or {})
rd["First Name"] = contact.first_name or ""
rd["Last Name"] = contact.last_name or ""
rd["Email"] = (contact.email or "").strip()
rd["Company Name"] = contact.company or ""
rd["Title"] = contact.title or ""
contact.raw_data = rd
@app.patch("/api/contacts/{contact_id}")
async def patch_contact(contact_id: int, body: ContactPatchRequest, t: TenantContext = Depends(get_tenant_context)):
db = t.db
contact = (
db.query(Contact)
.filter(Contact.tenant_id == t.tenant_id, Contact.id == contact_id)
.first()
)
if not contact:
raise HTTPException(status_code=404, detail="Contact not found")
data = body.model_dump(exclude_unset=True)
if not data:
raise HTTPException(status_code=400, detail="No fields to update")
if "email" in data:
email = _safe_str(data["email"]).lower()
if not email:
raise HTTPException(status_code=400, detail="Email cannot be empty")
taken = (
db.query(Contact)
.filter(
Contact.tenant_id == t.tenant_id,
Contact.id != contact_id,
func.lower(Contact.email) == email,
)
.first()
)
if taken:
raise HTTPException(status_code=409, detail="Another contact already uses this email")
contact.email = email
if "first_name" in data:
contact.first_name = _safe_str(data["first_name"])
if "last_name" in data:
contact.last_name = _safe_str(data["last_name"])
if "company" in data:
contact.company = _safe_str(data["company"])
if "title" in data:
contact.title = _safe_str(data["title"])
_sync_contact_raw_from_columns(contact)
_merge_contact_raw_company_details(contact, data)
db.commit()
db.refresh(contact)
return {
"id": contact.id,
"file_id": contact.file_id,
"row_index": contact.row_index,
"first_name": contact.first_name or _pick_from_raw(contact.raw_data, ["first name", "firstname", "first_name"]),
"last_name": contact.last_name or _pick_from_raw(contact.raw_data, ["last name", "lastname", "last_name"]),
"email": contact.email or _pick_from_raw(contact.raw_data, ["email", "work email", "email address", "secondary email", "tertiary email"]),
"company": contact.company or _pick_from_raw(contact.raw_data, ["company", "company name", "company name for emails", "organization name", "account name"]),
"title": contact.title or _pick_from_raw(contact.raw_data, ["title", "job title"]),
"source": contact.source or "apollo_csv",
"created_at": contact.created_at.isoformat() if contact.created_at else None,
"raw_data": contact.raw_data or {},
}
@app.post("/api/contacts/{contact_id}/enrich")
async def enrich_contact(contact_id: int, t: TenantContext = Depends(get_tenant_context)):
"""
GPT-enrich company/contact fields for manually added contacts only.
Enrichment (gpt_service): multi-page crawl + schema.org JSON-LD on your domain;
Google Custom Search (GOOGLE_API_KEY + GOOGLE_CSE_ID); optional Gemini with
Google Search grounding (GEMINI_API_KEY); DuckDuckGo fallback;
ENRICHMENT_CONTACT_EMAIL helps Wikipedia.
"""
db = t.db
if not os.getenv("OPENAI_API_KEY"):
raise HTTPException(status_code=503, detail="OpenAI API key is not configured")
contact = (
db.query(Contact)
.filter(Contact.tenant_id == t.tenant_id, Contact.id == contact_id)
.first()
)
if not contact:
raise HTTPException(status_code=404, detail="Contact not found")
src = (contact.source or "").strip().lower()
if src != "manual" and contact.file_id != MANUAL_CONTACT_FILE_ID:
raise HTTPException(
status_code=400,
detail="Enrich is only available for manually added contacts",
)
seed = {
"first_name": contact.first_name
or _pick_from_raw(contact.raw_data, ["first name", "firstname", "first_name"]),
"last_name": contact.last_name
or _pick_from_raw(contact.raw_data, ["last name", "lastname", "last_name"]),
"email": contact.email
or _pick_from_raw(
contact.raw_data,
["email", "work email", "email address", "secondary email", "tertiary email"],
),
"company": contact.company
or _pick_from_raw(
contact.raw_data,
[
"company",
"company name",
"company name for emails",
"organization name",
"account name",
],
),
"title": contact.title
or _pick_from_raw(contact.raw_data, ["title", "job title"]),
"website_hint": _pick_from_raw(contact.raw_data, ["website", "Website"]),
}
try:
merged, enrichment_report = enrich_manual_contact_profile(seed)
except RuntimeError as e:
raise HTTPException(status_code=503, detail=str(e)) from e
except Exception as e:
raise HTTPException(status_code=502, detail=f"Enrichment failed: {e}") from e
contact.first_name = _safe_str(merged.get("first_name")) or contact.first_name
contact.last_name = _safe_str(merged.get("last_name")) or contact.last_name
contact.title = _safe_str(merged.get("title")) or contact.title
contact.company = _safe_str(merged.get("company_name")) or contact.company
rd = dict(contact.raw_data or {})
rd["First Name"] = contact.first_name
rd["Last Name"] = contact.last_name
rd["Title"] = contact.title
rd["Email"] = _safe_str(contact.email)
if contact.company:
rd["Company Name"] = contact.company
optional_raw = [
("company_name_for_emails", "Company Name for Emails"),
("industry", "Industry"),
("employees", "# Employees"),
("annual_revenue", "Annual Revenue"),
("last_raised_at", "Last Raised At"),
("website", "Website"),
("city", "City"),
("state", "State"),
("country", "Country"),
]
for gpt_key, raw_key in optional_raw:
v = _safe_str(merged.get(gpt_key))
if v:
rd[raw_key] = v
contact.raw_data = rd
db.commit()
db.refresh(contact)
return {
"id": contact.id,
"file_id": contact.file_id,
"row_index": contact.row_index,
"first_name": contact.first_name
or _pick_from_raw(contact.raw_data, ["first name", "firstname", "first_name"]),
"last_name": contact.last_name
or _pick_from_raw(contact.raw_data, ["last name", "lastname", "last_name"]),
"email": contact.email
or _pick_from_raw(
contact.raw_data,
["email", "work email", "email address", "secondary email", "tertiary email"],
),
"company": contact.company
or _pick_from_raw(
contact.raw_data,
["company", "company name", "company name for emails", "organization name", "account name"],
),
"title": contact.title or _pick_from_raw(contact.raw_data, ["title", "job title"]),
"source": contact.source or "manual",
"created_at": contact.created_at.isoformat() if contact.created_at else None,
"raw_data": contact.raw_data or {},
"enrichment_report": enrichment_report,
}
@app.get("/api/contacts/{contact_id}/sequences")
async def get_contact_sequences(contact_id: int, t: TenantContext = Depends(get_tenant_context)):
db = t.db
contact = (
db.query(Contact)
.filter(Contact.tenant_id == t.tenant_id, Contact.id == contact_id)
.first()
)
if not contact:
raise HTTPException(status_code=404, detail="Contact not found")
if not contact.email:
return {"contact_id": contact_id, "sequences": []}
sequences = (
db.query(GeneratedSequence)
.filter(
GeneratedSequence.tenant_id == t.tenant_id,
GeneratedSequence.file_id == contact.file_id,
GeneratedSequence.email == contact.email,
)
.order_by(GeneratedSequence.sequence_id, GeneratedSequence.email_number)
.all()
)
return {
"contact_id": contact_id,
"sequences": [
{
"id": s.id,
"file_id": s.file_id,
"sequence_id": s.sequence_id,
"email_number": s.email_number,
"subject": s.subject or "",
"email_content": s.email_content or "",
"product": s.product or "",
"created_at": s.created_at.isoformat() if s.created_at else None,
}
for s in sequences
],
}
@app.post("/api/save-prompts")
async def save_prompts(request: PromptSaveRequest, t: TenantContext = Depends(get_tenant_context)):
"""Save prompt templates for products"""
db = t.db
try:
# Delete existing prompts for this file
db.query(Prompt).filter(
Prompt.tenant_id == t.tenant_id,
Prompt.file_id == request.file_id,
).delete()
# Save email prompts
for product_name, prompt_template in request.prompts.items():
prompt = Prompt(
tenant_id=t.tenant_id,
file_id=request.file_id,
product_name=product_name,
prompt_template=prompt_template,
prompt_kind="email",
)
db.add(prompt)
# Optional LinkedIn prompts (same product names)
li = request.linkedin_prompts or {}
for product_name, prompt_template in li.items():
if not (prompt_template or "").strip():
continue
db.add(
Prompt(
tenant_id=t.tenant_id,
file_id=request.file_id,
product_name=product_name,
prompt_template=prompt_template,
prompt_kind="linkedin",
)
)
db.commit()
uf = (
db.query(UploadedFile)
.filter(UploadedFile.tenant_id == t.tenant_id, UploadedFile.file_id == request.file_id)
.first()
)
if uf is not None:
if request.sequence_plan is not None:
uf.sequence_plan_json = json.dumps(request.sequence_plan)
db.commit()
return {"message": "Prompts saved successfully"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error saving prompts: {str(e)}")
def _normalize_campaign_plan_steps(raw_data) -> Optional[List[Dict]]:
if raw_data is None:
return None
if isinstance(raw_data, dict) and isinstance(raw_data.get("steps"), list):
return raw_data["steps"]
if isinstance(raw_data, list):
return raw_data
return None
def _parse_campaign_plan_pack(db_file) -> Optional[Dict[str, Any]]:
"""Build GPT kwargs + per-row step_order alignment from wizard sequence JSON on UploadedFile."""
raw = getattr(db_file, "sequence_plan_json", None)
if not raw or not str(raw).strip():
return None
try:
parsed = json.loads(raw)
except Exception:
return None
steps = _normalize_campaign_plan_steps(parsed)
if not steps:
return None
gmail_specs: List[Dict[str, Any]] = []
li_wizard: List[Dict[str, Any]] = []
wait_lines: List[str] = []
ord_global = 0
for s in steps:
if not isinstance(s, dict):
continue
if s.get("type") == "wait":
try:
d = int(s.get("days") or 1)
except (TypeError, ValueError):
d = 1
wait_lines.append(f"Wait {d} day(s) between surrounding sequence touches.")
continue
if s.get("type") != "action":
continue
ord_global += 1
ch = s.get("channel")
act = s.get("action")
title = (s.get("title") or "").strip() or ("Email" if ch == "gmail" else "LinkedIn touch")
if ch == "gmail":
gmail_specs.append({"step_order": ord_global, "title": title})
elif ch == "linkedin" and act in ("linkedin_connect", "linkedin_dm"):
li_wizard.append({"step_order": ord_global, "action": act, "title": title})
li_connects = [x for x in li_wizard if x["action"] == "linkedin_connect"]
li_dms = [x for x in li_wizard if x["action"] == "linkedin_dm"]
li_other = [x for x in li_wizard if x["action"] not in ("linkedin_connect", "linkedin_dm")]
li_for_gpt = li_connects + li_dms + li_other
total_actions = len(gmail_specs) + len(li_wizard)
if total_actions == 0:
return None
email_campaign_kw = None
if gmail_specs:
email_campaign_kw = {
"email_count": len(gmail_specs),
"email_titles": [g["title"] for g in gmail_specs],
"wait_context": "\n".join(wait_lines),
}
linkedin_campaign_kw = None
if li_for_gpt:
linkedin_campaign_kw = {"linkedin_actions": [x["action"] for x in li_for_gpt]}
return {
"gmail_specs": gmail_specs,
"li_wizard": li_wizard,
"li_for_gpt": li_for_gpt,
"total_actions": total_actions,
"email_campaign_kw": email_campaign_kw,
"linkedin_campaign_kw": linkedin_campaign_kw,
}
@app.get("/api/generation-status")
async def generation_status(file_id: str = Query(...), t: TenantContext = Depends(get_tenant_context)):
"""Return progress for sequence generation (so frontend can resume after sleep/reconnect)."""
db = t.db
db_file = (
db.query(UploadedFile)
.filter(
UploadedFile.tenant_id == t.tenant_id,
UploadedFile.file_id == file_id,
)
.first()
)
if not db_file:
raise HTTPException(status_code=404, detail="File not found")
total_contacts = db_file.contact_count or 0
plan_actions = 0
if getattr(db_file, "sequence_plan_json", None):
try:
raw = json.loads(db_file.sequence_plan_json)
if isinstance(raw, list):
plan_actions = sum(1 for s in raw if isinstance(s, dict) and s.get("type") == "action")
elif isinstance(raw, dict) and isinstance(raw.get("steps"), list):
plan_actions = sum(1 for s in raw["steps"] if isinstance(s, dict) and s.get("type") == "action")
except Exception:
plan_actions = 0
has_linkedin = (
db.query(Prompt.id)
.filter(
Prompt.tenant_id == t.tenant_id,
Prompt.file_id == file_id,
Prompt.prompt_kind == "linkedin",
)
.first()
is not None
)
email_done = (
db.query(GeneratedSequence.sequence_id)
.filter(
GeneratedSequence.tenant_id == t.tenant_id,
GeneratedSequence.file_id == file_id,
GeneratedSequence.channel == "email",
)
.distinct()
.count()
)
li_done = (
db.query(GeneratedSequence.sequence_id)
.filter(
GeneratedSequence.tenant_id == t.tenant_id,
GeneratedSequence.file_id == file_id,
GeneratedSequence.channel == "linkedin",
)
.distinct()
.count()
)
if plan_actions > 0:
row_count = (
db.query(GeneratedSequence.id)
.filter(
GeneratedSequence.tenant_id == t.tenant_id,
GeneratedSequence.file_id == file_id,
)
.count()
)
completed_units = row_count
expected_units = total_contacts * plan_actions
# GPT may return fewer rows than plan_actions × contacts; allow up to one row per contact shortfall
# so progress can reach 100% / is_complete when all contacts are effectively done.
complete_threshold = max(1, expected_units - max(0, total_contacts))
else:
completed_units = email_done + (li_done if has_linkedin else 0)
expected_units = total_contacts * (2 if has_linkedin else 1)
complete_threshold = expected_units
return {
"file_id": file_id,
"total_contacts": total_contacts,
"completed_count": completed_units,
"is_complete": total_contacts > 0 and completed_units >= complete_threshold,
"has_linkedin_prompts": has_linkedin,
"campaign_sequence_actions": plan_actions or None,
}
@app.get("/api/sequences")
async def get_sequences(file_id: str = Query(...), t: TenantContext = Depends(get_tenant_context)):
"""Return all generated sequences for a file (for catch-up after reconnect)."""
db = t.db
sequences = (
db.query(GeneratedSequence)
.filter(
GeneratedSequence.tenant_id == t.tenant_id,
GeneratedSequence.file_id == file_id,
)
.order_by(
GeneratedSequence.sequence_id,
func.coalesce(GeneratedSequence.step_order, 999999),
GeneratedSequence.email_number,
)
.all()
)
out = []
for seq in sequences:
out.append({
"id": seq.sequence_id,
"emailNumber": seq.email_number,
"firstName": seq.first_name,
"lastName": seq.last_name,
"email": seq.email,
"company": seq.company,
"title": seq.title or "",
"product": seq.product,
"subject": seq.subject,
"emailContent": seq.email_content,
"channel": getattr(seq, "channel", None) or "email",
"stepOrder": getattr(seq, "step_order", None),
})
return {"sequences": out}
@app.get("/api/generate-sequences")
async def generate_sequences(
file_id: str = Query(...),
reset: bool = Query(True),
t: TenantContext = Depends(get_tenant_context),
):
"""Generate email sequences using GPT with Server-Sent Events streaming.
Use reset=1 for a fresh run (clears existing). Use reset=0 to resume after disconnect/sleep."""
db = t.db
tenant_id = t.tenant_id
async def event_generator():
try:
db_file = (
db.query(UploadedFile)
.filter(
UploadedFile.tenant_id == tenant_id,
UploadedFile.file_id == file_id,
)
.first()
)
if not db_file:
yield f"data: {json.dumps({'type': 'error', 'error': 'File not found'})}\n\n"
return
df = pd.read_csv(db_file.file_path)
prompts = (
db.query(Prompt)
.filter(Prompt.tenant_id == tenant_id, Prompt.file_id == file_id)
.all()
)
email_prompt_dict = {
p.product_name: p.prompt_template
for p in prompts
if (getattr(p, "prompt_kind", None) or "email") == "email"
}
linkedin_prompt_dict = {
p.product_name: p.prompt_template
for p in prompts
if getattr(p, "prompt_kind", None) == "linkedin"
}
plan_pack = _parse_campaign_plan_pack(db_file)
run_email = True
run_linkedin = len(linkedin_prompt_dict) > 0
email_kw: Optional[Dict[str, Any]] = None
li_kw: Optional[Dict[str, Any]] = None
gmail_specs: List[Dict[str, Any]] = []
li_meta_for_rows: List[Dict[str, Any]] = []
if plan_pack:
run_email = bool(plan_pack.get("email_campaign_kw"))
run_linkedin = bool(plan_pack.get("linkedin_campaign_kw"))
email_kw = plan_pack.get("email_campaign_kw")
li_kw = plan_pack.get("linkedin_campaign_kw")
gmail_specs = plan_pack.get("gmail_specs") or []
li_meta_for_rows = plan_pack.get("li_for_gpt") or []
if run_email and not email_prompt_dict:
yield f"data: {json.dumps({'type': 'error', 'error': 'No prompts found'})}\n\n"
return
if run_linkedin and not linkedin_prompt_dict:
yield f"data: {json.dumps({'type': 'error', 'error': 'No LinkedIn prompts found'})}\n\n"
return
products = list(email_prompt_dict.keys()) if email_prompt_dict else []
li_products = list(linkedin_prompt_dict.keys()) if linkedin_prompt_dict else []
if reset:
db.query(GeneratedSequence).filter(
GeneratedSequence.tenant_id == tenant_id,
GeneratedSequence.file_id == file_id,
).delete()
db.commit()
user_row = db.query(User).filter(User.id == t.user_id).first()
mailbox_sig = ""
linkedin_sig = ""
if user_row:
mailbox_sig = (
(getattr(user_row, "mailbox_profile_display_name", None) or "").strip()
or (user_row.name or "").strip()
)
linkedin_sig = (
(getattr(user_row, "linkedin_profile_display_name", None) or "").strip()
or (user_row.name or "").strip()
)
total_contacts = len(df)
def progress_pct(seq_id: int, *, linkedin_phase: bool) -> float:
if total_contacts <= 0:
return 0.0
if run_linkedin and not linkedin_phase:
return min(100.0, max(0.0, (seq_id / total_contacts) * 50.0))
if run_linkedin and linkedin_phase:
return min(100.0, max(0.0, 50.0 + (seq_id / total_contacts) * 50.0))
return min(100.0, max(0.0, (seq_id / total_contacts) * 100.0))
def plan_progress_pct() -> float:
if not plan_pack or total_contacts <= 0:
return 0.0
cnt = (
db.query(func.count(GeneratedSequence.id))
.filter(
GeneratedSequence.tenant_id == tenant_id,
GeneratedSequence.file_id == file_id,
)
.scalar()
or 0
)
exp = total_contacts * plan_pack["total_actions"]
return min(100.0, max(0.0, (cnt / max(1, exp)) * 100.0))
def orm_to_sequence_response(seq: GeneratedSequence) -> Dict[str, Any]:
ch = getattr(seq, "channel", None) or "email"
return {
"id": seq.sequence_id,
"emailNumber": seq.email_number,
"firstName": seq.first_name,
"lastName": seq.last_name,
"email": seq.email,
"company": seq.company,
"title": seq.title or "",
"product": seq.product,
"subject": (seq.subject or "") if ch == "linkedin" else seq.subject,
"emailContent": seq.email_content,
"channel": ch,
"stepOrder": getattr(seq, "step_order", None),
}
sequence_id = 1
if run_email:
for idx, row in df.iterrows():
if plan_pack and gmail_specs:
email_done_n = (
db.query(func.count(GeneratedSequence.id))
.filter(
GeneratedSequence.tenant_id == tenant_id,
GeneratedSequence.file_id == file_id,
GeneratedSequence.sequence_id == sequence_id,
GeneratedSequence.channel == "email",
)
.scalar()
or 0
)
if email_done_n >= len(gmail_specs):
existing_em = (
db.query(GeneratedSequence)
.filter(
GeneratedSequence.tenant_id == tenant_id,
GeneratedSequence.file_id == file_id,
GeneratedSequence.sequence_id == sequence_id,
GeneratedSequence.channel == "email",
)
.order_by(
func.coalesce(GeneratedSequence.step_order, 999999),
GeneratedSequence.email_number,
)
.all()
)
for seq in existing_em:
yield f"data: {json.dumps({'type': 'sequence', 'sequence': orm_to_sequence_response(seq)})}\n\n"
prog = plan_progress_pct() if plan_pack else progress_pct(sequence_id, linkedin_phase=False)
yield f"data: {json.dumps({'type': 'progress', 'progress': prog})}\n\n"
sequence_id += 1
await asyncio.sleep(0.05)
continue
else:
existing = (
db.query(GeneratedSequence)
.filter(
GeneratedSequence.tenant_id == tenant_id,
GeneratedSequence.file_id == file_id,
GeneratedSequence.sequence_id == sequence_id,
GeneratedSequence.channel == "email",
)
.order_by(GeneratedSequence.email_number)
.all()
)
if existing:
for seq in existing:
yield f"data: {json.dumps({'type': 'sequence', 'sequence': orm_to_sequence_response(seq)})}\n\n"
prog = plan_progress_pct() if plan_pack else progress_pct(sequence_id, linkedin_phase=False)
yield f"data: {json.dumps({'type': 'progress', 'progress': prog})}\n\n"
sequence_id += 1
await asyncio.sleep(0.05)
continue
contact = row.to_dict()
product_name = products[(sequence_id - 1) % len(products)]
prompt_template = email_prompt_dict[product_name]
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
if plan_pack and email_kw:
gen_fn = partial(
generate_email_sequence,
campaign_sequence=email_kw,
sender_mailbox_name=mailbox_sig or None,
)
sequence_data_list = await loop.run_in_executor(
executor,
gen_fn,
contact,
prompt_template,
product_name,
)
else:
sequence_data_list = await loop.run_in_executor(
executor,
partial(generate_email_sequence, sender_mailbox_name=mailbox_sig or None),
contact,
prompt_template,
product_name,
)
for i, seq_data in enumerate(sequence_data_list):
step_order_v = None
if plan_pack and gmail_specs and i < len(gmail_specs):
step_order_v = gmail_specs[i]["step_order"]
db_sequence = GeneratedSequence(
tenant_id=tenant_id,
file_id=file_id,
sequence_id=sequence_id,
email_number=seq_data["email_number"],
channel="email",
first_name=seq_data["first_name"],
last_name=seq_data["last_name"],
email=seq_data["email"],
company=seq_data["company"],
title=seq_data.get("title", ""),
product=seq_data["product"],
subject=seq_data["subject"],
email_content=seq_data["email_content"],
step_order=step_order_v,
)
db.add(db_sequence)
db.commit()
for i, seq_data in enumerate(sequence_data_list):
step_order_v = None
if plan_pack and gmail_specs and i < len(gmail_specs):
step_order_v = gmail_specs[i]["step_order"]
sequence_response = {
"id": sequence_id,
"emailNumber": seq_data["email_number"],
"firstName": seq_data["first_name"],
"lastName": seq_data["last_name"],
"email": seq_data["email"],
"company": seq_data["company"],
"title": seq_data.get("title", ""),
"product": seq_data["product"],
"subject": seq_data["subject"],
"emailContent": seq_data["email_content"],
"channel": "email",
"stepOrder": step_order_v,
}
yield f"data: {json.dumps({'type': 'sequence', 'sequence': sequence_response})}\n\n"
prog = plan_progress_pct() if plan_pack else progress_pct(sequence_id, linkedin_phase=False)
yield f"data: {json.dumps({'type': 'progress', 'progress': prog})}\n\n"
sequence_id += 1
await asyncio.sleep(0.1)
if run_linkedin:
yield f"data: {json.dumps({'type': 'phase', 'phase': 'linkedin', 'message': 'Generating LinkedIn sequences'})}\n\n"
sequence_id = 1
for idx, row in df.iterrows():
if plan_pack and li_meta_for_rows:
li_done_n = (
db.query(func.count(GeneratedSequence.id))
.filter(
GeneratedSequence.tenant_id == tenant_id,
GeneratedSequence.file_id == file_id,
GeneratedSequence.sequence_id == sequence_id,
GeneratedSequence.channel == "linkedin",
)
.scalar()
or 0
)
if li_done_n >= len(li_meta_for_rows):
existing_li = (
db.query(GeneratedSequence)
.filter(
GeneratedSequence.tenant_id == tenant_id,
GeneratedSequence.file_id == file_id,
GeneratedSequence.sequence_id == sequence_id,
GeneratedSequence.channel == "linkedin",
)
.order_by(
func.coalesce(GeneratedSequence.step_order, 999999),
GeneratedSequence.email_number,
)
.all()
)
for seq in existing_li:
yield f"data: {json.dumps({'type': 'sequence', 'sequence': orm_to_sequence_response(seq)})}\n\n"
prog = plan_progress_pct() if plan_pack else progress_pct(sequence_id, linkedin_phase=True)
yield f"data: {json.dumps({'type': 'progress', 'progress': prog})}\n\n"
sequence_id += 1
await asyncio.sleep(0.05)
continue
else:
existing_li = (
db.query(GeneratedSequence)
.filter(
GeneratedSequence.tenant_id == tenant_id,
GeneratedSequence.file_id == file_id,
GeneratedSequence.sequence_id == sequence_id,
GeneratedSequence.channel == "linkedin",
)
.order_by(GeneratedSequence.email_number)
.all()
)
if existing_li:
for seq in existing_li:
yield f"data: {json.dumps({'type': 'sequence', 'sequence': orm_to_sequence_response(seq)})}\n\n"
prog = plan_progress_pct() if plan_pack else progress_pct(sequence_id, linkedin_phase=True)
yield f"data: {json.dumps({'type': 'progress', 'progress': prog})}\n\n"
sequence_id += 1
await asyncio.sleep(0.05)
continue
contact = row.to_dict()
li_product = li_products[(sequence_id - 1) % len(li_products)]
li_template = linkedin_prompt_dict[li_product]
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as executor:
if plan_pack and li_kw:
gen_li = partial(
generate_linkedin_sequence,
campaign_sequence=li_kw,
sender_linkedin_name=linkedin_sig or None,
)
li_list = await loop.run_in_executor(
executor,
gen_li,
contact,
li_template,
li_product,
)
else:
li_list = await loop.run_in_executor(
executor,
partial(generate_linkedin_sequence, sender_linkedin_name=linkedin_sig or None),
contact,
li_template,
li_product,
)
for i, seq_data in enumerate(li_list):
step_order_v = None
if plan_pack and li_meta_for_rows and i < len(li_meta_for_rows):
step_order_v = li_meta_for_rows[i]["step_order"]
db_sequence = GeneratedSequence(
tenant_id=tenant_id,
file_id=file_id,
sequence_id=sequence_id,
email_number=seq_data["email_number"],
channel="linkedin",
first_name=seq_data["first_name"],
last_name=seq_data["last_name"],
email=seq_data["email"],
company=seq_data["company"],
title=seq_data.get("title", ""),
product=seq_data["product"],
subject=seq_data.get("subject") or "",
email_content=seq_data["email_content"],
step_order=step_order_v,
)
db.add(db_sequence)
db.commit()
for i, seq_data in enumerate(li_list):
step_order_v = None
if plan_pack and li_meta_for_rows and i < len(li_meta_for_rows):
step_order_v = li_meta_for_rows[i]["step_order"]
sequence_response = {
"id": sequence_id,
"emailNumber": seq_data["email_number"],
"firstName": seq_data["first_name"],
"lastName": seq_data["last_name"],
"email": seq_data["email"],
"company": seq_data["company"],
"title": seq_data.get("title", ""),
"product": seq_data["product"],
"subject": seq_data.get("subject") or "",
"emailContent": seq_data["email_content"],
"channel": "linkedin",
"stepOrder": step_order_v,
}
yield f"data: {json.dumps({'type': 'sequence', 'sequence': sequence_response})}\n\n"
prog = plan_progress_pct() if plan_pack else progress_pct(sequence_id, linkedin_phase=True)
yield f"data: {json.dumps({'type': 'progress', 'progress': prog})}\n\n"
sequence_id += 1
await asyncio.sleep(0.1)
yield f"data: {json.dumps({'type': 'complete'})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'error': str(e)})}\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
@app.get("/api/download-sequences")
async def download_sequences(file_id: str = Query(...), t: TenantContext = Depends(get_tenant_context)):
"""Download generated sequences as CSV with all subject/body fields"""
db = t.db
try:
# Get all sequences for this file, grouped by contact
sequences = (
db.query(GeneratedSequence)
.filter(
GeneratedSequence.tenant_id == t.tenant_id,
GeneratedSequence.file_id == file_id,
GeneratedSequence.channel == "email",
)
.order_by(GeneratedSequence.sequence_id, GeneratedSequence.email_number)
.all()
)
if not sequences:
raise HTTPException(
status_code=404,
detail="No email sequences found (CSV export is email-only; view LinkedIn in the app).",
)
# Group sequences by contact
contacts = {}
max_email_number = 0 # Track the maximum email number across all contacts
for seq in sequences:
contact_key = f"{seq.sequence_id}"
if contact_key not in contacts:
contacts[contact_key] = {
'first_name': seq.first_name,
'last_name': seq.last_name,
'email': seq.email,
'company': seq.company,
'title': seq.title or '',
'product': seq.product,
'subjects': {},
'bodies': {}
}
contacts[contact_key]['subjects'][seq.email_number] = seq.subject
contacts[contact_key]['bodies'][seq.email_number] = seq.email_content
# Track the maximum email number
if seq.email_number > max_email_number:
max_email_number = seq.email_number
# Create CSV in memory
# Use QUOTE_MINIMAL to properly quote fields with newlines for Smartlead
output = io.StringIO()
writer = csv.writer(output, quoting=csv.QUOTE_MINIMAL)
# Write header – fixed format: subject1, body1, subject2, body2, ... (separate columns, not merged)
header = ['First Name', 'Last Name', 'Email', 'Company', 'Title', 'Product']
for i in range(1, max_email_number + 1):
header.extend([f'subject{i}', f'body{i}'])
writer.writerow(header)
# Write rows (one row per contact with subjects/bodies up to max_email_number)
# Preserve newlines in email bodies for proper formatting in Smartlead
for contact in contacts.values():
row = [
contact['first_name'],
contact['last_name'],
contact['email'],
contact['company'],
contact['title'],
contact['product']
]
for i in range(1, max_email_number + 1):
subject = contact['subjects'].get(i, '') or ''
body = contact['bodies'].get(i, '') or ''
# Convert newlines to HTML <br> tags for Smartlead to properly render line breaks
# Smartlead's rich text editor expects HTML formatting
if body:
# Replace newlines with <br> tags for HTML rendering in Smartlead
body_html = body.replace('\n', '<br>').replace('\r', '')
else:
body_html = ''
row.append(subject)
row.append(body_html)
writer.writerow(row)
output.seek(0)
# Return as downloadable file (fixed format: one row per contact, separate subject/body columns)
return StreamingResponse(
iter([output.getvalue()]),
media_type="text/csv",
headers={
"Content-Disposition": "attachment; filename=email_sequences_fixed.csv"
}
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error downloading sequences: {str(e)}")
@app.get("/api/smartlead-campaigns")
async def get_smartlead_campaigns():
"""Get list of campaigns from Smartlead"""
try:
client = SmartleadClient()
campaigns = client.get_campaigns()
# Format campaigns for frontend
formatted_campaigns = []
for campaign in campaigns:
campaign_id = campaign.get('campaign_id') or campaign.get('id') or campaign.get('campaignId')
campaign_name = campaign.get('name') or campaign.get('campaign_name') or campaign.get('title') or 'Unnamed Campaign'
if campaign_id:
formatted_campaigns.append({
"id": str(campaign_id),
"name": campaign_name
})
return {"campaigns": formatted_campaigns}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to fetch campaigns: {str(e)}")
@app.post("/api/push-to-smartlead")
async def push_to_smartlead(request: SmartleadPushRequest, t: TenantContext = Depends(get_tenant_context)):
"""Push generated sequences to Smartlead campaign (add leads to existing campaign)"""
import uuid
db = t.db
try:
# Get all sequences for this file
sequences = (
db.query(GeneratedSequence)
.filter(
GeneratedSequence.tenant_id == t.tenant_id,
GeneratedSequence.file_id == request.file_id,
)
.order_by(GeneratedSequence.sequence_id, GeneratedSequence.email_number)
.all()
)
if not sequences:
raise HTTPException(status_code=404, detail="No sequences found")
# Group sequences by contact
contacts = {}
for seq in sequences:
contact_key = f"{seq.sequence_id}"
if contact_key not in contacts:
contacts[contact_key] = {
'first_name': seq.first_name,
'last_name': seq.last_name,
'email': seq.email,
'company': seq.company,
'title': seq.title or '',
'subjects': {},
'bodies': {}
}
contacts[contact_key]['subjects'][seq.email_number] = seq.subject
contacts[contact_key]['bodies'][seq.email_number] = seq.email_content
# Initialize Smartlead client
try:
client = SmartleadClient()
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
# Get campaign name for logging
campaigns = client.get_campaigns()
campaign_name = 'Unknown Campaign'
for campaign in campaigns:
campaign_id = campaign.get('campaign_id') or campaign.get('id') or campaign.get('campaignId')
if str(campaign_id) == str(request.campaign_id):
campaign_name = campaign.get('name') or campaign.get('campaign_name') or campaign.get('title') or 'Unknown Campaign'
break
# Create run record
run_id = str(uuid.uuid4())
run = SmartleadRun(
tenant_id=t.tenant_id,
run_id=run_id,
file_id=request.file_id,
mode='existing', # Always 'existing' now
campaign_id=request.campaign_id,
campaign_name=campaign_name,
steps_count=0, # Not needed for existing campaigns
dry_run=1 if request.dry_run else 0,
total_leads=len(contacts),
status='pending'
)
db.add(run)
db.commit()
if request.dry_run:
# Dry run - just return what would be sent
return {
"run_id": run_id,
"campaign_id": request.campaign_id,
"campaign_name": campaign_name,
"total": len(contacts),
"added": 0,
"skipped": 0,
"failed": 0,
"errors": [],
"status": "dry_run_completed",
"message": "Dry run completed. No leads were sent to Smartlead."
}
campaign_id = request.campaign_id
# Prepare leads for Smartlead
leads = []
errors = []
added_count = 0
skipped_count = 0
failed_count = 0
for contact in contacts.values():
try:
# Validate required fields
if not contact.get('email'):
errors.append({
"email": contact.get('email', 'unknown'),
"error": "Missing email address"
})
failed_count += 1
continue
# Helper function to safely convert to string and strip
def safe_str(value):
if value is None:
return ""
if isinstance(value, float):
# Handle NaN and other float values
if math.isnan(value):
return ""
return str(value).strip()
return str(value).strip() if value else ""
# Ensure first_name and last_name are not None/empty/float
first_name = safe_str(contact.get('first_name', '')) or 'Contact'
last_name = safe_str(contact.get('last_name', ''))
company = safe_str(contact.get('company', ''))
title = safe_str(contact.get('title', ''))
email = safe_str(contact.get('email', ''))
if not email:
errors.append({
"email": "unknown",
"error": "Missing email address"
})
failed_count += 1
continue
# Build lead object - Smartlead API doesn't allow "company", "title", or custom variables when adding leads
# Custom variables should be set manually in Smartlead via CSV import
lead = {
"email": email,
"first_name": first_name,
"last_name": last_name
}
leads.append(lead)
except Exception as e:
errors.append({
"email": contact.get('email', 'unknown'),
"error": str(e)
})
failed_count += 1
# Add leads to campaign (chunk in batches of 50)
batch_size = 50
for i in range(0, len(leads), batch_size):
batch = leads[i:i + batch_size]
try:
response = client.add_leads_to_campaign(campaign_id, batch)
added_count += len(batch)
except Exception as e:
# Mark batch as failed
for lead in batch:
errors.append({
"email": lead.get('email', 'unknown'),
"error": f"Failed to add lead: {str(e)}"
})
failed_count += 1
# Update run record
run.status = 'completed'
run.added_leads = added_count
run.skipped_leads = skipped_count
run.failed_leads = failed_count
run.error_details = json.dumps(errors) if errors else None
run.completed_at = datetime.utcnow()
db.commit()
# No warning needed - custom variables are now passed as direct fields
return {
"run_id": run_id,
"campaign_id": campaign_id,
"campaign_name": campaign_name,
"total": len(contacts),
"added": added_count,
"skipped": skipped_count,
"failed": failed_count,
"errors": errors[:10], # Return first 10 errors
"status": "completed"
}
except HTTPException:
raise
except Exception as e:
if 'run' in locals():
run.status = 'failed'
run.error_details = str(e)
db.commit()
raise HTTPException(status_code=500, detail=f"Error pushing to Smartlead: {str(e)}")
@app.get("/api/smartlead-runs")
async def get_smartlead_runs(file_id: str = Query(None), t: TenantContext = Depends(get_tenant_context)):
"""Get Smartlead run history"""
db = t.db
try:
query = db.query(SmartleadRun).filter(SmartleadRun.tenant_id == t.tenant_id)
if file_id:
query = query.filter(SmartleadRun.file_id == file_id)
runs = query.order_by(SmartleadRun.created_at.desc()).limit(50).all()
return [
{
"run_id": run.run_id,
"file_id": run.file_id,
"campaign_id": run.campaign_id,
"campaign_name": run.campaign_name,
"mode": run.mode,
"steps_count": run.steps_count,
"dry_run": bool(run.dry_run),
"total_leads": run.total_leads,
"added_leads": run.added_leads,
"skipped_leads": run.skipped_leads,
"failed_leads": run.failed_leads,
"status": run.status,
"created_at": run.created_at.isoformat() if run.created_at else None,
"completed_at": run.completed_at.isoformat() if run.completed_at else None
}
for run in runs
]
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching runs: {str(e)}")
@app.post("/api/webhooks/smartlead")
async def smartlead_webhook(
request: Request,
tenant_id: int = Query(..., description="Workspace ID — add ?tenant_id=<id> to the webhook URL in Smartlead"),
db: Session = Depends(get_db),
):
"""
Smartlead webhook — configure in Smartlead to POST reply events to this URL when a lead replies.
Append **?tenant_id=&lt;your workspace id&gt;** to the webhook URL (same id as in the app URL bar after signing in).
Optional: set SMARTLEAD_WEBHOOK_SECRET and send the same value in header X-Webhook-Token (or Bearer).
"""
if not db.query(Tenant).filter(Tenant.id == tenant_id).first():
raise HTTPException(status_code=404, detail="Unknown workspace")
secret = os.getenv("SMARTLEAD_WEBHOOK_SECRET")
if secret:
token = request.headers.get("X-Webhook-Token") or ""
if not token:
auth = request.headers.get("Authorization") or ""
if auth.lower().startswith("bearer "):
token = auth[7:].strip()
if token != secret:
raise HTTPException(status_code=401, detail="Invalid webhook token")
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Expected JSON body")
parsed = _parse_smartlead_reply_payload(body)
if not parsed:
return {"ok": True, "ignored": True, "reason": "not_a_reply_or_missing_fields"}
q = db.query(CrmLead).filter(CrmLead.tenant_id == tenant_id)
row = None
if parsed["smartlead_lead_id"] and parsed["campaign_id"]:
row = q.filter(
CrmLead.smartlead_lead_id == parsed["smartlead_lead_id"],
CrmLead.campaign_id == parsed["campaign_id"],
).first()
if row is None and parsed["email"] and parsed["campaign_id"]:
row = q.filter(
CrmLead.email == parsed["email"],
CrmLead.campaign_id == parsed["campaign_id"],
).first()
if parsed["email"]:
apollo = (
db.query(Contact)
.filter(
Contact.tenant_id == tenant_id,
func.lower(Contact.email) == parsed["email"].lower(),
)
.first()
)
if apollo:
if not parsed["company_name"] and apollo.company:
parsed["company_name"] = apollo.company
if not parsed["title"] and apollo.title:
parsed["title"] = apollo.title
if not parsed["first_name"] and apollo.first_name:
parsed["first_name"] = apollo.first_name
if not parsed["last_name"] and apollo.last_name:
parsed["last_name"] = apollo.last_name
new_ts = parsed["last_reply_at"]
if row:
old_ts = row.last_reply_at
if not old_ts or (new_ts and new_ts >= old_ts):
row.last_reply_subject = parsed["last_reply_subject"]
row.last_reply_body = parsed["last_reply_body"]
row.last_reply_at = new_ts
row.raw_webhook = parsed["raw_webhook"]
row.email = parsed["email"] or row.email
row.first_name = parsed["first_name"] or row.first_name
row.last_name = parsed["last_name"] or row.last_name
row.company_name = parsed["company_name"] or row.company_name
row.title = parsed["title"] or row.title
row.campaign_name = parsed["campaign_name"] or row.campaign_name
if parsed["smartlead_lead_id"]:
row.smartlead_lead_id = parsed["smartlead_lead_id"]
else:
row = CrmLead(
tenant_id=tenant_id,
smartlead_lead_id=parsed["smartlead_lead_id"] or "",
campaign_id=parsed["campaign_id"] or "",
campaign_name=parsed["campaign_name"] or "",
email=parsed["email"] or "",
first_name=parsed["first_name"] or "",
last_name=parsed["last_name"] or "",
company_name=parsed["company_name"] or "",
title=parsed["title"] or "",
last_reply_subject=parsed["last_reply_subject"] or "",
last_reply_body=parsed["last_reply_body"] or "",
last_reply_at=new_ts,
crm_status="new_lead",
raw_webhook=parsed["raw_webhook"],
)
db.add(row)
db.commit()
db.refresh(row)
return {"ok": True, "lead_id": row.id}
@app.post("/api/leads/seed-demo")
async def seed_demo_leads(t: TenantContext = Depends(get_tenant_context)):
"""
Insert sample leads so the Leads UI can be previewed without a real Smartlead webhook.
Deletes any previous demo rows (emails like demo.lead.*@emailout.local) then inserts fresh ones.
"""
db = t.db
tid = t.tenant_id
demo_email_filter = CrmLead.email.like("demo.lead.%@emailout.local")
removed = (
db.query(CrmLead)
.filter(CrmLead.tenant_id == tid, demo_email_filter)
.delete(synchronize_session=False)
)
campaign_id = "88001"
campaign_name = "Logistics & Supply Chain Outreach"
now = datetime.utcnow()
specs = [
{
"sl_id": "91001",
"key": "jane",
"first_name": "Jane",
"last_name": "Morgan",
"company_name": "ACME Logistics",
"title": "VP of Operations",
"crm_status": "new_lead",
"subject": "Re: Quick question on your freight volumes",
"body": (
"Hi Sarah,\n\nThanks for reaching out. We're evaluating new 3PL partners in Q2. "
"Could you send a one-pager on pricing for mid-market shippers?\n\nBest,\nJane"
),
"days_ago": 0,
},
{
"sl_id": "91002",
"key": "marcus",
"first_name": "Marcus",
"last_name": "Chen",
"company_name": "Pacific Rail Partners",
"title": "Director of Procurement",
"crm_status": "contacted",
"subject": "Re: Partnership",
"body": (
"Not interested right now — we renewed with our incumbent through 2026. "
"Feel free to circle back next year."
),
"days_ago": 1,
},
{
"sl_id": "91003",
"key": "elena",
"first_name": "Elena",
"last_name": "Vasquez",
"company_name": "Harbor Freight Collective",
"title": "Head of Supply Chain",
"crm_status": "contacted",
"subject": "Re: 15 min chat?",
"body": (
"Tuesday 3pm PT works for me. Here's my calendar link: https://example.com/cal/elena — "
"please include an agenda."
),
"days_ago": 2,
},
{
"sl_id": "91004",
"key": "david",
"first_name": "David",
"last_name": "Okonkwo",
"company_name": "EZOFIS",
"title": "CEO",
"crm_status": "qualified",
"subject": "Re: outbound sequences",
"body": (
"This looks promising. Loop in our COO (coo@ezofis.com) and let's do a 30-min scoping call this week."
),
"days_ago": 3,
},
{
"sl_id": "91005",
"key": "priya",
"first_name": "Priya",
"last_name": "Nair",
"company_name": "Summit Cold Storage",
"title": "Operations Manager",
"crm_status": "unqualified",
"subject": "Re: cold storage",
"body": (
"We only work with vendors on our approved vendor list. Please don't follow up on this thread."
),
"days_ago": 4,
},
{
"sl_id": "91006",
"key": "sam",
"first_name": "Sam",
"last_name": "Rivera",
"company_name": "Inland Distribution Co.",
"title": "Logistics Coordinator",
"crm_status": "none",
"subject": "Re: intro",
"body": "Got it — thanks.",
"days_ago": 5,
},
]
for s in specs:
email = f"demo.lead.{s['key']}@emailout.local"
ts = now - timedelta(days=s["days_ago"])
raw = {
"event": "EMAIL_REPLIED",
"demo_seed": True,
"campaign_id": int(campaign_id),
"campaign_name": campaign_name,
"lead_id": int(s["sl_id"]),
"lead": {
"email": email,
"first_name": s["first_name"],
"last_name": s["last_name"],
"company_name": s["company_name"],
},
"reply": {
"subject": s["subject"],
"body": s["body"],
"received_at": ts.isoformat() + "Z",
},
}
db.add(
CrmLead(
tenant_id=tid,
smartlead_lead_id=s["sl_id"],
campaign_id=campaign_id,
campaign_name=campaign_name,
email=email,
first_name=s["first_name"],
last_name=s["last_name"],
company_name=s["company_name"],
title=s["title"],
last_reply_subject=s["subject"],
last_reply_body=s["body"],
last_reply_at=ts,
crm_status=s["crm_status"],
contact_id=None,
raw_webhook=raw,
)
)
db.commit()
return {
"ok": True,
"removed_previous_demo_rows": removed,
"inserted": len(specs),
}
@app.get("/api/leads")
async def list_leads(
search: str = Query("", description="Search email, name, company"),
status: str = Query("", description="crm_status filter"),
sort_by: str = Query("last_reply_at"),
sort_dir: str = Query("desc"),
limit: int = Query(50, ge=1, le=200),
offset: int = Query(0, ge=0),
t: TenantContext = Depends(get_tenant_context),
):
db = t.db
q = db.query(CrmLead).filter(CrmLead.tenant_id == t.tenant_id)
if search.strip():
term = f"%{search.strip().lower()}%"
q = q.filter(
or_(
func.lower(CrmLead.email).like(term),
func.lower(func.coalesce(CrmLead.company_name, "")).like(term),
func.lower(func.coalesce(CrmLead.first_name, "")).like(term),
func.lower(func.coalesce(CrmLead.last_name, "")).like(term),
)
)
if status.strip() and status in CRM_STATUS_ALLOWED:
q = q.filter(CrmLead.crm_status == status)
total = q.count()
col_map = {
"last_reply_at": CrmLead.last_reply_at,
"created_at": CrmLead.created_at,
"email": CrmLead.email,
"company_name": CrmLead.company_name,
}
col = col_map.get(sort_by, CrmLead.last_reply_at)
if sort_dir == "asc":
q = q.order_by(col.asc())
else:
q = q.order_by(col.desc())
rows = q.offset(offset).limit(limit).all()
return {
"total": total,
"leads": [_crm_lead_to_dict(r) for r in rows],
}
@app.get("/api/leads/{lead_id}")
async def get_lead(lead_id: int, t: TenantContext = Depends(get_tenant_context)):
db = t.db
row = (
db.query(CrmLead)
.filter(CrmLead.tenant_id == t.tenant_id, CrmLead.id == lead_id)
.first()
)
if not row:
raise HTTPException(status_code=404, detail="Lead not found")
d = _crm_lead_to_dict(row)
if row.contact_id:
c = (
db.query(Contact)
.filter(
Contact.tenant_id == t.tenant_id,
Contact.id == row.contact_id,
)
.first()
)
if c:
d["contact"] = {
"id": c.id,
"email": c.email,
"company": c.company,
"title": c.title,
}
d["company_details"] = _contact_company_details_dict(c)
return d
@app.patch("/api/leads/{lead_id}")
async def patch_lead(lead_id: int, body: CrmLeadPatchRequest, t: TenantContext = Depends(get_tenant_context)):
db = t.db
row = (
db.query(CrmLead)
.filter(CrmLead.tenant_id == t.tenant_id, CrmLead.id == lead_id)
.first()
)
if not row:
raise HTTPException(status_code=404, detail="Lead not found")
data = body.model_dump(exclude_unset=True)
if not data:
raise HTTPException(status_code=400, detail="No fields to update")
if "crm_status" in data:
if data["crm_status"] not in CRM_STATUS_ALLOWED:
raise HTTPException(
status_code=400,
detail=f"crm_status must be one of: {sorted(CRM_STATUS_ALLOWED)}",
)
row.crm_status = data["crm_status"]
if "first_name" in data:
row.first_name = _safe_str(data["first_name"])
if "last_name" in data:
row.last_name = _safe_str(data["last_name"])
if "company_name" in data:
row.company_name = _safe_str(data["company_name"])
if "title" in data:
row.title = _safe_str(data["title"])
if "last_reply_subject" in data:
row.last_reply_subject = _safe_str(data["last_reply_subject"])
if "last_reply_body" in data:
row.last_reply_body = _safe_str(data["last_reply_body"])
if "email" in data:
email = _safe_str(data["email"]).lower()
if email:
taken = (
db.query(CrmLead)
.filter(
CrmLead.tenant_id == row.tenant_id,
CrmLead.id != lead_id,
func.lower(CrmLead.email) == email,
)
.first()
)
if taken:
raise HTTPException(
status_code=409,
detail="Another lead already uses this email",
)
row.email = email
rw = dict(row.raw_webhook) if isinstance(row.raw_webhook, dict) else {}
cd = dict(rw.get("company_details") or {}) if isinstance(rw.get("company_details"), dict) else {}
cd["Company Name"] = row.company_name or ""
for py_key, raw_key in COMPANY_DETAIL_PATCH_FIELDS.items():
if py_key in data:
cd[raw_key] = _safe_str(data[py_key])
rw["company_details"] = cd
row.raw_webhook = rw
row.updated_at = datetime.utcnow()
db.commit()
db.refresh(row)
return _crm_lead_to_dict(row)
@app.post("/api/leads/{lead_id}/move-to-contacts")
async def move_lead_to_contacts(lead_id: int, t: TenantContext = Depends(get_tenant_context)):
db = t.db
lead = (
db.query(CrmLead)
.filter(CrmLead.tenant_id == t.tenant_id, CrmLead.id == lead_id)
.first()
)
if not lead:
raise HTTPException(status_code=404, detail="Lead not found")
r = _move_lead_to_contacts_core(db, lead)
if not r["ok"]:
raise HTTPException(status_code=400, detail=r.get("error", "Move failed"))
db.commit()
return {"contact_id": r["contact_id"], "message": r["message"]}
@app.post("/api/leads/bulk-move-to-contacts")
async def bulk_move_leads_to_contacts(body: BulkLeadIdsRequest, t: TenantContext = Depends(get_tenant_context)):
db = t.db
if not body.lead_ids:
raise HTTPException(status_code=400, detail="lead_ids required")
moved = 0
errors: List[dict] = []
for lid in body.lead_ids:
lead = (
db.query(CrmLead)
.filter(CrmLead.tenant_id == t.tenant_id, CrmLead.id == lid)
.first()
)
if not lead:
errors.append({"lead_id": lid, "error": "not found"})
continue
r = _move_lead_to_contacts_core(db, lead)
if not r["ok"]:
errors.append({"lead_id": lid, "error": r.get("error", "failed")})
else:
moved += 1
db.commit()
return {"moved": moved, "errors": errors}
@app.post("/api/leads/bulk-delete")
async def bulk_delete_leads(body: BulkLeadIdsRequest, t: TenantContext = Depends(get_tenant_context)):
db = t.db
if not body.lead_ids:
raise HTTPException(status_code=400, detail="lead_ids required")
deleted = (
db.query(CrmLead)
.filter(CrmLead.tenant_id == t.tenant_id, CrmLead.id.in_(body.lead_ids))
.delete(synchronize_session=False)
)
db.commit()
return {"deleted": deleted}
@app.post("/api/deals/from-leads")
async def deals_from_leads(body: BulkLeadIdsRequest, t: TenantContext = Depends(get_tenant_context)):
"""Create one deal per selected lead and remove those leads from the Leads table."""
db = t.db
if not body.lead_ids:
raise HTTPException(status_code=400, detail="lead_ids required")
created: List[dict] = []
errors: List[dict] = []
for lid in body.lead_ids:
lead = (
db.query(CrmLead)
.filter(CrmLead.tenant_id == t.tenant_id, CrmLead.id == lid)
.first()
)
if not lead:
errors.append({"lead_id": lid, "error": "not found"})
continue
person = " ".join(filter(None, [lead.first_name or "", lead.last_name or ""])).strip()
inviter = db.query(User).filter(User.id == t.user_id).first()
deal = CrmDeal(
tenant_id=lead.tenant_id,
name=_deal_name_from_lead(lead),
stage="new",
owner_user_id=t.user_id,
owner_initials=_user_initials_from_user(inviter),
deal_value=None,
revenue_type="arr",
contact_display=person or (lead.email or ""),
account_name=lead.company_name or "",
expected_close_date=datetime.utcnow() + timedelta(days=60),
close_probability=25,
country="",
last_interaction_at=lead.last_reply_at or datetime.utcnow(),
source_lead_id=lead.id,
source_campaign_name=lead.campaign_name or lead.campaign_id or "",
contact_id=lead.contact_id,
)
db.add(deal)
db.flush()
dd = _deal_to_dict(deal)
if inviter:
dd["owner_display_name"] = (inviter.name or inviter.email or "").strip() or None
else:
dd["owner_display_name"] = None
created.append(dd)
db.delete(lead)
db.commit()
return {"created": created, "errors": errors}
@app.get("/api/deals")
async def list_deals(
search: str = Query(""),
sort_by: str = Query("created_at"),
sort_dir: str = Query("desc"),
limit: int = Query(100, ge=1, le=500),
offset: int = Query(0, ge=0),
t: TenantContext = Depends(get_tenant_context),
):
db = t.db
q = db.query(CrmDeal).filter(CrmDeal.tenant_id == t.tenant_id)
if t.role != "admin":
q = q.filter(
or_(CrmDeal.owner_user_id == t.user_id, CrmDeal.owner_user_id.is_(None))
)
if search.strip():
term = f"%{search.strip().lower()}%"
q = q.filter(
or_(
func.lower(CrmDeal.name).like(term),
func.lower(func.coalesce(CrmDeal.account_name, "")).like(term),
func.lower(func.coalesce(CrmDeal.contact_display, "")).like(term),
)
)
total = q.count()
col_map = {
"created_at": CrmDeal.created_at,
"name": CrmDeal.name,
"deal_value": CrmDeal.deal_value,
"stage": CrmDeal.stage,
"last_interaction_at": CrmDeal.last_interaction_at,
}
col = col_map.get(sort_by, CrmDeal.created_at)
if sort_dir == "asc":
q = q.order_by(col.asc())
else:
q = q.order_by(col.desc())
rows = q.offset(offset).limit(limit).all()
oid_set = {r.owner_user_id for r in rows if r.owner_user_id}
user_map: Dict[int, User] = {}
if oid_set:
for u in db.query(User).filter(User.id.in_(oid_set)).all():
user_map[u.id] = u
deals_out = []
for r in rows:
dd = _deal_to_dict(r)
if r.owner_user_id and r.owner_user_id in user_map:
u = user_map[r.owner_user_id]
dd["owner_display_name"] = (u.name or u.email or "").strip() or None
if not (dd.get("owner_initials") or "").strip():
dd["owner_initials"] = _user_initials_from_user(u)
else:
dd["owner_display_name"] = None
deals_out.append(dd)
return {"total": total, "deals": deals_out}
@app.get("/api/deals/quarterly-recurring-metrics")
async def quarterly_recurring_metrics(
owner_user_id: Optional[int] = Query(None),
max_quarters: int = Query(12, ge=1, le=24),
period: str = Query("all"),
t: TenantContext = Depends(get_tenant_context),
):
"""
Won deals → ARR (USD) roll-forward by calendar quarter from PO lines (interval per line + one-time lines).
Optional period=all|month|quarter|year filters by won_at.
When period is not all, includes comparison roll-forward for the prior calendar period (last month / quarter / year).
"""
db = t.db
q = db.query(CrmDeal).filter(
CrmDeal.tenant_id == t.tenant_id,
CrmDeal.stage == "won",
)
if t.role != "admin":
q = q.filter(or_(CrmDeal.owner_user_id == t.user_id, CrmDeal.owner_user_id.is_(None)))
elif owner_user_id is not None:
q = q.filter(CrmDeal.owner_user_id == int(owner_user_id))
rows = q.all()
bounds = _dashboard_period_bounds_utc(period)
deals_payload = []
for r in rows:
wa = getattr(r, "won_at", None)
if not _won_at_within_bounds(wa, bounds):
continue
deals_payload.append(
{
"id": r.id,
"name": r.name or "",
"revenue_type": (getattr(r, "revenue_type", None) or "arr"),
"won_line_items": _won_line_items_list(r),
"won_at": wa,
}
)
base = build_quarterly_board(deals_payload, max_quarters=max_quarters)
pnorm = (period or "all").strip().lower()
if pnorm in ("", "all"):
return {**base, "comparison": None}
bounds_prev = _previous_period_bounds_utc(period)
deals_prev: list[dict] = []
if bounds_prev:
for r in rows:
wa = getattr(r, "won_at", None)
if not _won_at_within_bounds(wa, bounds_prev):
continue
deals_prev.append(
{
"id": r.id,
"name": r.name or "",
"revenue_type": (getattr(r, "revenue_type", None) or "arr"),
"won_line_items": _won_line_items_list(r),
"won_at": wa,
}
)
comp = build_quarterly_board(deals_prev, max_quarters=max_quarters)
return {
**base,
"comparison": {
"period_label": _comparison_period_label(period),
"quarters": comp.get("quarters", []),
},
}
@app.post("/api/deals")
async def create_deal(body: CrmDealCreateRequest, t: TenantContext = Depends(get_tenant_context)):
stage = (body.stage or "new").strip().lower() or "new"
if stage not in DEAL_STAGE_ALLOWED:
raise HTTPException(
status_code=400,
detail=f"stage must be one of: {sorted(DEAL_STAGE_ALLOWED)}",
)
if stage == "lost":
raise HTTPException(
status_code=400,
detail="Cannot create a deal in Lost. Create the deal in another stage, then move it to Lost.",
)
if stage == "won":
raise HTTPException(
status_code=400,
detail="Cannot create a deal as Won. Create the deal in another stage, then move it to Won with billing details.",
)
db = t.db
raw_name = _safe_str(body.name) if body.name is not None else ""
name = raw_name or "Untitled deal"
creator = db.query(User).filter(User.id == t.user_id).first()
row = CrmDeal(
tenant_id=t.tenant_id,
name=name,
stage=stage,
owner_user_id=t.user_id,
owner_initials=_user_initials_from_user(creator),
deal_value=None,
revenue_type="arr",
contact_display="",
account_name="",
expected_close_date=None,
close_probability=10,
country="",
last_interaction_at=None,
source_lead_id=None,
source_campaign_name="",
contact_id=None,
)
db.add(row)
db.commit()
db.refresh(row)
return _enrich_deal_response(db, row)
@app.get("/api/deals/{deal_id}")
async def get_deal(deal_id: int, t: TenantContext = Depends(get_tenant_context)):
db = t.db
row = (
db.query(CrmDeal)
.filter(CrmDeal.tenant_id == t.tenant_id, CrmDeal.id == deal_id)
.first()
)
if not row:
raise HTTPException(status_code=404, detail="Deal not found")
_deal_access_or_403(row, t)
return _enrich_deal_response(db, row)
@app.patch("/api/deals/{deal_id}")
async def patch_deal(deal_id: int, body: CrmDealPatchRequest, t: TenantContext = Depends(get_tenant_context)):
db = t.db
row = (
db.query(CrmDeal)
.filter(CrmDeal.tenant_id == t.tenant_id, CrmDeal.id == deal_id)
.first()
)
if not row:
raise HTTPException(status_code=404, detail="Deal not found")
_deal_access_or_403(row, t)
data = body.model_dump(exclude_unset=True)
if not data:
raise HTTPException(status_code=400, detail="No fields to update")
did_mark_won_this_request = False
if "owner_initials" in data:
del data["owner_initials"]
if "owner_user_id" in data:
val = data.pop("owner_user_id")
mids = _tenant_member_ids(db, t.tenant_id)
if t.role == "admin":
if val is None:
row.owner_user_id = None
row.owner_initials = ""
else:
vid = int(val)
if vid not in mids:
raise HTTPException(
status_code=400,
detail="Owner must be a member of this workspace",
)
ou = db.query(User).filter(User.id == vid).first()
if not ou:
raise HTTPException(status_code=404, detail="User not found")
row.owner_user_id = vid
row.owner_initials = _user_initials_from_user(ou)
else:
if val is None:
raise HTTPException(
status_code=403, detail="Only admins can set a deal to unassigned"
)
if int(val) != int(t.user_id):
raise HTTPException(
status_code=403,
detail="Only admins can assign deals to other users",
)
if row.owner_user_id is not None and int(row.owner_user_id) != int(t.user_id):
raise HTTPException(
status_code=403,
detail="You can only take ownership of unassigned deals",
)
claimer = db.query(User).filter(User.id == t.user_id).first()
row.owner_user_id = t.user_id
row.owner_initials = _user_initials_from_user(claimer)
if "contact_id" in data:
cid = data["contact_id"]
if cid is None:
row.contact_id = None
else:
contact = (
db.query(Contact)
.filter(
Contact.tenant_id == t.tenant_id,
Contact.id == int(cid),
)
.first()
)
if not contact:
raise HTTPException(status_code=404, detail="Contact not found")
row.contact_id = contact.id
row.contact_display = _format_contact_display(contact)
if "account_name" not in data and _safe_str(contact.company):
row.account_name = _safe_str(contact.company)
if "name" in data:
row.name = _safe_str(data["name"])
if "contact_display" in data:
row.contact_display = _safe_str(data["contact_display"])
if "account_name" in data:
row.account_name = _safe_str(data["account_name"])
if "stage" in data:
new_stage = data["stage"]
if new_stage not in DEAL_STAGE_ALLOWED:
raise HTTPException(
status_code=400,
detail=f"stage must be one of: {sorted(DEAL_STAGE_ALLOWED)}",
)
prev_stage = (row.stage or "new").strip().lower()
if new_stage == "lost":
_clear_won_billing_fields(row)
lr = _safe_str(data.get("lost_reason", "")).strip().lower()
if not lr or lr not in DEAL_LOST_REASON_ALLOWED:
raise HTTPException(
status_code=400,
detail=(
"When marking a deal as Lost, lost_reason is required. "
f"Use one of: {', '.join(sorted(DEAL_LOST_REASON_ALLOWED))}."
),
)
if lr == "price":
pct = data.get("lost_price_discount_pct")
if pct is None:
raise HTTPException(
status_code=400,
detail="lost_price_discount_pct (0–100) is required when lost_reason is price.",
)
try:
pf = float(pct)
except (TypeError, ValueError):
raise HTTPException(
status_code=400,
detail="lost_price_discount_pct must be a number.",
)
if pf < 0 or pf > 100:
raise HTTPException(
status_code=400,
detail="lost_price_discount_pct must be between 0 and 100.",
)
row.lost_reason = lr
row.lost_price_discount_pct = pf
row.lost_chose_product_name = ""
elif lr == "chose_another":
pn = _safe_str(data.get("lost_chose_product_name") or "")
if not pn:
raise HTTPException(
status_code=400,
detail="lost_chose_product_name is required when lost_reason is chose_another.",
)
row.lost_reason = lr
row.lost_price_discount_pct = None
row.lost_chose_product_name = pn
else:
row.lost_reason = lr
row.lost_price_discount_pct = None
row.lost_chose_product_name = ""
row.stage = "lost"
elif new_stage == "won":
wb_raw = data.pop("won_billing", None)
if wb_raw is None:
raise HTTPException(
status_code=400,
detail="won_billing is required when marking a deal as Won (PO, customer, line items, notes).",
)
try:
wb = WonBillingPayload.model_validate(wb_raw)
except ValidationError as e:
raise HTTPException(status_code=400, detail=str(e)[:3000])
items_out = []
subtotal = 0.0
for li in wb.line_items:
amt = round(float(li.qty) * float(li.rate), 2)
subtotal += amt
items_out.append(
{
"product_service": _safe_str(li.product_service),
"description": _safe_str(li.description or ""),
"qty": float(li.qty),
"rate": float(li.rate),
"amount": amt,
"billing_interval": li.billing_interval,
"currency": li.currency,
}
)
row.won_po_number = _safe_str(wb.po_number).strip()
row.won_customer_legal_name = _safe_str(wb.customer_legal_name).strip()
row.won_customer_address = _safe_str(wb.customer_address).strip()
row.won_contact_person_name = _safe_str(wb.contact_person_name).strip()
row.won_channel_partner_name = _safe_str(wb.channel_partner_name or "").strip()
row.won_note_to_customer = _safe_str(wb.note_to_customer).strip()
row.won_note_to_accounts = _safe_str(wb.note_to_accounts).strip()
row.won_line_items = items_out
# Actual closed value = invoice subtotal from the won popover (USD whole dollars).
row.deal_value = int(round(subtotal))
row.lost_reason = ""
row.lost_price_discount_pct = None
row.lost_chose_product_name = ""
row.stage = "won"
if prev_stage != "won":
row.won_at = datetime.utcnow()
did_mark_won_this_request = True
else:
if prev_stage == "lost":
row.lost_reason = ""
row.lost_price_discount_pct = None
row.lost_chose_product_name = ""
if prev_stage == "won":
_clear_won_billing_fields(row)
row.stage = new_stage
data.pop("lost_reason", None)
data.pop("lost_price_discount_pct", None)
data.pop("lost_chose_product_name", None)
data.pop("won_billing", None)
else:
data.pop("lost_reason", None)
data.pop("lost_price_discount_pct", None)
data.pop("lost_chose_product_name", None)
data.pop("won_billing", None)
if "deal_value" in data:
row.deal_value = data["deal_value"]
if "revenue_type" in data:
rt = _safe_str(data["revenue_type"]).lower()
if rt not in DEAL_REVENUE_TYPE_ALLOWED:
raise HTTPException(
status_code=400,
detail=f"revenue_type must be one of: {sorted(DEAL_REVENUE_TYPE_ALLOWED)}",
)
row.revenue_type = rt
if "close_probability" in data:
row.close_probability = max(0, min(100, int(data["close_probability"])))
if "country" in data:
row.country = _safe_str(data["country"])
if "expected_close_date" in data:
ts = _to_datetime(data["expected_close_date"])
row.expected_close_date = ts
if "last_interaction_at" in data:
ts = _to_datetime(data["last_interaction_at"])
row.last_interaction_at = ts
c = None
if row.contact_id:
c = (
db.query(Contact)
.filter(
Contact.tenant_id == t.tenant_id,
Contact.id == row.contact_id,
)
.first()
)
if "account_name" in data and c:
c.company = row.account_name
if c:
_sync_contact_raw_from_columns(c)
if any(k in data for k in COMPANY_DETAIL_PATCH_FIELDS):
_merge_contact_raw_company_details(c, data)
if "country_region" in data:
row.country = _safe_str(data["country_region"])
if "country" in data and "country_region" not in data:
rd = dict(c.raw_data or {})
rd["Country"] = _safe_str(data["country"])
c.raw_data = rd
db.add(c)
row.updated_at = datetime.utcnow()
db.commit()
db.refresh(row)
out = _enrich_deal_response(db, row)
if did_mark_won_this_request:
sender = db.query(User).filter(User.id == int(t.user_id)).first()
if sender:
lines = _won_line_items_list(row)
st = sum(float(li.get("amount") or 0) for li in lines) if lines else 0.0
ok, err = await _notify_tenant_admins_deal_won(
db,
int(t.tenant_id),
sender,
int(row.id),
row.name or "",
lines,
st,
row.won_po_number or "",
row.won_customer_legal_name or "",
row.won_customer_address or "",
row.won_contact_person_name or "",
row.won_channel_partner_name or "",
row.won_note_to_customer or "",
row.won_note_to_accounts or "",
)
out["won_email_to_admins_sent"] = ok
if err:
out["won_email_error"] = err
return out
@app.delete("/api/deals/{deal_id}")
async def delete_deal(deal_id: int, t: TenantContext = Depends(get_tenant_context)):
db = t.db
row = (
db.query(CrmDeal)
.filter(CrmDeal.tenant_id == t.tenant_id, CrmDeal.id == deal_id)
.first()
)
if not row:
raise HTTPException(status_code=404, detail="Deal not found")
_deal_access_or_403(row, t)
db.delete(row)
db.commit()
return {"ok": True, "id": deal_id}
@app.post("/api/deals/seed-demo")
async def seed_demo_deals(t: TenantContext = Depends(get_tenant_context)):
db = t.db
tid = t.tenant_id
removed = (
db.query(CrmDeal)
.filter(CrmDeal.tenant_id == tid, CrmDeal.name.like("DEMO: %"))
.delete(synchronize_session=False)
)
now = datetime.utcnow()
samples = [
{
"name": "DEMO: Ramco — Aus",
"stage": "discovery",
"owner_initials": "",
"deal_value": 10000,
"contact_display": "KENNETH WON",
"account_name": "Quanticsit",
"close_probability": 10,
"close_in_days": 120,
"country": "Australia",
},
{
"name": "DEMO: HT Pilot",
"stage": "proposal",
"owner_initials": "",
"deal_value": 1600,
"contact_display": "SARAH LEE",
"account_name": "HT Logistics",
"close_probability": 60,
"close_in_days": 45,
"country": "Malaysia",
},
{
"name": "DEMO: Northwind rollout",
"stage": "negotiation",
"owner_initials": "",
"deal_value": 45000,
"contact_display": "JAMES DIAZ",
"account_name": "Northwind Trading",
"close_probability": 40,
"close_in_days": 60,
"country": "USA",
},
{
"name": "DEMO: Maple cold chain",
"stage": "new",
"owner_initials": "",
"deal_value": None,
"contact_display": "ALEX RUIZ",
"account_name": "Maple Cold",
"close_probability": 15,
"close_in_days": 90,
"country": "Canada",
},
]
for s in samples:
db.add(
CrmDeal(
tenant_id=tid,
name=s["name"],
stage=s["stage"],
owner_user_id=None,
owner_initials=s["owner_initials"],
deal_value=s["deal_value"],
contact_display=s["contact_display"],
account_name=s["account_name"],
expected_close_date=now + timedelta(days=s["close_in_days"]),
close_probability=s["close_probability"],
country=s["country"],
last_interaction_at=now - timedelta(days=1),
source_lead_id=None,
source_campaign_name="Demo",
)
)
db.commit()
return {"ok": True, "removed_previous_demo_rows": removed, "inserted": len(samples)}
@app.get("/api/leads/{lead_id}/smartlead-thread")
async def lead_smartlead_thread(lead_id: int, t: TenantContext = Depends(get_tenant_context)):
"""Fetch full thread from Smartlead API (Admin API key required)."""
db = t.db
row = (
db.query(CrmLead)
.filter(CrmLead.tenant_id == t.tenant_id, CrmLead.id == lead_id)
.first()
)
if not row:
raise HTTPException(status_code=404, detail="Lead not found")
if not row.smartlead_lead_id or not row.campaign_id:
raise HTTPException(status_code=400, detail="Lead missing Smartlead campaign/lead id")
try:
lid = int(row.smartlead_lead_id)
except (TypeError, ValueError):
raise HTTPException(status_code=400, detail="Invalid smartlead_lead_id")
try:
client = SmartleadClient()
hist = client.get_message_history(str(row.campaign_id), lid)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=502, detail=str(e))
return {"history": hist}
# ---- Frontend static serving ----
FRONTEND_DIST = Path(__file__).resolve().parents[2] / "frontend" / "dist"
INDEX_FILE = FRONTEND_DIST / "index.html"
FRONTEND_ASSETS = FRONTEND_DIST / "assets"
# Prevent stale index.html in browsers/CDNs referencing old hashed bundles (→ blank page when *.js 404s).
_SPA_HTML_HEADERS = {
"Cache-Control": "no-store, no-cache, must-revalidate, max-age=0",
"Pragma": "no-cache",
}
def _spa_index_response() -> FileResponse:
return FileResponse(str(INDEX_FILE), headers=_SPA_HTML_HEADERS)
if FRONTEND_DIST.exists() and INDEX_FILE.is_file():
# Mount only /assets so client-side routes (/contacts, /deals, …) are not swallowed by
# StaticFiles (which would 404 for missing files instead of serving the SPA shell).
if FRONTEND_ASSETS.is_dir():
app.mount("/assets", StaticFiles(directory=str(FRONTEND_ASSETS)), name="frontend_assets")
@app.get("/")
def spa_index():
return _spa_index_response()
@app.get("/{full_path:path}")
def spa_fallback(full_path: str):
if full_path.startswith("api"):
raise HTTPException(status_code=404, detail="Not Found")
return _spa_index_response()