last_edit / server /executor.py
Moharek
Deploy Moharek GEO Platform
a74b879
import os
import time
from server.celery_app import celery_app
from server.db import SessionLocal, Action
from datetime import datetime
@celery_app.task(name="server.executor.execute_content_task", bind=True, max_retries=3)
def execute_content_task(self, action_id: int):
"""Executes a content-related task (e.g. Generate Article)."""
db = SessionLocal()
try:
action = db.query(Action).filter(Action.id == action_id).first()
if not action:
return {"status": "error", "message": "Action not found"}
action.status = "executing"
db.commit()
# Phase 3.5: Record Snapshot BEFORE
from server import impact_engine
impact_engine.record_initial_metrics(action_id)
# Simulate execution (Here we would call OpenAI/Groq to write an article)
print(f"Executing content task: {action.task}")
time.sleep(3)
# Simulate Impact Tracking & Result
action.status = "done"
action.result = {
"message": "Content generated successfully",
"traffic_gain_est": "15%",
"words": 1200
}
# Phase 3.5: Update Snapshot AFTER
from server import impact_engine
impact_engine.update_action_impact(action_id)
db.commit()
return {"status": "success", "action_id": action_id}
except Exception as e:
db.rollback()
raise self.retry(exc=e, countdown=5)
finally:
db.close()
@celery_app.task(name="server.executor.execute_technical_task", bind=True, max_retries=3)
def execute_technical_task(self, action_id: int):
"""Executes SEO/technical/outreach task."""
db = SessionLocal()
try:
action = db.query(Action).filter(Action.id == action_id).first()
if not action:
return {"status": "error", "message": "Action not found"}
action.status = "executing"
db.commit()
# Simulate execution
print(f"Executing technical task: {action.task}")
time.sleep(2)
action.status = "done"
action.result = {
"message": "Task processed successfully",
"impact_est": "High"
}
# Phase 3.5: Update Impact for tech task as well
from server import impact_engine
impact_engine.update_action_impact(action_id)
# Handle Outreach Specifically
if action.type == 'outreach':
from server import outreach_engine
# Simplification: we mark it sent if it's an outreach action
# In a real system, we'd lookup the Lead by action context
print("Processing outreach campaign step...")
db.commit()
return {"status": "success"}
except Exception as e:
db.rollback()
raise self.retry(exc=e, countdown=5)
finally:
db.close()
def dispatch_action(action_id: int, action_type: str):
"""Dispatches the action to the right executor based on type."""
# Ensure celery is loaded
act_type = str(action_type).lower()
if act_type == 'content':
execute_content_task.delay(action_id)
else:
execute_technical_task.delay(action_id)
return True