re-sheet / app.py
m8chaa's picture
Upload 2 files
3e65ce8 verified
"""
FastAPI Backend Application for Document Processing and Management
This application provides a comprehensive backend service for processing and managing various types of documents,
including images, SMS data, contacts, and YouTube links. It integrates with multiple external services and APIs
to provide document processing, storage, and management capabilities.
Main Components:
1. API Setup and Configuration:
- FastAPI application with CORS middleware
- Firebase, Google Sheets, Drive, and YouTube API integrations
- Gemini AI model configuration for image processing
2. Core File Management Functions:
- move_file_to_folder: Handles file organization in Google Drive
- update_user_sheet: Manages data updates in Google Sheets with backup capabilities
3. Image Processing Functions:
- request_gpt4o_completion: Processes images using GPT-4 Vision or Gemini
- process_photo: Endpoint for handling image processing requests
- update_user_sheets_and_folders: Organizes processed image data
4. Sheet Management Functions:
- copy_sms_sheet: Creates new sheets for SMS data
- edit_spreadsheet: Manages template-based sheet creation
- check_user_doc: Handles user verification and transaction records
5. Data Synchronization Functions:
- sync-data endpoint: Manages contacts and SMS data synchronization
- update_fb_transaction: Updates Firebase transaction records
6. YouTube Integration:
- share-target-handle endpoint: Processes YouTube links
- Functions for video information extraction and summarization
7. Business Information Functions:
- search_business_info: Business information lookup in reference spreadsheet
8. Utility Functions:
- Helper functions for data conversion and formatting
- Error handling and logging utilities
Dependencies:
- FastAPI for API framework
- Firebase Admin for user management
- Google API clients for Drive and Sheets integration
- Various AI models for image and text processing
"""
from fastapi import FastAPI, HTTPException, UploadFile, File, Request, BackgroundTasks, Form
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from httpx import Timeout
import httpx
import logging
import base64
import firebase_admin
from firebase_admin import credentials
from firebase_admin import firestore
import json
from datetime import datetime
import pytz
from typing import List, Optional, Dict
from google.oauth2 import service_account
from googleapiclient.discovery import build
from io import BytesIO
import google.generativeai as genai
import os
from google.cloud import vision
from typing import Tuple
from base64 import b64encode
from openai import OpenAI
from fuzzywuzzy import fuzz
from youtube_transcript_api import YouTubeTranscriptApi
from langchain_community.document_loaders import YoutubeLoader
import re
import asyncio
# start application
app = FastAPI()
# allow all origins
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allows all origins
allow_credentials=True, # Allows credentials (such as cookies) to be sent with requests
allow_methods=["*"], # Allows all methods
allow_headers=["*"], # Allows all headers
)
# set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logging.info("Application starting up")
request_timeout = Timeout(1000.0, connect=600.0)
# set up firebase
# Load Firebase configuration from a file
with open('/app/secrets/fb_secret.json', 'r') as f:
firebase_configuration = json.load(f)
firebase_credentials = credentials.Certificate(firebase_configuration)
firebase_admin.initialize_app(firebase_credentials, {
'projectId': firebase_configuration['project_id'],
})
firestore_db = firestore.client()
users_collection_ref = firestore_db.collection('users')
transactions_collection_ref = firestore_db.collection('transactions')
SCOPES = [
'https://www.googleapis.com/auth/spreadsheets',
'https://www.googleapis.com/auth/drive',
'https://www.googleapis.com/auth/youtube.force-ssl' # Add YouTube scope
]
with open('/app/secrets/openai_key.txt', 'r') as f:
OPENAI_KEY = f.read()
APICredential = service_account.Credentials.from_service_account_file(
'/app/secrets/googleapi_secret.json', scopes=SCOPES)
sheet_service = build('sheets', 'v4', credentials=APICredential)
drive_service = build('drive', 'v3', credentials=APICredential)
# Add YouTube API key setup
YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY') # Add to your environment variables
youtube_service = build('youtube', 'v3', developerKey=YOUTUBE_API_KEY) # Use API key instead of OAuth
with open('/app/secrets/gemini_key.txt', 'r') as f:
api_key = f.read()
genai.configure(api_key=api_key)
# Create the model
generation_config = {
"temperature": 1,
"top_p": 0.95,
"top_k": 64,
"max_output_tokens": 8192,
"response_mime_type": "application/json",
}
model = genai.GenerativeModel(
model_name="gemini-1.5-flash",
generation_config=generation_config,
# safety_settings = Adjust safety settings
# See https://ai.google.dev/gemini-api/docs/safety-settings
)
# first three are "๋“ฑ๋ก์ผ์‹œ", "์ธ๋„ค์ผ", "๋ฐ”๋กœ๊ฐ€๊ธฐ",
receipt_sheet_headers = ['๋ฐœํ–‰์ผ', '์ƒํ˜ธ', '์‚ฌ์—…์ž๋ฒˆํ˜ธ', 'ํ•ฉ๊ณ„๊ธˆ์•ก', '๋‚ด์—ญ', '์นด๋“œ๋ฒˆํ˜ธ', '์นด๋“œ์ข…๋ฅ˜', '์Šน์ธ๋ฒˆํ˜ธ', '๊ธฐํƒ€']
# at index 1 '๋“ฑ๋ก ์ผ์‹œ', at last index '๋ช…ํ•จ๋ณด๊ธฐ'
business_card_sheet_headers = ['์ด๋ฆ„', 'ํšŒ์‚ฌ๋ช…', '์†Œ์† ๋ฐ ์ง์œ„', '์ฃผ์†Œ', '์ „ํ™”' ,'ํœด๋Œ€ํฐ', 'ํŒฉ์Šค','e-mail','ํ™ˆํŽ˜์ด์ง€']
sms_sheet_headers = ['๋‚ ์งœ', '๊ณ„์ขŒ', '์ž…๊ธˆ', '์ถœ๊ธˆ', '์ž”์•ก', '๋ฌธ์ž']
contact_sheet_headers = ['์ด๋ฆ„', '์ „ํ™”๋ฒˆํ˜ธ1', '์ „ํ™”๋ฒˆํ˜ธ2', '์ „ํ™”๋ฒˆํ˜ธ3', '์ด๋ฉ”์ผ์ฃผ์†Œ1', '์ด๋ฉ”์ผ์ฃผ์†Œ2', '์ด๋ฉ”์ผ์ฃผ์†Œ3', '๊ทธ๋ฃน']
receipts_ss = 1395967745
business_cards_ss = 1733555840
contacts_ss = 1729750548
sms_ss = 1891574758
shared_links_ss = 289914391
kst = pytz.timezone('Asia/Seoul')
class SheetUpdateRequest(BaseModel):
target_sheet_id: int
data: list
def convert_dicts_to_list(dicts: List[Dict], headers: List[str], add_link: bool = False,
image_id: str = '', link_text: str = '',
link_position: int = -1) -> List[List[str]]:
rows = []
# timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
timestamp = datetime.now(kst).strftime("%Y-%m-%d %H:%M:%S")
for dict_item in dicts:
# row = list(dict_item.values())
# row = [str(value) for value in dict_item.values()]
row = [str(dict_item.get(header, '')) for header in headers]
# Always add timestamp to the first index
row.insert(0, timestamp)
if add_link:
# Ensure the list length is at least 9
while len(row) < 9:
row.append('')
if link_position == -1: # Append link at the end
row.append(f'=HYPERLINK("https://drive.google.com/file/d/{image_id}/view?usp=sharing", "{link_text}")')
else:
row.insert(link_position, f'=HYPERLINK("https://drive.google.com/file/d/{image_id}/view?usp=sharing", "{link_text}")')
rows.append(row)
# print(rows)
return rows
async def download_file_from_drive(file_id: str) -> Tuple[bytes, str]:
"""
Downloads a file from Google Drive using the file ID
and returns the file content as bytes and the MIME type.
"""
try:
url = f"https://drive.google.com/uc?export=download&id={file_id}"
async with httpx.AsyncClient(follow_redirects=True) as client: # Enable redirect following
response = await client.get(url)
if response.status_code == 200:
return response.content, response.headers.get('content-type', 'application/octet-stream')
else:
raise HTTPException(
status_code=500,
detail=f"Failed to download file, status code: {response.status_code}"
)
except Exception as e:
logging.error(f"Error downloading file from Drive: {e}")
raise HTTPException(status_code=500, detail="Failed to download file from Drive")
async def request_gpt4o_completion(image_id: str, user_credit: int, use_gpt4: bool = False):
"""
Processes images using either GPT-4 Vision or Gemini for text extraction and analysis.
Args:
image_id (str): The Google Drive ID of the image to process
user_credit (int): Available user credits for processing
use_gpt4 (bool): Whether to use GPT-4 Vision instead of Gemini
Returns:
str: JSON string containing extracted information
"""
try:
# Step 1: Download the image file from Google Drive
file_content, mime_type = await download_file_from_drive(image_id)
if use_gpt4:
# Convert image to base64 for GPT-4 Vision
base64_image = b64encode(file_content).decode('utf-8')
client = OpenAI(api_key=OPENAI_KEY)
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": """You are a receipt and business card data extractor. Extract information EXACTLY in the specified format.
For receipts, follow this EXACT format and order:
1. "๋ฐœํ–‰์ผ": Must be in format "M/D/YYYY HH:MM:SS"
2. "์ƒํ˜ธ": Store/business name only
3. "์‚ฌ์—…์ž๋ฒˆํ˜ธ": Must be in format "###-##-#####" (if present)
4. "ํ•ฉ๊ณ„๊ธˆ์•ก": Total amount with comma for thousands (e.g., "4,300")
5. "๋‚ด์—ญ": List items with format "item price quantity" (e.g., "๋ณดํ†ต 7g 10ํ†ต 430 4,300")
6. "์นด๋“œ๋ฒˆํ˜ธ": Must include asterisks for hidden numbers (e.g., "5286-6420-****-077")
7. "์นด๋“œ์ข…๋ฅ˜": Card type only
8. "์Šน์ธ๋ฒˆํ˜ธ": Approval number only
9. "๊ธฐํƒ€": All remaining information as key-value pairs separated by commas
Example receipt:
{
"receipts": [{
"๋ฐœํ–‰์ผ": "3/6/2024 15:31:00",
"์ƒํ˜ธ": "์‹œํฅ๋ฐฐ๊ณง๋™์šฐ์ฒด๊ตญ",
"์‚ฌ์—…์ž๋ฒˆํ˜ธ": "101-83-02925",
"ํ•ฉ๊ณ„๊ธˆ์•ก": "4,300",
"๋‚ด์—ญ": "๋ณดํ†ต 7g 10ํ†ต 430 4,300",
"์นด๋“œ๋ฒˆํ˜ธ": "5286-6420-****-077",
"์นด๋“œ์ข…๋ฅ˜": "NH๊ธฐ์—…์ฒดํฌ",
"์Šน์ธ๋ฒˆํ˜ธ": "49980444",
"๊ธฐํƒ€": "์ „ํ™”: 031-8042-2047, ์ฃผ์†Œ: ๊ฒฝ๊ธฐ๋„ ์‹œํฅ์‹œ ์ •์™•๋™ 2543, ๊ณ ๊ฐ๋ฌธ์˜ ์ „ํ™”: 1588-1300"
}],
"busi_cards": []
}
For business cards, follow this EXACT format and order:
1. "์ด๋ฆ„": Full name
2. "ํšŒ์‚ฌ๋ช…": Company name
3. "์†Œ์† ๋ฐ ์ง์œ„": Position and department
4. "์ฃผ์†Œ": Address with postal code in format "์ฃผ์†Œ (์šฐํŽธ๋ฒˆํ˜ธ) ์ฝ”๋“œ"
5. "์ „ํ™”": Office phone with hyphens
6. "ํœด๋Œ€ํฐ": Mobile with hyphens (e.g., "010-0000-0000")
7. "ํŒฉ์Šค": Fax number with hyphens
8. "e-mail": Email address
9. "ํ™ˆํŽ˜์ด์ง€": Website URL
Output must be valid JSON with "receipts" and "busi_cards" arrays."""
},
{
"role": "user",
"content": [
{"type": "text", "text": "Extract the information from this image following the exact format specified."},
{
"type": "image_url",
"image_url": {
"url": f"data:{mime_type};base64,{base64_image}"
}
}
]
}
],
max_tokens=4096,
temperature=1,
response_format={ "type": "json_object" }
)
return response.choices[0].message.content
else:
# Existing Gemini implementation
# Step 3: Get text data from the image using Google Cloud Vision
client = vision.ImageAnnotatorClient()
image = vision.Image(content=file_content)
response = client.text_detection(image=image)
texts = response.text_annotations
if response.error.message:
logging.error(f"Error in Google Cloud Vision API: {response.error.message}")
raise Exception(
"{}\nFor more info on error messages, check: "
"https://cloud.google.com/apis/design/errors".format(response.error.message)
)
# Extract all detected text
extracted_text = texts[0].description if texts else ""
# Step 4: Prepare the prompt for Gemini
prompt = f'''
#Direction
You are an employee of finance department of a company. For each task well done, you will be rewarded $1,000 bonus.
The boss has come to your office and gave you a task of transfering the content of receipts and business cards onto a google spreadsheet.
He hands you two things.
One, is a paper containing guideline and example.
Two, is an OCR text.
#Guideline
-strictly adhere to the given content of OCR text.
-it could be of receipt(s) or business card(s) but not both
-sort any other info that couldn't be classified as "๊ธฐํƒ€"
-format the extracted information as a JSON object with two main keys: 'receipts' and 'busi_cards'.
-ensure the total number of receipts and business cards combined does not exceed 500.
#Example
##Receipts
Input
์€์„ฑ๋งˆ์ผ“
์‚ฌ์—…์ž๋ฒˆํ˜ธ: 727-27-01455
๋Œ€ํ‘œ์ž:ํ—ˆ์œ ์„ฑ
์ฃผ
์†Œ: ๊ฒฝ๊ธฐ ์‹œํฅ์‹œ ๋ฐฐ๊ณง3๋กœ 96
์ „ํ™” ๋ฒˆํ˜ธ: 031-431-1544
ํŒ๋งค์ผ: 24-02-23 20:10, ๊ธˆ์š”์ผ ๊ณ„์‚ฐ๋Œ€:001
NO. ์ƒํ’ˆ๋ช…
๋‹จ๊ฐ€ ์ˆ˜๋Ÿ‰ ๊ธˆ์•ก
001 ๋Аํƒ€๋ฆฌ๋ฒ„์„ฏ
200092
1,000
1. 1,000 #
002 ์–‘๋ฐฐ์ถ”
200071
(#)๋ฉด์„ธ๋ฌผํ’ˆ:
ํ•ฉ
๊ณ„:
์‹ ์šฉ์นด๋“œ์ง€๋ถˆ:
โ˜…์นด๋“œ์‹ ์šฉ์Šน์ธ
็ซ
1,800 1 1.800 #
2,800
2,800
2,800
๊ฐ€๋งน์ ๋ช… ์€์„ฑ๋งˆ์ผ“
์นด๋“œ๋ฒˆํ˜ธ
4673-09**-****-515*
์นด๋“œ๋ช… : KB๊ตญ๋ฏผ์ฒดํฌ
์ „ํ‘œ๋งค์ž…: KB๊ตญ๋ฏผ์นด๋“œ ๋ฌด์„œ๋ช…: 00111521584
์Šน์ธ๊ธˆ์•ก: 2,800
(์ผ์‹œ๋ถˆ)
์Šน์ธ๋ฒˆํ˜ธ: 30014507, ์ „ํ‘œNo:201036
์ œ์ถœ VAN: JTNET-๋ฆฌ๋”๊ธฐ
์ •์ƒ์Šน์ธ
๊ฐ์‚ฌํ•ฉ๋‹ˆ๋‹ค.
๊ฑฐ๋ž˜NO:0223007835 ๊ณ„์‚ฐ์›: ๊ด€๋ฆฌ์ž(001)
2502230078357
Output
๋ฐœํ–‰์ผ: 2024/02/23 20:10
์ƒํ˜ธ: ์€์„ฑ๋งˆ์ผ“
์‚ฌ์—…์ž๋ฒˆํ˜ธ: 727-27-01455
ํ•ฉ๊ณ„๊ธˆ์•ก: 2,800
๋‚ด์—ญ:
๋Аํƒ€๋ฆฌ๋ฒ„์„ฏ 1,000์› 1๊ฐœ
์–‘๋ฐฐ์ถ” 1,800์› 1๊ฐœ
์นด๋“œ๋ฒˆํ˜ธ: 4673-09**-****-515*
์นด๋“œ์ข…๋ฅ˜: KB๊ตญ๋ฏผ์ฒดํฌ
์Šน์ธ๋ฒˆํ˜ธ: 30014507
๊ธฐํƒ€: ๋Œ€ํ‘œ์ž: ํ—ˆ์œ ์„ฑ, ์ฃผ์†Œ: ๊ฒฝ๊ธฐ ์‹œํฅ์‹œ ๋ฐฐ๊ณง3๋กœ 96, ์ „ํ™” ๋ฒˆํ˜ธ: 031-431-1544, ๋ฉด์„ธ๋ฌผํ’ˆ: 2,800, ํ•ฉ๊ณ„: 2,800, ์‹ ์šฉ์นด๋“œ์ง€๋ถˆ:2,800, ์ „ํ‘œNo:201036, ์ œ์ถœ VAN: JTNET-๋ฆฌ๋”๊ธฐ, ๊ฑฐ๋ž˜NO:0223007835 ๊ณ„์‚ฐ์›: ๊ด€๋ฆฌ์ž(001)
##Business Cards
Input
๋ฐ•์žฅ์›
์•„์ดํ”„๋ฆฌ๋งˆ
ํŒ€์žฅ/์ •์ฑ…์‚ฌ์—…ํŒ€
R&D์‚ฌ์—…์‹ค
Over the MIRACLE
๊ฒฝ๊ธฐ๋„ ํ™”์„ฑ์‹œ ์ •๋‚จ๋ฉด ๋งŒ๋…„๋กœ 98๋ฒˆ๊ธธ 55 10์ธต (์šฐ) 18523
Mobile 010-9582-0925
Tel (031) 000-0000
Fax (031) 000-0000
Email iprima@iprima.com
www.iprima.com
Output
์ด๋ฆ„: ๋ฐ•์žฅ์›
ํšŒ์‚ฌ๋ช…: ์•„์ดํ”„๋ฆฌ๋งˆ
์†Œ์† ๋ฐ ์ง์œ„: ํŒ€์žฅ/์ •์ฑ…์‚ฌ์—…ํŒ€ R&D์‚ฌ์—…์‹ค
์ฃผ์†Œ: ๊ฒฝ๊ธฐ๋„ ํ™”์„ฑ์‹œ ์ •๋‚จ๋ฉด ๋งŒ๋…„๋กœ 98๋ฒˆ๊ธธ 55 (์šฐํŽธ๋ฒˆํ˜ธ) 18523
์ „ํ™”: (031) 000-0000
ํœด๋Œ€ํฐ: (031) 000-0000
ํŒฉ์Šค: 010-0000-0000
e-mail: iprima@gmail.com
ํ™ˆํŽ˜์ด์ง€: www.iprima.com
๊ธฐํƒ€: Over the MIRACLE
Image content: {extracted_text}
'''
logging.info("Prompt prepared for Gemini model: " + prompt)
# Generate content using the Gemini model
response = model.generate_content(
contents=[prompt]
)
if response.text:
logging.info("Parsing Gemini model response")
json_response = json.loads(response.text)
logging.info(f"JSON response parsed: {response.text}")
return json.dumps(json_response)
else:
logging.error("Gemini model did not return a text response")
raise Exception("Gemini model did not return a text response")
except Exception as e:
logging.error(f"Error in request_gpt4o_completion: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failure: {str(e)}")
async def move_file_to_folder(file_id, current_parents, new_parents):
"""
Moves a file to specified folders in Google Drive while managing parent folders.
Args:
file_id (str): The ID of the file to move
current_parents (list/str): Current parent folder(s) to remove from
new_parents (list/str): New parent folder(s) to add to
Returns:
None
"""
try:
# Ensure current_parents is a list of parent IDs
if not isinstance(current_parents, list):
current_parents = [current_parents]
if not isinstance(new_parents, list):
new_parents = [new_parents]
# Retrieve the existing parents to remove
file = drive_service.files().get(fileId=file_id, fields='parents').execute()
existing_parents = file.get('parents')
logging.info(f"Existing parents of the file {file_id}: {existing_parents}")
# Remove only the current parents
parents_to_remove = [parent for parent in current_parents if parent in existing_parents]
parents_to_add = [parent for parent in new_parents if parent not in existing_parents]
previous_parents = ",".join(parents_to_remove)
logging.info(f"Parents to remove: {parents_to_remove}")
logging.info(f"Parents to add: {parents_to_add}")
# Move the file to the new folders while keeping it in yungsoogi folder
if parents_to_add:
file = drive_service.files().update(
fileId=file_id,
addParents=",".join(parents_to_add),
removeParents=previous_parents,
fields='id, parents'
).execute()
logging.info(f"File {file_id} moved to new folders {new_parents}.")
else:
logging.info(f"File {file_id} already in the target folders {new_parents}.")
except Exception as e:
logging.error(f"Failed to move file {file_id} to new folders: {e}")
async def update_user_sheet(spreadsheet_id:str, sheet_name:str, data: List[List[str]], is_reset: Optional[bool] = False):
"""
Updates a user's Google Sheet with new data, optionally creating a backup.
Args:
spreadsheet_id (str): The ID of the target spreadsheet
sheet_name (str): Name of the sheet to update
data (List[List[str]]): The data to write to the sheet
is_reset (bool, optional): Whether to create a backup before updating
Returns:
None
"""
print(f"Spreadsheet ID: {spreadsheet_id}, Sheet Name: {sheet_name}, Data: {data}")
# Get all the sheets in the spreadsheet
sheet_metadata = sheet_service.spreadsheets().get(spreadsheetId=spreadsheet_id).execute()
sheets = sheet_metadata.get('sheets', '')
# Find the sheet by name
sheet = next((sheet for sheet in sheets if sheet['properties']['title'] == sheet_name), None)
if not sheet:
raise ValueError(f"Sheet '{sheet_name}' not found in the spreadsheet.")
sheet_id = sheet['properties']['sheetId']
# Rest of the function remains the same
no_of_rows = len(data)
no_of_cols = len(data[0])
if is_reset:
new_sheet_name = f"{sheet_name} {datetime.now(kst).strftime('%Y-%m-%d %H:%M')}"
duplicate_requests = [{
"duplicateSheet": {
"sourceSheetId": sheet_id,
"newSheetName": new_sheet_name
}
}]
duplicate_response = sheet_service.spreadsheets().batchUpdate(
spreadsheetId=spreadsheet_id,
body={
'requests': duplicate_requests
}
).execute()
sheet_id = duplicate_response['replies'][0]['duplicateSheet']['properties']['sheetId']
sheet_name = new_sheet_name
# Insert new rows
requests = [
{
"updateSheetProperties": {
"properties": {
"sheetId": sheet_id,
"hidden": False
},
"fields": "hidden"
}
},
{
"insertDimension": {
"range": {
"sheetId": sheet_id,
"dimension": "ROWS",
"startIndex": 1,
"endIndex": no_of_rows + 1
},
"inheritFromBefore": False,
}
}]
response = sheet_service.spreadsheets().batchUpdate(
spreadsheetId=spreadsheet_id,
body={
'requests': requests
}
).execute()
update_body = {
'values': data
}
start_column = 'A'
end_column = chr(ord(start_column) + no_of_cols - 1)
range_to_update = f'{sheet_name}!{start_column}2:{end_column}{no_of_rows + 1}'
result = sheet_service.spreadsheets().values().update(
spreadsheetId=spreadsheet_id,
range=range_to_update,
valueInputOption='USER_ENTERED',
body=update_body
).execute()
# Format the cells
for row_index, row in enumerate(data):
for col_index, cell_value in enumerate(row):
if cell_value.startswith('=HYPERLINK'):
color = {"red": 0, "green": 0, "blue": 1} # Blue for hyperlinks
else:
color = {"red": 0, "green": 0, "blue": 0} # Black for other text
requests = [
{
"repeatCell": {
"range": {
"sheetId": sheet_id,
"startRowIndex": row_index + 1,
"endRowIndex": row_index + 2,
"startColumnIndex": col_index,
"endColumnIndex": col_index + 1
},
"cell": {
"userEnteredFormat": {
"textFormat": {
"foregroundColor": color
}
}
},
"fields": "userEnteredFormat.textFormat.foregroundColor"
}
}
]
response = sheet_service.spreadsheets().batchUpdate(
spreadsheetId=spreadsheet_id,
body={
'requests': requests
}
).execute()
async def copy_sms_sheet(phone_no: str, spreadsheet_id: str):
"""
Creates a new sheet for SMS data by copying a template.
Args:
phone_no (str): Phone number to create sheet for
spreadsheet_id (str): Target spreadsheet ID
Returns:
str: ID of the newly created sheet
"""
requests = [
{
"duplicateSheet": {
"sourceSheetId": sms_ss,
"newSheetName": 'SMS ' + phone_no
}
}
]
response = sheet_service.spreadsheets().batchUpdate(
spreadsheetId=spreadsheet_id,
body={
'requests': requests
}
).execute()
print(response)
new_sheet_id = response['replies'][0]['duplicateSheet']['properties']['sheetId']
unhide_request = [{
"updateSheetProperties": {
"properties": {
"sheetId": new_sheet_id,
"hidden": False
},
"fields": "hidden"
}
}]
sheet_service.spreadsheets().batchUpdate(
spreadsheetId=spreadsheet_id,
body={
'requests': unhide_request
}
).execute()
return new_sheet_id
async def update_fb_transaction(transaction_ref: firestore.DocumentReference, no_sms: Optional[int] = 0, no_contacts: Optional[int] = 0, no_receipts: Optional[int] = 0, no_business_cards: Optional[int] = 0):
try:
# Update the transaction
transaction_ref.update({
"no_sms": firestore.Increment(no_sms),
"no_contacts": firestore.Increment(no_contacts),
"no_receipts": firestore.Increment(no_receipts),
"no_business_cards": firestore.Increment(no_business_cards)
})
return {"status": "Success"}
except Exception as e:
logging.error(f"Error updating transaction: {e}")
return {"status": f"Error: {str(e)}"}
async def get_youtube_transcript_url(video_id: str) -> str:
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept-Language': 'ko,en;q=0.9'
}
url = f"https://www.youtube.com/watch?v={video_id}"
async with httpx.AsyncClient() as client:
response = await client.get(url, headers=headers)
page_content = response.text
# Look for timedtext URLs
timedtext_pattern = r'"baseUrl":"(https://www\.youtube\.com/api/timedtext[^"]+)"'
matches = re.finditer(timedtext_pattern, page_content)
urls = {} # Dictionary to store language:url pairs
for match in matches:
url = match.group(1).replace('\\u0026', '&')
# Extract language from URL
lang_match = re.search(r'lang=(\w+)', url)
if lang_match:
lang = lang_match.group(1)
urls[lang] = url
if urls:
# Try languages in order of preference
if 'ko' in urls:
logging.info("Found Korean transcript")
return urls['ko']
elif 'en' in urls:
logging.info("Found English transcript")
return urls['en']
else:
# Get first available language
lang = next(iter(urls))
logging.info(f"Found {lang.upper()} transcript")
return urls[lang]
else:
logging.warning("No transcript URLs found in the page content")
return "No transcript URLs found"
except Exception as e:
logging.error(f"Error in get_youtube_transcript_url: {str(e)}")
return f"Error: {str(e)}"
async def process_youtube_video(video_id: str, video_title: str = "") -> dict:
try:
# Get video details from YouTube Data API
video_response = youtube_service.videos().list(
part='snippet',
id=video_id
).execute()
if not video_response.get('items'):
return {
"title": video_title,
"summary": "๋น„๋””์˜ค๋ฅผ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.",
"key_points": [],
"timestamps": {}
}
video_info = video_response['items'][0]['snippet']
video_title = video_info.get('title', video_title)
description = video_info.get('description', '')
# Get transcript URL
transcript_url = await get_youtube_transcript_url(video_id)
if not transcript_url:
return {
"title": video_title,
"summary": "์ž๋ง‰์„ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.",
"key_points": [],
"timestamps": {}
}
# Get the actual captions
async with httpx.AsyncClient() as client:
response = await client.get(transcript_url)
if response.status_code != 200:
return {
"title": video_title,
"summary": "์ž๋ง‰์„ ๊ฐ€์ ธ์˜ค๋Š”๋ฐ ์‹คํŒจํ–ˆ์Šต๋‹ˆ๋‹ค.",
"key_points": [],
"timestamps": {}
}
full_transcript = response.text
# Generate summary using Gemini
prompt = """
You are a professional content summarizer. Summarize the following YouTube video transcript
in Korean. Focus on the key points and main ideas. Keep the summary concise but informative.
Video Title: {title}
Description: {description}
Transcript:
{transcript}
Please format the response as a JSON object with the following structure:
{{
"title": "{title}",
"summary": "Comprehensive summary in Korean",
"key_points": ["Key point 1", "Key point 2", "..."],
"timestamps": {{"topic": "timestamp"}} (if any significant timestamps are mentioned)
}}
"""
response = model.generate_content(
prompt.format(
title=video_title,
description=description,
transcript=full_transcript
)
)
return json.loads(response.text)
except Exception as e:
logging.error(f"Error in summary generation: {str(e)}")
return {
"title": video_title,
"summary": "๋™์˜์ƒ ์ฒ˜๋ฆฌ ์ค‘ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค.",
"key_points": [],
"timestamps": {}
}
def extract_youtube_id(url: str) -> str:
"""
Extract YouTube video ID from various URL formats
Returns None if URL is invalid
"""
if "youtube.com/watch?v=" in url:
return url.split("watch?v=")[1].split("&")[0]
elif "youtu.be/" in url:
return url.split("youtu.be/")[1].split("?")[0]
return None
@app.post("/share-target-handle")
async def share_target_handle(request: Request):
"""
Endpoint for processing shared YouTube links.
Extracts video information and creates summaries in user's spreadsheet.
Args:
request (Request): FastAPI request object containing share data
Returns:
dict: Processing status and summary data
"""
logging.info("Starting share-target-handle endpoint")
try:
# Parse request data
request_data = await request.json()
logging.info(f"Received request data: {request_data}")
shared_data = request_data.get("shared_data", {})
video_url = shared_data.get("text", "")
video_title = shared_data.get("title", "") # Get the video title
user_id = request_data.get("user_id")
logging.info(f"Extracted shared_data: {shared_data}, user_id: {user_id}")
# Validate user exists
user_ref = users_collection_ref.document(user_id)
user = user_ref.get()
if not user.exists:
logging.error(f"User not found: {user_id}")
return {"status": "error", "message": "User not found"}
user_data = user.to_dict()
logging.info(f"Found user data: {user_data}")
# Check if spreadsheet exists
if 'spreadsheet' not in user_data['gDrive_metadata']:
return {"status": "error", "message": "User's spreadsheet not found"}
# Check if shared links sheet exists
sheet_metadata = sheet_service.spreadsheets().get(
spreadsheetId=user_data['gDrive_metadata']['spreadsheet']
).execute()
shared_links_sheet_exists = any(
sheet['properties']['title'] == '๊ณต์œ ๋งํฌ'
for sheet in sheet_metadata.get('sheets', [])
)
# Create sheet if it doesn't exist
if not shared_links_sheet_exists:
try:
logging.info(f"Copying template sheet to user's spreadsheet: {user_data['gDrive_metadata']['spreadsheet']}")
# Copy template sheet using shared_links_ss
copied_sheet = sheet_service.spreadsheets().sheets().copyTo(
spreadsheetId=template_sheet_id,
sheetId=shared_links_ss,
body={"destinationSpreadsheetId": user_data['gDrive_metadata']['spreadsheet']}
).execute()
logging.info(f"Copied sheet to user's spreadsheet: {copied_sheet}")
# Update sheet name to remove "Copy of" prefix
rename_request = {
"updateSheetProperties": {
"properties": {
"sheetId": copied_sheet['sheetId'],
"title": "๊ณต์œ ๋งํฌ"
},
"fields": "title"
}
}
sheet_service.spreadsheets().batchUpdate(
spreadsheetId=user_data['gDrive_metadata']['spreadsheet'],
body={"requests": [rename_request]}
).execute()
logging.info("Renamed sheet to '๊ณต์œ ๋งํฌ'")
except Exception as e:
logging.error(f"Failed to create shared links sheet: {e}")
return {"status": "error", "message": f"Failed to create shared links sheet: {str(e)}"}
# Process YouTube URL
video_id = extract_youtube_id(video_url)
if not video_id:
return {"status": "error", "message": "Not a valid YouTube URL"}
# Generate video summary
summary_data = await process_youtube_video(video_id, video_title)
# Prepare sheet data
sheet_data = [[
f'=HYPERLINK("{video_url}", "๋ฐ”๋กœ๊ฐ€๊ธฐ")',
summary_data.get('title', video_title), # Use video_title as fallback
summary_data.get('summary', ''),
datetime.now(kst).strftime("%Y-%m-%d %H:%M:%S")
]]
# Update spreadsheet
await update_user_sheet(
spreadsheet_id=user_data['gDrive_metadata']['spreadsheet'],
sheet_name="๊ณต์œ ๋งํฌ",
data=sheet_data
)
return {
"status": "success",
"message": "Video processed and saved successfully",
"data": summary_data
}
except Exception as e:
logging.error(f"Error in share-target-handle: {str(e)}", exc_info=True)
return {"status": "error", "message": str(e)}
@app.post("/check-userdoc")
async def check_user_doc(user_id: str = Form(...)):
"""
Verifies user existence and initializes transaction records if needed.
Args:
user_id (str): ID of the user to check
Returns:
dict: User existence status
"""
user_ref = users_collection_ref.document(user_id)
user = user_ref.get()
if user.exists:
return {"dbUserExists": True}
transaction_ref = transactions_collection_ref.document(user_id)
transaction = transaction_ref.get()
if not transaction.exists:
transaction_ref.set({
"no_sms": 0,
"no_contacts": 0,
"no_receipts": 0,
"no_business_cards": 0,
"purchased_credit": {
"image_detection": 100,
"sync_data": 500,
},
"uid": user_id,
})
return {"dbUserExists": False}
template_sheet_id = '1i5mrmlTs5sPWtx2mBtc_f-zm1D3x21r1T_77guki8_8'
@app.post("/edit-spreadsheet")
async def edit_spreadsheet(target_sheet_id: str = Form(...)):
"""
Endpoint for creating or updating a spreadsheet from a template.
Replaces all sheets in the target spreadsheet with template sheets.
Args:
target_sheet_id (str): ID of the spreadsheet to edit
Returns:
dict: Status of the operation
"""
try:
# Retrieve the target spreadsheet's metadata
target_spreadsheet = sheet_service.spreadsheets().get(spreadsheetId=target_sheet_id).execute()
# Collect all sheet IDs in the target spreadsheet
target_sheet_ids = [sheet['properties']['sheetId'] for sheet in target_spreadsheet.get('sheets', [])]
# Prepare batch update request to delete existing sheets
requests = []
for sheet_id in target_sheet_ids:
requests.append({
"deleteSheet": {
"sheetId": sheet_id
}
})
# Retrieve the template spreadsheet's metadata
template_spreadsheet = sheet_service.spreadsheets().get(spreadsheetId=template_sheet_id).execute()
# Iterate through each sheet in the template spreadsheet
for sheet in template_spreadsheet.get('sheets', []):
sheet_id = sheet['properties']['sheetId']
# Use the copyTo method to copy the sheet to the target spreadsheet
copy_request = sheet_service.spreadsheets().sheets().copyTo(
spreadsheetId=template_sheet_id,
sheetId=sheet_id,
body={"destinationSpreadsheetId": target_sheet_id}
)
copy_response = copy_request.execute()
# Get the copied sheet ID
copied_sheet_id = copy_response['sheetId']
# Rename the copied sheet to remove "Copy of"
copied_sheet_title = sheet['properties']['title'].replace("Copy of ", "")
rename_request = {
"updateSheetProperties": {
"properties": {
"sheetId": copied_sheet_id,
"title": copied_sheet_title
},
"fields": "title"
}
}
# Execute rename request
sheet_service.spreadsheets().batchUpdate(
spreadsheetId=target_sheet_id,
body={"requests": [rename_request]}
).execute()
logging.info(f"Sheet {sheet['properties']['title']} copied and renamed to {copied_sheet_title} in {target_sheet_id}.")
if requests:
# Execute batch update to delete sheets
batch_update_request = {"requests": requests}
sheet_service.spreadsheets().batchUpdate(
spreadsheetId=target_sheet_id,
body=batch_update_request
).execute()
logging.info(f"Deleted existing sheets in target spreadsheet {target_sheet_id}.")
logging.info(f"All sheets from template {template_sheet_id} copied to {target_sheet_id}.")
return {"message": "Spreadsheet copied successfully"}
except Exception as e:
logging.error(f"Failed to copy sheets: {e}")
return {"error": str(e)}
async def create_shortcut(file_id, parent_folder_id, name):
try:
file_metadata = {
'name': name,
'mimeType': 'application/vnd.google-apps.shortcut',
'parents': [parent_folder_id],
'shortcutDetails': {
'targetId': file_id
}
}
shortcut = drive_service.files().create(body=file_metadata, fields='id').execute()
return shortcut.get('id')
except Exception as e:
logging.error(f"Failed to create shortcut: {e}")
return None
async def search_business_info(company_name: str, address: str, spreadsheet_id: str = '1YhbIaFFcz6mrMpA9yCSri8zaPL49V6v7l1YYt0AiKss') -> dict:
"""
Enhanced search for business information using fuzzy string matching and weighted scoring
"""
try:
logging.info(f"Searching business info for company: '{company_name}', address: '{address}'")
# Clean and normalize input strings
company_name = company_name.strip().lower()
address = address.strip().lower()
# Remove common company suffixes for better matching
company_suffixes = ['(์ฃผ)', '(์œ )', '์ฃผ์‹ํšŒ์‚ฌ', '์œ ํ•œํšŒ์‚ฌ']
normalized_company = company_name
for suffix in company_suffixes:
normalized_company = normalized_company.replace(suffix.lower(), '').strip()
# Get all rows from the spreadsheet
range_name = "๊ฒฝ๊ธฐ๋„!A2:P" # Extending range to include all relevant columns
result = sheet_service.spreadsheets().values().get(
spreadsheetId=spreadsheet_id,
range=range_name,
valueRenderOption='UNFORMATTED_VALUE'
).execute()
if not result.get('values'):
logging.info("No data found in spreadsheet")
return None
# Process each row and calculate match scores
matches = []
for row in result.get('values', []):
if len(row) < 15: # Skip incomplete rows
continue
db_company = str(row[1]).strip().lower() # Company name is in column B
db_address = str(row[12]).strip().lower() # Address is in column M
# Normalize database company name
normalized_db_company = db_company
for suffix in company_suffixes:
normalized_db_company = normalized_db_company.replace(suffix.lower(), '').strip()
# Calculate match scores
company_score = fuzz.token_sort_ratio(normalized_company, normalized_db_company)
address_score = fuzz.token_sort_ratio(address, db_address)
# Weight the scores (giving more weight to company name match)
total_score = (company_score * 0.7) + (address_score * 0.3)
# Only consider matches above certain thresholds
if company_score > 60 or address_score > 80:
matches.append({
'score': total_score,
'data': {
"์‚ฌ์—…์ž๋ฒˆํ˜ธ": str(row[2]) if len(row) > 2 else "",
"๋Œ€ํ‘œ์ž๋ช…": str(row[11]) if len(row) > 11 else "",
"์ „ํ™”๋ฒˆํ˜ธ": str(row[13]) if len(row) > 13 else "",
"์šฐํŽธ๋ฒˆํ˜ธ": str(row[14]) if len(row) > 14 else "",
},
'company_score': company_score,
'address_score': address_score
})
logging.info(f"Found potential match: {db_company}")
logging.info(f"Scores - Company: {company_score}, Address: {address_score}, Total: {total_score}")
if not matches:
logging.info("No matches found meeting the threshold criteria")
return None
# Sort matches by total score and return the best match
matches.sort(key=lambda x: x['score'], reverse=True)
best_match = matches[0]
logging.info(f"Best match found with score {best_match['score']}")
logging.info(f"Company match score: {best_match['company_score']}")
logging.info(f"Address match score: {best_match['address_score']}")
return best_match['data']
except Exception as e:
logging.error(f"Error searching business info: {str(e)}", exc_info=True)
return None
async def update_user_sheets_and_folders(user_data, transaction_ref, transaction_data, image_id):
"""
Updates sheets with processed image data and organizes files in Google Drive.
Args:
user_data (dict): User's metadata and preferences
transaction_ref: Firebase transaction reference
transaction_data (dict): User's transaction data
image_id (str): The ID of the processed image
Returns:
dict: Status of the update operation
"""
try:
parent_folders = [user_data['gDrive_metadata']['yungsoogi'], user_data['gDrive_metadata']['uploaded']]
dataJson = await request_gpt4o_completion(image_id, transaction_data['purchased_credit']['image_detection'], use_gpt4=True)
if dataJson is None:
return {"error": "An error occurred while processing the image"}
data = json.loads(dataJson)
# Check if 'receipts' key exists and the length
found_receipt_no = len(data['receipts']) if 'receipts' in data else 0
found_business_cards_no = len(data['busi_cards']) if 'busi_cards' in data else 0
new_folder = None
secondary_folder = None
# timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
timestamp = datetime.now(kst).strftime("%Y-%m-%d %H:%M:%S")
# https://drive.google.com/file/d/1cFIA09PBqFjM8YvIz1D60p7HMGk1uBvo/view?usp=sharing
download_image_url = f"https://drive.google.com/uc?id={image_id}&export=download"
image_url = f"https://drive.google.com/file/d/{image_id}/view?usp=sharing"
# ์ด๋ฏธ์ง€๋ฅผ ๋ณด์—ฌ์ฃผ์–ด์•ผ ํ• ๋•Œ =image({image_url})๋ฅผ ์‚ฌ์šฉํ•˜๊ณ  ๊ฐˆ ์ˆ˜ ์žˆ๋Š” ๊ฒฝ๋กœ๋ฅผ ๋งŒ๋“ค๋•Œ๋Š” =HYPERLINK({image_url}, '๋ฐ”๋กœ๊ฐ€๊ธฐ')๋ฅผ ์‚ฌ์šฉํ•˜์„ธ์š”.
# Determine the primary folder to move the image to and secondary folder for shortcut
if found_receipt_no > 0:
new_folder = user_data['gDrive_metadata']['receipts']
receipts_data = convert_dicts_to_list(
data['receipts'],
receipt_sheet_headers,
add_link=True,
image_id=image_id,
link_text="๋ฐ”๋กœ๊ฐ€๊ธฐ",
link_position=1 # Link third
)
print(receipts_data)
await update_user_sheet(user_data['gDrive_metadata']['spreadsheet'], "์˜์ˆ˜์ฆ", receipts_data)
if found_business_cards_no > 0:
# Enrich business card data with additional info
for card in data['busi_cards']:
if card.get('ํšŒ์‚ฌ๋ช…') and card.get('์ฃผ์†Œ'):
additional_info = await search_business_info(card['ํšŒ์‚ฌ๋ช…'], card['์ฃผ์†Œ'])
if additional_info:
# Update business number if empty
if not card.get('์‚ฌ์—…์ž๋ฒˆํ˜ธ') and additional_info.get('์‚ฌ์—…์ž๋ฒˆํ˜ธ'):
card['์‚ฌ์—…์ž๋ฒˆํ˜ธ'] = additional_info['์‚ฌ์—…์ž๋ฒˆํ˜ธ']
# Add additional info to ๊ธฐํƒ€
extra_info = []
if additional_info.get('๋Œ€ํ‘œ์ž๋ช…'):
extra_info.append(f"๋Œ€ํ‘œ์ž: {additional_info['๋Œ€ํ‘œ์ž๋ช…']}")
if additional_info.get('์šฐํŽธ๋ฒˆํ˜ธ'):
extra_info.append(f"์šฐํŽธ๋ฒˆํ˜ธ: {additional_info['์šฐํŽธ๋ฒˆํ˜ธ']}")
if extra_info:
if '๊ธฐํƒ€' in card:
card['๊ธฐํƒ€'] += ", " + ", ".join(extra_info)
else:
card['๊ธฐํƒ€'] = ", ".join(extra_info)
if new_folder is None:
new_folder = user_data['gDrive_metadata']['business_cards']
else:
secondary_folder = user_data['gDrive_metadata']['business_cards']
business_cards_data = convert_dicts_to_list(
data['busi_cards'],
business_card_sheet_headers,
add_link=True,
image_id=image_id,
link_text="๋ช…ํ•จ๋ณด๊ธฐ",
link_position=10
)
await update_user_sheet(user_data['gDrive_metadata']['spreadsheet'], "๋ช…ํ•จ", business_cards_data)
print(f"{found_receipt_no}, {found_business_cards_no}")
status = await update_fb_transaction(
transaction_ref,
no_receipts=found_receipt_no,
no_business_cards=found_business_cards_no
)
if status['status'] == "Success":
if new_folder:
await move_file_to_folder(image_id, parent_folders, [new_folder])
if secondary_folder:
await create_shortcut(image_id, secondary_folder, f"Shortcut to {image_id}")
return {"success": True}
else:
logging.error(f"Transaction update failed with status: {status['status']}")
return {"error": "An error has occurred while updating the transaction"}
except Exception as e:
logging.error(f"An error occurred: {e}")
# Move the image file to the error folder
await move_file_to_folder(image_id, parent_folders, [user_data['gDrive_metadata']['error']])
return {"error": str(e)}
@app.post("/process-image")
async def process_photo(background_tasks: BackgroundTasks, image_id: str = Form(...), user_id: str = Form(...)):
"""
Endpoint for processing uploaded images. Handles receipt and business card detection.
Args:
background_tasks: FastAPI background tasks handler
image_id (str): The ID of the uploaded image in Google Drive
user_id (str): The ID of the user making the request
Returns:
dict: Processing status and any errors
"""
transaction_ref = transactions_collection_ref.document(user_id)
transaction = transaction_ref.get()
if not transaction.exists:
transaction_ref.set({
"no_sms": 0,
"no_contacts": 0,
"no_receipts": 0,
"no_business_cards": 0,
"purchased_credit": {
"image_detection": 100,
"sync_data": 500,
}
})
transaction = transaction_ref.get()
transaction_data = transaction.to_dict()
if transaction_data['no_receipts'] + transaction_data['no_business_cards'] >= transaction_data['purchased_credit']['image_detection']:
return {"error": "You have reached the limit of the number of items you can process. Please upgrade your plan."}
user_ref = users_collection_ref.document(user_id)
user = user_ref.get()
user_data = user.to_dict()
background_tasks.add_task(update_user_sheets_and_folders, user_data, transaction_ref, transaction_data, image_id)
return {"success": True}
@app.post("/sync-data")
async def process_photo(data_json: str = Form(...), user_id: str = Form(...)):
"""
Endpoint for synchronizing contacts and SMS data.
Handles creation of new SMS sheets and updates existing ones.
Args:
data_json (str): JSON string containing sync data
user_id (str): ID of the user making the request
Returns:
dict: Sync operation status
"""
# print(data_json)
data: Dict[str, Any] = json.loads(data_json)
transaction_ref = transactions_collection_ref.document(user_id)
transaction = transaction_ref.get()
if not transaction.exists:
transaction_ref.set({
"no_sms": 0,
"no_contacts": 0,
"no_receipts": 0,
"no_business_cards": 0,
"purchased_credit": {
"image_detection": 100,
"sync_data": 500,
}
})
transaction = transaction_ref.get()
transaction_data = transaction.to_dict()
if transaction_data['no_contacts'] + transaction_data['no_sms'] >= transaction_data['purchased_credit']['sync_data']:
return {"error": "You have reached the limit of the number of items you can process. Please upgrade your plan."}
user_ref = users_collection_ref.document(user_id)
user = user_ref.get()
user_data = user.to_dict()
try:
if data['isContacts']:
await update_user_sheet(user_data['gDrive_metadata']['spreadsheet'], "์—ฐ๋ฝ์ฒ˜", data['data'], True)
transaction_data['no_contacts'] += len(data['data'])
transaction_ref.update({'no_contacts': transaction_data['no_contacts']})
else:
if 'sms_ids' not in user_data:
print('No sms_ids')
user_data['sms_ids'] = {}
existing_phones = set(user_data.get('sms_ids', {}).keys())
new_phones = [phone['address'] for phone in data['data'] if phone['address'] not in existing_phones]
print(f"Existing phones: {existing_phones}")
for phone_address in new_phones:
new_sheet_id = await copy_sms_sheet(phone_address, user_data['gDrive_metadata']['spreadsheet'])
print(f"New sheet id: {new_sheet_id}")
user_data['sms_ids'][phone_address] = {'sheet_id': new_sheet_id, 'last_updated': "1970-01-01T00:00:00.000"}
print(user_data['sms_ids'])
updated_sms_count = 0
for phone in data['data']:
last_updated = user_data['sms_ids'][phone['address']]['last_updated']
print(f"Last updated: {last_updated}")
# Convert last_updated to datetime for comparison
last_updated_dt = datetime.fromisoformat(last_updated)
update_message = [sms for sms in phone['sms'] if datetime.fromisoformat(sms[0]) > last_updated_dt]
print(f"Update message: {update_message}")
if not update_message:
continue
sheet_id = user_data['sms_ids'][phone['address']]['sheet_id']
print(f"Sheet ID: {sheet_id}")
await update_user_sheet(user_data['gDrive_metadata']['spreadsheet'], sheet_id, 'SMS ' + phone['address'], update_message, False)
user_data['sms_ids'][phone['address']]['last_updated'] = phone['sms'][0][0]
updated_sms_count += len(update_message)
user_ref.update({'sms_ids': user_data['sms_ids']})
transaction_data['no_sms'] += updated_sms_count
transaction_ref.update({'no_sms': transaction_data['no_sms']})
return {"success": True}
except Exception as e:
logging.error(f"An error occurred: {e}")
return {"error": str(e)}