Spaces:
Sleeping
Sleeping
add extra columns for feedback functionality
Browse files
src/reporting/snowflake_connector.py
CHANGED
|
@@ -8,8 +8,11 @@ import os
|
|
| 8 |
import json
|
| 9 |
import logging
|
| 10 |
from typing import Dict, Any, Optional
|
|
|
|
|
|
|
| 11 |
from src.reporting.feedback_schema import UserFeedback
|
| 12 |
|
|
|
|
| 13 |
# Try to import snowflake connector
|
| 14 |
try:
|
| 15 |
import snowflake.connector
|
|
@@ -79,12 +82,16 @@ class SnowflakeFeedbackConnector:
|
|
| 79 |
self._connection.close()
|
| 80 |
print("β
Disconnected from Snowflake")
|
| 81 |
|
| 82 |
-
def insert_feedback(self, feedback: UserFeedback) -> bool:
|
| 83 |
"""Insert a single feedback record into Snowflake"""
|
| 84 |
logger.info("=" * 80)
|
| 85 |
logger.info("π SNOWFLAKE INSERT: Starting feedback insertion process")
|
| 86 |
logger.info(f"π Feedback ID: {feedback.feedback_id}")
|
| 87 |
|
|
|
|
|
|
|
|
|
|
|
|
|
| 88 |
if not self._connection:
|
| 89 |
logger.error("β Not connected to Snowflake. Call connect() first.")
|
| 90 |
raise RuntimeError("Not connected to Snowflake. Call connect() first.")
|
|
@@ -131,38 +138,53 @@ class SnowflakeFeedbackConnector:
|
|
| 131 |
logger.error(f"β Could not set context: {e}")
|
| 132 |
raise
|
| 133 |
|
| 134 |
-
# Prepare data
|
| 135 |
-
logger.info("π§ DATA PREPARATION: Preparing
|
| 136 |
-
|
| 137 |
|
| 138 |
-
|
| 139 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 140 |
|
| 141 |
-
#
|
| 142 |
-
|
| 143 |
-
|
| 144 |
-
|
| 145 |
-
|
| 146 |
-
|
| 147 |
else:
|
| 148 |
-
|
| 149 |
-
logger.info(" -
|
| 150 |
-
retrieved_data = retrieved_data_raw
|
| 151 |
|
| 152 |
-
|
| 153 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 154 |
|
| 155 |
-
#
|
| 156 |
-
|
| 157 |
-
|
| 158 |
-
|
| 159 |
-
|
|
|
|
| 160 |
else:
|
| 161 |
-
logger.info(f" - Retrieved data is None, using NULL")
|
| 162 |
retrieved_data_for_db = None
|
|
|
|
| 163 |
|
| 164 |
-
# Build SQL with
|
| 165 |
-
|
|
|
|
| 166 |
feedback_id,
|
| 167 |
open_ended_feedback,
|
| 168 |
score,
|
|
@@ -172,23 +194,25 @@ class SnowflakeFeedbackConnector:
|
|
| 172 |
message_count,
|
| 173 |
has_retrievals,
|
| 174 |
retrieval_count,
|
| 175 |
-
|
| 176 |
-
|
| 177 |
-
|
| 178 |
-
retrieved_data
|
|
|
|
| 179 |
) VALUES (
|
| 180 |
%(feedback_id)s, %(open_ended_feedback)s, %(score)s, %(is_feedback_about_last_retrieval)s,
|
| 181 |
%(conversation_id)s, %(timestamp)s, %(message_count)s, %(has_retrievals)s,
|
| 182 |
-
%(retrieval_count)s, %(
|
| 183 |
-
%(retrieved_data)s
|
| 184 |
)"""
|
| 185 |
|
| 186 |
logger.info("π SQL PREPARATION: Building INSERT statement...")
|
| 187 |
-
logger.info(f" - Target table:
|
| 188 |
logger.info(f" - Database: {self.database}")
|
| 189 |
logger.info(f" - Schema: {self.schema}")
|
| 190 |
|
| 191 |
# Prepare parameters
|
|
|
|
| 192 |
params = {
|
| 193 |
'feedback_id': feedback.feedback_id,
|
| 194 |
'open_ended_feedback': feedback.open_ended_feedback,
|
|
@@ -199,10 +223,11 @@ class SnowflakeFeedbackConnector:
|
|
| 199 |
'message_count': feedback.message_count,
|
| 200 |
'has_retrievals': feedback.has_retrievals,
|
| 201 |
'retrieval_count': feedback.retrieval_count,
|
| 202 |
-
'
|
| 203 |
-
'
|
| 204 |
-
'
|
| 205 |
-
'retrieved_data': retrieved_data_for_db
|
|
|
|
| 206 |
}
|
| 207 |
|
| 208 |
# Execute insert
|
|
@@ -265,12 +290,16 @@ def get_snowflake_connector_from_env() -> Optional[SnowflakeFeedbackConnector]:
|
|
| 265 |
)
|
| 266 |
|
| 267 |
|
| 268 |
-
def save_to_snowflake(feedback: UserFeedback) -> bool:
|
| 269 |
"""Helper function to save feedback to Snowflake"""
|
| 270 |
logger.info("=" * 80)
|
| 271 |
logger.info("π΅ SNOWFLAKE SAVE: Starting save process")
|
| 272 |
logger.info(f"π Feedback ID: {feedback.feedback_id}")
|
| 273 |
|
|
|
|
|
|
|
|
|
|
|
|
|
| 274 |
connector = get_snowflake_connector_from_env()
|
| 275 |
|
| 276 |
if not connector:
|
|
@@ -285,7 +314,7 @@ def save_to_snowflake(feedback: UserFeedback) -> bool:
|
|
| 285 |
logger.info("β
SNOWFLAKE SAVE: Connection established")
|
| 286 |
|
| 287 |
logger.info("π₯ SNOWFLAKE SAVE: Attempting to insert feedback...")
|
| 288 |
-
success = connector.insert_feedback(feedback)
|
| 289 |
|
| 290 |
logger.info("π SNOWFLAKE SAVE: Disconnecting...")
|
| 291 |
connector.disconnect()
|
|
@@ -302,4 +331,3 @@ def save_to_snowflake(feedback: UserFeedback) -> bool:
|
|
| 302 |
logger.error(f" - Error: {e}")
|
| 303 |
logger.info("=" * 80)
|
| 304 |
return False
|
| 305 |
-
|
|
|
|
| 8 |
import json
|
| 9 |
import logging
|
| 10 |
from typing import Dict, Any, Optional
|
| 11 |
+
|
| 12 |
+
|
| 13 |
from src.reporting.feedback_schema import UserFeedback
|
| 14 |
|
| 15 |
+
|
| 16 |
# Try to import snowflake connector
|
| 17 |
try:
|
| 18 |
import snowflake.connector
|
|
|
|
| 82 |
self._connection.close()
|
| 83 |
print("β
Disconnected from Snowflake")
|
| 84 |
|
| 85 |
+
def insert_feedback(self, feedback: UserFeedback, table_name: Optional[str] = None) -> bool:
|
| 86 |
"""Insert a single feedback record into Snowflake"""
|
| 87 |
logger.info("=" * 80)
|
| 88 |
logger.info("π SNOWFLAKE INSERT: Starting feedback insertion process")
|
| 89 |
logger.info(f"π Feedback ID: {feedback.feedback_id}")
|
| 90 |
|
| 91 |
+
# Get table name from parameter, env var, or default
|
| 92 |
+
if table_name is None:
|
| 93 |
+
table_name = os.getenv("SNOWFLAKE_FEEDBACK_TABLE", "USER_FEEDBACK_V3")
|
| 94 |
+
|
| 95 |
if not self._connection:
|
| 96 |
logger.error("β Not connected to Snowflake. Call connect() first.")
|
| 97 |
raise RuntimeError("Not connected to Snowflake. Call connect() first.")
|
|
|
|
| 138 |
logger.error(f"β Could not set context: {e}")
|
| 139 |
raise
|
| 140 |
|
| 141 |
+
# Prepare data - convert to JSON strings for VARIANT columns (same approach as old retrieved_data)
|
| 142 |
+
logger.info("π§ DATA PREPARATION: Preparing VARIANT columns...")
|
| 143 |
+
feedback_dict = feedback.to_dict()
|
| 144 |
|
| 145 |
+
# Prepare transcript (ARRAY) - convert to JSON string
|
| 146 |
+
transcript_raw = feedback_dict.get('transcript', [])
|
| 147 |
+
if transcript_raw:
|
| 148 |
+
# Convert to JSON string (same approach as old retrieved_data)
|
| 149 |
+
transcript_for_db = json.dumps(transcript_raw)
|
| 150 |
+
logger.info(f" - Transcript: {len(transcript_raw)} messages, JSON length: {len(transcript_for_db)}")
|
| 151 |
+
else:
|
| 152 |
+
transcript_for_db = None
|
| 153 |
+
logger.info(" - Transcript: None")
|
| 154 |
|
| 155 |
+
# Prepare retrievals (ARRAY) - convert to JSON string
|
| 156 |
+
retrievals_raw = feedback_dict.get('retrievals', [])
|
| 157 |
+
if retrievals_raw:
|
| 158 |
+
# Convert to JSON string (same approach as old retrieved_data)
|
| 159 |
+
retrievals_for_db = json.dumps(retrievals_raw)
|
| 160 |
+
logger.info(f" - Retrievals: {len(retrievals_raw)} entries, JSON length: {len(retrievals_for_db)}")
|
| 161 |
else:
|
| 162 |
+
retrievals_for_db = None
|
| 163 |
+
logger.info(" - Retrievals: None")
|
|
|
|
| 164 |
|
| 165 |
+
# Prepare feedback_score_related_retrieval_docs (OBJECT) - convert to JSON string
|
| 166 |
+
feedback_score_related_raw = feedback_dict.get('feedback_score_related_retrieval_docs')
|
| 167 |
+
if feedback_score_related_raw:
|
| 168 |
+
# Convert to JSON string (same approach as old retrieved_data)
|
| 169 |
+
feedback_score_related_for_db = json.dumps(feedback_score_related_raw)
|
| 170 |
+
logger.info(f" - Feedback score related docs: present, JSON length: {len(feedback_score_related_for_db)}")
|
| 171 |
+
else:
|
| 172 |
+
feedback_score_related_for_db = None
|
| 173 |
+
logger.info(" - Feedback score related docs: None")
|
| 174 |
|
| 175 |
+
# Prepare retrieved_data (preserved old column) - convert to JSON string
|
| 176 |
+
retrieved_data_raw = feedback_dict.get('retrieved_data')
|
| 177 |
+
if retrieved_data_raw:
|
| 178 |
+
# Convert to JSON string (same approach as old retrieved_data)
|
| 179 |
+
retrieved_data_for_db = json.dumps(retrieved_data_raw)
|
| 180 |
+
logger.info(f" - Retrieved data (preserved): present, JSON length: {len(retrieved_data_for_db)}")
|
| 181 |
else:
|
|
|
|
| 182 |
retrieved_data_for_db = None
|
| 183 |
+
logger.info(" - Retrieved data (preserved): None")
|
| 184 |
|
| 185 |
+
# Build SQL with new column structure
|
| 186 |
+
# Columns are VARCHAR (storing JSON strings), same approach as old retrieved_data
|
| 187 |
+
sql = f"""INSERT INTO {table_name} (
|
| 188 |
feedback_id,
|
| 189 |
open_ended_feedback,
|
| 190 |
score,
|
|
|
|
| 194 |
message_count,
|
| 195 |
has_retrievals,
|
| 196 |
retrieval_count,
|
| 197 |
+
transcript,
|
| 198 |
+
retrievals,
|
| 199 |
+
feedback_score_related_retrieval_docs,
|
| 200 |
+
retrieved_data,
|
| 201 |
+
created_at
|
| 202 |
) VALUES (
|
| 203 |
%(feedback_id)s, %(open_ended_feedback)s, %(score)s, %(is_feedback_about_last_retrieval)s,
|
| 204 |
%(conversation_id)s, %(timestamp)s, %(message_count)s, %(has_retrievals)s,
|
| 205 |
+
%(retrieval_count)s, %(transcript)s, %(retrievals)s, %(feedback_score_related_retrieval_docs)s,
|
| 206 |
+
%(retrieved_data)s, %(created_at)s
|
| 207 |
)"""
|
| 208 |
|
| 209 |
logger.info("π SQL PREPARATION: Building INSERT statement...")
|
| 210 |
+
logger.info(f" - Target table: {table_name}")
|
| 211 |
logger.info(f" - Database: {self.database}")
|
| 212 |
logger.info(f" - Schema: {self.schema}")
|
| 213 |
|
| 214 |
# Prepare parameters
|
| 215 |
+
# Pass JSON strings for VARIANT columns (same approach as old retrieved_data)
|
| 216 |
params = {
|
| 217 |
'feedback_id': feedback.feedback_id,
|
| 218 |
'open_ended_feedback': feedback.open_ended_feedback,
|
|
|
|
| 223 |
'message_count': feedback.message_count,
|
| 224 |
'has_retrievals': feedback.has_retrievals,
|
| 225 |
'retrieval_count': feedback.retrieval_count,
|
| 226 |
+
'transcript': transcript_for_db, # JSON string
|
| 227 |
+
'retrievals': retrievals_for_db, # JSON string
|
| 228 |
+
'feedback_score_related_retrieval_docs': feedback_score_related_for_db, # JSON string
|
| 229 |
+
'retrieved_data': retrieved_data_for_db, # JSON string - preserved old column
|
| 230 |
+
'created_at': feedback.created_at
|
| 231 |
}
|
| 232 |
|
| 233 |
# Execute insert
|
|
|
|
| 290 |
)
|
| 291 |
|
| 292 |
|
| 293 |
+
def save_to_snowflake(feedback: UserFeedback, table_name: Optional[str] = None) -> bool:
|
| 294 |
"""Helper function to save feedback to Snowflake"""
|
| 295 |
logger.info("=" * 80)
|
| 296 |
logger.info("π΅ SNOWFLAKE SAVE: Starting save process")
|
| 297 |
logger.info(f"π Feedback ID: {feedback.feedback_id}")
|
| 298 |
|
| 299 |
+
# Get table name from parameter or env var
|
| 300 |
+
if table_name is None:
|
| 301 |
+
table_name = os.getenv("SNOWFLAKE_FEEDBACK_TABLE", "USER_FEEDBACK_V3")
|
| 302 |
+
|
| 303 |
connector = get_snowflake_connector_from_env()
|
| 304 |
|
| 305 |
if not connector:
|
|
|
|
| 314 |
logger.info("β
SNOWFLAKE SAVE: Connection established")
|
| 315 |
|
| 316 |
logger.info("π₯ SNOWFLAKE SAVE: Attempting to insert feedback...")
|
| 317 |
+
success = connector.insert_feedback(feedback, table_name=table_name)
|
| 318 |
|
| 319 |
logger.info("π SNOWFLAKE SAVE: Disconnecting...")
|
| 320 |
connector.disconnect()
|
|
|
|
| 331 |
logger.error(f" - Error: {e}")
|
| 332 |
logger.info("=" * 80)
|
| 333 |
return False
|
|
|