|
import os |
|
import torch |
|
import pandas as pd |
|
import logging |
|
import faiss |
|
import numpy as np |
|
import time |
|
import gensim |
|
import random |
|
import multiprocessing |
|
from fastapi import FastAPI, HTTPException, BackgroundTasks |
|
from pydantic import BaseModel |
|
from datasets import load_dataset |
|
from huggingface_hub import login, hf_hub_download, HfApi, create_repo |
|
from keybert import KeyBERT |
|
from sentence_transformers import SentenceTransformer |
|
from joblib import Parallel, delayed |
|
from tqdm import tqdm |
|
from fastapi.middleware.cors import CORSMiddleware |
|
import tempfile |
|
import re |
|
import sys |
|
import asyncio |
|
import gc |
|
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
thread_pool = ThreadPoolExecutor(max_workers=min(32, os.cpu_count() * 2)) |
|
|
|
|
|
last_gc_time = time.time() |
|
request_count = 0 |
|
CLEANUP_INTERVAL = 100 |
|
|
|
startup_semaphore = asyncio.Semaphore(1) |
|
|
|
|
|
app = FastAPI(title="๐ KeyBERT + Word2Vec ๊ธฐ๋ฐ FAISS ๊ฒ์ API", version="1.2") |
|
|
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["https://dev.kobay.co.kr"], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
logger.info(f"๐ ์คํ ๋๋ฐ์ด์ค: {device.upper()}") |
|
|
|
|
|
HF_API_TOKEN = os.getenv("HF_API_TOKEN") |
|
|
|
if multiprocessing.current_process().name == "MainProcess": |
|
if HF_API_TOKEN and HF_API_TOKEN.startswith("hf_"): |
|
logger.info("๐ Hugging Face API ๋ก๊ทธ์ธ ์ค (MainProcess)...") |
|
login(token=HF_API_TOKEN) |
|
else: |
|
logger.warning("โ ๏ธ HF_API_TOKEN์ด ์๊ฑฐ๋ ์๋ชป๋ ํ์์
๋๋ค.") |
|
|
|
|
|
|
|
word2vec_model = None |
|
kw_model = None |
|
embedding_model = None |
|
|
|
|
|
async def load_models(): |
|
"""๋ชจ๋ ํ์ํ ๋ชจ๋ธ์ ๋ก๋ํ๋ ํจ์ (์ง์ฐ ๋ก๋ฉ)""" |
|
global word2vec_model, kw_model, embedding_model |
|
|
|
|
|
if word2vec_model is not None and embedding_model is not None: |
|
return True |
|
|
|
worker_id = os.getenv("WORKER_ID", multiprocessing.current_process().name) |
|
logger.info(f"๐ ์์ปค {worker_id}: ๋ชจ๋ธ ๋ก๋ ์์...") |
|
|
|
try: |
|
|
|
if word2vec_model is None: |
|
MODEL_REPO = "aikobay/item-model" |
|
model_path = hf_hub_download(repo_id=MODEL_REPO, filename="item_vectors.bin", repo_type="dataset", token=HF_API_TOKEN) |
|
word2vec_model = gensim.models.KeyedVectors.load_word2vec_format(model_path, binary=True) |
|
logger.info(f"โ
์์ปค {worker_id}: Word2Vec ๋ชจ๋ธ ๋ก๋ ์๋ฃ! ๋จ์ด ์: {len(word2vec_model.key_to_index)}") |
|
|
|
|
|
if kw_model is None: |
|
kw_model = KeyBERT("paraphrase-multilingual-MiniLM-L12-v2") |
|
logger.info(f"โ
์์ปค {worker_id}: KeyBERT ๋ชจ๋ธ ๋ก๋ ์๋ฃ!") |
|
|
|
|
|
if embedding_model is None: |
|
try: |
|
embedding_model = SentenceTransformer("jhgan/ko-sroberta-multitask") |
|
logger.info(f"โ
์์ปค {worker_id}: ํ๊ตญ์ด ํนํ ์๋ฒ ๋ฉ ๋ชจ๋ธ ๋ก๋ ์๋ฃ!") |
|
except Exception as e: |
|
logger.warning(f"โ ๏ธ ์์ปค {worker_id}: ํ๊ตญ์ด ํนํ ๋ชจ๋ธ ๋ก๋ ์คํจ, ๊ธฐ๋ณธ ๋ชจ๋ธ ์ฌ์ฉ: {e}") |
|
embedding_model = SentenceTransformer("paraphrase-multilingual-MiniLM-L12-v2") |
|
logger.info(f"โ
์์ปค {worker_id}: ๊ธฐ๋ณธ ์๋ฒ ๋ฉ ๋ชจ๋ธ ๋ก๋ ์๋ฃ!") |
|
|
|
if device == "cuda": |
|
try: |
|
|
|
if worker_id == "MainProcess" or worker_id.endswith("0") or worker_id.endswith("1") or worker_id.endswith("2"): |
|
embedding_model.to(device) |
|
embedding_model.eval() |
|
logger.info(f"โ
์์ปค {worker_id}: GPU์ ์๋ฒ ๋ฉ ๋ชจ๋ธ ๋ก๋ ์๋ฃ!") |
|
else: |
|
logger.info(f"โ ๏ธ ์์ปค {worker_id}: ๋ฉ๋ชจ๋ฆฌ ํจ์จํ๋ฅผ ์ํด CPU ๋ชจ๋ ์ฌ์ฉ") |
|
except Exception as e: |
|
logger.error(f"โ ์์ปค {worker_id}: GPU ๋ชจ๋ธ ์ด๊ธฐํ ์ค๋ฅ: {e}") |
|
|
|
|
|
await cleanup_memory(force=True) |
|
|
|
return True |
|
|
|
except Exception as e: |
|
logger.error(f"โ ์์ปค {worker_id}: ๋ชจ๋ธ ๋ก๋ ์ค ์ค๋ฅ ๋ฐ์: {e}") |
|
return False |
|
|
|
|
|
async def load_huggingface_jsonl(dataset_name, split="train"): |
|
"""Hugging Face Hub์์ ๋ฐ์ดํฐ์
๋น๋๊ธฐ ๋ก๋""" |
|
try: |
|
|
|
loop = asyncio.get_event_loop() |
|
|
|
def _load_dataset(): |
|
repo_id = f"aikobay/{dataset_name}" |
|
dataset = load_dataset(repo_id, split=split) |
|
return dataset.to_pandas().dropna() |
|
|
|
|
|
df = await loop.run_in_executor(thread_pool, _load_dataset) |
|
return df |
|
except Exception as e: |
|
logger.error(f"โ ๋ฐ์ดํฐ ๋ก๋ ์ค ์ค๋ฅ ๋ฐ์: {e}") |
|
return pd.DataFrame() |
|
|
|
|
|
active_sale_items = None |
|
|
|
|
|
faiss_index = None |
|
indexed_items = [] |
|
|
|
|
|
async def cleanup_memory(force=False): |
|
"""์ฃผ๊ธฐ์ ์ธ ๋ฉ๋ชจ๋ฆฌ ์ ๋ฆฌ ์ํ""" |
|
global last_gc_time |
|
|
|
|
|
current_time = time.time() |
|
|
|
|
|
if force or (current_time - last_gc_time > 15): |
|
|
|
gc.collect() |
|
|
|
|
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
|
|
|
|
last_gc_time = current_time |
|
logger.debug("๐งน ๋ฉ๋ชจ๋ฆฌ ์ ๋ฆฌ ์๋ฃ") |
|
|
|
return True |
|
|
|
return False |
|
|
|
|
|
async def encode_texts_parallel(texts, batch_size=1024): |
|
"""GPU ํ์ฉ + ๋ฉ๋ชจ๋ฆฌ ๋์ ๋ฐฉ์ง ์ต์ ํ ๋ฒกํฐํ""" |
|
|
|
if embedding_model is None: |
|
|
|
worker_id = multiprocessing.current_process().name |
|
logger.warning(f"โ ๏ธ ์์ปค {worker_id}: ๋ฒกํฐํ ์ ๋ชจ๋ธ ๋ก๋ ํ์") |
|
if not await load_models(): |
|
logger.error(f"โ ์์ปค {worker_id}: ๋ชจ๋ธ ๋ก๋ ์คํจ, ๋ฒกํฐํ ๋ถ๊ฐ") |
|
return np.array([]).astype("float32") |
|
|
|
if not texts: |
|
return np.array([]).astype("float32") |
|
|
|
try: |
|
|
|
if len(texts) > 10: |
|
batch_size = min(1024, batch_size) |
|
else: |
|
batch_size = min(2048, batch_size) |
|
|
|
loop = asyncio.get_event_loop() |
|
|
|
def _encode_efficiently(): |
|
|
|
with torch.no_grad(): |
|
return embedding_model.encode( |
|
texts, |
|
batch_size=batch_size, |
|
convert_to_numpy=True, |
|
show_progress_bar=False, |
|
device=device, |
|
normalize_embeddings=True |
|
) |
|
|
|
|
|
embeddings = await loop.run_in_executor(thread_pool, _encode_efficiently) |
|
return embeddings.astype("float32") |
|
|
|
except Exception as e: |
|
logger.error(f"โ ๋ฒกํฐํ ์ค๋ฅ: {str(e)}") |
|
return np.array([]).astype("float32") |
|
|
|
finally: |
|
|
|
global request_count |
|
|
|
if request_count % 25 == 0: |
|
await cleanup_memory(force=True) |
|
|
|
|
|
|
|
async def save_faiss_index(): |
|
"""FAISS ์ธ๋ฑ์ค๋ฅผ Hugging Face Hub์ ์ ์ฅ (๋น๋๊ธฐ ์ง์)""" |
|
global faiss_index, indexed_items |
|
|
|
if faiss_index is None or not indexed_items: |
|
logger.error("โ ์ ์ฅํ FAISS ์ธ๋ฑ์ค๊ฐ ์์ต๋๋ค.") |
|
return False |
|
|
|
try: |
|
|
|
repo_id = os.getenv("HF_INDEX_REPO", "aikobay/saleitem_faiss_index") |
|
|
|
|
|
loop = asyncio.get_event_loop() |
|
|
|
|
|
def _save_index(): |
|
|
|
api = HfApi() |
|
|
|
|
|
try: |
|
api.repo_info(repo_id=repo_id, repo_type="dataset") |
|
logger.info(f"โ
๊ธฐ์กด ๋ ํฌ์งํ ๋ฆฌ ์ฌ์ฉ: {repo_id}") |
|
except Exception: |
|
logger.info(f"๐ ๋ ํฌ์งํ ๋ฆฌ๊ฐ ์กด์ฌํ์ง ์์ ์๋ก ์์ฑํฉ๋๋ค: {repo_id}") |
|
create_repo( |
|
repo_id=repo_id, |
|
repo_type="dataset", |
|
private=True, |
|
exist_ok=True |
|
) |
|
logger.info(f"โ
๋ ํฌ์งํ ๋ฆฌ ์์ฑ ์๋ฃ: {repo_id}") |
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir: |
|
index_path = os.path.join(temp_dir, "faiss_index.bin") |
|
items_path = os.path.join(temp_dir, "indexed_items.txt") |
|
|
|
|
|
faiss.write_index(faiss_index, index_path) |
|
|
|
|
|
with open(items_path, "w", encoding="utf-8") as f: |
|
f.write("\n".join(indexed_items)) |
|
|
|
|
|
readme_path = os.path.join(temp_dir, "README.md") |
|
with open(readme_path, "w", encoding="utf-8") as f: |
|
f.write(f"""# FAISS ์ธ๋ฑ์ค ์ ์ฅ์ |
|
์ด ์ ์ฅ์๋ ์ํ ๊ฒ์์ ์ํ FAISS ์ธ๋ฑ์ค์ ๊ด๋ จ ๋ฐ์ดํฐ๋ฅผ ํฌํจํ๊ณ ์์ต๋๋ค. |
|
- ์ต์ข
์
๋ฐ์ดํธ: {pd.Timestamp.now()} |
|
- ์ธ๋ฑ์ค ํญ๋ชฉ ์: {len(indexed_items)} |
|
- ๋ชจ๋ธ: KeyBERT + Word2Vec |
|
์ด ์ ์ฅ์๋ 'aikobay/initial_saleitem_dataset'์ ์ํ ๋ฐ์ดํฐ๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ์์ฑ๋ ๋ฒกํฐ ์ธ๋ฑ์ค๋ฅผ ์ ์ฅํ๊ธฐ ์ํด ์๋ ์์ฑ๋์์ต๋๋ค. |
|
""") |
|
|
|
|
|
for file_path, file_name in [ |
|
(index_path, "faiss_index.bin"), |
|
(items_path, "indexed_items.txt"), |
|
(readme_path, "README.md") |
|
]: |
|
api.upload_file( |
|
path_or_fileobj=file_path, |
|
path_in_repo=file_name, |
|
repo_id=repo_id, |
|
repo_type="dataset" |
|
) |
|
|
|
logger.info(f"โ
FAISS ์ธ๋ฑ์ค๊ฐ Hugging Face Hub์ ์ ์ฅ๋์์ต๋๋ค. ๋ ํฌ: {repo_id}") |
|
return True |
|
|
|
|
|
result = await loop.run_in_executor(thread_pool, _save_index) |
|
return result |
|
|
|
except Exception as e: |
|
logger.error(f"โ FAISS ์ธ๋ฑ์ค Hub ์ ์ฅ ์ค ์ค๋ฅ ๋ฐ์: {e}") |
|
|
|
|
|
try: |
|
loop = asyncio.get_event_loop() |
|
|
|
def _local_backup(): |
|
local_path = os.path.join(os.getcwd(), "faiss_index.bin") |
|
faiss.write_index(faiss_index, local_path) |
|
with open("indexed_items.txt", "w", encoding="utf-8") as f: |
|
f.write("\n".join(indexed_items)) |
|
logger.info(f"โ
FAISS ์ธ๋ฑ์ค๊ฐ ๋ก์ปฌ์ ๋ฐฑ์
์ ์ฅ๋์์ต๋๋ค: {local_path}") |
|
return True |
|
|
|
result = await loop.run_in_executor(thread_pool, _local_backup) |
|
return result |
|
except Exception as local_err: |
|
logger.error(f"โ ๋ก์ปฌ ๋ฐฑ์
์ ์ฅ๋ ์คํจ: {local_err}") |
|
return False |
|
|
|
|
|
async def load_faiss_index_safe(): |
|
"""์์ ํ๊ฒ FAISS ์ธ๋ฑ์ค๋ฅผ ์ฝ๊ธฐ ์ ์ฉ์ผ๋ก ๋ก๋""" |
|
global faiss_index, indexed_items |
|
|
|
|
|
max_retries = 5 |
|
retry_delay = 1 |
|
worker_id = os.getenv("WORKER_ID", multiprocessing.current_process().name) |
|
|
|
for attempt in range(max_retries): |
|
try: |
|
|
|
repo_id = os.getenv("HF_INDEX_REPO", "aikobay/saleitem_faiss_index") |
|
|
|
|
|
index_path = hf_hub_download( |
|
repo_id=repo_id, |
|
filename="faiss_index.bin", |
|
repo_type="dataset" |
|
) |
|
|
|
items_path = hf_hub_download( |
|
repo_id=repo_id, |
|
filename="indexed_items.txt", |
|
repo_type="dataset" |
|
) |
|
|
|
|
|
loaded_index = faiss.read_index(index_path) |
|
|
|
|
|
with open(items_path, "r", encoding="utf-8") as f: |
|
loaded_items = f.read().splitlines() |
|
|
|
|
|
faiss_index = loaded_index |
|
indexed_items = loaded_items |
|
|
|
logger.info(f"โ
์์ปค {worker_id}: FAISS ์ธ๋ฑ์ค ๋ก๋ ์๋ฃ. ์ด {len(indexed_items)}๊ฐ ํญ๋ชฉ") |
|
return True |
|
|
|
except Exception as e: |
|
logger.warning(f"โ ๏ธ ์์ปค {worker_id}: ์ธ๋ฑ์ค ๋ก๋ ์คํจ (์๋ {attempt+1}/{max_retries}): {e}") |
|
|
|
|
|
await asyncio.sleep(retry_delay * (2 ** attempt)) |
|
|
|
logger.error(f"โ ์์ปค {worker_id}: ์ธ๋ฑ์ค ๋ก๋ ์ต๋ ์ฌ์๋ ํ์ ์ด๊ณผ") |
|
return False |
|
|
|
|
|
async def extract_keywords(query: str, top_n: int = 2): |
|
"""KeyBERT ์ต์ ํ ํค์๋ ์ถ์ถ (์ฑ๋ฅ ์ค์ฌ)""" |
|
|
|
if len(query) <= 3: |
|
return [query] |
|
|
|
loop = asyncio.get_event_loop() |
|
|
|
def _optimized_extract(): |
|
|
|
return kw_model.extract_keywords( |
|
query, |
|
keyphrase_ngram_range=(1, 1), |
|
stop_words=["์ด", "๊ทธ", "์ ", "์", "๋ฅผ", "์", "์์", "์", "๋"], |
|
use_mmr=True, |
|
diversity=0.5, |
|
top_n=top_n |
|
) |
|
|
|
try: |
|
keywords = await loop.run_in_executor(thread_pool, _optimized_extract) |
|
|
|
filtered = [(k, s) for k, s in keywords if s > 0.2] |
|
return [k[0] for k in filtered] |
|
except Exception as e: |
|
logger.error(f"โ ํค์๋ ์ถ์ถ ์ค๋ฅ: {str(e)}") |
|
|
|
return query.split()[:2] |
|
|
|
|
|
async def unified_search(vectors, top_k=5): |
|
"""๋ชจ๋ ๋ฒกํฐ๋ฅผ ํ ๋ฒ์ ๊ฒ์ํ์ฌ ํจ์จ์ฑ ํฅ์""" |
|
if vectors.size == 0: |
|
return [] |
|
|
|
|
|
global request_count |
|
if request_count % 100 == 0: |
|
if faiss_index.nprobe > 8: |
|
faiss_index.nprobe = 8 |
|
|
|
loop = asyncio.get_event_loop() |
|
|
|
def _batch_search(): |
|
|
|
distances, indices = faiss_index.search(vectors, top_k) |
|
return distances, indices |
|
|
|
try: |
|
|
|
distances, indices = await loop.run_in_executor(thread_pool, _batch_search) |
|
|
|
|
|
results = [] |
|
for i in range(len(indices)): |
|
items = [] |
|
for j, (idx, dist) in enumerate(zip(indices[i], distances[i])): |
|
if idx < len(indexed_items): |
|
items.append((idx, float(dist))) |
|
results.append(items) |
|
|
|
return results |
|
except Exception as e: |
|
logger.error(f"โ ๊ฒ์ ์ค๋ฅ: {str(e)}") |
|
return [] |
|
|
|
|
|
|
|
async def search_faiss_with_keywords(query: str, top_k: int = 5, keywords=None): |
|
"""๊ณ ์ ํค์๋ ๊ธฐ๋ฐ FAISS ๊ฒ์ ์ํ (ํจ์จ์ ์ต์ ํ)""" |
|
global faiss_index, indexed_items, request_count |
|
|
|
|
|
start_time = time.time() |
|
|
|
|
|
request_count += 1 |
|
|
|
|
|
if len(query) <= 2: |
|
|
|
vector = await encode_texts_parallel([query]) |
|
distances, indices = faiss_index.search(vector, top_k) |
|
|
|
quick_results = [] |
|
for idx, dist in zip(indices[0], distances[0]): |
|
if idx < len(indexed_items): |
|
item_name = indexed_items[idx] |
|
try: |
|
item_seq = active_sale_items.loc[active_sale_items["ITEMNAME"] == item_name, "ITEMSEQ"].values[0] |
|
quick_results.append({ |
|
"ITEMSEQ": item_seq, |
|
"ITEMNAME": item_name, |
|
"score": float(dist) |
|
}) |
|
except: |
|
continue |
|
|
|
|
|
if request_count % CLEANUP_INTERVAL == 0: |
|
await cleanup_memory() |
|
|
|
return quick_results |
|
|
|
|
|
if keywords is None: |
|
keywords = await extract_keywords(query) |
|
|
|
|
|
|
|
|
|
search_texts = [query] + keywords |
|
|
|
try: |
|
|
|
all_vectors = await encode_texts_parallel(search_texts) |
|
|
|
if all_vectors.size == 0: |
|
logger.warning(f"โ ๏ธ ๋ฒกํฐํ ์คํจ: {query}") |
|
return [] |
|
|
|
|
|
search_results = await unified_search(all_vectors, top_k=top_k) |
|
|
|
if not search_results: |
|
return [] |
|
|
|
|
|
all_results = {} |
|
|
|
|
|
for idx, score in search_results[0]: |
|
if idx < len(indexed_items): |
|
all_results[idx] = score * 3.0 |
|
|
|
|
|
for i in range(1, len(search_results)): |
|
keyword_results = search_results[i] |
|
weight = 0.5 |
|
|
|
for idx, score in keyword_results: |
|
if idx in all_results: |
|
|
|
all_results[idx] = max(all_results[idx], score * weight) |
|
else: |
|
|
|
all_results[idx] = score * weight |
|
|
|
|
|
|
|
sorted_items = sorted(all_results.items(), key=lambda x: x[1], reverse=True) |
|
|
|
|
|
recommendations = [] |
|
item_indices = [idx for idx, _ in sorted_items[:top_k]] |
|
|
|
|
|
if item_indices: |
|
item_names = [indexed_items[idx] for idx in item_indices] |
|
|
|
items_df = active_sale_items[active_sale_items["ITEMNAME"].isin(item_names)] |
|
items_map = dict(zip(items_df["ITEMNAME"], items_df["ITEMSEQ"])) |
|
|
|
for idx, score in sorted_items[:top_k]: |
|
item_name = indexed_items[idx] |
|
if item_name in items_map: |
|
recommendations.append({ |
|
"ITEMSEQ": items_map[item_name], |
|
"ITEMNAME": item_name, |
|
"score": float(score) |
|
}) |
|
|
|
|
|
if request_count % CLEANUP_INTERVAL == 0: |
|
await cleanup_memory() |
|
|
|
|
|
elapsed = time.time() - start_time |
|
if elapsed > 1.0: |
|
logger.info(f"๐ ๊ฒ์ ์๋ฃ | ์์์๊ฐ: {elapsed:.2f}์ด | ๊ฒฐ๊ณผ: {len(recommendations)}๊ฐ") |
|
|
|
return recommendations[:top_k] |
|
|
|
except Exception as e: |
|
logger.error(f"โ ๊ฒ์ ํ๋ก์ธ์ค ์ค๋ฅ: {str(e)}") |
|
return [] |
|
|
|
|
|
class RecommendRequest(BaseModel): |
|
search_query: str |
|
top_k: int = 5 |
|
use_expansion: bool = True |
|
|
|
|
|
def validate_models(): |
|
"""ํ์ํ ๋ชจ๋ธ๋ค์ด ๋ชจ๋ ๋ก๋๋์๋์ง ํ์ธ""" |
|
models_loaded = ( |
|
word2vec_model is not None and |
|
kw_model is not None and |
|
embedding_model is not None |
|
) |
|
return models_loaded |
|
|
|
|
|
@app.post("/api/recommend") |
|
async def recommend(request: RecommendRequest, background_tasks: BackgroundTasks): |
|
if not validate_models(): |
|
|
|
await load_models() |
|
if not validate_models(): |
|
raise HTTPException(status_code=503, detail="์๋น์ค๊ฐ ์ค๋น๋์ง ์์์ต๋๋ค. ์ ์ ํ ๋ค์ ์๋ํด์ฃผ์ธ์.") |
|
|
|
"""๊ณ ์ ์ถ์ฒ API (๋ฉ๋ชจ๋ฆฌ ๊ด๋ฆฌ ์ต์ ํ + ์ฑ๋ฅ ๊ฐ์ )""" |
|
try: |
|
|
|
start_time = time.time() |
|
|
|
|
|
search_query = request.search_query.strip() |
|
if not search_query: |
|
raise HTTPException(status_code=400, detail="๊ฒ์์ด๋ฅผ ์
๋ ฅํด์ฃผ์ธ์") |
|
|
|
top_k = min(max(1, request.top_k), 20) |
|
|
|
|
|
recommendations = await search_faiss_with_keywords( |
|
search_query, |
|
top_k |
|
) |
|
|
|
|
|
result = { |
|
"query": search_query, |
|
"recommendations": recommendations |
|
} |
|
|
|
|
|
elapsed = time.time() - start_time |
|
if elapsed > 1.0: |
|
logger.info(f"โฑ๏ธ API ์๋ต ์๊ฐ: {elapsed:.2f}์ด | ์ฟผ๋ฆฌ: '{search_query}'") |
|
|
|
return result |
|
|
|
except Exception as e: |
|
logger.error(f"โ ์ถ์ฒ ์ฒ๋ฆฌ ์ค๋ฅ: {str(e)}") |
|
raise HTTPException(status_code=500, detail=f"์ถ์ฒ ์ฒ๋ฆฌ ์ค ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค") |
|
|
|
|
|
async def periodic_memory_monitor(): |
|
"""์ฃผ๊ธฐ์ ์ผ๋ก ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ๋์ ๋ชจ๋ํฐ๋งํ๊ณ ์ ๋ฆฌํฉ๋๋ค.""" |
|
try: |
|
worker_id = multiprocessing.current_process().name |
|
logger.info(f"๐ ์์ปค {worker_id}: ์ฃผ๊ธฐ์ ๋ฉ๋ชจ๋ฆฌ ๋ชจ๋ํฐ๋ง ์์") |
|
|
|
while True: |
|
await asyncio.sleep(1800) |
|
|
|
|
|
await cleanup_memory(force=True) |
|
|
|
|
|
if device == "cuda": |
|
allocated = torch.cuda.memory_allocated() / (1024**3) |
|
reserved = torch.cuda.memory_reserved() / (1024**3) |
|
logger.info(f"๐ ์์ปค {worker_id}: GPU ๋ฉ๋ชจ๋ฆฌ - ํ ๋น: {allocated:.2f}GB, ์์ฝ: {reserved:.2f}GB") |
|
|
|
|
|
import psutil |
|
process = psutil.Process() |
|
memory_info = process.memory_info() |
|
logger.info(f"๐ ์์ปค {worker_id}: ์์คํ
๋ฉ๋ชจ๋ฆฌ - RSS: {memory_info.rss/(1024**3):.2f}GB") |
|
|
|
except Exception as e: |
|
logger.error(f"โ ์์ปค {worker_id}: ๋ฉ๋ชจ๋ฆฌ ๋ชจ๋ํฐ๋ง ์ค ์ค๋ฅ: {e}") |
|
|
|
|
|
@app.on_event("startup") |
|
async def startup_event(): |
|
try: |
|
worker_id = multiprocessing.current_process().name |
|
logger.warning(f"๐ก ์์ปค {worker_id} STARTUP ์์") |
|
|
|
|
|
if device == "cuda": |
|
logger.info(f"๐ ์์ปค {worker_id}: GPU ๋ฉ๋ชจ๋ฆฌ ์ํ:") |
|
logger.info(f" - ์ด ๋ฉ๋ชจ๋ฆฌ: {torch.cuda.get_device_properties(0).total_memory/(1024**3):.2f}GB") |
|
logger.info(f" - ํ์ฌ ํ ๋น: {torch.cuda.memory_allocated()/(1024**3):.2f}GB") |
|
logger.info(f" - ์์ฝ: {torch.cuda.memory_reserved()/(1024**3):.2f}GB") |
|
|
|
logger.info(f"๐ ์์ปค {worker_id}: ์ด๊ธฐํ ์ธ๋งํฌ์ด ๋๊ธฐ ์ค...") |
|
async with startup_semaphore: |
|
logger.info(f"โ
์์ปค {worker_id}: ์ด๊ธฐํ ์์") |
|
|
|
|
|
try: |
|
if not await load_models(): |
|
logger.error(f"โ ์์ปค {worker_id} ๋ชจ๋ธ ๋ก๋ ์คํจ") |
|
|
|
except Exception as model_err: |
|
logger.error(f"๐ฅ ์์ปค {worker_id} ๋ชจ๋ธ ๋ก๋ ์ค ์ฌ๊ฐํ ์ค๋ฅ: {model_err}") |
|
|
|
|
|
|
|
global active_sale_items |
|
try: |
|
active_sale_items = await load_huggingface_jsonl("initial_saleitem_dataset") |
|
if active_sale_items is not None and not active_sale_items.empty: |
|
logger.info(f"โ
์์ปค {worker_id} ๋ฐ์ดํฐ ๋ก๋ ์๋ฃ: {len(active_sale_items)}๊ฐ ํญ๋ชฉ") |
|
else: |
|
logger.error(f"โ ์์ปค {worker_id} ๋ฐ์ดํฐ ๋ก๋ ์คํจ - ๋น ๋ฐ์ดํฐ์
") |
|
|
|
except Exception as data_err: |
|
logger.error(f"๐ฅ ์์ปค {worker_id} ๋ฐ์ดํฐ ๋ก๋ ์ค ์ค๋ฅ: {data_err}") |
|
|
|
active_sale_items = pd.DataFrame() |
|
|
|
|
|
faiss_loaded = False |
|
try: |
|
if await load_faiss_index_safe(): |
|
logger.info(f"โ
์์ปค {worker_id} FAISS ์ธ๋ฑ์ค ๋ก๋ ์ฑ๊ณต") |
|
faiss_loaded = True |
|
else: |
|
logger.warning(f"โ ๏ธ ์์ปค {worker_id} FAISS ์ธ๋ฑ์ค ๋ก๋ ์คํจ") |
|
except Exception as faiss_err: |
|
logger.error(f"๐ฅ ์์ปค {worker_id} FAISS ์ธ๋ฑ์ค ๋ก๋ ์ค ์ค๋ฅ: {faiss_err}") |
|
|
|
|
|
|
|
try: |
|
asyncio.create_task(periodic_memory_monitor()) |
|
logger.info(f"โ
์์ปค {worker_id} ๋ฉ๋ชจ๋ฆฌ ๋ชจ๋ํฐ๋ง ์์") |
|
except Exception as monitor_err: |
|
logger.error(f"โ ๏ธ ์์ปค {worker_id} ๋ฉ๋ชจ๋ฆฌ ๋ชจ๋ํฐ๋ง ์์ ์คํจ: {monitor_err}") |
|
|
|
|
|
status = "๐ข ์ ์" if faiss_loaded else "๐ ๋ถ๋ถ์ (์ธ๋ฑ์ค ์์)" |
|
logger.info(f"๐ ์์ปค {worker_id} STARTUP ์๋ฃ: {status}") |
|
except Exception as e: |
|
logger.exception(f"๐ฅ ์์ปค {worker_id} STARTUP ์คํจ: {e}") |
|
|
|
import traceback |
|
logger.error(f"์คํ ์ถ์ : {traceback.format_exc()}") |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
|
|
|
|
workers = int(os.getenv("WORKERS", 3)) |
|
|
|
|
|
if device == "cuda": |
|
|
|
torch.cuda.set_per_process_memory_fraction(0.28) |
|
|
|
uvicorn.run( |
|
"searchWorker:app", |
|
host="0.0.0.0", |
|
port=7860, |
|
workers=workers, |
|
log_level="info", |
|
timeout_keep_alive=65, |
|
limit_concurrency=100, |
|
timeout_graceful_shutdown=30 |
|
) |