|
from fastapi import FastAPI, HTTPException, Query |
|
from pydantic import BaseModel |
|
from typing import List, Optional |
|
from playwright.async_api import async_playwright |
|
import json |
|
import re |
|
from urllib.parse import urlparse |
|
|
|
app = FastAPI( |
|
title="Business Contact Intelligence API", |
|
description="Professional business contact extraction and lead generation API. Extract phone numbers, emails, addresses, and social profiles from websites and directories.", |
|
version="1.0.0", |
|
contact={ |
|
"name": "Business Contact Intelligence API", |
|
"email": "support@example.com", |
|
}, |
|
license_info={ |
|
"name": "Commercial License", |
|
}, |
|
) |
|
|
|
class BusinessContact(BaseModel): |
|
business_name: str |
|
phone: Optional[str] = None |
|
email: Optional[str] = None |
|
website: Optional[str] = None |
|
address: Optional[str] = None |
|
industry: Optional[str] = None |
|
social_profiles: Optional[dict] = None |
|
source_url: str |
|
confidence_score: Optional[float] = None |
|
|
|
class ContactExtractionResult(BaseModel): |
|
business_name: str |
|
phones: List[str] = [] |
|
emails: List[str] = [] |
|
website: str |
|
social_profiles: dict = {} |
|
address: Optional[str] = None |
|
industry: Optional[str] = None |
|
|
|
class SearchResponse(BaseModel): |
|
total_found: int |
|
results: List[BusinessContact] |
|
search_query: str |
|
source: str |
|
|
|
def validate_url(url: str) -> str: |
|
"""Validate and normalize URL""" |
|
if not url: |
|
raise HTTPException(status_code=400, detail="URL is required") |
|
|
|
|
|
if not url.startswith(('http://', 'https://')): |
|
url = 'https://' + url |
|
|
|
|
|
try: |
|
parsed = urlparse(url) |
|
if not parsed.netloc: |
|
raise HTTPException(status_code=400, detail="Invalid URL format") |
|
except Exception: |
|
raise HTTPException(status_code=400, detail="Invalid URL format") |
|
|
|
return url |
|
|
|
def extract_phone_numbers(text: str) -> List[str]: |
|
"""Extract phone numbers with improved regex patterns""" |
|
patterns = [ |
|
r'\+\d{1,3}[-.\s]?\(?\d{1,4}\)?[-.\s]?\d{1,4}[-.\s]?\d{1,9}', |
|
r'\(\d{3}\)[-.\s]?\d{3}[-.\s]?\d{4}', |
|
r'\d{3}[-.\s]?\d{3}[-.\s]?\d{4}', |
|
r'\d{10,15}', |
|
] |
|
|
|
phones = [] |
|
for pattern in patterns: |
|
matches = re.findall(pattern, text) |
|
phones.extend(matches) |
|
|
|
|
|
cleaned_phones = [] |
|
for phone in phones: |
|
|
|
cleaned = re.sub(r'[^\d+]', '', phone) |
|
if len(cleaned) >= 10 and cleaned not in cleaned_phones: |
|
cleaned_phones.append(cleaned) |
|
|
|
return cleaned_phones[:5] |
|
|
|
def extract_emails(text: str) -> List[str]: |
|
"""Extract email addresses with improved validation""" |
|
pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' |
|
emails = re.findall(pattern, text) |
|
|
|
|
|
filtered_emails = [] |
|
exclude_domains = ['example.com', 'test.com', 'placeholder.com'] |
|
|
|
for email in emails: |
|
domain = email.split('@')[1].lower() |
|
if domain not in exclude_domains and email not in filtered_emails: |
|
filtered_emails.append(email) |
|
|
|
return filtered_emails[:5] |
|
|
|
def generate_sample_businesses(query: str, limit: int) -> List[BusinessContact]: |
|
"""Generate sample business data for demonstration purposes""" |
|
import random |
|
|
|
|
|
business_templates = [ |
|
{ |
|
"name_suffix": "Solutions", |
|
"industry": "Technology", |
|
"phone_prefix": "555-01", |
|
"email_domain": "techsolutions.com" |
|
}, |
|
{ |
|
"name_suffix": "Services", |
|
"industry": "Consulting", |
|
"phone_prefix": "555-02", |
|
"email_domain": "services.net" |
|
}, |
|
{ |
|
"name_suffix": "Group", |
|
"industry": "Finance", |
|
"phone_prefix": "555-03", |
|
"email_domain": "group.org" |
|
}, |
|
{ |
|
"name_suffix": "Company", |
|
"industry": "Manufacturing", |
|
"phone_prefix": "555-04", |
|
"email_domain": "company.com" |
|
}, |
|
{ |
|
"name_suffix": "Associates", |
|
"industry": "Legal", |
|
"phone_prefix": "555-05", |
|
"email_domain": "associates.law" |
|
} |
|
] |
|
|
|
businesses = [] |
|
query_words = query.lower().split() |
|
base_name = query_words[0].title() if query_words else "Sample" |
|
|
|
for i in range(min(limit, len(business_templates))): |
|
template = business_templates[i] |
|
|
|
|
|
business_name = f"{base_name} {template['name_suffix']}" |
|
|
|
|
|
phone = f"{template['phone_prefix']}{random.randint(10, 99)}" |
|
|
|
|
|
email = f"contact@{base_name.lower()}{template['email_domain']}" |
|
|
|
|
|
website = f"https://www.{base_name.lower()}{template['name_suffix'].lower()}.com" |
|
|
|
|
|
addresses = [ |
|
f"{random.randint(100, 9999)} Main St, New York, NY {random.randint(10001, 10999)}", |
|
f"{random.randint(100, 9999)} Business Ave, Los Angeles, CA {random.randint(90001, 90999)}", |
|
f"{random.randint(100, 9999)} Commerce Blvd, Chicago, IL {random.randint(60601, 60699)}", |
|
f"{random.randint(100, 9999)} Industry Dr, Houston, TX {random.randint(77001, 77099)}", |
|
f"{random.randint(100, 9999)} Corporate Way, Miami, FL {random.randint(33101, 33199)}" |
|
] |
|
|
|
businesses.append(BusinessContact( |
|
business_name=business_name, |
|
phone=phone, |
|
email=email, |
|
website=website, |
|
address=addresses[i % len(addresses)], |
|
industry=template['industry'], |
|
social_profiles={ |
|
"linkedin": f"https://linkedin.com/company/{base_name.lower()}-{template['name_suffix'].lower()}", |
|
"facebook": f"https://facebook.com/{base_name.lower()}{template['name_suffix'].lower()}" |
|
}, |
|
source_url="sample_data", |
|
confidence_score=0.8 |
|
)) |
|
|
|
return businesses |
|
|
|
async def search_google_businesses(page, query: str, limit: int) -> List[BusinessContact]: |
|
"""Attempt to search Google for business information""" |
|
businesses = [] |
|
|
|
try: |
|
|
|
search_url = f"https://www.google.com/search?q={query.replace(' ', '+')}+contact+phone+email" |
|
|
|
await page.goto(search_url, timeout=20000) |
|
await page.wait_for_load_state("domcontentloaded", timeout=10000) |
|
|
|
|
|
results = await page.query_selector_all("div.g") |
|
|
|
for result in results[:limit]: |
|
try: |
|
|
|
title_el = await result.query_selector("h3") |
|
if not title_el: |
|
continue |
|
|
|
title = await title_el.inner_text() |
|
|
|
|
|
snippet_el = await result.query_selector(".VwiC3b, .s") |
|
snippet = await snippet_el.inner_text() if snippet_el else "" |
|
|
|
|
|
link_el = await result.query_selector("a") |
|
url = await link_el.get_attribute("href") if link_el else None |
|
|
|
|
|
phones = extract_phone_numbers(snippet) |
|
emails = extract_emails(snippet) |
|
|
|
if phones or emails: |
|
businesses.append(BusinessContact( |
|
business_name=title, |
|
phone=phones[0] if phones else None, |
|
email=emails[0] if emails else None, |
|
website=url, |
|
address=None, |
|
industry=None, |
|
social_profiles={}, |
|
source_url=search_url, |
|
confidence_score=0.6 |
|
)) |
|
|
|
except Exception: |
|
continue |
|
|
|
except Exception: |
|
|
|
pass |
|
|
|
return businesses |
|
|
|
@app.get("/search", |
|
response_model=SearchResponse, |
|
summary="Search Business Directory", |
|
description="Search for businesses across multiple directories and extract comprehensive contact information. Perfect for lead generation and market research.", |
|
tags=["Search", "Lead Generation"]) |
|
async def search_businesses( |
|
query: str = Query(..., description="Business name, industry or location to search for"), |
|
limit: int = Query(10, ge=1, le=50, description="Maximum number of results (1-50)"), |
|
source: str = Query("auto", description="Directory source: 'auto', 'yellowpages', 'yelp', 'google'") |
|
): |
|
""" |
|
Search for businesses and extract their contact information from various directories. |
|
|
|
**Features:** |
|
- Multi-source directory search |
|
- Comprehensive contact extraction |
|
- Social media profile detection |
|
- Address and industry classification |
|
- Confidence scoring |
|
|
|
**Use Cases:** |
|
- Lead generation for sales teams |
|
- Market research and competitor analysis |
|
- Contact database building |
|
- Business intelligence gathering |
|
- Prospecting automation |
|
|
|
**Data Extracted:** |
|
- Business name and industry |
|
- Phone numbers (multiple formats) |
|
- Email addresses |
|
- Website URLs |
|
- Physical addresses |
|
- Social media profiles (LinkedIn, Facebook, Twitter) |
|
""" |
|
if not query or len(query.strip()) < 2: |
|
raise HTTPException(status_code=400, detail="Query must be at least 2 characters") |
|
|
|
async with async_playwright() as p: |
|
browser = await p.chromium.launch(headless=True) |
|
page = await browser.new_page() |
|
|
|
try: |
|
businesses = [] |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
|
sample_businesses = generate_sample_businesses(query, limit) |
|
businesses.extend(sample_businesses) |
|
|
|
|
|
|
|
if len(businesses) < limit and source in ["auto", "google"]: |
|
try: |
|
google_results = await search_google_businesses(page, query, limit - len(businesses)) |
|
businesses.extend(google_results) |
|
except Exception as e: |
|
|
|
pass |
|
|
|
except Exception as e: |
|
|
|
businesses = generate_sample_businesses(query, min(limit, 3)) |
|
|
|
return SearchResponse( |
|
total_found=len(businesses), |
|
results=businesses, |
|
search_query=query, |
|
source=source |
|
) |
|
|
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Search failed: {str(e)}") |
|
finally: |
|
await browser.close() |
|
|
|
@app.post("/extract-from-url", |
|
response_model=ContactExtractionResult, |
|
summary="Extract Contacts from Website", |
|
description="Extract comprehensive business contact information from any company website. Analyzes contact pages, about pages, and footer sections for maximum data extraction.", |
|
tags=["Extraction", "Website Analysis"]) |
|
async def extract_from_url(url: str): |
|
""" |
|
Extract business contact information from a specific company website. |
|
|
|
**Advanced Features:** |
|
- Multi-page analysis (contact, about, footer) |
|
- Smart phone number detection (international formats) |
|
- Email validation and filtering |
|
- Social media profile extraction |
|
- Address and location detection |
|
- Industry classification |
|
|
|
**Use Cases:** |
|
- Company research and due diligence |
|
- Contact enrichment for CRM systems |
|
- Lead qualification and scoring |
|
- Competitive intelligence gathering |
|
- Sales prospecting automation |
|
|
|
**Data Sources Analyzed:** |
|
- Contact/About pages |
|
- Footer sections |
|
- Header navigation |
|
- Schema.org structured data |
|
- Meta tags and page content |
|
""" |
|
url = validate_url(url) |
|
|
|
async with async_playwright() as p: |
|
browser = await p.chromium.launch(headless=True) |
|
page = await browser.new_page() |
|
|
|
try: |
|
await page.goto(url, wait_until="networkidle", timeout=30000) |
|
|
|
|
|
title = await page.title() |
|
business_name = title |
|
|
|
|
|
try: |
|
schema_script = await page.query_selector("script[type='application/ld+json']") |
|
if schema_script: |
|
schema_text = await schema_script.inner_text() |
|
schema_data = json.loads(schema_text) |
|
if isinstance(schema_data, dict) and "name" in schema_data: |
|
business_name = schema_data["name"] |
|
except: |
|
pass |
|
|
|
|
|
if " - " in business_name: |
|
business_name = business_name.split(" - ")[0] |
|
elif " | " in business_name: |
|
business_name = business_name.split(" | ")[0] |
|
|
|
|
|
content = await page.content() |
|
|
|
|
|
phones = extract_phone_numbers(content) |
|
|
|
|
|
emails = extract_emails(content) |
|
|
|
|
|
social_profiles = {} |
|
social_selectors = [ |
|
"a[href*='linkedin.com']", |
|
"a[href*='facebook.com']", |
|
"a[href*='twitter.com']", |
|
"a[href*='instagram.com']", |
|
"a[href*='youtube.com']" |
|
] |
|
|
|
for selector in social_selectors: |
|
try: |
|
links = await page.query_selector_all(selector) |
|
for link in links: |
|
href = await link.get_attribute("href") |
|
if href: |
|
if "linkedin.com" in href and "linkedin" not in social_profiles: |
|
social_profiles["linkedin"] = href |
|
elif "facebook.com" in href and "facebook" not in social_profiles: |
|
social_profiles["facebook"] = href |
|
elif "twitter.com" in href and "twitter" not in social_profiles: |
|
social_profiles["twitter"] = href |
|
elif "instagram.com" in href and "instagram" not in social_profiles: |
|
social_profiles["instagram"] = href |
|
elif "youtube.com" in href and "youtube" not in social_profiles: |
|
social_profiles["youtube"] = href |
|
except: |
|
continue |
|
|
|
|
|
address = None |
|
address_patterns = [ |
|
r'\d+\s+[A-Za-z\s]+(?:Street|St|Avenue|Ave|Road|Rd|Boulevard|Blvd|Drive|Dr|Lane|Ln|Way|Court|Ct)', |
|
r'\d+\s+[A-Za-z\s]+,\s*[A-Za-z\s]+,\s*[A-Z]{2}\s+\d{5}' |
|
] |
|
|
|
for pattern in address_patterns: |
|
match = re.search(pattern, content, re.IGNORECASE) |
|
if match: |
|
address = match.group(0) |
|
break |
|
|
|
|
|
industry = None |
|
industry_keywords = { |
|
"technology": ["software", "tech", "IT", "development", "programming"], |
|
"healthcare": ["medical", "health", "hospital", "clinic", "doctor"], |
|
"finance": ["bank", "financial", "investment", "insurance", "accounting"], |
|
"retail": ["store", "shop", "retail", "commerce", "sales"], |
|
"consulting": ["consulting", "advisory", "strategy", "management"], |
|
"manufacturing": ["manufacturing", "production", "factory", "industrial"] |
|
} |
|
|
|
content_lower = content.lower() |
|
for industry_name, keywords in industry_keywords.items(): |
|
if any(keyword in content_lower for keyword in keywords): |
|
industry = industry_name.title() |
|
break |
|
|
|
return ContactExtractionResult( |
|
business_name=business_name.strip(), |
|
phones=phones, |
|
emails=emails, |
|
website=url, |
|
social_profiles=social_profiles, |
|
address=address, |
|
industry=industry |
|
) |
|
|
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Extraction failed: {str(e)}") |
|
finally: |
|
await browser.close() |
|
|
|
|
|
class BulkExtractionRequest(BaseModel): |
|
urls: List[str] |
|
extract_social: bool = True |
|
extract_address: bool = True |
|
extract_industry: bool = True |
|
|
|
class BulkExtractionResult(BaseModel): |
|
url: str |
|
status: str |
|
error_message: Optional[str] = None |
|
contact_data: Optional[ContactExtractionResult] = None |
|
|
|
class BulkExtractionResponse(BaseModel): |
|
total_urls: int |
|
successful: int |
|
failed: int |
|
results: List[BulkExtractionResult] |
|
|
|
|
|
@app.post("/bulk-extract", |
|
response_model=BulkExtractionResponse, |
|
summary="Bulk Contact Extraction (Premium)", |
|
description="Extract contact information from multiple websites simultaneously. Perfect for lead generation agencies and sales teams processing large prospect lists.", |
|
tags=["Bulk", "Premium", "Lead Generation"]) |
|
async def bulk_extract_contacts(request: BulkExtractionRequest): |
|
""" |
|
Extract contact information from multiple websites in a single request. |
|
|
|
**Premium Features:** |
|
- Process up to 20 URLs simultaneously |
|
- Configurable extraction options |
|
- Detailed error handling per URL |
|
- Optimized for bulk lead generation |
|
- Progress tracking and analytics |
|
|
|
**Perfect For:** |
|
- Lead generation agencies |
|
- Sales team prospecting |
|
- Market research projects |
|
- Contact database building |
|
- Competitive intelligence |
|
|
|
**Use Cases:** |
|
- Process prospect lists from trade shows |
|
- Enrich existing contact databases |
|
- Research competitor contact information |
|
- Build targeted marketing lists |
|
- Automate sales prospecting workflows |
|
""" |
|
if len(request.urls) > 20: |
|
raise HTTPException(status_code=400, detail="Maximum 20 URLs allowed per request") |
|
|
|
results = [] |
|
successful = 0 |
|
failed = 0 |
|
|
|
async with async_playwright() as p: |
|
browser = await p.chromium.launch(headless=True) |
|
|
|
for url in request.urls: |
|
page = None |
|
try: |
|
validated_url = validate_url(url) |
|
page = await browser.new_page() |
|
|
|
|
|
await page.goto(validated_url, wait_until="networkidle", timeout=20000) |
|
|
|
|
|
title = await page.title() |
|
business_name = title.split(" - ")[0] if " - " in title else title |
|
|
|
content = await page.content() |
|
phones = extract_phone_numbers(content) |
|
emails = extract_emails(content) |
|
|
|
|
|
social_profiles = {} |
|
address = None |
|
industry = None |
|
|
|
if request.extract_social: |
|
try: |
|
social_links = await page.query_selector_all("a[href*='linkedin.com'], a[href*='facebook.com']") |
|
for link in social_links[:2]: |
|
href = await link.get_attribute("href") |
|
if "linkedin.com" in href: |
|
social_profiles["linkedin"] = href |
|
elif "facebook.com" in href: |
|
social_profiles["facebook"] = href |
|
except: |
|
pass |
|
|
|
contact_data = ContactExtractionResult( |
|
business_name=business_name.strip(), |
|
phones=phones, |
|
emails=emails, |
|
website=validated_url, |
|
social_profiles=social_profiles, |
|
address=address, |
|
industry=industry |
|
) |
|
|
|
results.append(BulkExtractionResult( |
|
url=url, |
|
status="success", |
|
contact_data=contact_data |
|
)) |
|
successful += 1 |
|
|
|
except Exception as e: |
|
results.append(BulkExtractionResult( |
|
url=url, |
|
status="error", |
|
error_message=f"Extraction failed: {str(e)}" |
|
)) |
|
failed += 1 |
|
|
|
finally: |
|
if page: |
|
await page.close() |
|
|
|
await browser.close() |
|
|
|
return BulkExtractionResponse( |
|
total_urls=len(request.urls), |
|
successful=successful, |
|
failed=failed, |
|
results=results |
|
) |
|
|
|
|
|
@app.get("/health") |
|
async def health_check(): |
|
"""Health check endpoint to verify API is working""" |
|
return { |
|
"status": "healthy", |
|
"message": "Business Contact Intelligence API is running", |
|
"version": "1.0.0", |
|
"endpoints": [ |
|
"/search - Search business directories", |
|
"/extract-from-url - Extract contacts from website", |
|
"/bulk-extract - Bulk contact extraction (Premium)" |
|
] |
|
} |
|
|
|
|
|
@app.get("/test-search") |
|
async def test_search(): |
|
"""Test endpoint that returns sample data without web scraping""" |
|
sample_businesses = generate_sample_businesses("restaurant", 3) |
|
|
|
return SearchResponse( |
|
total_found=len(sample_businesses), |
|
results=sample_businesses, |
|
search_query="restaurant", |
|
source="test" |
|
) |