compressionenv / server /compressionenv_environment.py
muditjai's picture
Upload folder using huggingface_hub
add4140 verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
Compressionenv Environment Implementation.
Environment where the agent proposes compression/decompression algorithms for a
Paul Graham essay. The environment validates round-trip correctness and scores
compressed size relative to the agent's prior attempts and baseline compressors.
"""
import base64
import json
import os
import random
import subprocess
import sys
import tempfile
from dataclasses import dataclass
from pathlib import Path
from uuid import uuid4
import bz2
import lzma
import zlib
from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import State
from models import CompressionenvAction, CompressionenvObservation
@dataclass(frozen=True)
class _Essay:
essay_id: str
text: str
class CompressionenvEnvironment(Environment):
"""
Compression algorithm search environment.
- On `reset()`, selects a PG essay (from `../essays/*.txt`) and returns it.
- On `step()`, executes agent-provided Python code defining:
compress(text: str) -> bytes
decompress(data: bytes) -> str
Validates that decompress(compress(essay)) == essay.
Rewards (per spec):
- If algorithms fail or don't round-trip: -1 reward.
- If compressed size is lower than average of previous successful sizes for
this essay in the episode: +1 reward.
- Compare against baselines (zlib, bz2, lzma):
- If agent achieves smaller size than at least one baseline: +10 reward.
- If agent achieves smaller size than the best baseline: +20 reward.
"""
# Enable concurrent WebSocket sessions.
# Set to True if your environment isolates state between instances.
# When True, multiple WebSocket clients can connect simultaneously, each
# getting their own environment instance (when using factory mode in app.py).
SUPPORTS_CONCURRENT_SESSIONS: bool = True
def __init__(self):
"""Initialize the compressionenv environment."""
self._state = State(episode_id=str(uuid4()), step_count=0)
self._essay: _Essay | None = None
self._successful_sizes: list[int] = []
self._baselines: dict[str, int] = {}
def reset(self) -> CompressionenvObservation:
"""
Reset the environment.
Returns:
CompressionenvObservation containing a selected essay
"""
self._state = State(episode_id=str(uuid4()), step_count=0)
self._essay = self._pick_essay()
self._successful_sizes = []
self._baselines = self._compute_baselines(self._essay.text)
return CompressionenvObservation(
essay_id=self._essay.essay_id,
essay_text=self._essay.text,
valid=True,
error=None,
compressed_size_bytes=None,
avg_prev_compressed_size_bytes=None,
improved_over_avg=None,
baselines_size_bytes=self._baselines,
best_baseline_size_bytes=min(self._baselines.values()) if self._baselines else None,
beat_any_baseline=None,
beat_best_baseline=None,
done=False,
reward=0.0,
metadata={
"episode_id": self._state.episode_id,
"step_count": self._state.step_count,
"num_baselines": len(self._baselines),
},
)
def step(self, action: CompressionenvAction) -> CompressionenvObservation: # type: ignore[override]
"""
Execute a step: run agent algorithms, validate, score compression size.
"""
if self._essay is None:
# Defensive: ensure reset called.
self._essay = self._pick_essay()
self._baselines = self._compute_baselines(self._essay.text)
self._successful_sizes = []
self._state.step_count += 1
essay_text = self._essay.text
baselines = self._baselines
best_baseline = min(baselines.values()) if baselines else None
reward = 0.0
error: str | None = None
valid = False
compressed_size: int | None = None
improved_over_avg: bool | None = None
beat_any_baseline: bool | None = None
beat_best_baseline: bool | None = None
avg_prev: float | None = None
try:
compressed_bytes = self._run_agent_codec(
essay_text=essay_text,
compression_code=action.compression_code,
decompression_code=action.decompression_code,
)
compressed_size = len(compressed_bytes)
valid = True
except Exception as e:
error = str(e)
reward = -1.0
if valid and compressed_size is not None:
if self._successful_sizes:
avg_prev = sum(self._successful_sizes) / len(self._successful_sizes)
improved_over_avg = compressed_size < avg_prev
if improved_over_avg:
reward += 1.0
else:
avg_prev = None
improved_over_avg = None
self._successful_sizes.append(compressed_size)
if baselines:
beat_any_baseline = any(compressed_size < s for s in baselines.values())
beat_best_baseline = best_baseline is not None and compressed_size < best_baseline
if beat_best_baseline:
reward += 20.0
elif beat_any_baseline:
reward += 10.0
return CompressionenvObservation(
essay_id=self._essay.essay_id,
essay_text=essay_text,
valid=valid,
error=error,
compressed_size_bytes=compressed_size,
avg_prev_compressed_size_bytes=avg_prev,
improved_over_avg=improved_over_avg,
baselines_size_bytes=baselines,
best_baseline_size_bytes=best_baseline,
beat_any_baseline=beat_any_baseline,
beat_best_baseline=beat_best_baseline,
done=False,
reward=reward,
metadata={
"episode_id": self._state.episode_id,
"step_count": self._state.step_count,
"algo_name": action.algo_name,
"num_successful_attempts": len(self._successful_sizes),
},
)
@property
def state(self) -> State:
"""
Get the current environment **state**.
In RL terms, the State is a (Markov) description of the underlying
environment that is at least as informative as any single Observation.
Here we include all information needed to reconstruct what any call to
`reset()` or `step()` would expose in an observation for this episode.
Returns:
Current State with core fields plus extra environment details.
"""
# State allows extra fields, so we enrich it to be a superset of any
# single observation: from this State, an agent could derive the latest
# observation for the current episode.
if self._essay is not None:
self._state.essay_id = self._essay.essay_id # type: ignore[attr-defined]
self._state.essay_text = self._essay.text # type: ignore[attr-defined]
self._state.baselines_size_bytes = self._baselines # type: ignore[attr-defined]
self._state.num_successful_attempts = len(self._successful_sizes) # type: ignore[attr-defined]
if self._successful_sizes:
self._state.best_compressed_size_bytes = min(self._successful_sizes) # type: ignore[attr-defined]
self._state.last_compressed_size_bytes = self._successful_sizes[-1] # type: ignore[attr-defined]
if self._baselines:
self._state.best_baseline_size_bytes = min(self._baselines.values()) # type: ignore[attr-defined]
return self._state
def _pick_essay(self) -> _Essay:
# Expected layout:
# compression-openenv/
# essays/
# compressionenv/
# server/
# compressionenv_environment.py (this file)
essays_dir = Path(__file__).resolve().parents[2] / "essays"
if not essays_dir.exists():
# Try repo-level essays directory (if running from different cwd/layout).
essays_dir = Path(os.getcwd()).resolve() / "essays"
paths = sorted(essays_dir.glob("*.txt"))
if not paths:
raise FileNotFoundError(
f"No essays found in {essays_dir}. Expected PG essay .txt files."
)
path = random.choice(paths)
essay_id = path.stem
text = path.read_text(encoding="utf-8")
return _Essay(essay_id=essay_id, text=text)
def _compute_baselines(self, text: str) -> dict[str, int]:
data = text.encode("utf-8")
# Deterministic settings.
baselines: dict[str, bytes] = {
"zlib": zlib.compress(data, level=9),
"bz2": bz2.compress(data, compresslevel=9),
"lzma": lzma.compress(data, preset=9),
}
return {k: len(v) for k, v in baselines.items()}
def _run_agent_codec(
self,
essay_text: str,
compression_code: str,
decompression_code: str,
) -> bytes:
"""
Execute agent code in a subprocess and return compressed bytes.
Security note: this is not a hardened sandbox. It's a best-effort isolation
to avoid contaminating the server process, with a timeout.
"""
runner = r"""
import base64
import json
import sys
payload = json.loads(sys.stdin.read())
essay_text = payload["essay_text"]
compression_code = payload["compression_code"]
decompression_code = payload["decompression_code"]
ns = {}
exec(compression_code, ns, ns)
exec(decompression_code, ns, ns)
compress = ns.get("compress")
decompress = ns.get("decompress")
if compress is None or decompress is None:
raise RuntimeError("Expected functions compress(text: str)->bytes and decompress(data: bytes)->str")
compressed = compress(essay_text)
if not isinstance(compressed, (bytes, bytearray)):
raise RuntimeError(f"compress() must return bytes, got {type(compressed)}")
compressed = bytes(compressed)
round_trip = decompress(compressed)
if not isinstance(round_trip, str):
raise RuntimeError(f"decompress() must return str, got {type(round_trip)}")
if round_trip != essay_text:
raise RuntimeError("Round-trip failed: decompress(compress(essay)) != essay")
sys.stdout.write(base64.b64encode(compressed).decode("ascii"))
"""
payload = {
"essay_text": essay_text,
"compression_code": compression_code,
"decompression_code": decompression_code,
}
with tempfile.TemporaryDirectory() as td:
proc = subprocess.run(
[sys.executable, "-c", runner],
input=json.dumps(payload).encode("utf-8"),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=td,
timeout=3.0,
env={
"PYTHONIOENCODING": "utf-8",
"PYTHONUTF8": "1",
"PYTHONDONTWRITEBYTECODE": "1",
},
)
if proc.returncode != 0:
stderr = proc.stderr.decode("utf-8", errors="replace").strip()
raise RuntimeError(stderr or f"Agent codec subprocess failed with code {proc.returncode}")
out = proc.stdout.decode("utf-8", errors="replace").strip()
try:
return base64.b64decode(out.encode("ascii"), validate=True)
except Exception as e:
raise RuntimeError(f"Failed to decode compressed output: {e}") from e