Spaces:
Sleeping
Sleeping
import os | |
import json | |
import stripe | |
from typing import List, Tuple | |
def parse_product_quantities(product_quantities_str: str) -> List[Tuple[str, int]]: | |
"""Parse a product quantities string into a list of (product_id, quantity) tuples. | |
Args: | |
product_quantities_str: String with format "(product_1,k_1),(product_2,k_2),...,(product_n,k_n)" | |
where product_i must match a key in products.json and k_i is the quantity. | |
Example: "(finasteride,1),(NP_consultation,1)" | |
Returns: | |
List of tuples containing (product_id, quantity) | |
Raises: | |
ValueError: If the input string format is invalid | |
""" | |
product_quantities = [] | |
pairs = product_quantities_str.strip().split("),(") | |
for pair in pairs: | |
# Clean up the pair string | |
pair = pair.replace("(", "").replace(")", "") | |
if not pair: | |
continue | |
parts = pair.split(",") | |
if len(parts) != 2: | |
raise ValueError(f"Invalid format in pair: {pair}. Expected 'product,quantity'") | |
product_id, quantity_str = parts | |
try: | |
quantity = int(quantity_str) | |
except ValueError: | |
raise ValueError(f"Quantity must be an integer: {quantity_str}") | |
product_quantities.append((product_id, quantity)) | |
return product_quantities | |
def generate_payment_link(product_quantities_str: str) -> str: | |
"""Generate a Stripe payment link for specified products. | |
Args: | |
product_quantities_str: String describing products and quantities | |
Format: "(product_1,k_1),(product_2,k_2),...,(product_n,k_n)" | |
Example: "(finasteride,1),(NP_consultation,1)" | |
Returns: | |
Stripe payment link URL | |
Raises: | |
ValueError: If products are not found or quantities are invalid | |
stripe.error.StripeError: If there's an error with the Stripe API | |
""" | |
try: | |
# Parse the product quantities | |
product_quantities = parse_product_quantities(product_quantities_str) | |
# Load the product catalog | |
script_dir = os.path.dirname(os.path.abspath(__file__)) | |
catalog_path = os.path.join(script_dir, "products.json") | |
try: | |
with open(catalog_path, 'r') as f: | |
catalog = json.load(f) | |
except (FileNotFoundError, json.JSONDecodeError) as e: | |
raise ValueError(f"Error loading product catalog: {str(e)}") | |
# Create line items for the payment link | |
price_quantities = {} | |
for product_id, quantity in product_quantities: | |
if product_id not in catalog: | |
raise ValueError(f"Product '{product_id}' not found in catalog") | |
price_id = catalog[product_id]["price_id"] | |
price_quantities[price_id] = price_quantities.get(price_id, 0) + quantity | |
# Create line items from the consolidated price_quantities | |
line_items = [ | |
{"price": price_id, "quantity": quantity} | |
for price_id, quantity in price_quantities.items() | |
] | |
# Create the payment link | |
payment_link = stripe.PaymentLink.create( | |
line_items=line_items, | |
phone_number_collection={"enabled": True}, | |
after_completion={ | |
"type": "hosted_confirmation", | |
"hosted_confirmation": { | |
"custom_message": "Thank you for your order! π\nA nurse practitioner will review your order and contact you shortly." | |
} | |
} | |
) | |
return payment_link.url | |
except stripe.error.StripeError as e: | |
raise ValueError(f"Stripe API error: {str(e)}") | |
def create_non_recurring_item_with_price(product_name: str, price: int, description: str = "Default description") -> stripe.Product: | |
"""Create a new Stripe product with a non-recurring price. | |
Args: | |
product_name: Name of the product | |
price: Price in cents | |
description: Product description | |
Returns: | |
Stripe Product object | |
Raises: | |
stripe.error.StripeError: If there's an error with the Stripe API | |
""" | |
try: | |
item = stripe.Product.create( | |
name=product_name, | |
description=description, | |
default_price_data={ | |
"unit_amount": price, | |
"currency": "usd", | |
}, | |
expand=["default_price"] | |
) | |
print(f"Created product with id: {item.id}") | |
print(f"Created price with id: {item.default_price.id}") | |
return item | |
except stripe.error.StripeError as e: | |
raise ValueError(f"Error creating Stripe product: {str(e)}") | |