RevOps / server /environment.py
Jyo-K's picture
partial
38336e8
import sys
import os
from typing import Any, Dict, Tuple, Optional
from uuid import uuid4
# Make sure models module can be imported
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import State
from models import RevOpsObservation, RevOpsAction, EnrichmentData
from server.data_generator import get_task_data, get_reps, get_icp_criteria
from server.graders import grader_easy, grader_medium, grader_hard
global_last_grader_score = None
class RevOpsEnvironment(Environment):
SUPPORTS_CONCURRENT_SESSIONS: bool = True
def __init__(self, **kwargs):
self._state = State(episode_id=str(uuid4()), step_count=0)
self.task_id = "task_easy" # DEFAULT
self.state_data = {}
def reset(self, seed: Optional[int] = None, episode_id: Optional[str] = None, **kwargs) -> RevOpsObservation:
self._state = State(episode_id=episode_id or str(uuid4()), step_count=0)
task_id = kwargs.get("task_id")
if not task_id and episode_id in ["task_easy", "task_medium", "task_hard"]:
task_id = episode_id
if task_id:
self.task_id = task_id
task_data = get_task_data(self.task_id)
self.state_data = {
"leads": task_data["leads"],
"current_lead_index": 0,
"enrichment_map": task_data["enrichment_map"],
"crm": task_data["crm"],
"reps": get_reps(),
"icp": get_icp_criteria(),
"action_history": [],
"accumulated_reward": 0.0,
"last_feedback": f"New episode started for {self.task_id}.",
"lead_states": {lead.id: {"enriched": False, "scored": False, "crm_checked": False, "merged": False, "flagged": False} for lead in task_data["leads"]},
"grader_score": None
}
obs = self._get_observation()
obs.done = False
obs.reward = 0.0
return obs
def _get_observation(self) -> RevOpsObservation:
idx = self.state_data.get("current_lead_index", 0)
leads = self.state_data.get("leads", [])
if not leads:
return RevOpsObservation()
if idx < len(leads):
lead = leads[idx]
lead_state = self.state_data["lead_states"][lead.id]
enrichment = self.state_data["enrichment_map"][lead.id] if lead_state["enriched"] else EnrichmentData(enriched=False)
crm = self.state_data["crm"] if lead_state["crm_checked"] else self.state_data["crm"].model_copy(update={"existing_accounts": [], "opportunities": []})
else:
lead = leads[-1]
enrichment = EnrichmentData(enriched=False)
crm = self.state_data["crm"]
return RevOpsObservation(
task_id=self.task_id,
lead=lead,
enrichment=enrichment,
crm=crm,
reps=self.state_data.get("reps", []),
icp_criteria=self.state_data.get("icp", ""),
sla_time_remaining_minutes=60,
last_action_feedback=self.state_data.get("last_feedback", "")
)
def step(self, action: RevOpsAction, **kwargs) -> RevOpsObservation: # type: ignore[override]
if "action_history" not in self.state_data:
self.reset(task_id=self.task_id)
self._state.step_count += 1
self.state_data["action_history"].append(action)
idx = self.state_data["current_lead_index"]
is_done = False
reward = 0.0
feedback = ""
if idx >= len(self.state_data["leads"]):
obs = self._get_observation()
obs.done = True
obs.reward = 0.0
obs.metadata = {"message": "Episode is already finished."}
return obs
current_lead = self.state_data["leads"][idx]
lead_state = self.state_data["lead_states"][current_lead.id]
if action.action_type == "enrich_lead":
if not lead_state["enriched"]:
lead_state["enriched"] = True
reward += 0.1
feedback = "Lead enriched successfully."
else:
feedback = "Lead already enriched."
elif action.action_type == "check_crm":
if not lead_state["crm_checked"]:
lead_state["crm_checked"] = True
reward += 0.1
feedback = "CRM checked successfully."
else:
feedback = "CRM already checked."
elif action.action_type == "update_lead_score":
if self.task_id == "task_medium" and not lead_state["enriched"]:
reward -= 0.2
feedback = "Violation: Scored before enrichment."
else:
current_lead.score = action.score
lead_state["scored"] = True
reward += 0.1
feedback = f"Lead score updated to {action.score}."
elif action.action_type == "merge_with_account":
if not lead_state["crm_checked"] and self.task_id == "task_hard" and current_lead.id == "lead_3_cfo":
reward -= 0.5
feedback = "Violation: Merged without checking CRM first."
else:
lead_state["merged"] = True
reward += 0.1
feedback = f"Merged with account {action.account_id}."
elif action.action_type == "flag_reengagement":
lead_state["flagged"] = True
reward += 0.1
feedback = f"Flagged as re-engagement for opportunity {action.opportunity_id}."
elif action.action_type == "route_to_rep":
if self.task_id == "task_hard" and current_lead.id == "lead_3_cfo" and not lead_state["crm_checked"]:
reward -= 0.4
feedback = "Fatal violation: Routed without checking CRM."
else:
reward += 0.1
feedback = f"Lead routed to rep {action.rep_id}. Moving to next lead."
self.state_data["current_lead_index"] += 1
elif action.action_type == "disqualify":
reward += 0.1
feedback = f"Lead disqualified: {action.disqualification_reason}. Moving to next lead."
self.state_data["current_lead_index"] += 1
else:
feedback = f"Action {action.action_type.value} performed."
self.state_data["accumulated_reward"] += reward
self.state_data["last_feedback"] = feedback
if self.state_data["current_lead_index"] >= len(self.state_data["leads"]):
is_done = True
obs = self._get_observation()
info = {"message": feedback}
if is_done:
if self.task_id == "task_easy":
score = grader_easy(self.state_data["action_history"], self.state_data["leads"])
elif self.task_id == "task_medium":
score = grader_medium(self.state_data["action_history"], self.state_data["leads"])
elif self.task_id == "task_hard":
score = grader_hard(self.state_data["action_history"], self.state_data["leads"])
else:
score = 0.0
info["grader_score"] = score
self.state_data["grader_score"] = score
global global_last_grader_score
global_last_grader_score = score
if score >= 0.7:
reward += 0.5
else:
reward -= 0.5
obs.done = is_done
obs.reward = reward
obs.metadata = info
return obs
@property
def state(self) -> State:
return self._state
def get_full_state(self):
return self.state_data