File size: 5,383 Bytes
3f4c92b
 
 
 
 
 
6140a89
 
 
dec8c1d
ca925bb
c01589c
6140a89
3f4c92b
 
 
ce630e0
a056cb8
3f4c92b
6140a89
3f4c92b
 
6140a89
3f4c92b
 
a1fc15c
a0132ed
3f4c92b
6140a89
 
3f4c92b
 
6140a89
a1fc15c
6140a89
 
 
 
3f4c92b
6140a89
 
 
 
 
 
3f4c92b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6140a89
3f4c92b
 
 
 
a056cb8
54da3a4
3f4c92b
 
 
 
54da3a4
3f4c92b
54da3a4
 
 
3f4c92b
483353d
 
a0132ed
 
3f4c92b
49502e6
a056cb8
483353d
54da3a4
3f4c92b
634b798
a2fb9ee
 
49502e6
54da3a4
3f4c92b
 
54da3a4
3f4c92b
 
54da3a4
 
3f4c92b
 
54da3a4
 
 
 
3f4c92b
54da3a4
 
3f4c92b
54da3a4
 
 
3f4c92b
 
 
 
 
 
54da3a4
 
3f4c92b
 
54da3a4
a056cb8
3f4c92b
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# apis/Bubble_API_Calls.py
"""
This module provides functions to read data from the Bubble.io API.
It is used to fetch the LinkedIn token and all pre-processed application data.
All data writing/updating functions have been removed.
"""
import os
import json
import requests
import pandas as pd
import logging
from typing import Dict, Any, Optional, List, Tuple

from config import BUBBLE_API_KEY_PRIVATE_ENV_VAR, BUBBLE_API_ENDPOINT_ENV_VAR, BUBBLE_APP_NAME_ENV_VAR

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def fetch_linkedin_token_from_bubble(url_user_token_str: str) -> Optional[dict]:
    """
    Fetches LinkedIn access token from Bubble.io using a state value.
    The token is expected in a 'Raw_text' field as a JSON string.
    """
    bubble_api_key = os.environ.get(BUBBLE_API_KEY_PRIVATE_ENV_VAR)

    if not bubble_api_key:
        logger.error("Bubble API environment variables key not set.")
        return None

    if not url_user_token_str or "not found" in url_user_token_str or "Could not access" in url_user_token_str:
        logger.info(f"No valid user token provided to query Bubble: {url_user_token_str}")
        return None

    base_url = "https://app.ingaze.ai/version-test/api/1.1/obj/Linkedin_access"
    constraints = [{"key": "state", "constraint_type": "equals", "value": url_user_token_str}]
    params = {'constraints': json.dumps(constraints)}
    headers = {"Authorization": f"Bearer {bubble_api_key}"}

    logger.info(f"Attempting to fetch LinkedIn token from Bubble for state: {url_user_token_str}")
    try:
        response = requests.get(base_url, params=params, headers=headers, timeout=15)
        response.raise_for_status()
        data = response.json()
        results = data.get("response", {}).get("results", [])

        if not results:
            logger.warning(f"No token results found in Bubble for state: {url_user_token_str}")
            return None

        raw_text = results[0].get("Raw_text")
        if not raw_text or not isinstance(raw_text, str):
            logger.warning(f"Token 'Raw_text' field is missing or not a string. Value: {raw_text}")
            return None

        parsed_token = json.loads(raw_text)
        if isinstance(parsed_token, dict) and "access_token" in parsed_token:
            logger.info("Successfully fetched and parsed LinkedIn token from Bubble.")
            return parsed_token
        else:
            logger.error(f"Parsed token from Bubble is not a valid dictionary with an access_token. Parsed value: {parsed_token}")
            return None

    except requests.exceptions.RequestException as e:
        logger.error(f"Bubble API request error while fetching token: {e}", exc_info=True)
    except json.JSONDecodeError as e:
        logger.error(f"Error decoding JSON from Bubble token response: {e}", exc_info=True)
    except Exception as e:
        logger.error(f"An unexpected error occurred while fetching the token from Bubble: {e}", exc_info=True)

    return None


def fetch_linkedin_posts_data_from_bubble(
    data_type: str,
    constraint_key: str,
    constraint_type: str,
    constraint_value: any,
    additional_constraints: list = None
) -> Tuple[Optional[pd.DataFrame], Optional[str]]:
    """
    Fetches data from the Bubble.io Data API, handling pagination to retrieve all results.
    """
    bubble_api_key = os.environ.get(BUBBLE_API_KEY_PRIVATE_ENV_VAR)
    
    
    if not bubble_api_key:
        error_msg = "Bubble API not set."
        logger.error(error_msg)
        return None, error_msg

    base_url = f"{BUBBLE_API_ENDPOINT_ENV_VAR}/{data_type}"
    headers = {"Authorization": f"Bearer {bubble_api_key}"}

    constraints = [{"key": constraint_key, "constraint_type": constraint_type, "value": constraint_value}]
    if additional_constraints:
        constraints.extend(additional_constraints)

    all_results = []
    cursor = 0
    logger.info(f"Fetching data from Bubble type '{data_type}' where '{constraint_key}' is '{constraint_value}'...")

    while True:
        params = {'constraints': json.dumps(constraints), 'cursor': cursor, 'limit': 100}
        try:
            response = requests.get(base_url, params=params, headers=headers, timeout=30)
            response.raise_for_status()

            data = response.json().get("response", {})
            results_on_page = data.get("results", [])
            if results_on_page:
                all_results.extend(results_on_page)

            remaining = data.get("remaining", 0)
            if remaining == 0:
                break
            cursor += len(results_on_page)

        except requests.exceptions.RequestException as e:
            error_msg = f"Bubble API Error fetching '{data_type}': {e}"
            logger.error(error_msg, exc_info=True)
            return None, error_msg
        except json.JSONDecodeError as e:
            error_msg = f"JSON Decode Error fetching '{data_type}': {e}. Response text: {response.text}"
            logger.error(error_msg, exc_info=True)
            return None, error_msg

    if not all_results:
        logger.info(f"No data found in Bubble for the given constraints in data type '{data_type}'.")
        return pd.DataFrame(), None

    logger.info(f"Successfully retrieved a total of {len(all_results)} records from '{data_type}'.")
    return pd.DataFrame(all_results), None