Spaces:
Sleeping
Sleeping
import os | |
from oauth2client.service_account import ServiceAccountCredentials | |
import gspread | |
from dotenv import load_dotenv | |
from enviroments.convert import get_json_from_env_var | |
from typing import Optional, List | |
load_dotenv(override=True) | |
class SheetManager: | |
def __init__(self, spreadsheet_url: Optional[str] = None, | |
worksheet_name: str = "flag", | |
column_name: str = "huggingface_id"): | |
""" | |
Initialize SheetManager with Google Sheets credentials and connection. | |
Args: | |
spreadsheet_url (str, optional): URL of the Google Spreadsheet. | |
If None, takes from environment variable. | |
worksheet_name (str): Name of the worksheet to operate on. | |
Defaults to "flag". | |
column_name (str): Name of the column to operate on. | |
Defaults to "huggingface_id". | |
""" | |
self.spreadsheet_url = spreadsheet_url or os.getenv("SPREADSHEET_URL") | |
if not self.spreadsheet_url: | |
raise ValueError("Spreadsheet URL not provided and not found in environment variables") | |
self.worksheet_name = worksheet_name | |
self.column_name = column_name | |
# Initialize credentials and client | |
self._init_google_client() | |
# Initialize sheet connection | |
self.doc = None | |
self.sheet = None | |
self.col_index = None | |
self._connect_to_sheet(validate_column=True) | |
def _init_google_client(self): | |
"""Initialize Google Sheets client with credentials.""" | |
scope = ['https://spreadsheets.google.com/feeds', | |
'https://www.googleapis.com/auth/drive'] | |
json_key_dict = get_json_from_env_var("GOOGLE_CREDENTIALS") | |
credentials = ServiceAccountCredentials.from_json_keyfile_dict(json_key_dict, scope) | |
self.client = gspread.authorize(credentials) | |
def _connect_to_sheet(self, validate_column: bool = True): | |
""" | |
Connect to the specified Google Sheet and initialize necessary attributes. | |
Args: | |
validate_column (bool): Whether to validate the column name exists | |
""" | |
try: | |
self.doc = self.client.open_by_url(self.spreadsheet_url) | |
# Try to get the worksheet | |
try: | |
self.sheet = self.doc.worksheet(self.worksheet_name) | |
except gspread.exceptions.WorksheetNotFound: | |
raise ValueError(f"Worksheet '{self.worksheet_name}' not found in spreadsheet") | |
# Get headers | |
self.headers = self.sheet.row_values(1) | |
# Validate column only if requested | |
if validate_column: | |
try: | |
self.col_index = self.headers.index(self.column_name) + 1 | |
except ValueError: | |
# If column not found, use first available column | |
if self.headers: | |
self.column_name = self.headers[0] | |
self.col_index = 1 | |
print(f"Column '{self.column_name}' not found. Using first available column: '{self.headers[0]}'") | |
else: | |
raise ValueError("No columns found in worksheet") | |
except Exception as e: | |
if isinstance(e, ValueError): | |
raise e | |
raise ConnectionError(f"Failed to connect to sheet: {str(e)}") | |
def change_worksheet(self, worksheet_name: str, column_name: Optional[str] = None): | |
""" | |
Change the current worksheet and optionally the column. | |
Args: | |
worksheet_name (str): Name of the worksheet to switch to | |
column_name (str, optional): Name of the column to switch to | |
""" | |
old_worksheet = self.worksheet_name | |
old_column = self.column_name | |
try: | |
self.worksheet_name = worksheet_name | |
if column_name: | |
self.column_name = column_name | |
# First connect without column validation | |
self._connect_to_sheet(validate_column=False) | |
# Then validate the column if specified | |
if column_name: | |
self.change_column(column_name) | |
else: | |
# Validate existing column in new worksheet | |
try: | |
self.col_index = self.headers.index(self.column_name) + 1 | |
except ValueError: | |
# If column not found, use first available column | |
if self.headers: | |
self.column_name = self.headers[0] | |
self.col_index = 1 | |
print(f"Column '{old_column}' not found in new worksheet. Using first available column: '{self.headers[0]}'") | |
else: | |
raise ValueError("No columns found in worksheet") | |
print(f"Successfully switched to worksheet: {worksheet_name}, using column: {self.column_name}") | |
except Exception as e: | |
# Restore previous state on error | |
self.worksheet_name = old_worksheet | |
self.column_name = old_column | |
self._connect_to_sheet() | |
raise e | |
def change_column(self, column_name: str): | |
""" | |
Change the target column. | |
Args: | |
column_name (str): Name of the column to switch to | |
""" | |
if not self.headers: | |
self.headers = self.sheet.row_values(1) | |
try: | |
self.col_index = self.headers.index(column_name) + 1 | |
self.column_name = column_name | |
print(f"Successfully switched to column: {column_name}") | |
except ValueError: | |
raise ValueError(f"Column '{column_name}' not found in worksheet. Available columns: {', '.join(self.headers)}") | |
def get_available_worksheets(self) -> List[str]: | |
"""Get list of all available worksheets in the spreadsheet.""" | |
return [worksheet.title for worksheet in self.doc.worksheets()] | |
def get_available_columns(self) -> List[str]: | |
"""Get list of all available columns in the current worksheet.""" | |
return self.headers if self.headers else self.sheet.row_values(1) | |
def _reconnect_if_needed(self): | |
"""Reconnect to the sheet if the connection is lost.""" | |
try: | |
self.sheet.row_values(1) | |
except (gspread.exceptions.APIError, AttributeError): | |
self._init_google_client() | |
self._connect_to_sheet() | |
def _fetch_column_data(self) -> List[str]: | |
"""Fetch all data from the huggingface_id column.""" | |
values = self.sheet.col_values(self.col_index) | |
return values[1:] # Exclude header | |
def _update_sheet(self, data: List[str]): | |
"""Update the entire column with new data.""" | |
try: | |
# Prepare the range for update (excluding header) | |
start_cell = gspread.utils.rowcol_to_a1(2, self.col_index) # Start from row 2 | |
end_cell = gspread.utils.rowcol_to_a1(len(data) + 2, self.col_index) | |
range_name = f"{start_cell}:{end_cell}" | |
# Convert data to 2D array format required by gspread | |
cells = [[value] for value in data] | |
# Update the range | |
self.sheet.update(range_name, cells) | |
except Exception as e: | |
print(f"Error updating sheet: {str(e)}") | |
raise | |
def push(self, text: str) -> int: | |
""" | |
Push a text value to the next empty cell in the huggingface_id column. | |
Args: | |
text (str): Text to push to the sheet | |
Returns: | |
int: The row number where the text was pushed | |
""" | |
try: | |
self._reconnect_if_needed() | |
# Get all values in the huggingface_id column | |
column_values = self.sheet.col_values(self.col_index) | |
# Find the next empty row | |
next_row = None | |
for i in range(1, len(column_values)): | |
if not column_values[i].strip(): | |
next_row = i + 1 | |
break | |
# If no empty row found, append to the end | |
if next_row is None: | |
next_row = len(column_values) + 1 | |
# Update the cell | |
self.sheet.update_cell(next_row, self.col_index, text) | |
print(f"Successfully pushed value: {text} to row {next_row}") | |
return next_row | |
except Exception as e: | |
print(f"Error pushing to sheet: {str(e)}") | |
raise | |
def pop(self) -> Optional[str]: | |
"""Remove and return the most recent value.""" | |
try: | |
self._reconnect_if_needed() | |
data = self._fetch_column_data() | |
if not data or not data[0].strip(): | |
return None | |
value = data.pop(0) # Remove first value | |
data.append("") # Add empty string at the end to maintain sheet size | |
self._update_sheet(data) | |
print(f"Successfully popped value: {value}") | |
return value | |
except Exception as e: | |
print(f"Error popping from sheet: {str(e)}") | |
raise | |
def delete(self, value: str) -> List[int]: | |
"""Delete all occurrences of a value.""" | |
try: | |
self._reconnect_if_needed() | |
data = self._fetch_column_data() | |
# Find all indices before deletion | |
indices = [i + 1 for i, v in enumerate(data) if v.strip() == value.strip()] | |
if not indices: | |
print(f"Value '{value}' not found in sheet") | |
return [] | |
# Remove matching values and add empty strings at the end | |
data = [v for v in data if v.strip() != value.strip()] | |
data.extend([""] * len(indices)) # Add empty strings to maintain sheet size | |
self._update_sheet(data) | |
print(f"Successfully deleted value '{value}' from rows: {indices}") | |
return indices | |
except Exception as e: | |
print(f"Error deleting from sheet: {str(e)}") | |
raise | |
def update_cell_by_condition(self, condition_column: str, condition_value: str, target_column: str, target_value: str) -> Optional[int]: | |
""" | |
Update the value of a cell based on a condition in another column. | |
Args: | |
condition_column (str): The column to check the condition on. | |
condition_value (str): The value to match in the condition column. | |
target_column (str): The column where the value should be updated. | |
target_value (str): The new value to set in the target column. | |
Returns: | |
Optional[int]: The row number where the value was updated, or None if no matching row was found. | |
""" | |
try: | |
self._reconnect_if_needed() | |
# Get all column headers | |
headers = self.sheet.row_values(1) | |
# Find the indices for the condition and target columns | |
try: | |
condition_col_index = headers.index(condition_column) + 1 | |
except ValueError: | |
raise ValueError(f"์กฐ๊ฑด ์นผ๋ผ '{condition_column}'์ด(๊ฐ) ์์ต๋๋ค.") | |
try: | |
target_col_index = headers.index(target_column) + 1 | |
except ValueError: | |
raise ValueError(f"๋ชฉํ ์นผ๋ผ '{target_column}'์ด(๊ฐ) ์์ต๋๋ค.") | |
# Get all rows of data | |
data = self.sheet.get_all_records() | |
# Find the row that matches the condition | |
for i, row in enumerate(data): | |
if row.get(condition_column) == condition_value: | |
# Update the target column in the matching row | |
row_number = i + 2 # Row index starts at 2 (1 is header) | |
self.sheet.update_cell(row_number, target_col_index, target_value) | |
print(f"Updated row {row_number}: Set {target_column} to '{target_value}' where {condition_column} is '{condition_value}'") | |
return row_number | |
print(f"์กฐ๊ฑด์ ๋ง๋ ํ์ ์ฐพ์ ์ ์์ต๋๋ค: {condition_column} = '{condition_value}'") | |
return None | |
except Exception as e: | |
print(f"Error updating cell by condition: {str(e)}") | |
raise | |
def get_all_values(self) -> List[str]: | |
"""Get all values from the huggingface_id column.""" | |
self._reconnect_if_needed() | |
return [v for v in self._fetch_column_data() if v.strip()] | |
# Example usage | |
if __name__ == "__main__": | |
# Initialize sheet manager | |
sheet_manager = SheetManager() | |
# # Push some test values | |
# sheet_manager.push("test-model-1") | |
# sheet_manager.push("test-model-2") | |
# sheet_manager.push("test-model-3") | |
# print("Initial values:", sheet_manager.get_all_values()) | |
# # Pop the most recent value | |
# popped = sheet_manager.pop() | |
# print(f"Popped value: {popped}") | |
# print("After pop:", sheet_manager.get_all_values()) | |
# # Delete a specific value | |
# deleted_rows = sheet_manager.delete("test-model-2") | |
# print(f"Deleted from rows: {deleted_rows}") | |
# print("After delete:", sheet_manager.get_all_values()) | |
row_updated = sheet_manager.update_cell_by_condition( | |
condition_column="model", | |
condition_value="msr", | |
target_column="pia", | |
target_value="new_value" | |
) | |