CodeKnightDebjit commited on
Commit
c0db505
Β·
verified Β·
1 Parent(s): 73f852c

Upload folder using huggingface_hub

Browse files
Files changed (2) hide show
  1. README.md +747 -255
  2. inference.py +254 -430
README.md CHANGED
@@ -1,255 +1,747 @@
1
- ---
2
- title: Data Cleaning Env Environment Server
3
- emoji: 🎹
4
- colorFrom: indigo
5
- colorTo: red
6
- sdk: docker
7
- pinned: false
8
- app_port: 8000
9
- base_path: /web
10
- tags:
11
- - openenv
12
- ---
13
-
14
- # Data Cleaning Env Environment
15
-
16
- A simple test environment that echoes back messages. Perfect for testing the env APIs as well as demonstrating environment usage patterns.
17
-
18
- ## Quick Start
19
-
20
- The simplest way to use the Data Cleaning Env environment is through the `DataCleaningEnv` class:
21
-
22
- ```python
23
- from data_cleaning_env import CleanAction, DataCleaningEnv
24
-
25
- try:
26
- # Create environment from Docker image
27
- data_cleaning_envenv = DataCleaningEnv.from_docker_image("data_cleaning_env-env:latest")
28
-
29
- # Reset
30
- result = data_cleaning_envenv.reset()
31
- print(f"Reset: {result.observation.echoed_message}")
32
-
33
- # Send multiple messages
34
- messages = ["Hello, World!", "Testing echo", "Final message"]
35
-
36
- for msg in messages:
37
- result = data_cleaning_envenv.step(CleanAction(message=msg))
38
- print(f"Sent: '{msg}'")
39
- print(f" β†’ Echoed: '{result.observation.echoed_message}'")
40
- print(f" β†’ Length: {result.observation.message_length}")
41
- print(f" β†’ Reward: {result.reward}")
42
-
43
- finally:
44
- # Always clean up
45
- data_cleaning_envenv.close()
46
- ```
47
-
48
- That's it! The `DataCleaningEnv.from_docker_image()` method handles:
49
- - Starting the Docker container
50
- - Waiting for the server to be ready
51
- - Connecting to the environment
52
- - Container cleanup when you call `close()`
53
-
54
- ## Building the Docker Image
55
-
56
- Before using the environment, you need to build the Docker image:
57
-
58
- ```bash
59
- # From project root
60
- docker build -t data_cleaning_env-env:latest -f server/Dockerfile .
61
- ```
62
-
63
- ## Deploying to Hugging Face Spaces
64
-
65
- You can easily deploy your OpenEnv environment to Hugging Face Spaces using the `openenv push` command:
66
-
67
- ```bash
68
- # From the environment directory (where openenv.yaml is located)
69
- openenv push
70
-
71
- # Or specify options
72
- openenv push --namespace my-org --private
73
- ```
74
-
75
- The `openenv push` command will:
76
- 1. Validate that the directory is an OpenEnv environment (checks for `openenv.yaml`)
77
- 2. Prepare a custom build for Hugging Face Docker space (enables web interface)
78
- 3. Upload to Hugging Face (ensuring you're logged in)
79
-
80
- ### Prerequisites
81
-
82
- - Authenticate with Hugging Face: The command will prompt for login if not already authenticated
83
-
84
- ### Options
85
-
86
- - `--directory`, `-d`: Directory containing the OpenEnv environment (defaults to current directory)
87
- - `--repo-id`, `-r`: Repository ID in format 'username/repo-name' (defaults to 'username/env-name' from openenv.yaml)
88
- - `--base-image`, `-b`: Base Docker image to use (overrides Dockerfile FROM)
89
- - `--private`: Deploy the space as private (default: public)
90
-
91
- ### Examples
92
-
93
- ```bash
94
- # Push to your personal namespace (defaults to username/env-name from openenv.yaml)
95
- openenv push
96
-
97
- # Push to a specific repository
98
- openenv push --repo-id my-org/my-env
99
-
100
- # Push with a custom base image
101
- openenv push --base-image ghcr.io/meta-pytorch/openenv-base:latest
102
-
103
- # Push as a private space
104
- openenv push --private
105
-
106
- # Combine options
107
- openenv push --repo-id my-org/my-env --base-image custom-base:latest --private
108
- ```
109
-
110
- After deployment, your space will be available at:
111
- `https://huggingface.co/spaces/<repo-id>`
112
-
113
- The deployed space includes:
114
- - **Web Interface** at `/web` - Interactive UI for exploring the environment
115
- - **API Documentation** at `/docs` - Full OpenAPI/Swagger interface
116
- - **Health Check** at `/health` - Container health monitoring
117
- - **WebSocket** at `/ws` - Persistent session endpoint for low-latency interactions
118
-
119
- ## Environment Details
120
-
121
- ### Action
122
- **CleanAction**: Contains a single field
123
- - `message` (str) - The message to echo back
124
-
125
- ### Observation
126
- **CleanAction**: Contains the echo response and metadata
127
- - `echoed_message` (str) - The message echoed back
128
- - `message_length` (int) - Length of the message
129
- - `reward` (float) - Reward based on message length (length Γ— 0.1)
130
- - `done` (bool) - Always False for echo environment
131
- - `metadata` (dict) - Additional info like step count
132
-
133
- ### Reward
134
- The reward is calculated as: `message_length Γ— 0.1`
135
- - "Hi" β†’ reward: 0.2
136
- - "Hello, World!" β†’ reward: 1.3
137
- - Empty message β†’ reward: 0.0
138
-
139
- ## Advanced Usage
140
-
141
- ### Connecting to an Existing Server
142
-
143
- If you already have a Data Cleaning Env environment server running, you can connect directly:
144
-
145
- ```python
146
- from data_cleaning_env import DataCleaningEnv
147
-
148
- # Connect to existing server
149
- data_cleaning_envenv = DataCleaningEnv(base_url="<ENV_HTTP_URL_HERE>")
150
-
151
- # Use as normal
152
- result = data_cleaning_envenv.reset()
153
- result = data_cleaning_envenv.step(CleanAction(message="Hello!"))
154
- ```
155
-
156
- Note: When connecting to an existing server, `data_cleaning_envenv.close()` will NOT stop the server.
157
-
158
- ### Using the Context Manager
159
-
160
- The client supports context manager usage for automatic connection management:
161
-
162
- ```python
163
- from data_cleaning_env import CleanAction, DataCleaningEnv
164
-
165
- # Connect with context manager (auto-connects and closes)
166
- with DataCleaningEnv(base_url="http://localhost:8000") as env:
167
- result = env.reset()
168
- print(f"Reset: {result.observation.echoed_message}")
169
- # Multiple steps with low latency
170
- for msg in ["Hello", "World", "!"]:
171
- result = env.step(CleanAction(message=msg))
172
- print(f"Echoed: {result.observation.echoed_message}")
173
- ```
174
-
175
- The client uses WebSocket connections for:
176
- - **Lower latency**: No HTTP connection overhead per request
177
- - **Persistent session**: Server maintains your environment state
178
- - **Efficient for episodes**: Better for many sequential steps
179
-
180
- ### Concurrent WebSocket Sessions
181
-
182
- The server supports multiple concurrent WebSocket connections. To enable this,
183
- modify `server/app.py` to use factory mode:
184
-
185
- ```python
186
- # In server/app.py - use factory mode for concurrent sessions
187
- app = create_app(
188
- DataCleaningEnvironment, # Pass class, not instance
189
- CleanAction,
190
- CleanAction,
191
- max_concurrent_envs=4, # Allow 4 concurrent sessions
192
- )
193
- ```
194
-
195
- Then multiple clients can connect simultaneously:
196
-
197
- ```python
198
- from data_cleaning_env import CleanAction, DataCleaningEnv
199
- from concurrent.futures import ThreadPoolExecutor
200
-
201
- def run_episode(client_id: int):
202
- with DataCleaningEnv(base_url="http://localhost:8000") as env:
203
- result = env.reset()
204
- for i in range(10):
205
- result = env.step(CleanAction(message=f"Client {client_id}, step {i}"))
206
- return client_id, result.observation.message_length
207
-
208
- # Run 4 episodes concurrently
209
- with ThreadPoolExecutor(max_workers=4) as executor:
210
- results = list(executor.map(run_episode, range(4)))
211
- ```
212
-
213
- ## Development & Testing
214
-
215
- ### Direct Environment Testing
216
-
217
- Test the environment logic directly without starting the HTTP server:
218
-
219
- ```bash
220
- # From the server directory
221
- python3 server/data_cleaning_env_environment.py
222
- ```
223
-
224
- This verifies that:
225
- - Environment resets correctly
226
- - Step executes actions properly
227
- - State tracking works
228
- - Rewards are calculated correctly
229
-
230
- ### Running Locally
231
-
232
- Run the server locally for development:
233
-
234
- ```bash
235
- uvicorn server.app:app --reload
236
- ```
237
-
238
- ## Project Structure
239
-
240
- ```
241
- data_cleaning_env/
242
- β”œβ”€β”€ .dockerignore # Docker build exclusions
243
- β”œβ”€β”€ __init__.py # Module exports
244
- β”œβ”€β”€ README.md # This file
245
- β”œβ”€β”€ openenv.yaml # OpenEnv manifest
246
- β”œβ”€β”€ pyproject.toml # Project metadata and dependencies
247
- β”œβ”€β”€ uv.lock # Locked dependencies (generated)
248
- β”œβ”€β”€ client.py # DataCleaningEnv client
249
- β”œβ”€β”€ models.py # Action and Observation models
250
- └── server/
251
- β”œβ”€β”€ __init__.py # Server module exports
252
- β”œβ”€β”€ data_cleaning_env_environment.py # Core environment logic
253
- β”œβ”€β”€ app.py # FastAPI application (HTTP + WebSocket endpoints)
254
- └── Dockerfile # Container image definition
255
- ```
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ ---
2
+ title: Data Cleaning Environment
3
+ emoji: 🧹
4
+ colorFrom: blue
5
+ colorTo: purple
6
+ sdk: docker
7
+ app_port: 7860
8
+ base_path: /web
9
+ ---
10
+ <div align="center">
11
+
12
+ # 🧹 Data Cleaning Environment
13
+
14
+ ### A Reinforcement Learning Benchmark for Autonomous Data Cleaning Agents
15
+
16
+ [![Python](https://img.shields.io/badge/Python-3.12+-3776AB?style=for-the-badge&logo=python&logoColor=white)](https://www.python.org/)
17
+ [![OpenEnv](https://img.shields.io/badge/OpenEnv-Compatible-FF6B35?style=for-the-badge)](https://github.com/meta-pytorch/OpenEnv)
18
+ [![Pydantic](https://img.shields.io/badge/Pydantic-v2-E92063?style=for-the-badge&logo=pydantic&logoColor=white)](https://docs.pydantic.dev/)
19
+ [![FastAPI](https://img.shields.io/badge/FastAPI-WebSocket-009688?style=for-the-badge&logo=fastapi&logoColor=white)](https://fastapi.tiangolo.com/)
20
+ [![Docker](https://img.shields.io/badge/Docker-Ready-2496ED?style=for-the-badge&logo=docker&logoColor=white)](https://www.docker.com/)
21
+ [![HuggingFace](https://img.shields.io/badge/HuggingFace-Deployable-FFD21E?style=for-the-badge&logo=huggingface&logoColor=black)](https://huggingface.co/)
22
+ [![License](https://img.shields.io/badge/License-MIT-green?style=for-the-badge)](LICENSE)
23
+
24
+ <br/>
25
+
26
+ > **An OpenEnv-compatible reinforcement learning environment where an LLM agent receives a dirty CSV dataset and must autonomously fix type errors, outliers, missing values, and schema inconsistencies to match a hidden ground truth β€” step by step.**
27
+
28
+ <br/>
29
+
30
+ ```
31
+ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
32
+ β”‚ Dirty CSV β†’ Agent Observes β†’ Issues CleanAction β†’ Reward β”‚
33
+ β”‚ β”‚
34
+ β”‚ "N/A" β†’ FILL_MISSING(median) β†’ Score ↑ β†’ +0.12 reward β”‚
35
+ β”‚ "2099" β†’ SET_VALUE(row=3,"2024-01-15") β†’ Score ↑ β†’ +0.08 β”‚
36
+ β”‚ " bob" β†’ STANDARDIZE_COL("name") β†’ Score ↑ β†’ +0.05 β”‚
37
+ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
38
+ ```
39
+
40
+ </div>
41
+
42
+ ---
43
+
44
+ ## πŸ“‘ Table of Contents
45
+
46
+ - [Overview](#-overview)
47
+ - [Architecture](#-architecture)
48
+ - [Project Structure](#-project-structure)
49
+ - [Tasks](#-tasks)
50
+ - [Action Space](#-action-space)
51
+ - [Observation Space](#-observation-space)
52
+ - [Reward Function](#-reward-function)
53
+ - [Quick Start](#-quick-start)
54
+ - [Running Inference](#-running-inference)
55
+ - [Environment API](#-environment-api)
56
+ - [Configuration](#-configuration)
57
+ - [Deployment](#-deployment)
58
+ - [Development & Testing](#-development--testing)
59
+ - [Troubleshooting](#-troubleshooting)
60
+
61
+ ---
62
+
63
+ ## 🌟 Overview
64
+
65
+ The **Data Cleaning Environment** is a structured RL benchmark where an LLM-powered agent must clean tabular datasets. The environment wraps a FastAPI WebSocket server following the [OpenEnv](https://github.com/meta-pytorch/OpenEnv) protocol, making it compatible with any OpenEnv-based training or evaluation framework.
66
+
67
+ ### Why This Matters
68
+
69
+ Real-world data pipelines spend 60–80% of their time on data cleaning. This environment trains agents to:
70
+
71
+ - **Detect** type errors, outliers, missing values, and schema inconsistencies
72
+ - **Reason** about which fix is most impactful at each step
73
+ - **Self-correct** from informative error feedback
74
+ - **Terminate** efficiently without over-cleaning
75
+
76
+ ### Key Properties
77
+
78
+ | Property | Value |
79
+ |---|---|
80
+ | Protocol | OpenEnv (WebSocket + HTTP) |
81
+ | Action Space | Discrete (5 command types) |
82
+ | Observation | Full CSV state + grader feedback |
83
+ | Episode Structure | Reset β†’ N Γ— Step β†’ Done |
84
+ | Concurrency | βœ… Multiple simultaneous sessions |
85
+ | State Management | Server-side, fully isolated per session |
86
+
87
+ ---
88
+
89
+ ## πŸ—οΈ Architecture
90
+
91
+ ```
92
+ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
93
+ β”‚ Agent (LLM / RL Policy) β”‚
94
+ β”‚ Qwen2.5-72B / Mistral / Custom Model β”‚
95
+ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
96
+ β”‚ CleanAction (JSON) β”‚ CleanObservation
97
+ β–Ό β”‚
98
+ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
99
+ β”‚ DataCleaningEnv (client.py) β”‚
100
+ β”‚ OpenEnv EnvClient[CleanAction, CleanObservation, dict] β”‚
101
+ β”‚ WebSocket persistent connection β”‚
102
+ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
103
+ β”‚ WebSocket /ws
104
+ β–Ό
105
+ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
106
+ β”‚ FastAPI Server (server/app.py) β”‚
107
+ β”‚ HTTP + WebSocket endpoints, sessions β”‚
108
+ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
109
+ β”‚
110
+ β–Ό
111
+ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
112
+ β”‚ DataCleaningEnvironment (server/data_cleaning_env.py) β”‚
113
+ β”‚ β”‚
114
+ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
115
+ β”‚ β”‚ dataset_ β”‚ β”‚ Action β”‚ β”‚ Grader β”‚ β”‚ Reward β”‚ β”‚
116
+ β”‚ β”‚ factory.py β”‚ β”‚ Dispatcher β”‚ β”‚ Engine β”‚ β”‚ Computer β”‚ β”‚
117
+ β”‚ β”‚ β”‚ β”‚ SET_VALUE β”‚ β”‚ grade() β”‚ β”‚ β”‚ β”‚
118
+ β”‚ β”‚ easy/medium β”‚ β”‚ DROP_ROW β”‚ β”‚ score β”‚ β”‚ progress β”‚ β”‚
119
+ β”‚ β”‚ /hard CSVs β”‚ β”‚ STANDARD. β”‚ β”‚ delta β”‚ β”‚ efficiencyβ”‚ β”‚
120
+ β”‚ β”‚ β”‚ β”‚ FILL_MISS. β”‚ β”‚ β”‚ β”‚ penalties β”‚ β”‚
121
+ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
122
+ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
123
+ ```
124
+
125
+ ---
126
+
127
+ ## πŸ“ Project Structure
128
+
129
+ ```
130
+ data_cleaning_env/
131
+ β”‚
132
+ β”œβ”€β”€ πŸ“„ client.py # DataCleaningEnv β€” OpenEnv client
133
+ β”œβ”€β”€ πŸ“„ models.py # CleanAction, CleanObservation, CleanState (Pydantic)
134
+ β”œβ”€β”€ πŸ“„ inference.py # Official evaluation entry point
135
+ β”œβ”€β”€ πŸ“„ dataset_factory.py # Generates easy/medium/hard dirty↔clean CSV pairs
136
+ β”œβ”€β”€ πŸ“„ graders.py # Scoring engine β€” grade(agent_df vs clean_df)
137
+ β”œβ”€β”€ πŸ“„ openenv.yaml # OpenEnv manifest (HuggingFace Spaces config)
138
+ β”œβ”€β”€ πŸ“„ pyproject.toml # Project metadata and dependencies
139
+ β”‚
140
+ └── server/
141
+ β”œβ”€β”€ πŸ“„ app.py # FastAPI application (HTTP + WebSocket)
142
+ β”œβ”€β”€ πŸ“„ data_cleaning_env.py # Core environment logic (reset/step/state)
143
+ β”œβ”€β”€ πŸ“„ __init__.py
144
+ └── πŸ“„ Dockerfile # Container image definition
145
+ ```
146
+
147
+ ---
148
+
149
+ ## 🎯 Tasks
150
+
151
+ The environment ships three progressively harder tasks, each with fixed-seed deterministic datasets:
152
+
153
+ ### 🟒 Easy β€” Sales Orders
154
+
155
+ | Property | Value |
156
+ |---|---|
157
+ | Dataset | ~100-row sales orders CSV |
158
+ | Dirty Issues | Cell-level type errors, a few missing values |
159
+ | Step Budget | **40 steps** |
160
+ | Success Threshold | **Score β‰₯ 0.95** |
161
+ | Primary Skills | `SET_VALUE`, `FILL_MISSING` |
162
+
163
+ **What the agent needs to fix:** Individual cells with wrong types (e.g., `"N/A"` in a price column, `"abc"` in a numeric field). Straightforward injected errors with clear ground truth.
164
+
165
+ ---
166
+
167
+ ### 🟑 Medium β€” Financial Transactions
168
+
169
+ | Property | Value |
170
+ |---|---|
171
+ | Dataset | ~200-row transaction log |
172
+ | Dirty Issues | Outlier rows, mixed date formats, missing amounts |
173
+ | Step Budget | **80 steps** |
174
+ | Success Threshold | **Score β‰₯ 0.85** |
175
+ | Primary Skills | `DROP_ROW`, `STANDARDIZE_COL`, `FILL_MISSING` |
176
+
177
+ **What the agent needs to fix:** Statistical outliers disguised as data, inconsistent date formats, missing numeric values. Crucially, some extreme values are **valid** β€” dropping them costs a false-positive penalty.
178
+
179
+ ---
180
+
181
+ ### πŸ”΄ Hard β€” Multi-Schema Dataset
182
+
183
+ | Property | Value |
184
+ |---|---|
185
+ | Dataset | ~400-row multi-domain CSV |
186
+ | Dirty Issues | Cross-column inconsistencies, future-year dates, bulk missing data |
187
+ | Step Budget | **150 steps** |
188
+ | Success Threshold | **Score β‰₯ 0.80** |
189
+ | Primary Skills | All commands |
190
+
191
+ **What the agent needs to fix:** Everything from easy + medium, plus cascading schema issues across columns. Requires strategic planning about fix order.
192
+
193
+ ---
194
+
195
+ ## πŸ•ΉοΈ Action Space
196
+
197
+ Every step the agent sends exactly one `CleanAction`:
198
+
199
+ ```python
200
+ from models import CleanAction
201
+
202
+ # Fix a specific cell
203
+ CleanAction(command="SET_VALUE", row_index=3, column="price", value="29.99")
204
+
205
+ # Remove an entire row (use carefully β€” false positives are penalised)
206
+ CleanAction(command="DROP_ROW", row_index=17)
207
+
208
+ # Normalise a column's format (dates β†’ YYYY-MM-DD, numbers β†’ float, strings β†’ stripped)
209
+ CleanAction(command="STANDARDIZE_COL", column="order_date")
210
+
211
+ # Fill all NaN values in a column using a strategy
212
+ CleanAction(command="FILL_MISSING", column="quantity", fill_strategy="median")
213
+
214
+ # Signal episode completion (only accepted when score β‰₯ task threshold)
215
+ CleanAction(command="DONE")
216
+ ```
217
+
218
+ ### Command Reference
219
+
220
+ | Command | `row_index` | `column` | `value` | `fill_strategy` |
221
+ |---|---|---|---|---|
222
+ | `SET_VALUE` | βœ… required | βœ… required | βœ… required | β€” |
223
+ | `DROP_ROW` | βœ… required | β€” | β€” | β€” |
224
+ | `STANDARDIZE_COL` | β€” | βœ… required | β€” | β€” |
225
+ | `FILL_MISSING` | β€” | βœ… required | β€” | βœ… required |
226
+ | `DONE` | β€” | β€” | β€” | β€” |
227
+
228
+ ### `FILL_MISSING` Strategies
229
+
230
+ | Strategy | Behaviour |
231
+ |---|---|
232
+ | `"mean"` | Replace NaN with column mean (numeric columns only) |
233
+ | `"median"` | Replace NaN with column median (numeric columns only) |
234
+ | `"mode"` | Replace NaN with most frequent value (any column) |
235
+ | `"drop"` | Remove rows where this column is NaN |
236
+
237
+ > ⚠️ **Important:** `DROP_ROW` removes by **positional row index** (the `row_index` column in the CSV), not by a row ID field. Row indices shift after each drop.
238
+
239
+ ---
240
+
241
+ ## πŸ‘οΈ Observation Space
242
+
243
+ After every `reset()` and `step()`, the agent receives a `CleanObservation`:
244
+
245
+ ```python
246
+ @dataclass
247
+ class CleanObservation:
248
+ # ── Task context (constant per episode) ──────────────────────
249
+ task_id: str # "easy" | "medium" | "hard"
250
+ schema_hint: str # Plain-English description of clean schema
251
+ initial_dirty_cells: int # Total dirty cells at episode start
252
+
253
+ # ── Per-step state ───────────────────────────────────────────
254
+ dirty_csv: str # Full current CSV as string (all edits applied)
255
+ current_score: float # 0.0 β†’ 1.0 (grader score vs ground truth)
256
+ issues_remaining: int # Approximate dirty cells still to fix
257
+ step_number: int # Steps taken so far
258
+ max_steps: int # Budget for this task
259
+
260
+ # ── Last-action feedback ─────────────────────────────────────
261
+ last_action_success: bool # Whether previous action applied cleanly
262
+ last_action_error: str # Error message if success=False (else None)
263
+
264
+ # ── Inherited ────────────────────────────────────────────────
265
+ done: bool # True = episode ended
266
+ reward: float | None # Per-step reward (None after reset)
267
+ ```
268
+
269
+ ### Score Computation
270
+
271
+ The grader compares the agent's working DataFrame to the hidden ground-truth DataFrame:
272
+
273
+ ```
274
+ score = (initial_dirty_cells - remaining_dirty_cells) / initial_dirty_cells
275
+ ```
276
+
277
+ A score of `1.0` means perfect agreement with ground truth.
278
+
279
+ ---
280
+
281
+ ## πŸ’° Reward Function
282
+
283
+ The reward is dense and shaped to guide efficient, precise cleaning:
284
+
285
+ ```
286
+ reward = progress_term
287
+ + efficiency_bonus
288
+ + false_positive_penalty
289
+ + early_done_penalty
290
+ + step_cost
291
+ ```
292
+
293
+ | Component | Value | When |
294
+ |---|---|---|
295
+ | **Progress** | `current_score βˆ’ previous_score` | Every step |
296
+ | **Efficiency bonus** | `+0.10 Γ— (1 βˆ’ steps_used/max_steps)` | Only when task is solved this step |
297
+ | **False-positive penalty** | `βˆ’0.15` | `DROP_ROW` removes a valid-extreme row (medium task) |
298
+ | **Early DONE penalty** | `βˆ’0.20` | `DONE` called with score < 0.60 |
299
+ | **Step cost** | `βˆ’0.005` | Every step (discourages padding) |
300
+ | **Premature DONE block** | `βˆ’1.00` | `DONE` below task threshold β€” episode *continues* |
301
+
302
+ **Reward range:** `[βˆ’0.5, +1.0]` (clipped)
303
+
304
+ ### Termination Logic
305
+
306
+ The episode terminates when **any** of these is true:
307
+
308
+ 1. βœ… `current_score >= task_threshold` (auto-terminated, efficiency bonus awarded)
309
+ 2. βœ… Agent sends `DONE` and `current_score >= task_threshold` (accepted)
310
+ 3. ⏱️ `step_count >= max_steps` (budget exhausted)
311
+
312
+ `DONE` is **refused** if the score is below threshold β€” the episode continues with a `βˆ’1.0` reward signal.
313
+
314
+ ---
315
+
316
+ ## πŸš€ Quick Start
317
+
318
+ ### Prerequisites
319
+
320
+ - Python 3.12+
321
+ - Docker Desktop (for containerised server)
322
+ - A free [HuggingFace token](https://huggingface.co/settings/tokens) (for the inference LLM)
323
+
324
+ ### 1. Clone & Install
325
+
326
+ ```bash
327
+ git clone https://github.com/Code-Knight-Debjit/Data-Cleaning-Environment.git
328
+ cd Data-Cleaning-Environment
329
+
330
+ # Create virtual environment
331
+ python -m venv .venv
332
+
333
+ # Activate (Windows PowerShell)
334
+ .venv\Scripts\Activate.ps1
335
+
336
+ # Activate (macOS/Linux)
337
+ source .venv/bin/activate
338
+
339
+ # Install dependencies
340
+ pip install -e .
341
+ ```
342
+
343
+ ### 2. Build the Docker Image
344
+
345
+ ```bash
346
+ docker build -t openenv-data_cleaning:latest -f server/Dockerfile .
347
+ ```
348
+
349
+ ### 3. Set Your HuggingFace Token
350
+
351
+ ```powershell
352
+ # Windows PowerShell
353
+ $env:HF_TOKEN = "hf_your_token_here"
354
+
355
+ # macOS / Linux
356
+ export HF_TOKEN="hf_your_token_here"
357
+ ```
358
+
359
+ ### 4. Run Inference
360
+
361
+ ```bash
362
+ python inference.py
363
+ ```
364
+
365
+ That's it! The script auto-starts the Docker container, runs the LLM agent through all three tasks (easy β†’ medium β†’ hard), and prints structured evaluation logs.
366
+
367
+ ---
368
+
369
+ ## πŸ€– Running Inference
370
+
371
+ ### Environment Variables
372
+
373
+ | Variable | Default | Description |
374
+ |---|---|---|
375
+ | `HF_TOKEN` | *(required)* | Your HuggingFace token for LLM API access |
376
+ | `API_BASE_URL` | `https://router.huggingface.co/v1` | LLM API endpoint |
377
+ | `MODEL_NAME` | `Qwen/Qwen2.5-72B-Instruct` | Model to use for inference |
378
+ | `LOCAL_IMAGE_NAME` | `openenv-data_cleaning:latest` | Docker image to launch |
379
+ | `ENV_BASE_URL` | `http://localhost:8000` | Direct server URL (if not using Docker) |
380
+
381
+ ### Switching Models
382
+
383
+ ```powershell
384
+ # Use Mistral (smaller, faster)
385
+ $env:MODEL_NAME = "mistralai/Mistral-7B-Instruct-v0.3"
386
+
387
+ # Use Llama
388
+ $env:MODEL_NAME = "meta-llama/Llama-3.1-8B-Instruct"
389
+ ```
390
+
391
+ ### Connecting to a Running Server (skip Docker)
392
+
393
+ ```powershell
394
+ $env:LOCAL_IMAGE_NAME = "" # must be empty string
395
+ $env:ENV_BASE_URL = "http://localhost:8000"
396
+ python inference.py
397
+ ```
398
+
399
+ ### Expected Output
400
+
401
+ ```
402
+ API_BASE_URL : https://router.huggingface.co/v1
403
+ MODEL_NAME : Qwen/Qwen2.5-72B-Instruct
404
+ LOCAL_IMAGE_NAME : openenv-data_cleaning:latest
405
+ ENV_BASE_URL : http://localhost:8000
406
+
407
+ [START] task=easy env=data_cleaning_env model=Qwen/Qwen2.5-72B-Instruct
408
+ [STEP] step=1 action=FILL_MISSING reward=0.12 done=false error=null
409
+ [STEP] step=2 action=SET_VALUE reward=0.08 done=false error=null
410
+ [STEP] step=3 action=STANDARDIZE_COL reward=0.05 done=false error=null
411
+ ...
412
+ [END] success=true steps=18 score=0.97 rewards=0.12,0.08,...
413
+
414
+ [START] task=medium env=data_cleaning_env ...
415
+ ...
416
+
417
+ ════════════════════════════════════════════════════════
418
+ Task Score Reward Steps Pass
419
+ ────────────────────────────────────────────────────────
420
+ easy 0.9712 1.3400 18 YES
421
+ medium 0.8823 2.1100 47 YES
422
+ hard 0.7640 1.8500 98 NO
423
+ ════════════════════════════════════════════════════════
424
+ ```
425
+
426
+ ---
427
+
428
+ ## πŸ”Œ Environment API
429
+
430
+ ### Using the Python Client Directly
431
+
432
+ ```python
433
+ import asyncio
434
+ from client import DataCleaningEnv
435
+ from models import CleanAction
436
+
437
+ async def run():
438
+ # Option A: Auto-start Docker container
439
+ env = await DataCleaningEnv.from_docker_image("openenv-data_cleaning:latest")
440
+
441
+ # Option B: Connect to an already-running server
442
+ # env = DataCleaningEnv(base_url="http://localhost:8000")
443
+ # await env.connect()
444
+
445
+ try:
446
+ # Reset for a specific task
447
+ result = await env.reset(task_id="easy")
448
+ obs = result.observation
449
+
450
+ print(f"Score: {obs.current_score:.4f}")
451
+ print(f"Issues: {obs.issues_remaining}")
452
+ print(f"Schema: {obs.schema_hint}")
453
+
454
+ # Take a step
455
+ action = CleanAction(
456
+ command="FILL_MISSING",
457
+ column="price",
458
+ fill_strategy="median"
459
+ )
460
+ result = await env.step(action)
461
+ obs = result.observation
462
+
463
+ print(f"Reward: {result.reward:.4f}")
464
+ print(f"New score: {obs.current_score:.4f}")
465
+ print(f"Action OK: {obs.last_action_success}")
466
+
467
+ # Signal completion
468
+ result = await env.step(CleanAction(command="DONE"))
469
+
470
+ finally:
471
+ await env.close()
472
+
473
+ asyncio.run(run())
474
+ ```
475
+
476
+ ### Using the Sync Wrapper
477
+
478
+ ```python
479
+ from client import DataCleaningEnv
480
+ from models import CleanAction
481
+
482
+ env = DataCleaningEnv(base_url="http://localhost:8000").sync()
483
+
484
+ with env:
485
+ result = env.reset(task_id="easy")
486
+ result = env.step(CleanAction(command="STANDARDIZE_COL", column="order_date"))
487
+ print(f"Score: {result.observation.current_score:.4f}")
488
+ ```
489
+
490
+ ### HTTP Endpoints
491
+
492
+ When the server is running, the following HTTP endpoints are available:
493
+
494
+ | Endpoint | Method | Description |
495
+ |---|---|---|
496
+ | `/health` | GET | Server health check |
497
+ | `/docs` | GET | Swagger / OpenAPI documentation |
498
+ | `/web` | GET | Interactive web UI |
499
+ | `/ws` | WebSocket | Persistent session endpoint |
500
+
501
+ ---
502
+
503
+ ## βš™οΈ Configuration
504
+
505
+ ### Step Budgets
506
+
507
+ ```python
508
+ MAX_STEPS = {
509
+ "easy": 40,
510
+ "medium": 80,
511
+ "hard": 150,
512
+ }
513
+ ```
514
+
515
+ ### Success Thresholds
516
+
517
+ ```python
518
+ DONE_THRESHOLD = {
519
+ "easy": 0.95,
520
+ "medium": 0.85,
521
+ "hard": 0.80,
522
+ }
523
+ ```
524
+
525
+ ### Reward Constants
526
+
527
+ | Constant | Value | Purpose |
528
+ |---|---|---|
529
+ | `STEP_COST` | `-0.005` | Per-step penalty to discourage padding |
530
+ | `EARLY_DONE_PENALTY` | `-0.20` | Penalty for `DONE` below score 0.60 |
531
+ | `EARLY_DONE_THRESHOLD` | `0.60` | Score floor for DONE without penalty |
532
+ | `FALSE_POSITIVE_PENALTY` | `-0.15` | Penalty for wrongly dropping a valid row |
533
+ | `EFFICIENCY_BONUS_WEIGHT` | `0.10` | Multiplier for early-completion bonus |
534
+
535
+ ---
536
+
537
+ ## ☁️ Deployment
538
+
539
+ ### Deploy to HuggingFace Spaces
540
+
541
+ ```bash
542
+ # Install the OpenEnv CLI
543
+ pip install openenv
544
+
545
+ # Authenticate with HuggingFace
546
+ huggingface-cli login
547
+
548
+ # Deploy (from the repo root where openenv.yaml lives)
549
+ openenv push
550
+
551
+ # Or deploy privately to a specific repo
552
+ openenv push --repo-id your-username/data-cleaning-env --private
553
+ ```
554
+
555
+ After deployment, your environment will be live at:
556
+ ```
557
+ https://huggingface.co/spaces/your-username/data-cleaning-env
558
+ ```
559
+
560
+ With endpoints:
561
+ - **Web UI:** `/web`
562
+ - **API Docs:** `/docs`
563
+ - **Health:** `/health`
564
+ - **WebSocket:** `/ws`
565
+
566
+ ### Connect to a HuggingFace Space
567
+
568
+ ```python
569
+ env = await DataCleaningEnv.from_env("your-username/data-cleaning-env")
570
+ # or run locally with UV (no Docker needed)
571
+ env = await DataCleaningEnv.from_env("your-username/data-cleaning-env", use_docker=False)
572
+ ```
573
+
574
+ ### Run the Server Locally (Without Docker)
575
+
576
+ ```bash
577
+ uvicorn server.app:app --reload --port 8000
578
+ ```
579
+
580
+ ---
581
+
582
+ ## πŸ§ͺ Development & Testing
583
+
584
+ ### Test the Environment Logic (No Server Needed)
585
+
586
+ ```bash
587
+ # Runs a smoke test across all three tasks
588
+ python server/data_cleaning_env.py
589
+ ```
590
+
591
+ Expected output:
592
+ ```
593
+ ────────────────────────────────────────────────────────────────
594
+ TASK: EASY
595
+ ────────────────────────────────────────────────────────────────
596
+ reset() β†’ score=0.0000 issues=29 done=False
597
+ CSV: 101 rows, 5 cols
598
+ Hint: Sales orders dataset. price must be float...
599
+ step (bad col) β†’ success=False error='Column 'DOES_NOT_EXIST' not found...'
600
+ step (fix row=3 col='price') β†’ success=True score=0.0345 reward=0.0295
601
+ step (DONE, blocked) β†’ done=False reward=-1.0 score=0.0345
602
+ ...
603
+ All smoke tests passed.
604
+ ```
605
+
606
+ ### Test Pydantic Models
607
+
608
+ ```bash
609
+ python models.py
610
+ ```
611
+
612
+ ### Test the Client Parser
613
+
614
+ ```bash
615
+ python test_parse.py
616
+ ```
617
+
618
+ ### Run the Full Server Locally
619
+
620
+ ```bash
621
+ uvicorn server.app:app --reload
622
+ # Open http://localhost:8000/docs for interactive API explorer
623
+ ```
624
+
625
+ ---
626
+
627
+ ## πŸ”§ Troubleshooting
628
+
629
+ ### `TypeError: Too few arguments for EnvClient`
630
+
631
+ **Cause:** Your `client.py` subclasses `EnvClient` with only 2 type parameters, but OpenEnv requires 3 (`ActT`, `ObsT`, `StateT`).
632
+
633
+ **Fix:**
634
+ ```python
635
+ # ❌ Wrong
636
+ class DataCleaningEnv(EnvClient[CleanAction, CleanObservation]):
637
+
638
+ # βœ… Correct
639
+ class DataCleaningEnv(EnvClient[CleanAction, CleanObservation, dict]):
640
+ ```
641
+
642
+ Also ensure `_parse_state` is implemented:
643
+ ```python
644
+ def _parse_state(self, payload: dict) -> dict:
645
+ return payload
646
+ ```
647
+
648
+ ---
649
+
650
+ ### `ValidationError: Input should be 'SET_VALUE', 'DROP_ROW', ...`
651
+
652
+ **Cause:** Passing an invalid command string to `CleanAction`.
653
+
654
+ **Fix:** Only these 5 commands are valid:
655
+ ```python
656
+ "SET_VALUE" | "DROP_ROW" | "STANDARDIZE_COL" | "FILL_MISSING" | "DONE"
657
+ ```
658
+ There is no `"drop_column"` β€” columns cannot be dropped, only rows.
659
+
660
+ ---
661
+
662
+ ### `UnboundLocalError: cannot access local variable 'env'`
663
+
664
+ **Cause 1:** Docker image doesn't exist yet.
665
+ ```bash
666
+ docker build -t openenv-data_cleaning:latest -f server/Dockerfile .
667
+ ```
668
+
669
+ **Cause 2:** Stray test lines in `inference.py` referencing `env` before it's assigned.
670
+
671
+ **Fix:** Remove any manually added lines like `action = CleanAction(...)` or `result = await env.step(action)` from inside `main()`. The `main()` function should only call `run_episode()` β€” all action logic belongs inside that function.
672
+
673
+ ---
674
+
675
+ ### `DONE rejected: score X < required Y`
676
+
677
+ **This is expected behaviour, not a bug.** The environment refuses premature termination. The agent should continue cleaning until the score meets the task threshold.
678
+
679
+ ---
680
+
681
+ ### HuggingFace Router returns 401
682
+
683
+ Ensure your token is set:
684
+ ```powershell
685
+ $env:HF_TOKEN = "hf_your_token_here"
686
+ ```
687
+ Get a free token at [huggingface.co/settings/tokens](https://huggingface.co/settings/tokens).
688
+
689
+ ---
690
+
691
+ ## πŸ“ Data Flow Diagram
692
+
693
+ ```
694
+ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
695
+ β”‚ inference.py / custom agent β”‚
696
+ β”‚ β”‚
697
+ β”‚ 1. await env.reset(task_id=…) β”‚
698
+ β”‚ 2. obs = result.observation β”‚
699
+ β”‚ 3. build_prompt(obs) β†’ LLM β”‚
700
+ β”‚ 4. parse_action(llm_output) β”‚
701
+ β”‚ 5. await env.step(action) β”‚
702
+ β”‚ 6. GOTO 2 until done β”‚
703
+ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
704
+ β”‚
705
+ CleanAction (JSON over WebSocket)
706
+ β”‚
707
+ β–Ό
708
+ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
709
+ β”‚ DataCleaningEnvironment β”‚
710
+ β”‚ β”‚
711
+ β”‚ _apply_action() β”‚
712
+ β”‚ β†’ mutates _dirty_df in-place β”‚
713
+ β”‚ β”‚
714
+ β”‚ grade(agent_df vs clean_df) β”‚
715
+ β”‚ β†’ score ∈ [0.0, 1.0] β”‚
716
+ β”‚ β”‚
717
+ β”‚ _compute_reward() β”‚
718
+ β”‚ β†’ progress + bonuses β”‚
719
+ β”‚ β”‚
720
+ β”‚ _build_observation() β”‚
721
+ β”‚ β†’ CleanObservation β”‚
722
+ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
723
+ ```
724
+
725
+ ---
726
+
727
+ ## 🀝 Contributing
728
+
729
+ 1. Fork the repository
730
+ 2. Create a feature branch: `git checkout -b feature/my-improvement`
731
+ 3. Run the smoke tests: `python server/data_cleaning_env.py`
732
+ 4. Commit your changes: `git commit -m "feat: add my improvement"`
733
+ 5. Push and open a Pull Request
734
+
735
+ ---
736
+
737
+ ## πŸ“„ License
738
+
739
+ This project is licensed under the MIT License. See [LICENSE](LICENSE) for details.
740
+
741
+ ---
742
+
743
+ <div align="center">
744
+
745
+ Built with ❀️ using [OpenEnv](https://github.com/meta-pytorch/OpenEnv) · [FastAPI](https://fastapi.tiangolo.com/) · [Pydantic](https://docs.pydantic.dev/) · [HuggingFace](https://huggingface.co/)
746
+
747
+ </div>
inference.py CHANGED
@@ -1,508 +1,332 @@
1
  """
2
- inference.py
3
- ------------
4
- Data Cleaning Pipeline β€” submission inference script.
5
-
6
- Supports:
7
- β€’ Ollama local llama3 (DEFAULT β€” no API key needed)
8
- β€’ Groq free cloud API
9
- β€’ Any OpenAI-compatible endpoint
10
-
11
- Environment variables:
12
- API_BASE_URL LLM endpoint. Default: http://localhost:11434/v1 (Ollama)
13
- MODEL_NAME Model name. Default: llama3
14
- HF_TOKEN API key. Default: "ollama" (ignored by Ollama)
15
- LOCAL_IMAGE_NAME Docker image (leave unset to use ENV_BASE_URL)
16
- ENV_BASE_URL Env server URL. Default: http://localhost:8000
17
-
18
- To switch to Groq instead of Ollama:
19
- $env:API_BASE_URL = "https://api.groq.com/openai/v1"
20
- $env:MODEL_NAME = "llama-3.3-70b-versatile"
21
- $env:HF_TOKEN = "gsk_xxxxxxxxxxxx"
22
-
23
- STDOUT FORMAT (evaluator parses exactly β€” do not modify):
24
- [START] task=<n> env=<benchmark> model=<model>
25
- [STEP] step=<n> action=<str> reward=<0.00> done=<true|false> error=<msg|null>
26
- [END] success=<true|false> steps=<n> score=<0.00> rewards=<r1,r2,...>
27
  """
28
 
29
  import asyncio
30
  import json
31
  import os
32
  import re
33
- import sys
34
- from typing import Any, Dict, List, Optional
35
 
36
  from openai import OpenAI
37
 
38
- try:
39
- from client import DataCleaningEnv
40
- from models import CleanAction, MAX_STEPS, DONE_THRESHOLD
41
- except ImportError:
42
- sys.path.insert(0, os.path.dirname(__file__))
43
- from client import DataCleaningEnv
44
- from models import CleanAction, MAX_STEPS, DONE_THRESHOLD
45
-
46
-
47
- # ── Configuration ─────────────────────────────────────────────────────────────
48
-
49
- API_BASE_URL = os.getenv("API_BASE_URL", "http://localhost:11434/v1")
50
- MODEL_NAME = os.getenv("MODEL_NAME", "llama3")
51
- HF_TOKEN = os.getenv("HF_TOKEN", "ollama")
52
- LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME", "")
53
- ENV_BASE_URL = os.getenv("ENV_BASE_URL", "http://localhost:8000")
54
-
55
- BENCHMARK = "data_cleaning_env"
56
- TASK_IDS = ["easy", "medium", "hard"]
57
- STEP_LIMITS = {"easy": 40, "medium": 100, "hard": 150}
58
-
59
-
60
- # ── System prompt (deterministic agent) ──────────────────────────────────────
61
-
62
- SYSTEM_PROMPT = """You are a deterministic data cleaning agent.
63
- Your task is to clean a dataset step-by-step using valid actions.
64
- You are operating inside an environment with strict rules.
65
- --------------------------------------------------
66
- ## INPUT PROVIDED EACH STEP
67
- You will receive:
68
- 1. Column schema (LIST OF VALID COLUMN NAMES - CASE SENSITIVE)
69
- 2. Column status:
70
- - missing values count
71
- - whether standardized (true/false)
72
- 3. Remaining issues (global state)
73
- 4. Previous actions taken
74
- --------------------------------------------------
75
- ## OBJECTIVE
76
- Fully clean the dataset with MINIMUM steps.
77
- A dataset is CLEAN only if:
78
- - No missing values remain
79
- - All columns are standardized
80
- - No invalid formats exist
81
- --------------------------------------------------
82
- ## STRICT RULES (MUST FOLLOW)
83
-
84
- ### 1. NEVER TERMINATE EARLY
85
- You MUST NOT output DONE unless:
86
- - ALL columns have missing = 0
87
- - ALL columns have standardized = true
88
- - remaining_issues is empty
89
- If ANY issue remains -> DO NOT output DONE.
90
-
91
- ### 2. USE ONLY VALID COLUMNS
92
- - You MUST use EXACT column names from the schema list
93
- - Column names are CASE SENSITIVE
94
- - NEVER invent new column names
95
-
96
- ### 3. PRIORITIZE COLUMN-LEVEL ACTIONS
97
- Preferred actions (in order):
98
- 1. FILL_MISSING - fixes entire column missing values
99
- 2. STANDARDIZE_COL - fixes formatting for entire column
100
- 3. SET_VALUE - only for a single isolated bad cell
101
- 4. DROP_ROW - only for truly corrupt/outlier rows
102
- NEVER fix a full column using repeated SET_VALUE.
103
-
104
- ### 4. DO NOT REPEAT ACTIONS
105
- - Do NOT apply the same action to the same column twice
106
- - Do NOT standardize an already standardized column
107
- - Do NOT fill missing if missing = 0
108
-
109
- ### 5. CHOOSE THE CORRECT FILL STRATEGY
110
- - Numeric columns (float/int): use "median" or "mean"
111
- - Categorical/string columns: use "mode"
112
- - NEVER use "mean" or "median" on a categorical column
113
-
114
- ### 6. ALWAYS THINK GLOBALLY
115
- Before choosing an action:
116
- - Review ALL columns in column_status
117
- - Pick the single action that fixes the largest remaining issue
118
- --------------------------------------------------
119
- ## DECISION PROCESS (MANDATORY)
120
- At each step:
121
- 1. Read column_status carefully
122
- 2. Find columns where missing > 0 OR standardized = false
123
- 3. If none exist AND remaining_issues is empty -> output DONE
124
- 4. Otherwise, pick the ONE most impactful action
125
- --------------------------------------------------
126
- ## OUTPUT FORMAT - STRICT JSON ONLY
127
- Return ONLY a single JSON object. No explanation. No markdown. No backticks.
128
-
129
- Fill missing values:
130
- {"action": "FILL_MISSING", "column": "<exact_col_name>", "strategy": "<mean|median|mode>"}
131
-
132
- Standardize a column:
133
- {"action": "STANDARDIZE_COL", "column": "<exact_col_name>"}
134
-
135
- Fix one cell:
136
- {"action": "SET_VALUE", "column": "<exact_col_name>", "row": <int>, "value": "<str>"}
137
-
138
- Drop a bad row:
139
- {"action": "DROP_ROW", "row": <int>}
140
-
141
- Signal completion:
142
- {"action": "DONE"}
143
-
144
- --------------------------------------------------
145
- ## FAILURE CONDITIONS (YOU WILL BE PENALIZED FOR):
146
- - Outputting DONE when issues remain
147
- - Using a column name not in the schema
148
- - Repeating the same action on the same column
149
- - Using SET_VALUE to fix an entire column
150
- - Using mean/median on a categorical column
151
- - Using mode on a numeric column
152
- --------------------------------------------------
153
- ## FINAL GOAL
154
- Be efficient, precise, and minimal.
155
- Every step must move the dataset closer to a fully clean state."""
156
-
157
-
158
- # ── Official log helpers ──────────────────────────────────────────────────────
159
 
160
  def log_start(task: str, env: str, model: str) -> None:
161
  print(f"[START] task={task} env={env} model={model}", flush=True)
162
 
163
 
164
- def log_step(step: int, action: str, reward: float,
165
- done: bool, error: Optional[str]) -> None:
 
166
  print(
167
- f"[STEP] step={step} action={action[:80].replace(chr(10), ' ')} "
168
- f"reward={reward:.2f} done={str(done).lower()} "
169
- f"error={error if error else 'null'}",
170
  flush=True,
171
  )
172
 
173
 
174
- def log_end(success: bool, steps: int, score: float,
175
- rewards: List[float]) -> None:
176
  print(
177
- f"[END] success={str(success).lower()} steps={steps} "
178
- f"score={score:.4f} rewards={','.join(f'{r:.2f}' for r in rewards)}",
179
  flush=True,
180
  )
181
 
 
182
 
183
- # ── Column type hints (used to suggest fill strategies) ──────────────────────
184
-
185
- _COL_TYPES: Dict[str, Dict[str, str]] = {
186
- "easy": {
187
- "order_id": "numeric",
188
- "customer": "categorical",
189
- "product": "categorical",
190
- "category": "categorical",
191
- "price": "numeric",
192
- "quantity": "numeric",
193
- "order_date": "datetime",
194
- "region": "categorical",
195
- },
196
- "medium": {
197
- "tx_id": "numeric",
198
- "customer_id": "numeric",
199
- "amount": "numeric",
200
- "tx_date": "datetime",
201
- "category": "categorical",
202
- "country": "categorical",
203
- "status": "categorical",
204
- },
205
- "hard": {
206
- "record_id": "numeric", "id": "numeric", "RecordID": "numeric",
207
- "customer_id": "numeric", "cust_id": "numeric", "CustomerID": "numeric",
208
- "full_name": "categorical","name": "categorical","CustomerName":"categorical",
209
- "email": "categorical","email_address": "categorical","Email": "categorical",
210
- "amount": "numeric", "sale_amount": "numeric", "Amount": "numeric",
211
- "currency": "categorical","ccy": "categorical","Currency": "categorical",
212
- "purchase_date": "datetime", "date": "datetime", "PurchaseDate":"datetime",
213
- "product_name": "categorical","item": "categorical","ProductName": "categorical",
214
- "region": "categorical","territory": "categorical","area": "categorical",
215
- "contact_email": "categorical","value": "numeric", "product": "categorical",
216
- },
217
- }
218
-
219
-
220
- def _strategy_hint(task_id: str, col: str) -> str:
221
- col_type = _COL_TYPES.get(task_id, {}).get(col, "unknown")
222
- if col_type == "numeric":
223
- return "median"
224
- if col_type in ("categorical", "datetime"):
225
- return "mode"
226
- return "median"
227
 
 
228
 
229
- # ── Prompt builder ────────────────────────────────────────────────────────────
 
 
 
 
230
 
231
- def _column_status_block(obs, task_id: str) -> str:
232
- col_status: Dict[str, Any] = getattr(obs, "column_status", {}) or {}
233
-
234
- if col_status:
235
- lines = []
236
- for col, status in col_status.items():
237
- missing = status.get("missing", 0)
238
- standardized = status.get("standardized", True)
239
- hint = _strategy_hint(task_id, col)
240
- flag = "OK" if (missing == 0 and standardized) else "NEEDS_FIX"
241
- lines.append(
242
- f" {col:<22} missing={missing:<4} "
243
- f"standardized={str(standardized).lower():<5} "
244
- f"fill_strategy={hint:<7} [{flag}]"
245
- )
246
- return "\n".join(lines)
247
-
248
- # Fallback: derive columns from CSV header
249
- rows = obs.dirty_csv.strip().split("\n")
250
- header = rows[0] if rows else ""
251
- cols = [c.strip() for c in header.split(",")]
252
- return "\n".join(
253
- f" {col:<22} (status unknown) fill_strategy={_strategy_hint(task_id, col)}"
254
- for col in cols
255
- )
256
 
257
 
258
  def build_user_prompt(obs, history: List[str]) -> str:
259
- rows = obs.dirty_csv.strip().split("\n")
260
- header = rows[0] if rows else ""
261
- data_rows = rows[1:]
262
- preview = "\n".join([header] + data_rows[:10])
263
- truncated = len(data_rows) > 10
264
-
265
- col_status: Dict[str, Any] = getattr(obs, "column_status", {}) or {}
266
- broken = [
267
- c for c, s in col_status.items()
268
- if s.get("missing", 0) > 0 or not s.get("standardized", True)
269
- ]
270
-
271
- history_block = (
272
- "\n".join(f" {h}" for h in history[-6:])
273
- if history else " (none yet)"
274
- )
275
-
276
- return (
277
- f"--------------------------------------------------\n"
278
- f"## STEP {obs.step_number}/{obs.max_steps}\n"
279
- f"Score: {obs.current_score:.4f} "
280
- f"(need >= {DONE_THRESHOLD[obs.task_id]:.2f} to pass)\n"
281
- f"Issues remaining: {obs.issues_remaining}\n"
282
- f"Broken columns: {len(broken)} -> {broken[:10] if broken else 'NONE β€” consider DONE'}\n"
283
- f"\n## SCHEMA HINT\n{obs.schema_hint}\n"
284
- f"\n## VALID COLUMN NAMES (CASE SENSITIVE οΏ½οΏ½οΏ½ copy exactly)\n{header}\n"
285
- f"\n## COLUMN STATUS (read carefully before acting)\n"
286
- f"{_column_status_block(obs, obs.task_id)}\n"
287
- f"\n## CSV PREVIEW"
288
- f"{' (first 10 of ' + str(len(data_rows)) + ' rows)' if truncated else ''}\n"
289
- f"{preview}\n"
290
- f"\n## PREVIOUS ACTIONS (last 6)\n{history_block}\n"
291
- f"\n--------------------------------------------------\n"
292
- f"## DECISION CHECKLIST\n"
293
- f"1. Any column with missing > 0? -> FILL_MISSING (use strategy from column status)\n"
294
- f"2. Any column with standardized=false? -> STANDARDIZE_COL\n"
295
- f"3. Isolated bad cell visible in CSV? -> SET_VALUE\n"
296
- f"4. Clearly corrupt/outlier row? -> DROP_ROW\n"
297
- f"5. ALL missing=0, ALL standardized=true, issues=0? -> DONE\n"
298
- f"\nOutput ONE JSON action (no markdown, no explanation):"
299
- )
300
-
301
-
302
- # ── Action parser ─────────────────────────────────────────────────────────────
303
- # Bridges {action, column, strategy, row, value} -> CleanAction
304
-
305
- _COMMAND_MAP = {
306
- "FILL_MISSING": "FILL_MISSING",
307
- "STANDARDIZE_COL": "STANDARDIZE_COL",
308
- "STANDARDIZE": "STANDARDIZE_COL",
309
- "SET_VALUE": "SET_VALUE",
310
- "DROP_ROW": "DROP_ROW",
311
- "DROP": "DROP_ROW",
312
- "DONE": "DONE",
313
- }
314
-
315
-
316
- def parse_action(raw: str) -> CleanAction:
317
- text = raw.strip()
318
-
319
- # Strip markdown fences
320
- if text.startswith("```"):
321
- lines = text.split("\n")
322
- inner = lines[1:-1] if lines[-1].strip().startswith("```") else lines[1:]
323
- text = "\n".join(inner).strip()
324
-
325
- m = re.search(r"\{[^{}]*\}", text, re.DOTALL)
326
- if not m:
327
- return CleanAction(command="DONE")
328
-
329
- try:
330
- data: Dict[str, Any] = json.loads(m.group())
331
- except json.JSONDecodeError:
332
- return CleanAction(command="DONE")
333
-
334
- raw_cmd = str(data.get("action", "DONE")).upper().strip().replace(" ", "_")
335
- command = _COMMAND_MAP.get(raw_cmd)
336
- if not command:
337
- return CleanAction(command="DONE")
338
- if command == "DONE":
339
- return CleanAction(command="DONE")
340
-
341
- column = data.get("column")
342
- fill_strategy = data.get("strategy") or data.get("fill_strategy")
343
- row_raw = data.get("row") if data.get("row") is not None else data.get("row_index")
344
- value = data.get("value")
345
-
346
- try:
347
  return CleanAction(
348
- command = command,
349
- column = column,
350
- fill_strategy = fill_strategy,
351
- row_index = int(row_raw) if row_raw is not None else None,
352
- value = str(value) if value is not None else None,
353
  )
354
- except Exception:
355
  return CleanAction(command="DONE")
356
 
357
 
358
- # ── LLM call (async β€” keeps WebSocket keepalive alive) ───────────────────────
359
-
360
- async def call_llm_async(client: OpenAI, messages: list) -> str:
361
- loop = asyncio.get_event_loop()
362
- response = await loop.run_in_executor(
363
- None,
364
- lambda: client.chat.completions.create(
365
- model = MODEL_NAME,
366
- messages = messages,
367
- max_tokens = 120,
368
- temperature = 0.0,
369
- ),
370
- )
371
- return (response.choices[0].message.content or "").strip()
372
-
373
-
374
- # ── Episode loop ───────────────────────────────────────────────────────────────
375
-
376
- async def run_episode(env, client: OpenAI, task_id: str) -> dict:
377
- max_steps = STEP_LIMITS[task_id]
378
- threshold = DONE_THRESHOLD[task_id]
379
- rewards: List[float] = []
380
- steps_taken = 0
381
- score = 0.0
382
- success = False
383
- history: List[str] = []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
384
 
385
  log_start(task=task_id, env=BENCHMARK, model=MODEL_NAME)
386
 
387
  try:
388
- result = await env.reset(task_id=task_id)
389
- obs = result.observation
390
- messages = [{"role": "system", "content": SYSTEM_PROMPT}]
391
 
392
  for step in range(1, max_steps + 1):
393
- if obs.done:
394
  break
395
 
396
- steps_taken = step
397
- messages.append({"role": "user", "content": build_user_prompt(obs, history)})
398
-
399
- try:
400
- raw = await call_llm_async(client, messages)
401
- action = parse_action(raw)
402
- messages.append({"role": "assistant", "content": raw})
403
- except Exception as exc:
404
- log_step(step, "DONE", 0.00, True, str(exc)[:120])
405
- rewards.append(0.0)
406
- break
407
-
408
- # Keep system + last 3 exchanges to avoid context overflow
409
- if len(messages) > 7:
410
- messages = [messages[0]] + messages[-6:]
411
 
412
  result = await env.step(action)
413
  obs = result.observation
414
- reward = result.reward or 0.0
 
 
 
 
 
 
415
  rewards.append(reward)
416
- score = obs.current_score
417
 
418
- log_step(
419
- step = step,
420
- action = action.command,
421
- reward = reward,
422
- done = obs.done,
423
- error = obs.last_action_error,
 
 
424
  )
425
 
426
- err_note = f" [ERR: {obs.last_action_error[:40]}]" if obs.last_action_error else ""
427
- history.append(
428
- f"step {step}: {action.command}"
429
- + (f"({action.column}"
430
- + (f", {action.fill_strategy})" if action.fill_strategy else ")")
431
- if action.column else "")
432
- + f" -> score={score:.4f}{err_note}"
433
  )
434
 
435
- if obs.done or score >= threshold:
436
  break
437
 
 
438
  success = score >= threshold
439
 
440
- except Exception as e:
441
- print(f"[EPISODE ERROR] task={task_id} error={str(e)[:120]}", flush=True)
442
-
443
  finally:
 
 
444
  log_end(success=success, steps=steps_taken, score=score, rewards=rewards)
445
 
446
  return {
447
- "task_id": task_id,
448
  "score": score,
449
  "reward": sum(rewards),
450
  "steps": steps_taken,
451
  "success": success,
452
  }
453
 
454
-
455
- # ── Entry point ────────────────────────────────────────────────────────────────
456
 
457
  async def main() -> None:
458
- is_ollama = "11434" in API_BASE_URL or "ollama" in API_BASE_URL.lower()
459
-
460
- if not is_ollama and (not HF_TOKEN or HF_TOKEN == "ollama"):
461
- print(
462
- "ERROR: HF_TOKEN not set for remote API.\n"
463
- "For Groq: $env:HF_TOKEN='gsk_xxxxxxxxxxxx'\n"
464
- "For Ollama (local): no token needed β€” defaults already set.",
465
- file=sys.stderr,
466
- )
467
- sys.exit(1)
468
 
469
- print(f"API_BASE_URL : {API_BASE_URL}", flush=True)
470
- print(f"MODEL_NAME : {MODEL_NAME}", flush=True)
471
- print(f"BACKEND : {'Ollama (local)' if is_ollama else 'Remote API'}", flush=True)
472
- print(f"ENV SERVER : {LOCAL_IMAGE_NAME or ENV_BASE_URL}", flush=True)
473
- print("", flush=True)
474
 
475
- llm = OpenAI(base_url=API_BASE_URL, api_key=HF_TOKEN)
476
 
477
  results = []
478
- for task_id in TASK_IDS:
479
- # Fresh connection per task β€” prevents WebSocket keepalive timeout carryover
480
- if LOCAL_IMAGE_NAME:
481
- env = await DataCleaningEnv.from_docker_image(LOCAL_IMAGE_NAME)
482
- else:
483
- env = DataCleaningEnv(base_url=ENV_BASE_URL)
484
- await env.connect()
485
-
486
- try:
487
- summary = await run_episode(env, llm, task_id)
488
  results.append(summary)
489
- finally:
490
- try:
491
- await env.close()
492
- except Exception:
493
- pass
494
- print("", flush=True)
495
-
496
- print("=" * 56, flush=True)
497
- print(f"{'Task':<10} {'Score':>7} {'Reward':>9} {'Steps':>6} {'Pass':>5}")
498
- print("-" * 56, flush=True)
 
499
  for r in results:
500
- print(
501
- f"{r['task_id']:<10} {r['score']:>7.4f} {r['reward']:>9.4f} "
502
- f"{r['steps']:>6} {'YES' if r['success'] else 'NO':>4}",
503
- flush=True,
504
- )
505
- print("=" * 56, flush=True)
506
 
507
 
508
  if __name__ == "__main__":
 
1
  """
2
+ Inference Script β€” Data Cleaning Environment
3
+ =============================================
4
+ MANDATORY environment variables:
5
+ API_BASE_URL The API endpoint for the LLM.
6
+ MODEL_NAME The model identifier to use for inference.
7
+ HF_TOKEN Your Hugging Face / API key.
8
+ LOCAL_IMAGE_NAME Docker image name (when using from_docker_image()).
9
+
10
+ Defaults are set only for API_BASE_URL and MODEL_NAME (not HF_TOKEN).
11
+
12
+ STDOUT FORMAT
13
+ - [START] task=<task_name> env=<benchmark> model=<model_name>
14
+ - [STEP] step=<n> action=<action_str> reward=<0.00> done=<true|false> error=<msg|null>
15
+ - [END] success=<true|false> steps=<n> score=<score> rewards=<r1,r2,...,rn>
 
 
 
 
 
 
 
 
 
 
 
16
  """
17
 
18
  import asyncio
19
  import json
20
  import os
21
  import re
22
+ import textwrap
23
+ from typing import List, Optional
24
 
25
  from openai import OpenAI
26
 
27
+ from client import DataCleaningEnv
28
+ from models import CleanAction
29
+
30
+ # ── Environment variables ────────────────────────────────────────────────────
31
+ LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME", "openenv-data_cleaning:latest")
32
+ API_KEY = os.getenv("HF_TOKEN") or os.getenv("API_KEY")
33
+ API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1")
34
+ MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct")
35
+ BENCHMARK = "data_cleaning_env"
36
+
37
+ # ── Per-task config (mirrors server constants) ────────────────────────────────
38
+ TASK_CONFIG = {
39
+ "easy": {"max_steps": 40, "threshold": 0.95},
40
+ "medium": {"max_steps": 80, "threshold": 0.85},
41
+ "hard": {"max_steps": 150, "threshold": 0.80},
42
+ }
43
+
44
+ TEMPERATURE = 0.2 # low temp β†’ more deterministic action parsing
45
+ MAX_TOKENS = 256
46
+
47
+ # ── Logging helpers (strict stdout format) ───────────────────────────────────
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48
 
49
  def log_start(task: str, env: str, model: str) -> None:
50
  print(f"[START] task={task} env={env} model={model}", flush=True)
51
 
52
 
53
+ def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None:
54
+ error_val = error if error else "null"
55
+ done_val = str(done).lower()
56
  print(
57
+ f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}",
 
 
58
  flush=True,
59
  )
60
 
61
 
62
+ def log_end(success: bool, steps: int, score: float, rewards: List[float]) -> None:
63
+ rewards_str = ",".join(f"{r:.2f}" for r in rewards)
64
  print(
65
+ f"[END] success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}",
 
66
  flush=True,
67
  )
68
 
69
+ # ── Prompt builders ───────────────────────────────────────────────────────────
70
 
71
+ SYSTEM_PROMPT = textwrap.dedent("""
72
+ You are an expert data cleaning agent. You receive a dirty CSV dataset and must
73
+ fix it step by step to match a hidden clean ground truth.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
74
 
75
+ Available commands (respond with EXACTLY one JSON object, no extra text):
76
 
77
+ {"command": "SET_VALUE", "row_index": <int>, "column": "<col>", "value": "<val>"}
78
+ {"command": "DROP_ROW", "row_index": <int>}
79
+ {"command": "STANDARDIZE_COL", "column": "<col>"}
80
+ {"command": "FILL_MISSING", "column": "<col>", "fill_strategy": "mean|median|mode|drop"}
81
+ {"command": "DONE"}
82
 
83
+ Rules:
84
+ - Output ONLY the JSON object β€” no explanation, no markdown, no backticks.
85
+ - Use DONE only when you are confident the score meets the task threshold.
86
+ - SET_VALUE fixes a single bad cell.
87
+ - STANDARDIZE_COL normalises an entire column's format.
88
+ - FILL_MISSING fills NaN values in a column.
89
+ - DROP_ROW removes a row; use carefully β€” false positives are penalised.
90
+ - Row indices are 0-based positional indices (they shift after each DROP_ROW).
91
+ """).strip()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
92
 
93
 
94
  def build_user_prompt(obs, history: List[str]) -> str:
95
+ history_block = "\n".join(history[-15:]) if history else "None yet."
96
+ return textwrap.dedent(f"""
97
+ Task: {obs.task_id}
98
+ Schema hint: {obs.schema_hint}
99
+ Step: {obs.step_number} / {obs.max_steps}
100
+ Current score: {obs.current_score:.4f}
101
+ Issues remaining: {obs.issues_remaining}
102
+ Initial dirty cells: {obs.initial_dirty_cells}
103
+ Last action success: {obs.last_action_success}
104
+ Last action error: {obs.last_action_error or 'none'}
105
+
106
+ === ACTION HISTORY (most recent 15) ===
107
+ {history_block}
108
+
109
+ IMPORTANT RULES:
110
+ - Do NOT repeat any action that already appears in the history with score_delta=0.0000.
111
+ - Do NOT repeat STANDARDIZE_COL or FILL_MISSING on the same column twice.
112
+ - If score is not improving after 2 steps, switch strategy entirely.
113
+ - Use SET_VALUE to fix specific bad cells (wrong types, "N/A" strings, outliers, future dates).
114
+ - Inspect the CSV carefully before choosing your action.
115
+
116
+ Current CSV (first 80 rows shown if large):
117
+ {_truncate_csv(obs.dirty_csv, max_rows=80)}
118
+
119
+ Output your next CleanAction as a single JSON object.
120
+ """).strip()
121
+
122
+
123
+ def _truncate_csv(csv_text: str, max_rows: int = 80) -> str:
124
+ lines = csv_text.splitlines()
125
+ if len(lines) <= max_rows + 1: # +1 for header
126
+ return csv_text
127
+ header = lines[0]
128
+ body = lines[1: max_rows + 1]
129
+ omitted = len(lines) - 1 - max_rows
130
+ return "\n".join([header] + body + [f"... ({omitted} more rows omitted)"])
131
+
132
+ # ── Action parsing ────────────────────────────────────────────────────────────
133
+
134
+ VALID_COMMANDS = {"SET_VALUE", "DROP_ROW", "STANDARDIZE_COL", "FILL_MISSING", "DONE"}
135
+ VALID_STRATEGIES = {"mean", "median", "mode", "drop"}
136
+
137
+
138
+ def parse_action(llm_output: str) -> CleanAction:
139
+ """
140
+ Parse the LLM's JSON output into a CleanAction.
141
+ Falls back to STANDARDIZE_COL on the first column if parsing fails.
142
+ """
143
+ text = llm_output.strip()
144
+
145
+ # Strip accidental markdown fences
146
+ text = re.sub(r"^```(?:json)?", "", text, flags=re.IGNORECASE).strip()
147
+ text = re.sub(r"```$", "", text).strip()
148
+
149
+ # Extract first JSON object
150
+ match = re.search(r"\{.*?\}", text, re.DOTALL)
151
+ if not match:
152
+ raise ValueError(f"No JSON object found in LLM output: {text!r}")
153
+
154
+ data = json.loads(match.group())
155
+ command = data.get("command", "").upper()
156
+
157
+ if command not in VALID_COMMANDS:
158
+ raise ValueError(f"Unknown command: {command!r}")
159
+
160
+ if command == "SET_VALUE":
161
+ return CleanAction(
162
+ command="SET_VALUE",
163
+ row_index=int(data["row_index"]),
164
+ column=str(data["column"]),
165
+ value=str(data["value"]),
166
+ )
167
+ elif command == "DROP_ROW":
168
+ return CleanAction(command="DROP_ROW", row_index=int(data["row_index"]))
169
+ elif command == "STANDARDIZE_COL":
170
+ return CleanAction(command="STANDARDIZE_COL", column=str(data["column"]))
171
+ elif command == "FILL_MISSING":
172
+ strategy = str(data.get("fill_strategy", "median")).lower()
173
+ if strategy not in VALID_STRATEGIES:
174
+ strategy = "median"
 
 
 
 
 
 
 
 
175
  return CleanAction(
176
+ command="FILL_MISSING",
177
+ column=str(data["column"]),
178
+ fill_strategy=strategy,
 
 
179
  )
180
+ else: # DONE
181
  return CleanAction(command="DONE")
182
 
183
 
184
+ def _action_to_str(action: CleanAction) -> str:
185
+ """Compact single-line string for [STEP] log."""
186
+ parts = [action.command]
187
+ if action.row_index is not None:
188
+ parts.append(f"row={action.row_index}")
189
+ if action.column:
190
+ parts.append(f"col={action.column}")
191
+ if action.value is not None:
192
+ val_repr = str(action.value)[:30]
193
+ parts.append(f"val={val_repr!r}")
194
+ if action.fill_strategy:
195
+ parts.append(f"strategy={action.fill_strategy}")
196
+ return "(" + ",".join(parts) + ")"
197
+
198
+ # ── LLM call ──────────────────────────────────────────────────────────────────
199
+
200
+ def get_model_action(client: OpenAI, obs, history: List[str]) -> CleanAction:
201
+ user_prompt = build_user_prompt(obs, history)
202
+ try:
203
+ completion = client.chat.completions.create(
204
+ model=MODEL_NAME,
205
+ messages=[
206
+ {"role": "system", "content": SYSTEM_PROMPT},
207
+ {"role": "user", "content": user_prompt},
208
+ ],
209
+ temperature=TEMPERATURE,
210
+ max_tokens=MAX_TOKENS,
211
+ stream=False,
212
+ )
213
+ text = (completion.choices[0].message.content or "").strip()
214
+ return parse_action(text)
215
+ except Exception as exc:
216
+ print(f"[DEBUG] Model/parse error: {exc}", flush=True)
217
+ return CleanAction(command="FILL_MISSING", column="quantity", fill_strategy="median")
218
+
219
+ # ── Episode runner ────────────────────────────────────────────────────────────
220
+
221
+ async def run_episode(env: DataCleaningEnv, client: OpenAI, task_id: str) -> dict:
222
+ """
223
+ Run a single episode for task_id. Returns a summary dict.
224
+ """
225
+ cfg = TASK_CONFIG[task_id]
226
+ max_steps = cfg["max_steps"]
227
+ threshold = cfg["threshold"]
228
+
229
+ rewards: List[float] = []
230
+ history: List[str] = [] # action history fed back to LLM each step
231
+ steps_taken: int = 0
232
+ score: float = 0.0
233
+ prev_score: float = 0.0
234
+ success: bool = False
235
 
236
  log_start(task=task_id, env=BENCHMARK, model=MODEL_NAME)
237
 
238
  try:
239
+ result = await env.reset(task_id=task_id)
240
+ obs = result.observation
241
+ prev_score = obs.current_score
242
 
243
  for step in range(1, max_steps + 1):
244
+ if result.done:
245
  break
246
 
247
+ action = get_model_action(client, obs, history)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
248
 
249
  result = await env.step(action)
250
  obs = result.observation
251
+
252
+ reward = result.reward or 0.0
253
+ done = result.done
254
+ error = obs.last_action_error if not obs.last_action_success else None
255
+ score_delta = obs.current_score - prev_score
256
+ prev_score = obs.current_score
257
+
258
  rewards.append(reward)
259
+ steps_taken = step
260
 
261
+ # Build a rich history entry the LLM can learn from
262
+ action_desc = _action_to_str(action)
263
+ status = "βœ“" if obs.last_action_success else "βœ—"
264
+ delta_str = f"+{score_delta:.4f}" if score_delta > 0 else f"{score_delta:.4f}"
265
+ history.append(
266
+ f"step={step} {status} {action_desc} reward={reward:+.2f} "
267
+ f"score_delta={delta_str} score={obs.current_score:.4f}"
268
+ + (f" ERROR={error}" if error else "")
269
  )
270
 
271
+ log_step(
272
+ step=step,
273
+ action=action_desc,
274
+ reward=reward,
275
+ done=done,
276
+ error=error,
 
277
  )
278
 
279
+ if done:
280
  break
281
 
282
+ score = obs.current_score if obs else 0.0
283
  success = score >= threshold
284
 
 
 
 
285
  finally:
286
+ score = score if score else 0.0
287
+ success = success if success else False
288
  log_end(success=success, steps=steps_taken, score=score, rewards=rewards)
289
 
290
  return {
291
+ "task": task_id,
292
  "score": score,
293
  "reward": sum(rewards),
294
  "steps": steps_taken,
295
  "success": success,
296
  }
297
 
298
+ # ── Main ──────────────────────────────────────────────────────────────────────
 
299
 
300
  async def main() -> None:
301
+ print(f"API_BASE_URL : {API_BASE_URL}")
302
+ print(f"MODEL_NAME : {MODEL_NAME}")
303
+ print(f"LOCAL_IMAGE_NAME : {LOCAL_IMAGE_NAME}")
304
+ print()
 
 
 
 
 
 
305
 
306
+ client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY)
 
 
 
 
307
 
308
+ env = await DataCleaningEnv.from_docker_image(LOCAL_IMAGE_NAME)
309
 
310
  results = []
311
+ try:
312
+ for task_id in ("easy", "medium", "hard"):
313
+ summary = await run_episode(env, client, task_id)
 
 
 
 
 
 
 
314
  results.append(summary)
315
+ print() # blank line between tasks
316
+ finally:
317
+ try:
318
+ await env.close()
319
+ except Exception as e:
320
+ print(f"[DEBUG] env.close() error: {e}", flush=True)
321
+
322
+ # ── Summary table ────────────────────────────────────────────────────────
323
+ print("═" * 56)
324
+ print(f"{'Task':<12} {'Score':>7} {'Reward':>7} {'Steps':>5} {'Pass'}")
325
+ print("─" * 56)
326
  for r in results:
327
+ flag = "YES" if r["success"] else " NO"
328
+ print(f"{r['task']:<12} {r['score']:>7.4f} {r['reward']:>7.4f} {r['steps']:>5} {flag}")
329
+ print("═" * 56)
 
 
 
330
 
331
 
332
  if __name__ == "__main__":