ChatWB / api /ads.py
Levin-Aleksey's picture
fix
4dfc186
import numpy as np
import pandas as pd
from typing import Any
from sqlalchemy import text
from langchain_core.tools import tool
# Импортируем подключение к БД. Убедитесь, что путь соответствует вашей структуре проекта.
from database import engine
ALLOWED_PRODUCT_SORT = {"roas", "ad_spend_total", "orders_total", "revenue_total"}
ALLOWED_CAMPAIGN_SORT = {"ad_spend_total", "roas", "orders_total", "cpo"}
ALLOWED_CATEGORY_SORT = {"ad_spend_total", "roas", "orders_total"}
def clean_dataframe(df: pd.DataFrame) -> pd.DataFrame:
"""Заменяет inf и NaN на None для корректной JSON-сериализации агенту."""
return df.replace([np.inf, -np.inf, np.nan], None)
@tool
def get_dashboard_ads_summary() -> Any:
"""Сводка рекламы для дашборда.
Быстрая сводка: количество товаров с рекламой, расход за 7 дней,
заказы, выручка, средний ROAS, товары с низким ROAS и падающие товары.
"""
query = text("""
SELECT
COUNT(*) as products_with_ads,
SUM(ad_spend_total)::numeric(14,2) as total_spend_7d,
SUM(orders_total) as total_orders_7d,
SUM(revenue_total)::numeric(14,2) as total_revenue_7d,
ROUND(SUM(revenue_total) / NULLIF(SUM(ad_spend_total), 0), 2) as avg_roas,
COUNT(*) FILTER (WHERE roas < 1) as products_low_roas,
COUNT(*) FILTER (WHERE orders_dyn < -20) as products_falling
FROM dm_ad_spend_product_7d
""")
df = pd.read_sql(query, engine)
return clean_dataframe(df).to_dict(orient="records")[0]
@tool
def get_ads_summary() -> Any:
"""Детальная сводка рекламы.
Полная сводка за 7 дней с динамикой относительно прошлого периода:
расход, просмотры, клики, заказы, выручка, ROAS, CPO, CTR.
"""
query = text("""
SELECT
COUNT(DISTINCT nm_id) as products_count,
SUM(ad_spend_total)::numeric(14,2) as ad_spend_total,
SUM(ad_spend_prev)::numeric(14,2) as ad_spend_prev,
ROUND((SUM(ad_spend_total) - SUM(ad_spend_prev)) / NULLIF(SUM(ad_spend_prev), 0) * 100, 1) as ad_spend_dyn,
SUM(views_total) as views_total,
SUM(clicks_total) as clicks_total,
SUM(orders_total) as orders_total,
SUM(orders_prev) as orders_prev,
ROUND((SUM(orders_total) - SUM(orders_prev))::numeric / NULLIF(SUM(orders_prev), 0) * 100, 1) as orders_dyn,
SUM(revenue_total)::numeric(14,2) as revenue_total,
ROUND(SUM(revenue_total) / NULLIF(SUM(ad_spend_total), 0), 2) as roas_avg,
ROUND(SUM(ad_spend_total) / NULLIF(SUM(orders_total), 0), 2) as cpo_avg,
ROUND(SUM(clicks_total)::numeric / NULLIF(SUM(views_total), 0) * 100, 2) as ctr_avg
FROM dm_ad_spend_product_7d
""")
df = pd.read_sql(query, engine)
return clean_dataframe(df).to_dict(orient="records")[0]
@tool
def get_ads_trend(days: int = 30) -> Any:
"""Тренд рекламных расходов по дням.
Args:
days: Количество дней для графика. По умолчанию 30.
"""
query = text("""
SELECT
day::text as date,
ad_spend_total
FROM dm_ad_spend_daily
WHERE day >= CURRENT_DATE - :days
ORDER BY day ASC
""")
df = pd.read_sql(query, engine, params={"days": days})
return clean_dataframe(df).to_dict(orient="records")
@tool
def get_ads_products_top(limit: int = 20, sort_by: str = "roas") -> Any:
"""ТОП товаров по эффективности рекламы.
Args:
limit: Сколько товаров вернуть.
sort_by: Поле сортировки. Допустимые значения:
'roas', 'ad_spend_total', 'orders_total', 'revenue_total'.
"""
if sort_by not in ALLOWED_PRODUCT_SORT:
return {"error": f"Некорректный sort_by='{sort_by}'. Допустимо: {sorted(ALLOWED_PRODUCT_SORT)}"}
query = text(f"""
SELECT
nm_id, vendor_code, title, brand, category,
ad_spend_total, views_total, clicks_total, orders_total, revenue_total,
ctr, cpc, cpm, cr, roas, cpo,
ad_spend_dyn, orders_dyn, revenue_dyn
FROM dm_ad_spend_product_7d
WHERE {sort_by} IS NOT NULL
ORDER BY {sort_by} DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@tool
def get_ads_products_losers(limit: int = 20) -> Any:
"""Убыточные товары в рекламе.
Возвращает товары с ROAS < 1 и заметным рекламным расходом (больше 100 руб).
"""
query = text("""
SELECT
nm_id, vendor_code, title, brand,
ad_spend_total, orders_total, revenue_total,
roas, cpo,
ad_spend_dyn, orders_dyn
FROM dm_ad_spend_product_7d
WHERE roas IS NOT NULL AND roas < 1 AND ad_spend_total > 100
ORDER BY ad_spend_total DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@tool
def get_ads_products_rising(limit: int = 20) -> Any:
"""Товары с ростом эффективности рекламы.
Возвращает товары с положительной динамикой заказов.
"""
query = text("""
SELECT
nm_id, vendor_code, title, brand,
ad_spend_total, orders_total, revenue_total,
roas, cpo,
orders_dyn, revenue_dyn
FROM dm_ad_spend_product_7d
WHERE orders_dyn IS NOT NULL AND orders_dyn > 0
ORDER BY orders_dyn DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@tool
def get_ads_products_falling(limit: int = 20) -> Any:
"""Товары с падением эффективности рекламы.
Возвращает товары с отрицательной динамикой заказов.
"""
query = text("""
SELECT
nm_id, vendor_code, title, brand,
ad_spend_total, orders_total, revenue_total,
roas, cpo,
orders_dyn, revenue_dyn
FROM dm_ad_spend_product_7d
WHERE orders_dyn IS NOT NULL AND orders_dyn < 0
ORDER BY orders_dyn ASC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@tool
def get_ads_product(nm_id: int) -> Any:
"""Детали рекламы по конкретному товару.
Args:
nm_id: Артикул Wildberries.
"""
query = text("""
SELECT
nm_id, vendor_code, title, brand, category,
period_start::text, period_end::text,
ad_spend_total, ad_spend_prev, ad_spend_dyn,
views_total, views_prev, views_dyn,
clicks_total, clicks_prev, clicks_dyn,
orders_total, orders_prev, orders_dyn,
revenue_total, revenue_prev, revenue_dyn,
ctr, cpc, cpm, cr, roas, cpo
FROM dm_ad_spend_product_7d
WHERE nm_id = :nm_id
""")
df = pd.read_sql(query, engine, params={"nm_id": nm_id})
if df.empty:
return {"error": f"Товар с артикулом {nm_id} не найден в рекламных данных."}
return clean_dataframe(df).to_dict(orient="records")[0]
@tool
def get_ads_campaigns(limit: int = 20, sort_by: str = "ad_spend_total") -> Any:
"""Список рекламных кампаний с метриками.
Args:
limit: Сколько кампаний вернуть.
sort_by: Поле сортировки. Допустимые значения:
'ad_spend_total', 'roas', 'orders_total', 'cpo'.
"""
if sort_by not in ALLOWED_CAMPAIGN_SORT:
return {"error": f"Некорректный sort_by='{sort_by}'. Допустимо: {sorted(ALLOWED_CAMPAIGN_SORT)}"}
query = text(f"""
SELECT
campaign_id, campaign_name, campaign_status, campaign_type,
daily_budget, budget_total,
nm_cnt, ad_spend_total, views_total, clicks_total, orders_total, revenue_total,
ctr, cpc, cpm, cr, roas, cpo
FROM dm_ad_spend_campaign_7d
ORDER BY {sort_by} DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@tool
def get_ads_campaigns_inefficient(limit: int = 20) -> Any:
"""Неэффективные рекламные кампании.
Возвращает кампании с низким ROAS (< 1) или высоким CPO (> 500).
"""
query = text("""
SELECT
campaign_id, campaign_name, campaign_status, campaign_type,
ad_spend_total, orders_total, revenue_total,
roas, cpo,
CASE
WHEN roas < 1 THEN 'low_roas'
WHEN cpo > 500 THEN 'high_cpo'
ELSE 'other'
END as issue_type
FROM dm_ad_spend_campaign_7d
WHERE (roas IS NOT NULL AND roas < 1) OR (cpo IS NOT NULL AND cpo > 500)
ORDER BY ad_spend_total DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@tool
def get_ads_campaign(campaign_id: int) -> Any:
"""Детали конкретной рекламной кампании.
Args:
campaign_id: ID рекламной кампании.
"""
query = text("""
SELECT
campaign_id, campaign_name, campaign_status, campaign_type,
daily_budget, budget_total,
period_start::text, period_end::text,
nm_cnt, ad_spend_total, views_total, clicks_total,
atbs_total, orders_total, shks_total, revenue_total,
ctr, cpc, cpm, cr, roas, cpo
FROM dm_ad_spend_campaign_7d
WHERE campaign_id = :campaign_id
""")
df = pd.read_sql(query, engine, params={"campaign_id": campaign_id})
if df.empty:
return {"error": f"Кампания с ID {campaign_id} не найдена."}
return clean_dataframe(df).to_dict(orient="records")[0]
@tool
def get_ads_categories(sort_by: str = "ad_spend_total") -> Any:
"""Эффективность рекламы по категориям.
Args:
sort_by: Поле сортировки. Допустимые значения:
'ad_spend_total', 'roas', 'orders_total'.
"""
if sort_by not in ALLOWED_CATEGORY_SORT:
return {"error": f"Некорректный sort_by='{sort_by}'. Допустимо: {sorted(ALLOWED_CATEGORY_SORT)}"}
query = text(f"""
SELECT
category,
nm_cnt, campaign_cnt,
ad_spend_total, views_total, clicks_total, orders_total, revenue_total,
ctr, cpc, cpm, cr, roas, cpo
FROM dm_ad_spend_category_7d
ORDER BY {sort_by} DESC
""")
df = pd.read_sql(query, engine)
return clean_dataframe(df).to_dict(orient="records")
@tool
def get_ads_category_benchmark(nm_id: int) -> Any:
"""Сравнение товара со средними рекламными метриками по его категории.
Args:
nm_id: Артикул Wildberries.
"""
query = text("""
WITH product AS (
SELECT * FROM dm_ad_spend_product_7d WHERE nm_id = :nm_id
),
cat_avg AS (
SELECT
category,
ROUND(AVG(ctr), 2) as avg_ctr,
ROUND(AVG(cpc), 2) as avg_cpc,
ROUND(AVG(cr), 2) as avg_cr,
ROUND(AVG(roas), 2) as avg_roas,
ROUND(AVG(cpo), 2) as avg_cpo
FROM dm_ad_spend_product_7d
WHERE category = (SELECT category FROM product)
GROUP BY category
)
SELECT
p.nm_id, p.title, p.category,
p.ctr, c.avg_ctr, ROUND(p.ctr - c.avg_ctr, 2) as ctr_diff,
p.cpc, c.avg_cpc, ROUND(p.cpc - c.avg_cpc, 2) as cpc_diff,
p.cr, c.avg_cr, ROUND(p.cr - c.avg_cr, 2) as cr_diff,
p.roas, c.avg_roas, ROUND(p.roas - c.avg_roas, 2) as roas_diff,
p.cpo, c.avg_cpo, ROUND(p.cpo - c.avg_cpo, 2) as cpo_diff
FROM product p
LEFT JOIN cat_avg c ON p.category = c.category
""")
df = pd.read_sql(query, engine, params={"nm_id": nm_id})
if df.empty:
return {"error": f"Товар с артикулом {nm_id} не найден."}
return clean_dataframe(df).to_dict(orient="records")[0]
@tool
def get_ads_alerts(limit: int = 30) -> Any:
"""Все проблемы с рекламой.
Возвращает проблемные товары: низкий ROAS, высокий CPO,
сильное падение заказов и другие рекламные алерты.
"""
query = text("""
SELECT
nm_id, vendor_code, title,
ad_spend_total, orders_total, revenue_total,
roas, cpo, orders_dyn,
CASE
WHEN roas < 0.5 THEN 'critical_roas'
WHEN roas < 1 THEN 'low_roas'
WHEN orders_dyn < -50 THEN 'orders_crash'
WHEN orders_dyn < -20 THEN 'orders_falling'
WHEN cpo > 1000 THEN 'very_high_cpo'
ELSE 'other'
END as alert_type,
CASE
WHEN roas < 0.5 THEN 1
WHEN orders_dyn < -50 THEN 2
WHEN roas < 1 THEN 3
WHEN orders_dyn < -20 THEN 4
ELSE 5
END as priority
FROM dm_ad_spend_product_7d
WHERE roas < 1 OR orders_dyn < -20 OR cpo > 1000
ORDER BY priority ASC, ad_spend_total DESC
LIMIT :limit
""")
df = pd.read_sql(query, engine, params={"limit": limit})
return clean_dataframe(df).to_dict(orient="records")
@tool
def add_insight(nm_id: int, tag: str, recommendation: str, score: float) -> Any:
"""Сохранить AI-рекомендацию по рекламе для товара в базу.
Args:
nm_id: Артикул Wildberries.
tag: Тег рекомендации. Допустимые значения:
'ads_stop', 'ads_reduce', 'ads_boost', 'ads_optimize', 'ads_test'.
recommendation: Конкретное действие с цифрами.
score: Уверенность от 0.5 до 1.0.
"""
query = text("""
INSERT INTO ai_product_insights (product_nm_id, ai_strategy_tag, ai_recommendation, confidence_score)
VALUES (:nm_id, :tag, :rec, :score)
""")
try:
with engine.begin() as conn:
conn.execute(query, {
"nm_id": nm_id,
"tag": tag,
"rec": recommendation,
"score": score
})
return {"status": "success", "message": "Инсайт успешно сохранен."}
except Exception as e:
return {"error": f"Ошибка сохранения в базу данных: {str(e)}"}
# Совместимость со старым именем инструмента
get_ads_product_details = get_ads_product
ads_tools = [
get_dashboard_ads_summary,
get_ads_summary,
get_ads_trend,
get_ads_products_top,
get_ads_products_losers,
get_ads_products_rising,
get_ads_products_falling,
get_ads_product,
get_ads_campaigns,
get_ads_campaigns_inefficient,
get_ads_campaign,
get_ads_categories,
get_ads_category_benchmark,
get_ads_alerts,
add_insight,
]