vaitor2 / sheet_service.py
youngtsai's picture
def style
2ab288e
import google.oauth2.credentials
import googleapiclient.discovery
from google.oauth2 import service_account
import json
from urllib.parse import urlparse, parse_qs
import logging # 建議使用 logging 而非 print
import string # 導入 string 模組用於轉換欄位索引
import re # 導入 re 模組
# 設定基本的 logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class SheetService:
"""
一個用於與 Google Sheets API 互動的服務類別。
"""
SCOPES = ['https://www.googleapis.com/auth/spreadsheets']
def __init__(self, service_account_key_string, api_service_name='sheets', api_version='v4'):
"""
初始化 SheetService。
Args:
service_account_key_string (str): 包含 Google 服務帳戶憑證資訊的 JSON 字串。
通常是從 JSON 金鑰檔案讀取的內容。
api_service_name (str): 要使用的 Google API 服務名稱。預設為 'sheets'。
api_version (str): 要使用的 Google API 版本。預設為 'v4'。
"""
try:
credentials_info = json.loads(service_account_key_string)
self.credentials = service_account.Credentials.from_service_account_info(
credentials_info, scopes=self.SCOPES
)
self.service = googleapiclient.discovery.build(
api_service_name, api_version, credentials=self.credentials
)
self.sheet = self.service.spreadsheets()
logging.info("成功連接 Google Sheets API")
except json.JSONDecodeError as e:
logging.error(f"解析憑證 JSON 字串時發生錯誤: {e}")
self.service = None
self.sheet = None
except Exception as e:
logging.error(f"連接 Google Sheets API 時發生錯誤: {e}")
self.service = None
self.sheet = None
def _extract_ids_from_url(self, sheet_url):
"""
(私有方法) 從 Google Sheets URL 中提取 spreadsheetId 和 gid。
"""
spreadsheet_id = None
gid = None
# 嘗試從 URL 中提取 spreadsheetId (通常在 /d/ 和 /edit 之間)
match_id = re.search(r'/d/([a-zA-Z0-9-_]+)', sheet_url)
if match_id:
spreadsheet_id = match_id.group(1)
# 嘗試從 URL 中提取 gid (通常在 gid= 後面)
match_gid = re.search(r'[#&]gid=(\d+)', sheet_url)
if match_gid:
gid = int(match_gid.group(1)) # gid 是數字
else:
# 如果 URL 沒有 gid,通常指向第一個工作表,gid 為 0
# logging.info(f"URL 中未找到 gid,假設為第一個工作表 (gid=0): {sheet_url}")
gid = 0
return spreadsheet_id, gid
def get_sheet_id_by_url(self, sheet_url):
"""
從 Google Sheets URL 中提取試算表 ID。
"""
spreadsheet_id, _ = self._extract_ids_from_url(sheet_url)
if spreadsheet_id:
# 可以添加一些基本檢查,確保它看起來像一個 ID
if len(spreadsheet_id) > 30: # Google Sheet ID 通常很長
return spreadsheet_id
else:
logging.warning(f"從 URL 提取的 ID '{spreadsheet_id}' 看起來太短,可能無效: {sheet_url}")
return None
else:
logging.warning(f"無法從 URL 中提取有效的 Spreadsheet ID: {sheet_url}")
return None
def get_sheet_gid_by_url(self, sheet_url):
"""
從 Google Sheets URL 中提取 gid (工作表分頁 ID)。
返回整數型別的 gid 或 None (如果未指定則返回 0)。
"""
_, gid = self._extract_ids_from_url(sheet_url)
# 即使 gid 是 0 (預設),也將其視為有效的 gid
return gid
def get_sheet_name_by_gid(self, spreadsheet_id, gid):
"""
使用 spreadsheetId 和 gid 獲取工作表名稱 (title)。
如果 gid 為 None,則返回第一個工作表的名稱 (gid=0)。
"""
if not self.service:
logging.error("Sheet API 服務未成功初始化。")
return None
# 如果傳入的 gid 是 None,我們將其視為 0
target_gid = gid if gid is not None else 0
try:
# 使用 spreadsheets.get 獲取試算表的中繼資料
# fields 參數限制只返回我們需要的 sheets.properties (包含 title 和 sheetId)
sheet_metadata = self.service.spreadsheets().get(
spreadsheetId=spreadsheet_id,
fields='sheets(properties(sheetId,title))'
).execute()
sheets = sheet_metadata.get('sheets', [])
if not sheets:
logging.warning(f"試算表 {spreadsheet_id} 中沒有找到任何工作表。")
return None
for sheet in sheets:
properties = sheet.get('properties', {})
if properties.get('sheetId') == target_gid:
sheet_title = properties.get('title')
logging.info(f"找到 gid={target_gid} 對應的工作表名稱: '{sheet_title}'")
return sheet_title
# 如果循環結束還沒找到
logging.warning(f"在試算表 {spreadsheet_id} 中未找到 gid={target_gid} 對應的工作表。")
# 如果 gid 是 0 但找不到,可能試算表是空的或有問題,返回第一個工作表的名稱作為備選
if target_gid == 0 and sheets:
first_sheet_title = sheets[0].get('properties', {}).get('title')
logging.warning(f"返回第一個工作表的名稱 '{first_sheet_title}' 作為 gid=0 的備選。")
return first_sheet_title
return None
except Exception as e:
logging.error(f"獲取工作表名稱時發生 API 錯誤 (ID: {spreadsheet_id}, GID: {target_gid}): {e}")
return None
def get_sheet_name_by_url(self, sheet_url):
"""
從 Google Sheets URL 中提取 gid 並獲取對應的工作表名稱。
"""
spreadsheet_id, gid = self._extract_ids_from_url(sheet_url)
if not spreadsheet_id:
logging.error(f"無法從 URL {sheet_url} 中提取 Spreadsheet ID。")
return None
# 直接調用已有的 get_sheet_name_by_gid 方法
return self.get_sheet_name_by_gid(spreadsheet_id, gid)
def get_sheet_data_by_url(self, sheet_url, read_range=None):
"""
通過 Google Sheets URL 自動獲取 Spreadsheet ID 和工作表名稱,並讀取數據。
如果 URL 中包含 gid,則讀取對應的工作表;否則讀取第一個工作表。
默認讀取整個工作表的數據。
Args:
sheet_url (str): Google 試算表的完整 URL。
read_range (str | None): 可選。指定要讀取的儲存格範圍 (例如 'A1:C10')。
如果提供,則只讀取此範圍;否則讀取整個工作表。
Returns:
list | None: 包含讀取到的資料的列表 (list of lists),如果發生錯誤則返回 None。
"""
spreadsheet_id = self.get_sheet_id_by_url(sheet_url)
if not spreadsheet_id:
logging.error("無法從 URL 獲取 Spreadsheet ID。")
return None
gid = self.get_sheet_gid_by_url(sheet_url)
# 無論 gid 是否為 None,都嘗試獲取工作表名稱
sheet_name = self.get_sheet_name_by_gid(spreadsheet_id, gid)
if not sheet_name:
logging.error(f"無法根據 URL ({sheet_url}) 確定要讀取的工作表名稱。")
return None
# 組合 range_name
if read_range:
# 如果使用者指定了範圍,將其與工作表名稱結合
# 需要確保工作表名稱不包含特殊字符,或者正確引用
# 簡單起見,如果名稱包含空格或特殊符號,用單引號括起來
if ' ' in sheet_name or '!' in sheet_name or ':' in sheet_name:
range_name = f"'{sheet_name}'!{read_range}"
else:
range_name = f"{sheet_name}!{read_range}"
else:
# 如果未指定範圍,則讀取整個工作表
# 只需要提供工作表名稱即可
if ' ' in sheet_name or '!' in sheet_name or ':' in sheet_name:
range_name = f"'{sheet_name}'"
else:
range_name = sheet_name
logging.info(f"準備從試算表 '{spreadsheet_id}' 的 '{range_name}' 範圍讀取數據。")
# 使用現有的 get_sheet_value 方法讀取數據
return self.get_sheet_value(spreadsheet_id, range_name)
def get_sheet_value(self, spreadsheet_id, range_name):
"""
從指定的試算表和範圍讀取資料。
Args:
spreadsheet_id (str): Google 試算表的 ID。
range_name (str): 要讀取的範圍,例如 'Sheet1!A1:B2' 或僅 'Sheet1' (讀取整個工作表)。
Returns:
list | None: 包含讀取到的資料的列表 (list of lists),如果發生錯誤則返回 None。
"""
if not self.sheet:
logging.error("Sheet API 服務未成功初始化。")
return None
try:
logging.info(f"正在讀取 Spreadsheet ID: {spreadsheet_id}, Range: {range_name}")
result = self.sheet.values().get(
spreadsheetId=spreadsheet_id,
range=range_name
).execute()
values = result.get('values', [])
logging.info(f"成功從 {spreadsheet_id}{range_name} 讀取 {len(values)} 列資料。")
# 如果 values 是 None 或空列表,直接返回
if not values:
logging.warning(f"在 {spreadsheet_id}{range_name} 範圍內未找到任何資料。")
return [] # 返回空列表而不是 None,以便後續處理
return values
except googleapiclient.errors.HttpError as error:
# 更詳細地記錄錯誤信息
error_details = error.resp.get('content', '{}')
try:
error_json = json.loads(error_details)
error_message = error_json.get('error', {}).get('message', str(error))
except json.JSONDecodeError:
error_message = str(error)
logging.error(f"讀取試算表時發生 API 錯誤 (ID: {spreadsheet_id}, Range: {range_name}): {error_message}")
return None
except Exception as e:
logging.error(f"讀取試算表時發生未知錯誤 (ID: {spreadsheet_id}, Range: {range_name}): {e}")
return None
@staticmethod
def flatten_column_data(data):
"""
將從 Google Sheets API 獲取的單欄數據(列表的列表)扁平化為單一列表。
例如,將 [['A'], ['B'], ['C']] 轉換為 ['A', 'B', 'C']。
此方法會跳過空的內部列表,並假設每個非空內部列表只取第一個元素。
Args:
data (list[list[str]]): 從 API 獲取的原始數據,通常是 list of lists。
Returns:
list[str]: 包含所有第一欄元素的單一列表。如果輸入為 None 或空列表,
則返回空列表。
"""
if not data:
return []
# 使用列表推導式,提取每個子列表的第一個元素
# 添加 if sublist and sublist[0] is not None 確保子列表非空且第一個元素存在
# 並將其轉換為字串 str() 以確保類型一致性
flattened = [str(sublist[0]) for sublist in data if sublist and sublist[0] is not None]
return flattened
def update_sheet_data_by_url(self, sheet_url, data):
spreadsheet_id = self.get_sheet_id_by_url(sheet_url)
self.sheet.values().update(
spreadsheetId=spreadsheet_id,
range='A1:Z1000',
valueInputOption='USER_ENTERED',
body={'values': data}
).execute()
return True
@staticmethod
def _col_index_to_letter(col_index):
"""將 0-based 的欄位索引轉換為 A1 表示法的字母。"""
if col_index < 0:
raise ValueError("Column index must be non-negative")
letters = ''
while col_index >= 0:
col_index, remainder = divmod(col_index, 26)
letters = string.ascii_uppercase[remainder] + letters
col_index -= 1 # 因為 divmod 是 0-based,但 A=1, Z=26, AA=27 的轉換需要調整
if col_index < -1: # 修正邊界條件
break
# 修正:上面的邏輯有點複雜且可能有誤,改用更簡單的方式
letters = ''
dividend = col_index + 1 # 轉為 1-based
while dividend > 0:
module = (dividend - 1) % 26
letters = string.ascii_uppercase[module] + letters
dividend = (dividend - module) // 26
return letters if letters else "A" # 確保至少返回 "A"
@staticmethod
def _col_index_to_letter_simple(col_index):
"""將 0-based 的欄位索引轉換為 A1 表示法的字母 (簡化版,適用於 A-ZZ)。"""
if col_index < 0:
raise ValueError("Column index must be non-negative")
letters = ""
while col_index >= 0:
letters = string.ascii_uppercase[col_index % 26] + letters
col_index = col_index // 26 - 1
return letters
# SHEET_SERVICE.update_sheet_cell(sheet_url, target_row_index, qa_col_index, qa_result)
def update_sheet_cell(self, sheet_url, target_row_index_in_data, qa_col_index, qa_result):
"""
更新指定 URL 的 Google Sheet 中的單一儲存格。
Args:
sheet_url (str): Google 試算表的完整 URL。
target_row_index_in_data (int): 目標列在 get_sheet_data_by_url 返回的列表中的索引
(從 1 開始算,因為 0 是標頭)。
qa_col_index (int): 目標欄位的索引 (從 0 開始算)。
qa_result (str): 要寫入儲存格的值。
Returns:
bool: 更新是否成功。
"""
if not self.sheet:
logging.error("Sheet API 服務未成功初始化。")
return False
spreadsheet_id = self.get_sheet_id_by_url(sheet_url)
if not spreadsheet_id:
logging.error(f"無法從 URL 獲取 Spreadsheet ID: {sheet_url}")
return False
gid = self.get_sheet_gid_by_url(sheet_url)
sheet_name = self.get_sheet_name_by_gid(spreadsheet_id, gid)
if not sheet_name:
logging.error(f"無法根據 URL ({sheet_url}) 確定要更新的工作表名稱。")
return False
# 將 0-based 的欄位索引轉換為字母
try:
col_letter = self._col_index_to_letter_simple(qa_col_index)
except ValueError as e:
logging.error(f"無效的欄位索引 {qa_col_index}: {e}")
return False
# 計算實際的工作表列號
# target_row_index_in_data 是 sheet_data 中的索引 (1-based)
# 實際列號是 target_row_index_in_data + 1 (因為工作表通常從第 1 列開始)
actual_sheet_row = target_row_index_in_data + 1
# 構建 A1 表示法的範圍,例如 '測試'!F3
# 需要正確處理工作表名稱中的特殊字符(例如空格)
if ' ' in sheet_name or '!' in sheet_name or ':' in sheet_name or "'" in sheet_name:
# 修正:使用三引號定義 f-string 以避免引號衝突
safe_sheet_name = f"""'{sheet_name.replace("'", "''")}'""" # 單引號用兩個單引號轉義
else:
safe_sheet_name = sheet_name
range_name = f"{safe_sheet_name}!{col_letter}{actual_sheet_row}"
logging.info(f"準備更新 Spreadsheet ID: {spreadsheet_id}, Range: {range_name}, Value: {qa_result}")
try:
body = {
'values': [[qa_result]] # 更新單一儲存格的值需要是 list of lists
}
result = self.sheet.values().update(
spreadsheetId=spreadsheet_id,
range=range_name,
valueInputOption='USER_ENTERED', # 或者 'RAW' 如果你不需要 Google Sheets 解釋輸入
body=body
).execute()
logging.info(f"成功更新儲存格 {range_name}。 更新了 {result.get('updatedCells')} 個儲存格。")
return True
except googleapiclient.errors.HttpError as error:
# 記錄更詳細的錯誤
error_details = error.resp.get('content', '{}')
try:
error_json = json.loads(error_details)
error_message = error_json.get('error', {}).get('message', str(error))
except json.JSONDecodeError:
error_message = str(error)
logging.error(f"更新儲存格時發生 API 錯誤 (ID: {spreadsheet_id}, Range: {range_name}): {error_message}")
# 檢查是否是權限錯誤
if error.resp.status == 403:
logging.error("錯誤 403:權限不足。請檢查服務帳戶是否有目標工作表的編輯權限,以及 API 金鑰是否啟用了正確的範圍 (scopes)。")
return False
except Exception as e:
logging.error(f"更新儲存格時發生未知錯誤 (ID: {spreadsheet_id}, Range: {range_name}): {e}")
return False
def batch_update_cells(self, sheet_url, update_data):
"""
批次更新 Google Sheet 中的多個儲存格。
Args:
sheet_url (str): Google Sheet 的 URL。
update_data (list[dict]): 一個字典列表,每個字典應包含:
'range' (str): 要更新的範圍 (例如 '工作表名!A1')
'values' (list[list[str]]): 要寫入的值 (二維列表, 例如 [['value1']])
Returns:
dict | None: Google API 的返回結果,如果成功。否則返回 None。
"""
spreadsheet_id, _ = self._extract_ids_from_url(sheet_url)
if not spreadsheet_id:
logging.error(f"錯誤:無法從 URL {sheet_url} 中提取 Spreadsheet ID 進行批次更新。")
return None
if not self.service:
logging.error("Sheet API 服務未成功初始化。")
return None
if not update_data:
logging.warning("沒有需要批次更新的資料。")
return None
body = {
'valueInputOption': 'USER_ENTERED', # 或者 'RAW'
'data': []
}
valid_updates_count = 0 # 計算實際加入請求的數量
for item in update_data:
# 修正 1: 檢查 'values' 而不是 'value',並檢查 'values' 是否為列表
if 'range' not in item or 'values' not in item or not isinstance(item['values'], list):
logging.warning(f"跳過格式錯誤的更新項目 (缺少 'range' 或 'values' 非列表): {item}")
continue
# 修正 2: 稍微改進範圍檢查,確保 '!' 後面有內容
range_parts = item['range'].split('!', 1) # 最多分割一次
if len(range_parts) != 2 or not range_parts[0] or not range_parts[1]:
logging.warning(f"跳過範圍格式錯誤的更新項目 (格式應為 'SheetName!A1'): {item['range']}")
continue
# 修正 3: 直接使用 item['values'],它本身就應該是二維列表
body['data'].append({
'range': item['range'],
'values': item['values'] # 直接使用傳入的二維列表
})
valid_updates_count += 1
# 修正 4: 根據實際加入請求的數量判斷是否繼續
if not body['data']: # 或者 if valid_updates_count == 0:
logging.warning("經過驗證後,沒有有效的資料可以進行批次更新。")
# 即使沒有資料,也返回一個表示「沒有執行」的狀態,而不是 None,避免呼叫方誤判
# 或者可以返回一個特定的物件或空字典來表示未執行
return {"totalUpdatedCells": 0, "replies": [], "message": "No valid data to update after validation."} # 返回模擬的回應
try:
# logging.info(f"準備批次更新 Spreadsheet ID: {spreadsheet_id},共 {len(body['data'])} 個儲存格...")
# 使用 valid_updates_count 進行日誌記錄更準確
logging.info(f"準備批次更新 Spreadsheet ID: {spreadsheet_id},共 {valid_updates_count} 個有效儲存格...")
result = self.service.spreadsheets().values().batchUpdate(
spreadsheetId=spreadsheet_id,
body=body
).execute()
total_updated = result.get('totalUpdatedCells', 0)
logging.info(f"批次更新完成。API 回報共更新了 {total_updated} 個儲存格。")
# 可以在這裡比較 total_updated 和 valid_updates_count
if total_updated != valid_updates_count:
logging.warning(f"預期更新 {valid_updates_count} 個儲存格,但 API 回報更新了 {total_updated} 個。")
return result # 返回實際的 API 回應
except googleapiclient.errors.HttpError as error: # 更具體地捕獲 HttpError
error_details = error.resp.get('content', '{}')
try:
error_json = json.loads(error_details)
error_message = error_json.get('error', {}).get('message', str(error))
except json.JSONDecodeError:
error_message = str(error)
logging.error(f"批次更新儲存格時發生 API 錯誤 (ID: {spreadsheet_id}): {error_message}")
logging.error(f"錯誤詳情: {error.content.decode()}") # 打印詳細錯誤內容
return None # API 錯誤返回 None
except Exception as e:
logging.error(f"批次更新儲存格時發生未知錯誤: {e}")
import traceback
traceback.print_exc()
return None # 其他錯誤返回 None