n8n / python /hubspot_companies.py
niwayandm
Add new properties to deals, tickes and companies
31b36e7
"""
HubSpot Companies → Supabase (incremental since a millisecond cursor)
Usage from orchestrator:
import hubspot_companies
hubspot_companies.main(since_ms=<int milliseconds since epoch UTC>)
Direct CLI:
# epoch ms
python hubspot_companies.py 1754025600000
# ISO-8601
python hubspot_companies.py 2025-08-01T09:30:00Z
# Back-compat date (floors to 00:00Z)
python hubspot_companies.py 2025-08-01
"""
import os
import re
import time
import logging
import datetime
from typing import List, Dict, Optional, Tuple, Union
import httpx
import hubspot
from dotenv import load_dotenv
from supabase import create_client
from hubspot.crm.companies import ApiException as CompaniesApiException
from hubspot_utils import (
try_parse_int, parse_ts, get_property_label_mapping,
)
from supabase_utils import (
update_sync_metadata, enrich_supabase_row,
batched_insert, fetch_supabase_table,
)
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logging.basicConfig(
filename=f"logs/hubspot_company_pipeline_{datetime.datetime.now().strftime('%Y-%m-%d')}.log",
filemode="a",
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
# -----------------------------------------------------------------------------
# Environment
# -----------------------------------------------------------------------------
load_dotenv()
HUBSPOT_TOKEN = os.getenv("HUBSPOT_TOKEN")
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY")
# Optional bootstrap cursor if orchestrator doesn't provide one
BOOTSTRAP_SINCE_MS_ENV = os.getenv("HUBSPOT_COMPANIES_SINCE_MS")
if not HUBSPOT_TOKEN:
raise RuntimeError("HUBSPOT_TOKEN is not set")
if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY:
raise RuntimeError("Supabase env vars are not set")
hubspot_client = hubspot.Client.create(access_token=HUBSPOT_TOKEN)
supabase_client = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY)
# -----------------------------------------------------------------------------
# Config
# -----------------------------------------------------------------------------
COMPANY_PROPERTIES = [
"name",
"city",
"company_email",
"address",
"address2",
"domain",
"number_of_active__cli_s",
"number_of__mobile___cloned_",
"createdate",
"hs_lastmodifieddate",
"lastmodifieddate",
"review_date_updated_by___clone",
"industry",
"macro_industry_grouping",
"closest_review_date___clone",
"region",
"industry_grouping",
"source_1",
"source_2",
"hs_object_source_label",
"hs_analytics_source",
"hs_analytics_latest_source",
"hs_object_source_detail_1",
"average_deal_closed_days",
]
# -----------------------------------------------------------------------------
# Time helpers
# -----------------------------------------------------------------------------
def _ensure_utc(dt: datetime.datetime) -> datetime.datetime:
if dt.tzinfo is None:
dt = dt.replace(tzinfo=datetime.timezone.utc)
return dt.astimezone(datetime.timezone.utc)
def floor_to_utc_midnight(dt: datetime.datetime) -> datetime.datetime:
dt = _ensure_utc(dt)
return dt.replace(hour=0, minute=0, second=0, microsecond=0)
def _parse_iso_like_to_dt(value: str) -> datetime.datetime:
if isinstance(value, str) and value.endswith("Z"):
value = value[:-1] + "+00:00"
dt = datetime.datetime.fromisoformat(value)
return _ensure_utc(dt)
def to_epoch_ms(dt_or_str: Union[str, datetime.datetime]) -> int:
if isinstance(dt_or_str, str):
dt = _parse_iso_like_to_dt(dt_or_str)
elif isinstance(dt_or_str, datetime.datetime):
dt = _ensure_utc(dt_or_str)
else:
raise TypeError(f"Unsupported type for to_epoch_ms: {type(dt_or_str)}")
return int(dt.timestamp() * 1000)
def parse_any_ts_ms(value: Optional[Union[str, int, float]]) -> Optional[int]:
if value is None:
return None
try:
v = int(str(value))
if v < 10_000_000_000_000: # seconds → ms
v *= 1000
return v
except ValueError:
pass
try:
return to_epoch_ms(str(value))
except Exception:
logging.warning("Could not parse timestamp value=%r", value)
return None
# -----------------------------------------------------------------------------
# Search IDs (ts > since_ms) with property fallback
# -----------------------------------------------------------------------------
def _search_company_ids_from(since_ms: int, prop: str) -> List[str]:
"""
Search companies where {prop} > since_ms (epoch-ms).
Sort ascending so we can advance the cursor monotonically.
"""
url = "https://api.hubapi.com/crm/v3/objects/companies/search"
headers = {
"Authorization": f"Bearer {HUBSPOT_TOKEN}",
"Content-Type": "application/json",
"Accept": "application/json",
}
payload = {
"filterGroups": [{
"filters": [
{"propertyName": prop, "operator": "GT",
"value": str(since_ms)},
]
}],
"limit": 100,
"sorts": [{"propertyName": prop, "direction": "ASCENDING"}],
}
ids: List[str] = []
after: Optional[str] = None
with httpx.Client(timeout=30.0) as client:
while True:
body = dict(payload)
if after:
body["after"] = after
resp = client.post(url, headers=headers, json=body)
if resp.status_code >= 400:
try:
logging.error(
"Company search error for prop '%s': %s", prop, resp.json())
except Exception:
logging.error(
"Company search error for prop '%s': %s", prop, resp.text)
resp.raise_for_status()
data = resp.json()
ids.extend([obj["id"] for obj in data.get("results", []) or []])
after = (data.get("paging") or {}).get("next", {}).get("after")
if not after:
break
time.sleep(0.1)
return ids
def search_company_ids_after_ms(since_ms: int) -> Tuple[List[str], str]:
"""
Try these properties in order; return (ids, prop_used) for the first successful search:
1) hs_lastmodifieddate
2) lastmodifieddate
3) createdate
"""
props_to_try = ["createdate"]
last_err = None
for prop in props_to_try:
try:
ids = _search_company_ids_from(since_ms, prop)
logging.info(
"Company search with '%s' returned %d IDs.", prop, len(ids))
return ids, prop
except httpx.HTTPStatusError as e:
last_err = e
continue
if last_err:
raise last_err
return [], "hs_lastmodifieddate"
# -----------------------------------------------------------------------------
# Read-by-ID (with associations) → enrich & track max cursor ts
# -----------------------------------------------------------------------------
def _enrich_company_data_from_record(
record,
industry_map: Dict[str, str],
macro_industry_map: Dict[str, str],
industry_grouping_map: Dict[str, str],
region_map: Dict[str, str],
) -> Dict:
props = record.properties or {}
company_data: Dict[str, Optional[str]] = {"id": record.id}
for p in COMPANY_PROPERTIES:
company_data[p] = props.get(p)
# Association counts
num_contacts = 0
num_deals = 0
if getattr(record, "associations", None):
if record.associations.get("contacts") and getattr(record.associations["contacts"], "results", None):
num_contacts = len(
{a.id for a in record.associations["contacts"].results if getattr(a, "id", None)})
if record.associations.get("deals") and getattr(record.associations["deals"], "results", None):
num_deals = len(
{a.id for a in record.associations["deals"].results if getattr(a, "id", None)})
company_data["number_of_associated_contacts"] = num_contacts
company_data["number_of_associated_deals"] = num_deals
# Map label properties
code = company_data.get("industry")
company_data["industry"] = industry_map.get(
code) if code in industry_map else None
macro = company_data.get("macro_industry_grouping")
company_data["macro_industry_grouping"] = macro_industry_map.get(
macro) if macro in macro_industry_map else None
industry_mapping = company_data.get("industry_grouping")
company_data["industry_grouping"] = industry_grouping_map.get(
industry_mapping) if industry_mapping in industry_grouping_map else None
region = company_data.get("region")
company_data["region"] = region_map.get(
region) if region in region_map else None
return company_data
def read_companies_by_ids(company_ids: List[str], cursor_prop: str) -> Tuple[List[Dict], Optional[int]]:
if not company_ids:
return [], None
companies: List[Dict] = []
assoc_types = ["contacts", "deals"]
# Fetch label maps once
try:
industry_map = get_property_label_mapping(
hubspot_client, "companies", "industry")
except Exception as e:
logging.warning("Failed to fetch industry map: %s", e)
industry_map = {}
try:
macro_industry_map = get_property_label_mapping(
hubspot_client, "companies", "macro_industry_grouping")
except Exception as e:
logging.warning("Failed to fetch macro_industry_grouping map: %s", e)
macro_industry_map = {}
try:
industry_grouping_map = get_property_label_mapping(
hubspot_client, "companies", "industry_grouping")
except Exception as e:
logging.warning("Failed to fetch industry_grouping map: %s", e)
industry_grouping_map = {}
try:
region_map = get_property_label_mapping(
hubspot_client, "companies", "region")
except Exception as e:
logging.warning("Failed to fetch region map: %s", e)
region_map = {}
max_ts_ms: Optional[int] = None
for i, cid in enumerate(company_ids, start=1):
try:
record = hubspot_client.crm.companies.basic_api.get_by_id(
company_id=cid,
properties=COMPANY_PROPERTIES,
associations=assoc_types,
archived=False,
)
# Track max timestamp for the chosen cursor property
cursor_val = (record.properties or {}).get(cursor_prop)
ts_ms = parse_any_ts_ms(cursor_val)
if ts_ms is not None and (max_ts_ms is None or ts_ms > max_ts_ms):
max_ts_ms = ts_ms
companies.append(_enrich_company_data_from_record(
record, industry_map, macro_industry_map, industry_grouping_map,
region_map
))
if i % 200 == 0:
logging.info("Read %d companies...", i)
time.sleep(0.05)
except httpx.HTTPStatusError as e:
logging.error("HTTP error reading company %s: %s", cid, e)
except (CompaniesApiException, httpx.HTTPError) as e:
logging.error("Error reading company %s: %s", cid, e)
return companies, max_ts_ms
# -----------------------------------------------------------------------------
# Map → Supabase rows and diff
# -----------------------------------------------------------------------------
def map_company_data_for_db(companies: List[Dict]) -> List[Dict]:
mapped: List[Dict] = []
for c in companies:
base_row = {
"company_id": try_parse_int(c["id"]),
"company_name": c.get("name"),
"company_email": c.get("company_email"),
"city": c.get("city"),
"domain": c.get("domain"),
"street_address": c.get("address"),
"street_address2": c.get("address2"),
"hubspot_create_date": parse_ts(c.get("createdate")),
"hubspot_modified_date": parse_ts(c.get("hs_lastmodifieddate") or c.get("lastmodifieddate")),
"review_date_updated_by": c.get("review_date_updated_by___clone") or None,
"number_of_active_clis": try_parse_int(c.get("number_of_active__cli_s")),
"number_of_clis": try_parse_int(c.get("number_of__mobile___cloned_")),
"number_of_associated_contacts": c.get("number_of_associated_contacts", 0),
"number_of_associated_deals": c.get("number_of_associated_deals", 0),
"industry": c.get("industry"),
"macro_industry_grouping": c.get("macro_industry_grouping"),
"industry_grouping": c.get("industry_grouping"),
"closest_review_date": parse_ts(c.get("closest_review_date___clone")),
"region": c.get("region"),
"source_1": c.get("source_1"),
"source_2": c.get("source_2"),
"record_source": c.get("hs_object_source_label"),
"record_source_detail_1": c.get("hs_object_source_detail_1"),
"original_traffic_source": c.get("hs_analytics_source"),
"latest_traffic_source": c.get("hs_analytics_latest_source"),
"average_deal_closed_days": try_parse_int(c.get("average_deal_closed_days")),
}
mapped.append(enrich_supabase_row(base_row))
return mapped
def companies_are_different(new_row: Dict, old_row: Dict) -> bool:
compare_keys = [
"company_name", "company_email", "city", "domain", "street_address",
"street_address2", "hubspot_modified_date", "review_date_updated_by",
"number_of_active_clis", "number_of_clis",
"number_of_associated_contacts", "number_of_associated_deals",
"industry", "macro_industry_grouping", "closest_review_date",
"region", "source_1", "source_2", "record_source",
"record_source_detail_1", "original_traffic_source",
"latest_traffic_source",
]
for key in compare_keys:
if str(new_row.get(key)) != str(old_row.get(key)):
return True
return False
# -----------------------------------------------------------------------------
# Upsert
# -----------------------------------------------------------------------------
def upsert_companies(companies: List[Dict]) -> None:
if not companies:
print("No companies to upsert.")
return
existing = fetch_supabase_table(
supabase_client, "hubspot_companies", "company_id")
mapped = map_company_data_for_db(companies)
rows_to_upsert: List[Dict] = []
for row in mapped:
cid = row.get("company_id")
if not cid:
continue
old_row = existing.get(str(cid))
if not old_row or companies_are_different(row, old_row):
rows_to_upsert.append(row)
print(f"{len(rows_to_upsert)} companies to insert/update (out of {len(companies)} read).")
if rows_to_upsert:
# upraw_json_to_supabase(supabase_client, rows_to_upsert, object_type="companies")
batched_insert(supabase_client, "hubspot_companies",
rows_to_upsert, batch_size=1000)
# -----------------------------------------------------------------------------
# Main (timestamp cursor)
# -----------------------------------------------------------------------------
def main(since_ms: Optional[int] = None):
"""
Orchestrates:
1) Search company IDs with <cursor_prop> > since_ms (property fallback)
2) Read full companies (track max timestamp for <cursor_prop>)
3) Upsert into Supabase
4) Update sync metadata with { last_sync_metadata, last_sync_time, cursor_prop }
"""
# Resolve since_ms
if since_ms is None and BOOTSTRAP_SINCE_MS_ENV:
try:
since_ms = int(BOOTSTRAP_SINCE_MS_ENV)
except ValueError:
raise RuntimeError(
"HUBSPOT_COMPANIES_SINCE_MS must be an integer (ms) if set.")
if since_ms is None:
# Default: today@00:00:00Z for first run
today0 = floor_to_utc_midnight(
datetime.datetime.now(datetime.timezone.utc))
since_ms = to_epoch_ms(today0)
print(f"Searching companies with timestamp > {since_ms} ...")
ids, cursor_prop = search_company_ids_after_ms(since_ms)
print(f"Search property: {cursor_prop}. Found {len(ids)} company IDs.")
now_iso = datetime.datetime.now(datetime.timezone.utc).isoformat()
if not ids:
print("No companies beyond the cursor. Updating sync metadata and exiting.")
update_sync_metadata(supabase_client, "companies", now_iso)
return
print("Reading companies (with associations)...")
companies, max_ts_ms = read_companies_by_ids(ids, cursor_prop)
print("Upserting into Supabase...")
upsert_companies(companies)
# Advance cursor to max timestamp we actually ingested for the chosen property
new_cursor_ms = max_ts_ms if max_ts_ms is not None else since_ms
update_sync_metadata(supabase_client, "companies", now_iso)
print(
f"Companies sync complete. Advanced cursor to {new_cursor_ms} using prop '{cursor_prop}'.")
# -----------------------------------------------------------------------------
# CLI
# -----------------------------------------------------------------------------
def _parse_cli_arg_to_ms(arg: str) -> int:
"""
Accept:
- integer epoch ms
- ISO-8601 (Z or offset)
- YYYY-MM-DD (floors to 00:00Z)
"""
# epoch ms or seconds
if re.fullmatch(r"\d{10,13}", arg):
v = int(arg)
if v < 10_000_000_000_000: # seconds -> ms
v *= 1000
return v
# YYYY-MM-DD
if re.fullmatch(r"\d{4}-\d{2}-\d{2}", arg):
d = datetime.datetime.strptime(
arg, "%Y-%m-%d").replace(tzinfo=datetime.timezone.utc)
return to_epoch_ms(floor_to_utc_midnight(d))
# ISO-8601
return to_epoch_ms(arg)
if __name__ == "__main__":
import sys
if len(sys.argv) > 1:
try:
since = _parse_cli_arg_to_ms(sys.argv[1])
except Exception as e:
print(
f"Invalid timestamp. Provide epoch ms, ISO-8601, or YYYY-MM-DD. Error: {e}"
)
sys.exit(1)
main(since_ms=since)
else:
main()