|
import os |
|
import pickle |
|
import json |
|
import hashlib |
|
from datetime import datetime |
|
from typing import Any, Dict, List, Optional |
|
import re |
|
import time |
|
import random |
|
|
|
from google.oauth2.service_account import Credentials |
|
from google.auth.transport.requests import Request |
|
from google_auth_oauthlib.flow import InstalledAppFlow |
|
from googleapiclient.discovery import build |
|
from loguru import logger |
|
|
|
from .utils import timing_decorator_sync |
|
from .constants import SHEET_RANGE |
|
|
|
SCOPES = ['https://www.googleapis.com/auth/spreadsheets'] |
|
|
|
def generate_conversation_id(user_id: str, page_id: str, timestamp: str) -> str: |
|
"""Tạo ID hội thoại duy nhất.""" |
|
hash_input = f"{user_id}:{page_id}:{timestamp}" |
|
return hashlib.sha256(hash_input.encode()).hexdigest()[:32] |
|
|
|
def _flatten_and_unique_timestamps(items: Any) -> List[Any]: |
|
""" |
|
Hàm tiện ích để làm phẳng danh sách timestamp (xử lý list lồng nhau) |
|
và loại bỏ các giá trị trùng lặp, giữ nguyên thứ tự. |
|
""" |
|
if not isinstance(items, list): |
|
return [items] |
|
|
|
flat_list = [] |
|
for item in items: |
|
if isinstance(item, list): |
|
flat_list.extend(_flatten_and_unique_timestamps(item)) |
|
else: |
|
flat_list.append(item) |
|
return list(dict.fromkeys(flat_list)) |
|
|
|
def _get_start_row_from_range(range_string: str) -> int: |
|
""" |
|
Phân tích một chuỗi range (ví dụ: 'Sheet1!A2:Z') để lấy ra số của dòng bắt đầu. |
|
""" |
|
match = re.search(r"[A-Z]+([0-9]+)", range_string) |
|
if match: |
|
try: |
|
return int(match.group(1)) |
|
except (ValueError, IndexError): |
|
pass |
|
|
|
logger.warning(f"Không thể xác định dòng bắt đầu từ range '{range_string}'. Mặc định là 2.") |
|
return 2 |
|
|
|
|
|
class SheetsClient: |
|
def __init__(self, credentials_file: str, token_file: str, sheet_id: str): |
|
self.credentials_file = credentials_file |
|
self.token_file = token_file |
|
self.sheet_id = sheet_id |
|
self.creds = None |
|
self.service = None |
|
|
|
@timing_decorator_sync |
|
def authenticate(self) -> None: |
|
"""Xác thực với Google Sheets API.""" |
|
credentials_json = os.getenv("GOOGLE_SHEETS_CREDENTIALS_JSON") |
|
if credentials_json: |
|
info = json.loads(credentials_json) |
|
self.creds = Credentials.from_service_account_info(info, scopes=SCOPES) |
|
else: |
|
if os.path.exists(self.token_file): |
|
with open(self.token_file, 'rb') as token: |
|
self.creds = pickle.load(token) |
|
if not self.creds or not self.creds.valid: |
|
if self.creds and self.creds.expired and self.creds.refresh_token: |
|
self.creds.refresh(Request()) |
|
else: |
|
flow = InstalledAppFlow.from_client_secrets_file(self.credentials_file, SCOPES) |
|
self.creds = flow.run_local_server(port=0) |
|
with open(self.token_file, 'wb') as token: |
|
pickle.dump(self.creds, token) |
|
self.service = build('sheets', 'v4', credentials=self.creds) |
|
|
|
@timing_decorator_sync |
|
def get_conversation_history(self, user_id: str, page_id: str) -> List[Dict[str, Any]]: |
|
"""Lấy lịch sử hội thoại từ sheet bằng vị trí cột cố định để đảm bảo độ tin cậy.""" |
|
logger.debug(f"[get_conversation_history] Bắt đầu lấy lịch sử cho user_id: '{user_id}' và page_id: '{page_id}'") |
|
try: |
|
if not self.service: |
|
self.authenticate() |
|
range_name = SHEET_RANGE |
|
result = self.service.spreadsheets().values().get( |
|
spreadsheetId=self.sheet_id, |
|
range=range_name |
|
).execute() |
|
values = result.get('values', []) |
|
history = [] |
|
|
|
if not values: |
|
logger.warning(f"[get_conversation_history] Không tìm thấy dữ liệu trong sheet. Range: {range_name}") |
|
return [] |
|
|
|
logger.debug(f"[get_conversation_history] Đã lấy được {len(values)} dòng từ sheet. Bắt đầu xử lý...") |
|
|
|
start_row = _get_start_row_from_range(range_name) |
|
for i, row in enumerate(values, start=start_row): |
|
if len(row) < 14: |
|
row.extend([""] * (14 - len(row))) |
|
|
|
sheet_recipient_id = row[4] |
|
sheet_page_id = row[5] |
|
|
|
if str(sheet_recipient_id) == str(user_id) and str(sheet_page_id) == str(page_id): |
|
try: |
|
timestamps_raw = json.loads(row[12]) |
|
timestamps = _flatten_and_unique_timestamps(timestamps_raw) |
|
except (json.JSONDecodeError, TypeError): |
|
timestamps = [] |
|
|
|
history.append({ |
|
'conversation_id': row[0], |
|
'originalcommand': row[1], |
|
'originalcontent': row[2], |
|
'originalattachments': json.loads(row[3]) if row[3] else [], |
|
'recipient_id': row[4], |
|
'page_id': row[5], |
|
'originaltext': row[6], |
|
'originalvehicle': row[7], |
|
'originalaction': row[8], |
|
'originalpurpose': row[9], |
|
'originalquestion': row[10], |
|
'systemresponse': row[11], |
|
'timestamp': timestamps, |
|
'isdone': str(row[13]).lower() == 'true' |
|
}) |
|
|
|
logger.debug(f"[get_conversation_history] Hoàn tất xử lý. Tìm thấy {len(history)} bản ghi lịch sử.") |
|
return history |
|
except Exception as e: |
|
logger.error(f"Error getting conversation history: {e}", exc_info=True) |
|
return [] |
|
|
|
@timing_decorator_sync |
|
def log_conversation( |
|
self, |
|
**kwargs: Any |
|
) -> Optional[Dict[str, Any]]: |
|
""" |
|
Thực hiện "UPSERT" (Update hoặc Insert) một hội thoại với logic chống trùng lặp mạnh mẽ. |
|
""" |
|
try: |
|
if not self.service: |
|
self.authenticate() |
|
|
|
|
|
sheet_name_match = re.match(r"([^!]+)!", SHEET_RANGE) |
|
sheet_name = sheet_name_match.group(1) if sheet_name_match else "Sheet1" |
|
header_range = f"{sheet_name}!A1:Z1" |
|
header_result = self.service.spreadsheets().values().get(spreadsheetId=self.sheet_id, range=header_range).execute() |
|
header = header_result.get('values', [[]])[0] |
|
if not header: |
|
logger.error(f"Không thể lấy được header từ range '{header_range}'.") |
|
return None |
|
|
|
|
|
data_result = self.service.spreadsheets().values().get(spreadsheetId=self.sheet_id, range=SHEET_RANGE).execute() |
|
values = data_result.get('values', []) |
|
|
|
|
|
recipient_id = str(kwargs.get('recipient_id')).strip() |
|
page_id = str(kwargs.get('page_id')).strip() |
|
ts_list = _flatten_and_unique_timestamps(kwargs.get('timestamp', [])) |
|
event_timestamp = str(ts_list[-1]).strip() if ts_list else '' |
|
|
|
logger.debug(f"UPSERT: Bắt đầu tìm kiếm với recipient_id='{recipient_id}', page_id='{page_id}', event_timestamp='{event_timestamp}'") |
|
|
|
|
|
found_row_index = -1 |
|
found_row_data = {} |
|
start_row = _get_start_row_from_range(SHEET_RANGE) |
|
|
|
try: |
|
id_col_idx = header.index('conversation_id') |
|
recipient_col_idx = header.index('recipient_id') |
|
page_col_idx = header.index('page_id') |
|
timestamp_col_idx = header.index('timestamp') |
|
except ValueError as e: |
|
logger.error(f"Thiếu cột bắt buộc trong header: {e}") |
|
return None |
|
|
|
target_conv_id = str(kwargs.get('conversation_id') or '').strip() |
|
if target_conv_id: |
|
logger.debug(f"UPSERT: Ưu tiên tìm bằng conversation_id='{target_conv_id}'") |
|
for i, row in enumerate(values, start=start_row): |
|
if len(row) > id_col_idx: |
|
sheet_conv_id = str(row[id_col_idx]).strip() |
|
is_match = sheet_conv_id == target_conv_id |
|
|
|
if is_match: |
|
found_row_index = i |
|
found_row_data = dict(zip(header, row)) |
|
logger.success(f"Tìm thấy bằng conversation_id tại dòng {i}.") |
|
break |
|
|
|
if found_row_index == -1: |
|
logger.debug(f"UPSERT: Không tìm thấy bằng ID, chuyển sang tìm bằng (user, page, timestamp).") |
|
for i, row in enumerate(values, start=start_row): |
|
if len(row) <= max(recipient_col_idx, page_col_idx, timestamp_col_idx): |
|
continue |
|
|
|
sheet_recipient_id = str(row[recipient_col_idx]).strip() |
|
sheet_page_id = str(row[page_col_idx]).strip() |
|
|
|
id_match = (sheet_recipient_id == recipient_id) and (sheet_page_id == page_id) |
|
|
|
|
|
if id_match: |
|
try: |
|
sheet_timestamps = [str(ts).strip() for ts in _flatten_and_unique_timestamps(json.loads(row[timestamp_col_idx]))] |
|
ts_match = event_timestamp and event_timestamp in sheet_timestamps |
|
|
|
if ts_match: |
|
found_row_index = i |
|
found_row_data = dict(zip(header, row)) |
|
logger.success(f"Tìm thấy bằng (user, page, timestamp) tại dòng {i}.") |
|
break |
|
except (json.JSONDecodeError, TypeError): |
|
continue |
|
|
|
|
|
if found_row_index != -1: |
|
|
|
logger.info(f"Đang cập nhật hội thoại tại dòng {found_row_index}") |
|
|
|
updated_data = found_row_data.copy() |
|
for key, value in kwargs.items(): |
|
if value is not None and value != '' or isinstance(value, bool): |
|
updated_data[key] = value |
|
|
|
existing_ts = _flatten_and_unique_timestamps(json.loads(found_row_data.get('timestamp', '[]'))) |
|
new_ts = _flatten_and_unique_timestamps(kwargs.get('timestamp', [])) |
|
updated_data['timestamp'] = _flatten_and_unique_timestamps(existing_ts + new_ts) |
|
|
|
row_data_to_write = [] |
|
for col_name in header: |
|
value = updated_data.get(col_name, '') |
|
if col_name in ['originalattachments', 'timestamp']: |
|
row_data_to_write.append(json.dumps(value or [])) |
|
elif col_name == 'isdone': |
|
row_data_to_write.append(str(value).lower()) |
|
else: |
|
row_data_to_write.append(str(value)) |
|
|
|
range_to_update = f"{sheet_name}!A{found_row_index}" |
|
body = {'values': [row_data_to_write]} |
|
self.service.spreadsheets().values().update(spreadsheetId=self.sheet_id, range=range_to_update, valueInputOption='RAW', body=body).execute() |
|
|
|
kwargs.update(updated_data) |
|
return kwargs |
|
else: |
|
|
|
logger.info(f"Không tìm thấy dòng khớp. Tiến hành tạo bản ghi mới.") |
|
|
|
kwargs['conversation_id'] = kwargs.get('conversation_id') or generate_conversation_id(recipient_id, page_id, event_timestamp) |
|
kwargs['timestamp'] = _flatten_and_unique_timestamps(kwargs.get('timestamp', [])) |
|
|
|
row_data_to_write = [] |
|
for col_name in header: |
|
value = kwargs.get(col_name, '') |
|
if col_name in ['originalattachments', 'timestamp']: |
|
row_data_to_write.append(json.dumps(value or [])) |
|
elif col_name == 'isdone': |
|
row_data_to_write.append(str(value).lower()) |
|
else: |
|
row_data_to_write.append(str(value)) |
|
|
|
body = {'values': [row_data_to_write]} |
|
self.service.spreadsheets().values().append(spreadsheetId=self.sheet_id, range=SHEET_RANGE, valueInputOption='RAW', insertDataOption='INSERT_ROWS', body=body).execute() |
|
|
|
return kwargs |
|
|
|
except Exception as e: |
|
logger.error(f"Lỗi khi ghi/cập nhật conversation: {e}", exc_info=True) |
|
return None |
|
|