Dev Goyal
feat: add fallback instruction for cases where only management commentary is present
9081085 | """ | |
| Earnings-call ingest + inference tools. | |
| Ingest layer - fetch transcript (Financial Modeling Prep β SEC 8-K fallback), | |
| normalize into Prepared Remarks / Q&A segments, | |
| extract keyword counts, and embed into ChromaDB. | |
| Inference layer - LangGraph @tool functions for retrieval, | |
| sentiment divergence, and keyword trend analysis. | |
| Primary API: Financial Modeling Prep (FMP) β free tier, 250 req/day. | |
| Sign up: https://financialmodelingprep.com/developer/docs | |
| Endpoint: GET /api/v3/earning_call_transcript/{symbol}?year=YYYY&quarter=N&apikey=KEY | |
| """ | |
| import json | |
| import os | |
| import re | |
| from collections import Counter | |
| from typing import Optional | |
| import requests | |
| from langchain_chroma import Chroma | |
| from langchain_core.documents import Document | |
| from langchain_core.tools import tool | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from .rag_tools import get_cached_embeddings | |
| from .sec_tools import HEADERS, get_cik_from_ticker | |
| # --------------------------------------------------------------------------- | |
| # Constants | |
| # --------------------------------------------------------------------------- | |
| TRACKED_KEYWORDS = [ | |
| "ai", "artificial intelligence", "machine learning", | |
| "headwinds", "tailwinds", "guidance", "margin", "growth", | |
| "inflation", "recession", "tariff", "supply chain", | |
| "cloud", "capex", "capital expenditure", "free cash flow", | |
| "buyback", "dividend", "restructuring", "layoff", | |
| "regulation", "competition", "demand", "inventory", | |
| ] | |
| # Markers used to split transcripts into sections | |
| QA_MARKERS = [ | |
| "question-and-answer session", | |
| "question-and-answer", | |
| "q&a session", | |
| "q & a session", | |
| "operator instructions", | |
| "and our first question", | |
| "we will now begin the question", | |
| "we'll now begin the question", | |
| ] | |
| METADATA_DIR_NAME = "_earnings_meta" | |
| # --------------------------------------------------------------------------- | |
| # Quarter helpers | |
| # --------------------------------------------------------------------------- | |
| def parse_quarter(quarter_str: str) -> tuple[int, int]: | |
| """Parse 'Q1-2025' β (1, 2025). Also accepts 'Q1 2025' or 'q1-2025'.""" | |
| m = re.match(r"[Qq](\d)\s*[-_ ]?\s*(\d{4})", quarter_str.strip()) | |
| if not m: | |
| raise ValueError( | |
| f"Invalid quarter format '{quarter_str}'. Expected e.g. 'Q1-2025'." | |
| ) | |
| q, y = int(m.group(1)), int(m.group(2)) | |
| if q < 1 or q > 4: | |
| raise ValueError(f"Quarter must be 1-4, got {q}.") | |
| return q, y | |
| def _get_quarter_month_range(q: int) -> list[int]: | |
| """ | |
| Get the month range (quarter end month + 3 months after) for a given quarter. | |
| This is used as a heuristic to find the relevant 8-K filing. | |
| """ | |
| start_month = {1: 3, 2: 6, 3: 9, 4: 12}[q] | |
| # We allow a very wide range: 2 months before the standard month to 4 months after. | |
| # This covers most fiscal year offsets (e.g. AAPL Q1 ends in Dec, reported in Jan/Feb). | |
| months = [] | |
| for i in range(-2, 5): | |
| m = start_month + i | |
| if m < 1: m += 12 | |
| if m > 12: m -= 12 | |
| months.append(m) | |
| return months | |
| # --------------------------------------------------------------------------- | |
| # Transcript fetchers | |
| # --------------------------------------------------------------------------- | |
| def fetch_transcript_fmp( | |
| ticker: str, quarter: int, year: int, api_key: str | |
| ) -> Optional[str]: | |
| """ | |
| Fetch an earnings-call transcript from Financial Modeling Prep (FMP). | |
| Free tier: 250 requests / day β no premium required. | |
| Sign up: https://financialmodelingprep.com/developer/docs | |
| Endpoint: | |
| GET https://financialmodelingprep.com/api/v3/earning_call_transcript/{symbol} | |
| ?year=YYYY&quarter=N&apikey=KEY | |
| Response schema (list, first element used): | |
| [{"symbol": "AAPL", "quarter": 1, "year": 2025, | |
| "date": "2025-01-30 00:00:00", "content": "<full transcript>"}] | |
| Returns the full transcript string or None on failure. | |
| """ | |
| if not api_key: | |
| return None | |
| url = ( | |
| f"https://financialmodelingprep.com/api/v3/earning_call_transcript/{ticker.upper()}" | |
| f"?year={year}&quarter={quarter}&apikey={api_key}" | |
| ) | |
| try: | |
| print(f"[Earnings Ingest] Trying FMP for {ticker} Q{quarter}-{year}...") | |
| resp = requests.get(url, timeout=30) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| # FMP returns a list; first element holds the transcript | |
| if isinstance(data, list) and data: | |
| content = data[0].get("content", "") | |
| if len(content) > 200: | |
| print(f"[Earnings Ingest] FMP returned transcript ({len(content)} chars).") | |
| return content | |
| print(f"[Earnings Ingest] FMP returned empty/short content for {ticker} Q{quarter}-{year}.") | |
| return None | |
| # Error object returned (e.g. invalid key or no data for this quarter) | |
| if isinstance(data, dict): | |
| msg = data.get("Error Message") or data.get("message") or str(data) | |
| print(f"[Earnings Ingest] FMP error: {msg[:120]}") | |
| return None | |
| except Exception as e: | |
| print(f"[Earnings Ingest] FMP fetch failed: {e}") | |
| return None | |
| def fetch_transcript_sec_8k(ticker: str, quarter: int, year: int) -> Optional[str]: | |
| """ | |
| Fallback: search SEC EDGAR for 8-K filings around the quarter-end date | |
| that mention 'earnings' or 'results of operations'. | |
| Returns extracted text or None. | |
| """ | |
| try: | |
| cik = get_cik_from_ticker(ticker) | |
| except ValueError: | |
| print(f"[Earnings Ingest] Ticker {ticker} not found in SEC database.") | |
| return None | |
| try: | |
| print(f"[Earnings Ingest] Trying SEC 8-K fallback for {ticker} Q{quarter}-{year}...") | |
| url = f"https://data.sec.gov/submissions/CIK{cik}.json" | |
| resp = requests.get(url, headers=HEADERS, timeout=30) | |
| resp.raise_for_status() | |
| filings = resp.json()["filings"]["recent"] | |
| acceptable_months = _get_quarter_month_range(quarter) | |
| best_doc_url = None | |
| for i, form in enumerate(filings["form"]): | |
| if form != "8-K": | |
| continue | |
| # Check for Item 2.02 (Results of Operations and Financial Condition) | |
| # Some filings have items like '1.01,2.02,9.01', some just '2.02' | |
| items = str(filings.get("items", [""])[i]) | |
| if "2.02" not in items: | |
| # If we can't find Item 2.02, we fallback to checking if 'earnings' is in the title (if available) | |
| # or just continuing to search for other 8-Ks. | |
| continue | |
| filed = filings["filingDate"][i] # "2025-01-30" | |
| filed_year, filed_month = int(filed[:4]), int(filed[5:7]) | |
| # Logic: If the filing is within the target year (or next year if Q4) | |
| # and the month is in our heuristic range. | |
| is_valid_year = (filed_year == year) or (quarter == 4 and filed_year == year + 1) | |
| if is_valid_year and filed_month in acceptable_months: | |
| accession = filings["accessionNumber"][i] | |
| acc_clean = accession.replace("-", "") | |
| primary_doc = filings["primaryDocument"][i] | |
| doc_url = ( | |
| f"https://www.sec.gov/Archives/edgar/data/" | |
| f"{cik.lstrip('0')}/{acc_clean}/{primary_doc}" | |
| ) | |
| best_doc_url = doc_url | |
| break # Take the first matching 8-K (most recent) | |
| if not best_doc_url: | |
| print(f"[Earnings Ingest] No matching SEC 8-K found for {ticker} Q{quarter}-{year}.") | |
| return None | |
| print(f"[Earnings Ingest] Downloading 8-K from {best_doc_url}...") | |
| doc_resp = requests.get(best_doc_url, headers=HEADERS, timeout=30) | |
| doc_resp.raise_for_status() | |
| from bs4 import BeautifulSoup | |
| soup = BeautifulSoup(doc_resp.text, "html.parser") | |
| text = soup.get_text(separator=" ", strip=True) | |
| if len(text) > 500: | |
| print(f"[Earnings Ingest] SEC 8-K text extracted ({len(text)} chars).") | |
| return text | |
| print("[Earnings Ingest] SEC 8-K text too short, likely not a transcript.") | |
| return None | |
| except Exception as e: | |
| print(f"[Earnings Ingest] SEC 8-K fallback failed: {e}") | |
| return None | |
| # --------------------------------------------------------------------------- | |
| # Transcript normalization & segmentation | |
| # --------------------------------------------------------------------------- | |
| def normalize_transcript( | |
| raw_text: str, ticker: str, quarter: int, year: int | |
| ) -> dict: | |
| """ | |
| Split a raw transcript into Prepared Remarks and Q&A Session. | |
| Returns: | |
| { | |
| "ticker": ..., "quarter": ..., "year": ..., | |
| "prepared_remarks": str, | |
| "qa_session": str, | |
| "source": "fmp" | "sec_8k", | |
| } | |
| """ | |
| text_lower = raw_text.lower() | |
| split_pos = -1 | |
| for marker in QA_MARKERS: | |
| idx = text_lower.find(marker) | |
| if idx != -1: | |
| split_pos = idx | |
| break | |
| if split_pos > 0: | |
| prepared = raw_text[:split_pos].strip() | |
| qa = raw_text[split_pos:].strip() | |
| else: | |
| # SEC 8-K filings don't contain a Q&A section β treat entire text as prepared remarks | |
| prepared = raw_text.strip() | |
| qa = "" | |
| return { | |
| "ticker": ticker.upper(), | |
| "quarter": quarter, | |
| "year": year, | |
| "prepared_remarks": prepared, | |
| "qa_session": qa, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Keyword / entity extraction | |
| # --------------------------------------------------------------------------- | |
| def extract_keywords(text: str) -> dict[str, int]: | |
| """ | |
| Count occurrences of tracked financial keywords in the text. | |
| Returns a dict of keyword β count (only keywords with count > 0). | |
| """ | |
| text_lower = text.lower() | |
| counts: dict[str, int] = {} | |
| for kw in TRACKED_KEYWORDS: | |
| c = len(re.findall(r"\b" + re.escape(kw) + r"\b", text_lower)) | |
| if c > 0: | |
| counts[kw] = c | |
| return counts | |
| # --------------------------------------------------------------------------- | |
| # ChromaDB ingest | |
| # --------------------------------------------------------------------------- | |
| def _meta_path(chroma_path: str, ticker: str) -> str: | |
| d = os.path.join(chroma_path, f"{ticker.upper()}{METADATA_DIR_NAME}") | |
| os.makedirs(d, exist_ok=True) | |
| return d | |
| def _save_metadata( | |
| chroma_path: str, | |
| ticker: str, | |
| quarter: int, | |
| year: int, | |
| keywords: dict[str, int], | |
| status: str, | |
| ) -> None: | |
| meta_dir = _meta_path(chroma_path, ticker) | |
| fname = os.path.join(meta_dir, f"Q{quarter}_{year}.json") | |
| payload = { | |
| "ticker": ticker.upper(), | |
| "quarter": quarter, | |
| "year": year, | |
| "status": status, | |
| "keywords": keywords, | |
| } | |
| with open(fname, "w") as f: | |
| json.dump(payload, f, indent=2) | |
| print(f"[Earnings Ingest] Metadata saved β {fname}") | |
| def _load_metadata(chroma_path: str, ticker: str) -> list[dict]: | |
| """Load all quarter metadata files for a ticker.""" | |
| meta_dir = _meta_path(chroma_path, ticker) | |
| results = [] | |
| if not os.path.isdir(meta_dir): | |
| return results | |
| for fname in sorted(os.listdir(meta_dir)): | |
| if fname.endswith(".json"): | |
| with open(os.path.join(meta_dir, fname)) as f: | |
| results.append(json.load(f)) | |
| return results | |
| def ingest_earnings_call( | |
| ticker: str, | |
| quarter: int, | |
| year: int, | |
| api_key: str = "", | |
| chroma_path: str = "./chroma_db", | |
| ) -> str: | |
| """ | |
| Full ingest pipeline for one ticker/quarter pair. | |
| Returns a status string: 'success', 'exists', or 'failed'. | |
| """ | |
| ticker = ticker.upper() | |
| collection_dir = os.path.join(chroma_path, f"{ticker}_earnings") | |
| # Check if already ingested | |
| meta_dir = _meta_path(chroma_path, ticker) | |
| meta_file = os.path.join(meta_dir, f"Q{quarter}_{year}.json") | |
| if os.path.exists(meta_file): | |
| print(f"[Earnings Ingest] Q{quarter}-{year} for {ticker} already ingested. Skipping.") | |
| return "exists" | |
| # 1. Fetch transcript: FMP (free) β SEC 8-K fallback | |
| raw_text = fetch_transcript_fmp(ticker, quarter, year, api_key) | |
| source = "fmp" if raw_text else None | |
| if not raw_text: | |
| raw_text = fetch_transcript_sec_8k(ticker, quarter, year) | |
| source = "sec_8k" if raw_text else None | |
| if not raw_text: | |
| _save_metadata(chroma_path, ticker, quarter, year, {}, "failed") | |
| return "failed" | |
| # 2. Normalize & segment | |
| segments = normalize_transcript(raw_text, ticker, quarter, year) | |
| # 3. Extract keywords from both sections | |
| all_text = segments["prepared_remarks"] + " " + segments["qa_session"] | |
| keywords = extract_keywords(all_text) | |
| # 4. Chunk & embed into ChromaDB | |
| splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) | |
| docs = [] | |
| if segments["prepared_remarks"]: | |
| pr_doc = Document( | |
| page_content=segments["prepared_remarks"], | |
| metadata={ | |
| "ticker": ticker, | |
| "quarter": quarter, | |
| "year": year, | |
| "section": "Prepared Remarks", | |
| "source": source, | |
| }, | |
| ) | |
| docs.extend(splitter.split_documents([pr_doc])) | |
| if segments["qa_session"]: | |
| qa_doc = Document( | |
| page_content=segments["qa_session"], | |
| metadata={ | |
| "ticker": ticker, | |
| "quarter": quarter, | |
| "year": year, | |
| "section": "Q&A Session", | |
| "source": source, | |
| }, | |
| ) | |
| docs.extend(splitter.split_documents([qa_doc])) | |
| if not docs: | |
| _save_metadata(chroma_path, ticker, quarter, year, keywords, "failed") | |
| return "failed" | |
| print(f"[Earnings Ingest] Embedding {len(docs)} chunks into {collection_dir}...") | |
| embeddings = get_cached_embeddings() | |
| Chroma.from_documents( | |
| documents=docs, | |
| embedding=embeddings, | |
| persist_directory=collection_dir, | |
| ) | |
| # SEC 8-K filings often lack a Q&A section β this is a successful fallback | |
| status = "success" | |
| _save_metadata(chroma_path, ticker, quarter, year, keywords, status) | |
| print(f"[Earnings Ingest] {ticker} Q{quarter}-{year} ingested ({status}, source={source}).") | |
| return status | |
| # --------------------------------------------------------------------------- | |
| # Inference tools (LangGraph runtime) | |
| # --------------------------------------------------------------------------- | |
| def _get_earnings_db(ticker: str, chroma_path: str = "./chroma_db") -> Chroma: | |
| """Load the earnings-call Chroma collection for a ticker.""" | |
| ticker = ticker.upper() | |
| persist_directory = os.path.join(chroma_path, f"{ticker}_earnings") | |
| if not os.path.exists(persist_directory): | |
| raise FileNotFoundError( | |
| f"Earnings data for {ticker} not ingested. " | |
| f"Run: python scripts/ingest_earnings_calls.py --tickers {ticker} --quarters Q<N>-<YYYY>" | |
| ) | |
| embeddings = get_cached_embeddings() | |
| return Chroma(persist_directory=persist_directory, embedding_function=embeddings) | |
| def search_earnings_call(ticker: str, query: str) -> str: | |
| """ | |
| Searches pre-ingested earnings-call transcripts for a given ticker. | |
| Use this to find specific management commentary, guidance, or discussion topics. | |
| CRITICAL: The ticker's earnings data must already be ingested. | |
| Pass the stock ticker (e.g. 'AAPL') and a natural-language query. | |
| """ | |
| try: | |
| db = _get_earnings_db(ticker.upper()) | |
| results = db.similarity_search(query, k=3) | |
| if not results: | |
| return f"No earnings data matched '{query}' for {ticker}. Try broadening your search terms." | |
| output_parts = [f"EARNINGS CALL SEARCH RESULTS FOR {ticker.upper()} β '{query}':\n"] | |
| total_chars = 0 | |
| for doc in results: | |
| meta = doc.metadata | |
| label = f"[{meta.get('section', 'Unknown')} | Q{meta.get('quarter', '?')}-{meta.get('year', '?')}]" | |
| snippet = doc.page_content[:700] | |
| total_chars += len(snippet) | |
| output_parts.append(f"{label}\n{snippet}\n") | |
| if total_chars > 2000: | |
| break | |
| return "\n".join(output_parts) | |
| except Exception as e: | |
| return f"Error searching earnings data: {e}" | |
| def get_earnings_sentiment_divergence(ticker: str) -> str: | |
| """ | |
| Retrieves evidence from both Prepared Remarks and Q&A sections of the | |
| most recent earnings call for a ticker. Use this to analyze whether | |
| management tone differs between the scripted portion and live Q&A. | |
| When only prepared remarks are available (e.g. from an SEC 8-K filing), | |
| performs a single-section tone analysis instead. | |
| CRITICAL: The ticker's earnings data must already be ingested. | |
| """ | |
| try: | |
| db = _get_earnings_db(ticker.upper()) | |
| # Retrieve top chunks from each section | |
| pr_results = db.similarity_search( | |
| "management outlook guidance performance", | |
| k=3, | |
| filter={"section": "Prepared Remarks"}, | |
| ) | |
| qa_results = db.similarity_search( | |
| "analyst question concern risk challenge", | |
| k=3, | |
| filter={"section": "Q&A Session"}, | |
| ) | |
| output = f"EARNINGS TONE ANALYSIS FOR {ticker.upper()}:\n\n" | |
| output += "=== MANAGEMENT COMMENTARY ===\n" | |
| if pr_results: | |
| for doc in pr_results: | |
| output += doc.page_content[:600] + "\n---\n" | |
| else: | |
| # Fallback: search without section filter | |
| fallback = db.similarity_search("management outlook guidance performance", k=3) | |
| for doc in fallback: | |
| output += doc.page_content[:600] + "\n---\n" | |
| if qa_results: | |
| output += "\n=== ANALYST Q&A ===\n" | |
| for doc in qa_results: | |
| output += doc.page_content[:600] + "\n---\n" | |
| output += ( | |
| "\nINSTRUCTION: Compare the tone, confidence, and specificity between " | |
| "the Management Commentary and Analyst Q&A sections. Note any divergence " | |
| "where management was more cautious, evasive, or forthcoming under questioning." | |
| ) | |
| else: | |
| output += ( | |
| "\nINSTRUCTION: Analyze the tone, confidence, and specificity of the " | |
| "management commentary above. (Note: Only management commentary was found, typical of SEC 8-K filings). " | |
| "Identify forward-looking statements, hedging language, areas of emphasis, and any notable risks or opportunities mentioned." | |
| ) | |
| return output | |
| except Exception as e: | |
| return f"Error retrieving tone analysis data: {e}" | |
| def get_earnings_keyword_trends(ticker: str) -> str: | |
| """ | |
| Returns quarter-over-quarter keyword frequency trends from ingested | |
| earnings calls for a given ticker. Shows how often key terms (AI, headwinds, | |
| growth, guidance, etc.) were mentioned across available quarters. | |
| CRITICAL: Multiple quarters must be ingested for trend comparison. | |
| """ | |
| try: | |
| ticker = ticker.upper() | |
| all_meta = _load_metadata("./chroma_db", ticker) | |
| if not all_meta: | |
| return ( | |
| f"No earnings metadata found for {ticker}. " | |
| f"Run: python scripts/ingest_earnings_calls.py --tickers {ticker} --quarters Q<N>-<YYYY>" | |
| ) | |
| # Sort by year, quarter | |
| all_meta.sort(key=lambda m: (m["year"], m["quarter"])) | |
| # Build output table | |
| quarters = [f"Q{m['quarter']}-{m['year']}" for m in all_meta] | |
| header = f"KEYWORD TRENDS FOR {ticker} ({', '.join(quarters)}):\n\n" | |
| # Collect all keywords across quarters | |
| all_kws = set() | |
| for m in all_meta: | |
| all_kws.update(m.get("keywords", {}).keys()) | |
| if not all_kws: | |
| return header + "No tracked keywords found in any ingested quarter." | |
| rows = [] | |
| rows.append(f"{'Keyword':<30} " + " ".join(f"{q:>10}" for q in quarters)) | |
| rows.append("-" * (30 + 11 * len(quarters))) | |
| for kw in sorted(all_kws): | |
| vals = [] | |
| for m in all_meta: | |
| c = m.get("keywords", {}).get(kw, 0) | |
| vals.append(f"{c:>10}") | |
| rows.append(f"{kw:<30} " + " ".join(vals)) | |
| # Add trend commentary for the last two quarters | |
| if len(all_meta) >= 2: | |
| rows.append("") | |
| rows.append("NOTABLE CHANGES (latest vs prior quarter):") | |
| prev_kw = all_meta[-2].get("keywords", {}) | |
| curr_kw = all_meta[-1].get("keywords", {}) | |
| for kw in sorted(all_kws): | |
| p, c = prev_kw.get(kw, 0), curr_kw.get(kw, 0) | |
| if p != c: | |
| direction = "β" if c > p else "β" | |
| rows.append(f" {kw}: {p} β {c} ({direction})") | |
| return header + "\n".join(rows) | |
| except Exception as e: | |
| return f"Error loading keyword trends: {e}" |