shaliz-kong commited on
Commit ·
883efb8
1
Parent(s): 6fd7659
analytics worker refactor
Browse files
app/tasks/analytics_worker.py
CHANGED
|
@@ -202,15 +202,71 @@ class AnalyticsWorker:
|
|
| 202 |
|
| 203 |
# app/tasks/analytics_worker.py - Replace _sync_load_dataframe
|
| 204 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 205 |
def _sync_load_dataframe(self, entity_type: str) -> pd.DataFrame:
|
| 206 |
"""
|
| 207 |
-
|
|
|
|
| 208 |
"""
|
| 209 |
try:
|
| 210 |
conn = get_conn(self.org_id)
|
| 211 |
table_name = f"main.{entity_type}_canonical"
|
| 212 |
|
| 213 |
-
# Verify table exists
|
| 214 |
table_exists = conn.execute(
|
| 215 |
"SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'main' AND table_name = ?",
|
| 216 |
[entity_type + "_canonical"]
|
|
@@ -243,7 +299,7 @@ class AnalyticsWorker:
|
|
| 243 |
except Exception as e:
|
| 244 |
logger.error(f"[LOAD] Fatal error: {e}")
|
| 245 |
return pd.DataFrame()
|
| 246 |
-
|
| 247 |
async def _load_entity_from_redis(self) -> dict:
|
| 248 |
"""Instantly load entity/industry from Redis (source of truth)"""
|
| 249 |
try:
|
|
|
|
| 202 |
|
| 203 |
# app/tasks/analytics_worker.py - Replace _sync_load_dataframe
|
| 204 |
|
| 205 |
+
# def _sync_load_dataframe(self, entity_type: str) -> pd.DataFrame:
|
| 206 |
+
# """
|
| 207 |
+
# Load data with entity context (receives entity_type from STEP 2)
|
| 208 |
+
# """
|
| 209 |
+
# try:
|
| 210 |
+
# conn = get_conn(self.org_id)
|
| 211 |
+
# table_name = f"main.{entity_type}_canonical"
|
| 212 |
+
|
| 213 |
+
# # Verify table exists first
|
| 214 |
+
# table_exists = conn.execute(
|
| 215 |
+
# "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'main' AND table_name = ?",
|
| 216 |
+
# [entity_type + "_canonical"]
|
| 217 |
+
# ).fetchone()[0] > 0
|
| 218 |
+
|
| 219 |
+
# if not table_exists:
|
| 220 |
+
# logger.error(f"[LOAD] Table {table_name} does not exist")
|
| 221 |
+
# return pd.DataFrame()
|
| 222 |
+
|
| 223 |
+
# # Load with time window
|
| 224 |
+
# cutoff = datetime.now() - timedelta(hours=self.hours_window)
|
| 225 |
+
# df = conn.execute(
|
| 226 |
+
# f"SELECT * FROM {table_name} WHERE timestamp >= ? ORDER BY timestamp DESC LIMIT 10000",
|
| 227 |
+
# [cutoff]
|
| 228 |
+
# ).df()
|
| 229 |
+
|
| 230 |
+
# if not df.empty:
|
| 231 |
+
# logger.info(f"[LOAD] Success: {len(df)} rows × {len(df.columns)} cols (time-filtered)")
|
| 232 |
+
# return df
|
| 233 |
+
|
| 234 |
+
# # Fallback to recent data
|
| 235 |
+
# logger.warning(f"[LOAD] No data in {self.hours_window}h window, returning recent rows")
|
| 236 |
+
# df = conn.execute(f"SELECT * FROM {table_name} ORDER BY timestamp DESC LIMIT 1000").df()
|
| 237 |
+
|
| 238 |
+
# if df.empty:
|
| 239 |
+
# logger.error(f"[LOAD] Table exists but contains no rows")
|
| 240 |
+
|
| 241 |
+
# return df
|
| 242 |
+
|
| 243 |
+
# except Exception as e:
|
| 244 |
+
# logger.error(f"[LOAD] Fatal error: {e}")
|
| 245 |
+
# return pd.DataFrame()
|
| 246 |
+
|
| 247 |
+
# # app/tasks/analytics_worker.py - Add these inside AnalyticsWorker class
|
| 248 |
+
|
| 249 |
+
async def _load_dataframe(self) -> pd.DataFrame:
|
| 250 |
+
"""
|
| 251 |
+
Load data asynchronously (non-blocking)
|
| 252 |
+
Requires: self._entity_type must be set from Redis first
|
| 253 |
+
"""
|
| 254 |
+
if not hasattr(self, '_entity_type') or not self._entity_type:
|
| 255 |
+
raise ValueError("entity_type must be loaded from Redis first")
|
| 256 |
+
|
| 257 |
+
# Run sync DB operation in thread pool
|
| 258 |
+
return await asyncio.to_thread(self._sync_load_dataframe, self._entity_type)
|
| 259 |
+
|
| 260 |
def _sync_load_dataframe(self, entity_type: str) -> pd.DataFrame:
|
| 261 |
"""
|
| 262 |
+
Synchronous data loader (runs in thread pool)
|
| 263 |
+
Receives entity_type from STEP 2 (_load_entity_from_redis)
|
| 264 |
"""
|
| 265 |
try:
|
| 266 |
conn = get_conn(self.org_id)
|
| 267 |
table_name = f"main.{entity_type}_canonical"
|
| 268 |
|
| 269 |
+
# Verify table exists
|
| 270 |
table_exists = conn.execute(
|
| 271 |
"SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'main' AND table_name = ?",
|
| 272 |
[entity_type + "_canonical"]
|
|
|
|
| 299 |
except Exception as e:
|
| 300 |
logger.error(f"[LOAD] Fatal error: {e}")
|
| 301 |
return pd.DataFrame()
|
| 302 |
+
|
| 303 |
async def _load_entity_from_redis(self) -> dict:
|
| 304 |
"""Instantly load entity/industry from Redis (source of truth)"""
|
| 305 |
try:
|