data_clean_env / inference.py
anugrah55's picture
Upload folder using huggingface_hub
7b49766 verified
import asyncio
import os
import textwrap
from typing import List, Optional
import json
from openai import OpenAI
from client import get_client
from models import DataCleanAction
API_BASE_URL = os.getenv("API_BASE_URL", "https://api.openai.com/v1")
MODEL_NAME = os.getenv("MODEL_NAME", "gpt-4.1-mini")
HF_TOKEN = os.getenv("HF_TOKEN")
BENCHMARK = "data_clean_env"
MAX_STEPS = 10
TEMPERATURE = 0.0
SYSTEM_PROMPT = textwrap.dedent(
"""
You are an AI agent tasked with cleaning a pandas DataFrame.
You will be given the current DataFrame schema, missing values count per column, and the first 5 rows.
You must output a JSON string representing exactly one action to take.
Allowed actions:
{"action_type": "fill_na", "column_name": "col", "value": "0"}
{"action_type": "drop_na", "column_name": "col"}
{"action_type": "drop_column", "column_name": "col"}
{"action_type": "rename_column", "column_name": "old_col", "value": "new_col"}
{"action_type": "change_type", "column_name": "col", "value": "int"} (value can be int, float, or str)
{"action_type": "submit"}
Your goal:
- easy_clean: Fill missing values in 'age' with '0'.
- medium_clean: Drop rows with missing values in 'name' and 'age'. Drop column 'ignore_me'.
- hard_clean: Rename 'EmployeeID' to 'emp_id'. Drop 'Dept' column. Make 'Salary' valid (fill NaN with '0' and convert to float/int). Fill NaN in 'JoinDate' with '2000-01-01'.
When you are done cleaning according to the goal, output {"action_type": "submit"}.
Reply ONLY with valid JSON.
"""
).strip()
def log_start(task: str, env: str, model: str) -> None:
print(f"[START] task={task} env={env} model={model}", flush=True)
def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None:
error_val = error if error else "null"
done_val = str(done).lower()
print(
f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}",
flush=True,
)
def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None:
score = max(0.01, min(0.99, float(score)))
rewards_str = ",".join(f"{r:.2f}" for r in rewards)
print(f"[END] success={str(success).lower()} steps={steps} score={score:.2f} rewards={rewards_str}", flush=True)
def get_model_action(client: OpenAI, obs_dict: dict) -> dict:
user_prompt = f"Observation:\n{json.dumps(obs_dict, indent=2)}\nWhat is your next action?"
try:
completion = client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_prompt},
],
temperature=TEMPERATURE,
stream=False,
)
text = completion.choices[0].message.content.strip()
if text.startswith("```json"):
text = text[7:]
if text.endswith("```"):
text = text[:-3]
return json.loads(text.strip())
except Exception as exc:
print(f"[DEBUG] Model request failed: {exc}", flush=True)
return {"action_type": "fill_na", "column_name": "invalid", "value": "invalid"}
async def run_task(task_name: str, client: OpenAI, env_client) -> None:
log_start(task=task_name, env=BENCHMARK, model=MODEL_NAME)
rewards = []
steps_taken = 0
score = 0.0
success = False
try:
result = await env_client.reset(task=task_name)
for step in range(1, MAX_STEPS + 1):
if result.done:
break
obs = result.observation
obs_dict = {
"schema": obs.df_schema,
"missing": obs.missing_values,
"head": obs.head,
"feedback": obs.feedback,
"error": obs.last_error
}
action_dict = get_model_action(client, obs_dict)
action_str = json.dumps(action_dict)
action = DataCleanAction(**action_dict)
result = await env_client.step(action)
reward = result.reward or 0.0
done = result.done
error = result.observation.last_error
rewards.append(reward)
steps_taken = step
if action.action_type == "submit":
score = reward # grader sets final reward to score
log_step(step=step, action=action_str, reward=reward, done=done, error=error)
if done:
break
success = score >= 0.5
except Exception as e:
print(f"[DEBUG] Error running task {task_name}: {e}", flush=True)
finally:
log_end(success=success, steps=steps_taken, score=score, rewards=rewards)
async def main() -> None:
task_name_env = os.getenv("DATA_CLEAN_ENV_TASK")
tasks_to_run = [task_name_env] if task_name_env else ["easy_clean", "medium_clean", "hard_clean"]
if HF_TOKEN is None:
print("[DEBUG] HF_TOKEN environment variable is required", flush=True)
for task in tasks_to_run:
log_start(task=task, env=BENCHMARK, model=MODEL_NAME)
log_end(success=False, steps=0, score=0.0, rewards=[])
return
client = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN)
image_name = os.getenv("LOCAL_IMAGE_NAME") or os.getenv("IMAGE_NAME")
try:
env_client = await get_client(image_name)
except Exception as e:
print(f"[DEBUG] Failed to start env_client: {e}", flush=True)
for task in tasks_to_run:
log_start(task=task, env=BENCHMARK, model=MODEL_NAME)
log_end(success=False, steps=0, score=0.0, rewards=[])
return
try:
for task in tasks_to_run:
await run_task(task, client, env_client)
finally:
try:
await env_client.close()
except Exception as e:
print(f"[DEBUG] env.close() error: {e}", flush=True)
if __name__ == "__main__":
asyncio.run(main())