Spaces:
Sleeping
Sleeping
File size: 22,600 Bytes
cf3160b 01d9916 4efcc29 cf3160b 01d9916 cf3160b 2ab288e cf3160b 4efcc29 2ab288e cf3160b 4efcc29 cf3160b 2ab288e cf3160b 4efcc29 cf3160b 4efcc29 cf3160b 2ab288e cf3160b 4efcc29 cf3160b 4efcc29 cf3160b 4efcc29 cf3160b 4efcc29 cf3160b 4efcc29 2ab288e 4efcc29 cf3160b 4efcc29 cf3160b 2ab288e cf3160b 2ab288e cf3160b 2ab288e cf3160b 01d9916 2ab288e 01d9916 2ab288e 01d9916 2ab288e 01d9916 4efcc29 2ab288e 4120bc6 |
|
import google.oauth2.credentials
import googleapiclient.discovery
from google.oauth2 import service_account
import json
from urllib.parse import urlparse, parse_qs
import logging # 建議使用 logging 而非 print
import string # 導入 string 模組用於轉換欄位索引
import re # 導入 re 模組
# 設定基本的 logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
class SheetService:
"""
一個用於與 Google Sheets API 互動的服務類別。
"""
SCOPES = ['https://www.googleapis.com/auth/spreadsheets']
def __init__(self, service_account_key_string, api_service_name='sheets', api_version='v4'):
"""
初始化 SheetService。
Args:
service_account_key_string (str): 包含 Google 服務帳戶憑證資訊的 JSON 字串。
通常是從 JSON 金鑰檔案讀取的內容。
api_service_name (str): 要使用的 Google API 服務名稱。預設為 'sheets'。
api_version (str): 要使用的 Google API 版本。預設為 'v4'。
"""
try:
credentials_info = json.loads(service_account_key_string)
self.credentials = service_account.Credentials.from_service_account_info(
credentials_info, scopes=self.SCOPES
)
self.service = googleapiclient.discovery.build(
api_service_name, api_version, credentials=self.credentials
)
self.sheet = self.service.spreadsheets()
logging.info("成功連接 Google Sheets API")
except json.JSONDecodeError as e:
logging.error(f"解析憑證 JSON 字串時發生錯誤: {e}")
self.service = None
self.sheet = None
except Exception as e:
logging.error(f"連接 Google Sheets API 時發生錯誤: {e}")
self.service = None
self.sheet = None
def _extract_ids_from_url(self, sheet_url):
"""
(私有方法) 從 Google Sheets URL 中提取 spreadsheetId 和 gid。
"""
spreadsheet_id = None
gid = None
# 嘗試從 URL 中提取 spreadsheetId (通常在 /d/ 和 /edit 之間)
match_id = re.search(r'/d/([a-zA-Z0-9-_]+)', sheet_url)
if match_id:
spreadsheet_id = match_id.group(1)
# 嘗試從 URL 中提取 gid (通常在 gid= 後面)
match_gid = re.search(r'[#&]gid=(\d+)', sheet_url)
if match_gid:
gid = int(match_gid.group(1)) # gid 是數字
else:
# 如果 URL 沒有 gid,通常指向第一個工作表,gid 為 0
# logging.info(f"URL 中未找到 gid,假設為第一個工作表 (gid=0): {sheet_url}")
gid = 0
return spreadsheet_id, gid
def get_sheet_id_by_url(self, sheet_url):
"""
從 Google Sheets URL 中提取試算表 ID。
"""
spreadsheet_id, _ = self._extract_ids_from_url(sheet_url)
if spreadsheet_id:
# 可以添加一些基本檢查,確保它看起來像一個 ID
if len(spreadsheet_id) > 30: # Google Sheet ID 通常很長
return spreadsheet_id
else:
logging.warning(f"從 URL 提取的 ID '{spreadsheet_id}' 看起來太短,可能無效: {sheet_url}")
return None
else:
logging.warning(f"無法從 URL 中提取有效的 Spreadsheet ID: {sheet_url}")
return None
def get_sheet_gid_by_url(self, sheet_url):
"""
從 Google Sheets URL 中提取 gid (工作表分頁 ID)。
返回整數型別的 gid 或 None (如果未指定則返回 0)。
"""
_, gid = self._extract_ids_from_url(sheet_url)
# 即使 gid 是 0 (預設),也將其視為有效的 gid
return gid
def get_sheet_name_by_gid(self, spreadsheet_id, gid):
"""
使用 spreadsheetId 和 gid 獲取工作表名稱 (title)。
如果 gid 為 None,則返回第一個工作表的名稱 (gid=0)。
"""
if not self.service:
logging.error("Sheet API 服務未成功初始化。")
return None
# 如果傳入的 gid 是 None,我們將其視為 0
target_gid = gid if gid is not None else 0
try:
# 使用 spreadsheets.get 獲取試算表的中繼資料
# fields 參數限制只返回我們需要的 sheets.properties (包含 title 和 sheetId)
sheet_metadata = self.service.spreadsheets().get(
spreadsheetId=spreadsheet_id,
fields='sheets(properties(sheetId,title))'
).execute()
sheets = sheet_metadata.get('sheets', [])
if not sheets:
logging.warning(f"試算表 {spreadsheet_id} 中沒有找到任何工作表。")
return None
for sheet in sheets:
properties = sheet.get('properties', {})
if properties.get('sheetId') == target_gid:
sheet_title = properties.get('title')
logging.info(f"找到 gid={target_gid} 對應的工作表名稱: '{sheet_title}'")
return sheet_title
# 如果循環結束還沒找到
logging.warning(f"在試算表 {spreadsheet_id} 中未找到 gid={target_gid} 對應的工作表。")
# 如果 gid 是 0 但找不到,可能試算表是空的或有問題,返回第一個工作表的名稱作為備選
if target_gid == 0 and sheets:
first_sheet_title = sheets[0].get('properties', {}).get('title')
logging.warning(f"返回第一個工作表的名稱 '{first_sheet_title}' 作為 gid=0 的備選。")
return first_sheet_title
return None
except Exception as e:
logging.error(f"獲取工作表名稱時發生 API 錯誤 (ID: {spreadsheet_id}, GID: {target_gid}): {e}")
return None
def get_sheet_name_by_url(self, sheet_url):
"""
從 Google Sheets URL 中提取 gid 並獲取對應的工作表名稱。
"""
spreadsheet_id, gid = self._extract_ids_from_url(sheet_url)
if not spreadsheet_id:
logging.error(f"無法從 URL {sheet_url} 中提取 Spreadsheet ID。")
return None
# 直接調用已有的 get_sheet_name_by_gid 方法
return self.get_sheet_name_by_gid(spreadsheet_id, gid)
def get_sheet_data_by_url(self, sheet_url, read_range=None):
"""
通過 Google Sheets URL 自動獲取 Spreadsheet ID 和工作表名稱,並讀取數據。
如果 URL 中包含 gid,則讀取對應的工作表;否則讀取第一個工作表。
默認讀取整個工作表的數據。
Args:
sheet_url (str): Google 試算表的完整 URL。
read_range (str | None): 可選。指定要讀取的儲存格範圍 (例如 'A1:C10')。
如果提供,則只讀取此範圍;否則讀取整個工作表。
Returns:
list | None: 包含讀取到的資料的列表 (list of lists),如果發生錯誤則返回 None。
"""
spreadsheet_id = self.get_sheet_id_by_url(sheet_url)
if not spreadsheet_id:
logging.error("無法從 URL 獲取 Spreadsheet ID。")
return None
gid = self.get_sheet_gid_by_url(sheet_url)
# 無論 gid 是否為 None,都嘗試獲取工作表名稱
sheet_name = self.get_sheet_name_by_gid(spreadsheet_id, gid)
if not sheet_name:
logging.error(f"無法根據 URL ({sheet_url}) 確定要讀取的工作表名稱。")
return None
# 組合 range_name
if read_range:
# 如果使用者指定了範圍,將其與工作表名稱結合
# 需要確保工作表名稱不包含特殊字符,或者正確引用
# 簡單起見,如果名稱包含空格或特殊符號,用單引號括起來
if ' ' in sheet_name or '!' in sheet_name or ':' in sheet_name:
range_name = f"'{sheet_name}'!{read_range}"
else:
range_name = f"{sheet_name}!{read_range}"
else:
# 如果未指定範圍,則讀取整個工作表
# 只需要提供工作表名稱即可
if ' ' in sheet_name or '!' in sheet_name or ':' in sheet_name:
range_name = f"'{sheet_name}'"
else:
range_name = sheet_name
logging.info(f"準備從試算表 '{spreadsheet_id}' 的 '{range_name}' 範圍讀取數據。")
# 使用現有的 get_sheet_value 方法讀取數據
return self.get_sheet_value(spreadsheet_id, range_name)
def get_sheet_value(self, spreadsheet_id, range_name):
"""
從指定的試算表和範圍讀取資料。
Args:
spreadsheet_id (str): Google 試算表的 ID。
range_name (str): 要讀取的範圍,例如 'Sheet1!A1:B2' 或僅 'Sheet1' (讀取整個工作表)。
Returns:
list | None: 包含讀取到的資料的列表 (list of lists),如果發生錯誤則返回 None。
"""
if not self.sheet:
logging.error("Sheet API 服務未成功初始化。")
return None
try:
logging.info(f"正在讀取 Spreadsheet ID: {spreadsheet_id}, Range: {range_name}")
result = self.sheet.values().get(
spreadsheetId=spreadsheet_id,
range=range_name
).execute()
values = result.get('values', [])
logging.info(f"成功從 {spreadsheet_id} 的 {range_name} 讀取 {len(values)} 列資料。")
# 如果 values 是 None 或空列表,直接返回
if not values:
logging.warning(f"在 {spreadsheet_id} 的 {range_name} 範圍內未找到任何資料。")
return [] # 返回空列表而不是 None,以便後續處理
return values
except googleapiclient.errors.HttpError as error:
# 更詳細地記錄錯誤信息
error_details = error.resp.get('content', '{}')
try:
error_json = json.loads(error_details)
error_message = error_json.get('error', {}).get('message', str(error))
except json.JSONDecodeError:
error_message = str(error)
logging.error(f"讀取試算表時發生 API 錯誤 (ID: {spreadsheet_id}, Range: {range_name}): {error_message}")
return None
except Exception as e:
logging.error(f"讀取試算表時發生未知錯誤 (ID: {spreadsheet_id}, Range: {range_name}): {e}")
return None
@staticmethod
def flatten_column_data(data):
"""
將從 Google Sheets API 獲取的單欄數據(列表的列表)扁平化為單一列表。
例如,將 [['A'], ['B'], ['C']] 轉換為 ['A', 'B', 'C']。
此方法會跳過空的內部列表,並假設每個非空內部列表只取第一個元素。
Args:
data (list[list[str]]): 從 API 獲取的原始數據,通常是 list of lists。
Returns:
list[str]: 包含所有第一欄元素的單一列表。如果輸入為 None 或空列表,
則返回空列表。
"""
if not data:
return []
# 使用列表推導式,提取每個子列表的第一個元素
# 添加 if sublist and sublist[0] is not None 確保子列表非空且第一個元素存在
# 並將其轉換為字串 str() 以確保類型一致性
flattened = [str(sublist[0]) for sublist in data if sublist and sublist[0] is not None]
return flattened
def update_sheet_data_by_url(self, sheet_url, data):
spreadsheet_id = self.get_sheet_id_by_url(sheet_url)
self.sheet.values().update(
spreadsheetId=spreadsheet_id,
range='A1:Z1000',
valueInputOption='USER_ENTERED',
body={'values': data}
).execute()
return True
@staticmethod
def _col_index_to_letter(col_index):
"""將 0-based 的欄位索引轉換為 A1 表示法的字母。"""
if col_index < 0:
raise ValueError("Column index must be non-negative")
letters = ''
while col_index >= 0:
col_index, remainder = divmod(col_index, 26)
letters = string.ascii_uppercase[remainder] + letters
col_index -= 1 # 因為 divmod 是 0-based,但 A=1, Z=26, AA=27 的轉換需要調整
if col_index < -1: # 修正邊界條件
break
# 修正:上面的邏輯有點複雜且可能有誤,改用更簡單的方式
letters = ''
dividend = col_index + 1 # 轉為 1-based
while dividend > 0:
module = (dividend - 1) % 26
letters = string.ascii_uppercase[module] + letters
dividend = (dividend - module) // 26
return letters if letters else "A" # 確保至少返回 "A"
@staticmethod
def _col_index_to_letter_simple(col_index):
"""將 0-based 的欄位索引轉換為 A1 表示法的字母 (簡化版,適用於 A-ZZ)。"""
if col_index < 0:
raise ValueError("Column index must be non-negative")
letters = ""
while col_index >= 0:
letters = string.ascii_uppercase[col_index % 26] + letters
col_index = col_index // 26 - 1
return letters
# SHEET_SERVICE.update_sheet_cell(sheet_url, target_row_index, qa_col_index, qa_result)
def update_sheet_cell(self, sheet_url, target_row_index_in_data, qa_col_index, qa_result):
"""
更新指定 URL 的 Google Sheet 中的單一儲存格。
Args:
sheet_url (str): Google 試算表的完整 URL。
target_row_index_in_data (int): 目標列在 get_sheet_data_by_url 返回的列表中的索引
(從 1 開始算,因為 0 是標頭)。
qa_col_index (int): 目標欄位的索引 (從 0 開始算)。
qa_result (str): 要寫入儲存格的值。
Returns:
bool: 更新是否成功。
"""
if not self.sheet:
logging.error("Sheet API 服務未成功初始化。")
return False
spreadsheet_id = self.get_sheet_id_by_url(sheet_url)
if not spreadsheet_id:
logging.error(f"無法從 URL 獲取 Spreadsheet ID: {sheet_url}")
return False
gid = self.get_sheet_gid_by_url(sheet_url)
sheet_name = self.get_sheet_name_by_gid(spreadsheet_id, gid)
if not sheet_name:
logging.error(f"無法根據 URL ({sheet_url}) 確定要更新的工作表名稱。")
return False
# 將 0-based 的欄位索引轉換為字母
try:
col_letter = self._col_index_to_letter_simple(qa_col_index)
except ValueError as e:
logging.error(f"無效的欄位索引 {qa_col_index}: {e}")
return False
# 計算實際的工作表列號
# target_row_index_in_data 是 sheet_data 中的索引 (1-based)
# 實際列號是 target_row_index_in_data + 1 (因為工作表通常從第 1 列開始)
actual_sheet_row = target_row_index_in_data + 1
# 構建 A1 表示法的範圍,例如 '測試'!F3
# 需要正確處理工作表名稱中的特殊字符(例如空格)
if ' ' in sheet_name or '!' in sheet_name or ':' in sheet_name or "'" in sheet_name:
# 修正:使用三引號定義 f-string 以避免引號衝突
safe_sheet_name = f"""'{sheet_name.replace("'", "''")}'""" # 單引號用兩個單引號轉義
else:
safe_sheet_name = sheet_name
range_name = f"{safe_sheet_name}!{col_letter}{actual_sheet_row}"
logging.info(f"準備更新 Spreadsheet ID: {spreadsheet_id}, Range: {range_name}, Value: {qa_result}")
try:
body = {
'values': [[qa_result]] # 更新單一儲存格的值需要是 list of lists
}
result = self.sheet.values().update(
spreadsheetId=spreadsheet_id,
range=range_name,
valueInputOption='USER_ENTERED', # 或者 'RAW' 如果你不需要 Google Sheets 解釋輸入
body=body
).execute()
logging.info(f"成功更新儲存格 {range_name}。 更新了 {result.get('updatedCells')} 個儲存格。")
return True
except googleapiclient.errors.HttpError as error:
# 記錄更詳細的錯誤
error_details = error.resp.get('content', '{}')
try:
error_json = json.loads(error_details)
error_message = error_json.get('error', {}).get('message', str(error))
except json.JSONDecodeError:
error_message = str(error)
logging.error(f"更新儲存格時發生 API 錯誤 (ID: {spreadsheet_id}, Range: {range_name}): {error_message}")
# 檢查是否是權限錯誤
if error.resp.status == 403:
logging.error("錯誤 403:權限不足。請檢查服務帳戶是否有目標工作表的編輯權限,以及 API 金鑰是否啟用了正確的範圍 (scopes)。")
return False
except Exception as e:
logging.error(f"更新儲存格時發生未知錯誤 (ID: {spreadsheet_id}, Range: {range_name}): {e}")
return False
def batch_update_cells(self, sheet_url, update_data):
"""
批次更新 Google Sheet 中的多個儲存格。
Args:
sheet_url (str): Google Sheet 的 URL。
update_data (list[dict]): 一個字典列表,每個字典應包含:
'range' (str): 要更新的範圍 (例如 '工作表名!A1')
'values' (list[list[str]]): 要寫入的值 (二維列表, 例如 [['value1']])
Returns:
dict | None: Google API 的返回結果,如果成功。否則返回 None。
"""
spreadsheet_id, _ = self._extract_ids_from_url(sheet_url)
if not spreadsheet_id:
logging.error(f"錯誤:無法從 URL {sheet_url} 中提取 Spreadsheet ID 進行批次更新。")
return None
if not self.service:
logging.error("Sheet API 服務未成功初始化。")
return None
if not update_data:
logging.warning("沒有需要批次更新的資料。")
return None
body = {
'valueInputOption': 'USER_ENTERED', # 或者 'RAW'
'data': []
}
valid_updates_count = 0 # 計算實際加入請求的數量
for item in update_data:
# 修正 1: 檢查 'values' 而不是 'value',並檢查 'values' 是否為列表
if 'range' not in item or 'values' not in item or not isinstance(item['values'], list):
logging.warning(f"跳過格式錯誤的更新項目 (缺少 'range' 或 'values' 非列表): {item}")
continue
# 修正 2: 稍微改進範圍檢查,確保 '!' 後面有內容
range_parts = item['range'].split('!', 1) # 最多分割一次
if len(range_parts) != 2 or not range_parts[0] or not range_parts[1]:
logging.warning(f"跳過範圍格式錯誤的更新項目 (格式應為 'SheetName!A1'): {item['range']}")
continue
# 修正 3: 直接使用 item['values'],它本身就應該是二維列表
body['data'].append({
'range': item['range'],
'values': item['values'] # 直接使用傳入的二維列表
})
valid_updates_count += 1
# 修正 4: 根據實際加入請求的數量判斷是否繼續
if not body['data']: # 或者 if valid_updates_count == 0:
logging.warning("經過驗證後,沒有有效的資料可以進行批次更新。")
# 即使沒有資料,也返回一個表示「沒有執行」的狀態,而不是 None,避免呼叫方誤判
# 或者可以返回一個特定的物件或空字典來表示未執行
return {"totalUpdatedCells": 0, "replies": [], "message": "No valid data to update after validation."} # 返回模擬的回應
try:
# logging.info(f"準備批次更新 Spreadsheet ID: {spreadsheet_id},共 {len(body['data'])} 個儲存格...")
# 使用 valid_updates_count 進行日誌記錄更準確
logging.info(f"準備批次更新 Spreadsheet ID: {spreadsheet_id},共 {valid_updates_count} 個有效儲存格...")
result = self.service.spreadsheets().values().batchUpdate(
spreadsheetId=spreadsheet_id,
body=body
).execute()
total_updated = result.get('totalUpdatedCells', 0)
logging.info(f"批次更新完成。API 回報共更新了 {total_updated} 個儲存格。")
# 可以在這裡比較 total_updated 和 valid_updates_count
if total_updated != valid_updates_count:
logging.warning(f"預期更新 {valid_updates_count} 個儲存格,但 API 回報更新了 {total_updated} 個。")
return result # 返回實際的 API 回應
except googleapiclient.errors.HttpError as error: # 更具體地捕獲 HttpError
error_details = error.resp.get('content', '{}')
try:
error_json = json.loads(error_details)
error_message = error_json.get('error', {}).get('message', str(error))
except json.JSONDecodeError:
error_message = str(error)
logging.error(f"批次更新儲存格時發生 API 錯誤 (ID: {spreadsheet_id}): {error_message}")
logging.error(f"錯誤詳情: {error.content.decode()}") # 打印詳細錯誤內容
return None # API 錯誤返回 None
except Exception as e:
logging.error(f"批次更新儲存格時發生未知錯誤: {e}")
import traceback
traceback.print_exc()
return None # 其他錯誤返回 None |