Spaces:
Build error
Build error
| from __future__ import annotations | |
| import random | |
| import sys | |
| from collections import defaultdict | |
| from pathlib import Path | |
| # Allow running this script from any working directory by ensuring the | |
| # `apps/api` directory (parent of the `app` package) is on sys.path. | |
| _API_ROOT = Path(__file__).resolve().parents[1] | |
| if str(_API_ROOT) not in sys.path: | |
| sys.path.insert(0, str(_API_ROOT)) | |
| from faker import Faker | |
| from sqlmodel import Session, select | |
| from app.core.database import engine, init_db | |
| from app.core.security import hash_password | |
| from app.models.address import Address | |
| from app.models.product import Product | |
| from app.models.review import Review | |
| from app.models.suggestion import SearchSuggestion | |
| from app.models.user import User | |
| from app.models.variant import ProductVariant | |
| fake = Faker() | |
| # Pre-computed bcrypt hash for "password123!" to avoid expensive hashing per user | |
| _BULK_HASH = hash_password("password123!") | |
| # Curated Unsplash sources (seed must use images.unsplash.com) | |
| UNSPLASH = [ | |
| "https://images.unsplash.com/photo-1512436991641-6745cdb1723f", # clothes | |
| "https://images.unsplash.com/photo-1523275335684-37898b6baf30", # watch | |
| "https://images.unsplash.com/photo-1503602642458-232111445657", # laptop | |
| "https://images.unsplash.com/photo-1505740420928-5e560c06d30e", # headphones | |
| "https://images.unsplash.com/photo-1511707171634-5f897ff02aa9", # phone | |
| "https://images.unsplash.com/photo-1526170375885-4d8ecf77b99f", # camera | |
| "https://images.unsplash.com/photo-1503602642458-232111445657", # laptop | |
| "https://images.unsplash.com/photo-1542291026-7eec264c27ff", # sneakers | |
| "https://images.unsplash.com/photo-1516979187457-637abb4f9353", # headphones alt | |
| ] | |
| def u(url: str, *, w: int = 800, q: int = 80) -> str: | |
| return f"{url}?auto=format&fit=crop&w={w}&q={q}" | |
| def _exists(session: Session, model) -> bool: | |
| return session.exec(select(model)).first() is not None | |
| def seed_users(session: Session, n: int = 15) -> None: | |
| if _exists(session, User): | |
| return | |
| demo_email = "james@example.com" | |
| demo = User( | |
| email=demo_email, | |
| full_name="James Smith", | |
| password_hash=hash_password("password123!"), | |
| ) | |
| session.add(demo) | |
| session.flush() | |
| session.add( | |
| Address( | |
| user_id=demo.id, | |
| name=demo.full_name, | |
| line1="410 Terry Ave N", | |
| line2=None, | |
| city="Seattle", | |
| state="WA", | |
| postal_code="98109", | |
| country="United States", | |
| is_default=True, | |
| ) | |
| ) | |
| for _ in range(max(0, n - 1)): | |
| full_name = fake.name() | |
| email = fake.unique.email().lower() | |
| user = User(email=email, full_name=full_name, password_hash=_BULK_HASH) | |
| session.add(user) | |
| session.flush() | |
| session.add( | |
| Address( | |
| user_id=user.id, | |
| name=full_name, | |
| line1=fake.street_address(), | |
| line2=None if random.random() < 0.7 else fake.secondary_address(), | |
| city=fake.city(), | |
| state=fake.state_abbr(), | |
| postal_code=fake.postcode(), | |
| country="United States", | |
| is_default=True, | |
| ) | |
| ) | |
| session.commit() | |
| def seed_products(session: Session, n: int = 60) -> None: | |
| if _exists(session, Product): | |
| return | |
| brands = [ | |
| "Amazon Basics", | |
| "Google", | |
| "Roblox", | |
| "Apple", | |
| "Netflix", | |
| "Spotify", | |
| "Uber", | |
| "Starbucks", | |
| "Best Buy", | |
| "GameStop", | |
| "Visa", | |
| "Outback Steakhouse", | |
| "Lyft", | |
| ] | |
| for _ in range(n): | |
| brand = random.choice(brands) | |
| kind = random.choices( | |
| ["Gift Card", "Digital Gift Card", "E-Gift Card"], | |
| weights=[0.35, 0.45, 0.20], | |
| k=1, | |
| )[0] | |
| title = f"{brand} {kind}" if brand != "Google" else f"Google Play {kind}" | |
| price_min = random.choice([10, 15, 20, 25, 50]) | |
| price_max = random.choice([100, 150, 200, 500, 2000]) | |
| if price_max < price_min: | |
| price_min, price_max = price_max, price_min | |
| base_img = u(random.choice(UNSPLASH), w=600) | |
| images = [u(random.choice(UNSPLASH), w=1200) for _ in range(random.randint(4, 7))] | |
| p = Product( | |
| title=title, | |
| brand=brand, | |
| description=fake.paragraph(nb_sentences=7), | |
| image_url=base_img, | |
| images=images, | |
| rating_avg=0.0, | |
| rating_count=0, | |
| price_min=price_min, | |
| price_max=price_max, | |
| ) | |
| session.add(p) | |
| session.commit() | |
| def seed_variants(session: Session) -> None: | |
| if _exists(session, ProductVariant): | |
| return | |
| products = session.exec(select(Product)).all() | |
| for p in products: | |
| # Gift-card denominations, plus a custom amount option represented by min price | |
| denoms = sorted(set([10, 15, 25, 50, 100, p.price_min, p.price_max])) | |
| for d in denoms: | |
| if d <= 0: | |
| continue | |
| v = ProductVariant( | |
| product_id=p.id, | |
| sku=f"{str(p.id)[:8]}-{d}", | |
| price=int(d), | |
| attributes={ | |
| "type": "gift_card", | |
| "denomination": int(d), | |
| "delivery": "email", | |
| "region": "US", | |
| }, | |
| ) | |
| session.add(v) | |
| def seed_reviews(session: Session, target_reviews: int = 300) -> None: | |
| if _exists(session, Review): | |
| return | |
| users = session.exec(select(User)).all() | |
| products = session.exec(select(Product)).all() | |
| if not users or not products: | |
| return | |
| # Distribute reviews across products with a long-tail | |
| ratings_for_product: dict[str, list[int]] = defaultdict(list) | |
| for _ in range(target_reviews): | |
| uobj = random.choice(users) | |
| pobj = random.choice(products) | |
| rating = random.choices([5, 4, 3, 2, 1], weights=[0.45, 0.35, 0.13, 0.05, 0.02], k=1)[0] | |
| title = fake.sentence(nb_words=5).rstrip(".") | |
| body = fake.paragraph(nb_sentences=random.randint(2, 6)) | |
| images: list[str] = [] | |
| if random.random() < 0.07: | |
| images = [u(random.choice(UNSPLASH), w=900) for _ in range(random.randint(1, 3))] | |
| r = Review( | |
| product_id=pobj.id, | |
| user_id=uobj.id, | |
| display_name=uobj.full_name, | |
| rating=rating, | |
| title=title, | |
| body=body, | |
| verified_purchase=random.random() < 0.55, | |
| images=images, | |
| ) | |
| session.add(r) | |
| ratings_for_product[str(pobj.id)].append(rating) | |
| # Update denormalized rating fields | |
| for p in products: | |
| arr = ratings_for_product.get(str(p.id), []) | |
| if not arr: | |
| continue | |
| p.rating_count = len(arr) | |
| p.rating_avg = round(sum(arr) / len(arr), 1) | |
| session.add(p) | |
| def seed_suggestions(session: Session, n: int = 100) -> None: | |
| if _exists(session, SearchSuggestion): | |
| return | |
| base_queries = [ | |
| "google play", | |
| "google play gift card", | |
| "google play gift card email delivery", | |
| "amazon gift card", | |
| "roblox gift card", | |
| "netflix gift card", | |
| "spotify gift card", | |
| "uber gift card", | |
| "starbucks gift card", | |
| "visa gift card", | |
| "best buy gift card", | |
| "game stop gift card", | |
| "digital gift card", | |
| "gift cards", | |
| "e gift card", | |
| ] | |
| brands = [ | |
| "google play", | |
| "amazon", | |
| "roblox", | |
| "netflix", | |
| "spotify", | |
| "uber", | |
| "starbucks", | |
| "best buy", | |
| "gamestop", | |
| "visa", | |
| "lyft", | |
| ] | |
| templates = [ | |
| "{b}", | |
| "{b} gift card", | |
| "{b} gift card email delivery", | |
| "{b} gift card digital code", | |
| "{b} gift", | |
| "{b} store gift card", | |
| "{b} card", | |
| "{b} card email delivery", | |
| ] | |
| queries: set[str] = set(q.lower() for q in base_queries) | |
| while len(queries) < n: | |
| b = random.choice(brands) | |
| t = random.choice(templates) | |
| q = t.format(b=b).strip().lower() | |
| # Add denomination variants | |
| if random.random() < 0.25: | |
| q = f"{q} {random.choice([10, 15, 20, 25, 50, 100])}" | |
| queries.add(q) | |
| for q in queries: | |
| session.add(SearchSuggestion(query=q, score=random.randint(1, 5000))) | |
| def main() -> None: | |
| random.seed(42) | |
| Faker.seed(42) | |
| data_dir = _API_ROOT / "data" | |
| data_dir.mkdir(parents=True, exist_ok=True) | |
| db_path = data_dir / "app.db" | |
| # Remove corrupt DB if present | |
| if db_path.exists(): | |
| import sqlite3 | |
| try: | |
| conn = sqlite3.connect(str(db_path)) | |
| conn.execute("PRAGMA integrity_check") | |
| conn.close() | |
| except Exception: | |
| print(f"Removing corrupt DB: {db_path}") | |
| db_path.unlink(missing_ok=True) | |
| init_db() | |
| with Session(engine) as session: | |
| seed_users(session) | |
| seed_products(session) | |
| seed_variants(session) | |
| seed_reviews(session) | |
| seed_suggestions(session) | |
| session.commit() | |
| if __name__ == "__main__": | |
| main() | |