aws_rl_env / tests_tasks /test_expert_tasks.py
Sizzing's picture
Upload folder using huggingface_hub
e56d042 verified
"""Tests for expert-tier tasks β€” verifies SRE incident resolution and security audit grading.
Expert tasks require setup commands to provision initial (broken/vulnerable) state,
then the agent must diagnose and fix issues via multi-step AWS CLI commands.
The grader uses state_checks as ground truth for task completion.
Each test resets MiniStack, provisions the setup state, executes the solution
command sequence, and asserts the grader returns task_achieved=True with reward=1.0.
Run inside Docker:
docker exec -w /app/env aws-rl-env python -m pytest tests/test_expert_tasks.py -v
"""
import json
import re
import pytest
import yaml
from pathlib import Path
from models import SuccessCriteria, Task, TaskID, TaskDifficulty, SetupCommand
from server.services.simulator_strategy import SimulatorStrategy
from server.services.task_grader import TaskGrader
from server.services.episode_tracker import EpisodeTracker
TASKS_FILE = (
Path(__file__).resolve().parent.parent
/ "server"
/ "services"
/ "tasks"
/ "expert.yaml"
)
# ---------------------------------------------------------------------------
# Solution commands for each expert task β€” ordered list of AWS CLI commands
# that resolve the SRE incident or pass the security audit.
# Diagnostic commands (list/describe) are included where needed to satisfy
# the services requirement in grading.
# ---------------------------------------------------------------------------
EXPERT_COMMANDS: dict[int, list[str]] = {
# -- Task 18: SRE β€” Lambda missing SQS permissions + event source mapping --
18: [
"aws sqs get-queue-url --queue-name incoming-orders",
(
"aws iam attach-role-policy --role-name broken-lambda-role "
"--policy-arn arn:aws:iam::aws:policy/AmazonSQSFullAccess"
),
(
"aws lambda create-event-source-mapping "
"--function-name order-processor "
"--event-source-arn arn:aws:sqs:us-east-1:000000000000:incoming-orders "
"--batch-size 10"
),
],
# -- Task 19: SRE β€” S3 versioning + lifecycle rule -------------------------
19: [
(
"aws s3api put-bucket-versioning --bucket app-config-store "
"--versioning-configuration Status=Enabled"
),
(
"aws s3api put-bucket-lifecycle-configuration --bucket app-config-store "
"--lifecycle-configuration "
'\'{"Rules":[{"ID":"cleanup-old-versions","Status":"Enabled",'
'"NoncurrentVersionExpiration":{"NoncurrentDays":30},'
'"Filter":{"Prefix":""}}]}\''
),
],
# -- Task 20: SRE β€” DynamoDB throughput + SNS subscription -----------------
20: [
(
"aws dynamodb update-table --table-name session-store "
"--provisioned-throughput ReadCapacityUnits=50,WriteCapacityUnits=50"
),
"aws sqs create-queue --queue-name ops-alert-inbox",
(
"aws sns subscribe "
"--topic-arn arn:aws:sns:us-east-1:000000000000:ops-alerts "
"--protocol sqs "
"--notification-endpoint arn:aws:sqs:us-east-1:000000000000:ops-alert-inbox"
),
],
# -- Task 21: Security β€” Replace overly permissive S3 bucket policy --------
21: [
"aws s3api get-bucket-policy --bucket public-assets",
(
"aws s3api put-bucket-policy --bucket public-assets "
"--policy "
'\'{"Version":"2012-10-17","Statement":[{"Effect":"Allow",'
'"Principal":{"AWS":"arn:aws:iam::000000000000:role/app-role"},'
'"Action":"s3:GetObject",'
'"Resource":"arn:aws:s3:::public-assets/*"}]}\''
),
],
# -- Task 22: Security β€” Replace overly broad IAM inline policy ------------
22: [
"aws iam get-role-policy --role-name app-role --policy-name app-access",
(
"aws iam put-role-policy --role-name app-role "
"--policy-name app-access "
"--policy-document "
'\'{"Version":"2012-10-17","Statement":[{"Effect":"Allow",'
'"Action":["dynamodb:GetItem","dynamodb:PutItem"],'
'"Resource":"arn:aws:dynamodb:us-east-1:000000000000:table/users"}]}\''
),
],
# -- Task 23: Security β€” Move plaintext password to Secrets Manager --------
23: [
(
"aws secretsmanager create-secret "
"--name data-processor/db-password "
"--secret-string hunter2"
),
(
"aws lambda update-function-configuration "
"--function-name data-processor "
"--environment "
"Variables={SECRET_ARN=arn:aws:secretsmanager:us-east-1:000000000000:secret:data-processor/db-password}"
),
],
# -- Task 109: SRE β€” Lambda timeout + CloudWatch alarm ---------------------
109: [
(
"aws lambda update-function-configuration "
"--function-name payment-webhook --timeout 30"
),
(
"aws cloudwatch put-metric-alarm --alarm-name payment-webhook-errors "
"--metric-name Errors --namespace AWS/Lambda --statistic Sum "
"--period 60 --evaluation-periods 1 --threshold 5 "
"--comparison-operator GreaterThanThreshold "
"--dimensions Name=FunctionName,Value=payment-webhook"
),
],
# -- Task 110: SRE β€” ECS service role policy + desired count ---------------
110: [
(
"aws iam attach-role-policy --role-name ecs-service-role "
"--policy-arn arn:aws:iam::aws:policy/AmazonECS_FullAccess"
),
(
"aws ecs update-service --cluster prod-cluster "
"--service api-service --desired-count 3"
),
],
# -- Task 111: SRE β€” Start RDS + fix security group -----------------------
111: [
"aws rds start-db-instance --db-instance-identifier analytics-db",
(
"aws ec2 create-security-group --group-name analytics-db-sg-fixed "
'--description "Restricted MySQL access"'
),
# authorize-security-group-ingress resolved dynamically (needs group-id)
(
"aws rds modify-db-instance --db-instance-identifier analytics-db "
"--vpc-security-group-ids analytics-db-sg-fixed"
),
],
# -- Task 113: SRE β€” SQS visibility timeout (redrive resolved dynamically) -
113: [
(
"aws sqs set-queue-attributes "
"--queue-url http://localhost:4566/000000000000/order-processing "
"--attributes VisibilityTimeout=120"
),
# RedrivePolicy resolved dynamically (JSON format issue with shorthand)
],
# -- Task 114: SRE β€” Route53 DNS record update (zone-id from setup) --------
114: [
# change-resource-record-sets resolved dynamically (needs zone ID)
],
# -- Task 115: SRE β€” ALB target group health check fix (DYNAMIC) -----------
115: [
# Resolved dynamically after setup β€” needs target group ARN
],
# -- Task 116: Security β€” Lambda resource policy fix -----------------------
116: [
"aws iam list-roles",
(
"aws lambda remove-permission "
"--function-name public-api-handler "
"--statement-id open-access"
),
(
"aws lambda add-permission "
"--function-name public-api-handler "
"--statement-id restricted-access "
"--action lambda:InvokeFunction "
"--principal apigateway.amazonaws.com "
"--source-arn arn:aws:execute-api:us-east-1:000000000000:*"
),
],
# -- Task 117: Security β€” S3 encryption + deny unencrypted uploads ---------
117: [
(
"aws s3api put-bucket-encryption --bucket data-lake-raw "
"--server-side-encryption-configuration "
'\'{"Rules":[{"ApplyServerSideEncryptionByDefault":'
'{"SSEAlgorithm":"AES256"}}]}\''
),
(
"aws s3api put-bucket-policy --bucket data-lake-raw "
"--policy "
'\'{"Version":"2012-10-17","Statement":[{"Effect":"Deny",'
'"Principal":"*","Action":"s3:PutObject",'
'"Resource":"arn:aws:s3:::data-lake-raw/*",'
'"Condition":{"StringNotEquals":'
'{"s3:x-amz-server-side-encryption":"AES256"}}}]}\''
),
],
# -- Task 118: Security β€” DynamoDB PITR + TTL ------------------------------
118: [
(
"aws dynamodb update-continuous-backups "
"--table-name financial-transactions "
"--point-in-time-recovery-specification PointInTimeRecoveryEnabled=true"
),
(
"aws dynamodb update-time-to-live "
"--table-name financial-transactions "
"--time-to-live-specification Enabled=true,AttributeName=expiry_timestamp"
),
],
# -- Task 119: Security β€” SSM SecureString + Secrets Manager ---------------
119: [
(
"aws ssm put-parameter --name /app/database/password-secure "
"--value SuperSecret123 --type SecureString"
),
(
"aws secretsmanager create-secret "
"--name app/database-credentials "
"--secret-string "
'\'{"username":"admin","password":"SuperSecret123"}\''
),
],
# -- Task 120: Security β€” IAM user managed + inline policy fix ------------
120: [
(
"aws iam detach-user-policy --user-name deploy-bot "
"--policy-arn arn:aws:iam::aws:policy/IAMFullAccess"
),
(
"aws iam delete-user-policy --user-name deploy-bot "
"--policy-name admin-access"
),
(
"aws iam put-user-policy --user-name deploy-bot "
"--policy-name deploy-only "
"--policy-document "
'\'{"Version":"2012-10-17","Statement":[{"Effect":"Allow",'
'"Action":["s3:PutObject","codedeploy:*"],'
'"Resource":"*"}]}\''
),
],
# -- Task 121: SRE β€” EventBridge rule enable + Lambda target ---------------
121: [
"aws lambda get-function --function-name etl-runner",
(
"aws events put-rule --name nightly-etl-trigger "
'--schedule-expression "cron(0 2 * * ? *)" '
"--state ENABLED"
),
(
"aws events put-targets --rule nightly-etl-trigger "
"--targets Id=1,Arn=arn:aws:lambda:us-east-1:000000000000:function:etl-runner"
),
],
# -- Task 122: SRE β€” Firehose delivery stream prefix fix -------------------
122: [
"aws s3api head-bucket --bucket clickstream-archive",
(
"aws firehose delete-delivery-stream "
"--delivery-stream-name clickstream-delivery"
),
(
"aws firehose create-delivery-stream "
"--delivery-stream-name clickstream-delivery "
"--s3-destination-configuration "
'\'{"RoleARN":"arn:aws:iam::000000000000:role/firehose-role",'
'"BucketARN":"arn:aws:s3:::clickstream-archive",'
'"Prefix":"clickstream/year=!{timestamp:yyyy}/month=!{timestamp:MM}/"}\''
),
],
# -- Task 123: SRE β€” SNS subscription DLQ + retention (DYNAMIC) ------------
123: [
"aws sqs create-queue --queue-name order-notifications-dlq",
(
"aws sqs set-queue-attributes "
"--queue-url http://localhost:4566/000000000000/order-notifications-dlq "
"--attributes MessageRetentionPeriod=1209600"
),
# Dynamic: set-subscription-attributes resolved after setup
],
# -- Task 124: Security β€” Encrypted EFS + NFS security group ---------------
124: [
(
"aws efs create-file-system --creation-token shared-data-encrypted "
"--encrypted --tags Key=Name,Value=shared-data-encrypted"
),
(
"aws ec2 create-security-group --group-name efs-mount-sg "
'--description "NFS access for EFS"'
),
# authorize-security-group-ingress resolved dynamically (needs group-id)
],
# -- Task 125: SRE β€” Glue job script location fix --------------------------
125: [
(
"aws s3api head-object --bucket glue-scripts-bucket "
"--key scripts/daily-transform.py"
),
(
"aws glue update-job --job-name daily-transform "
"--job-update "
'\'{"Role":"arn:aws:iam::000000000000:role/glue-role",'
'"Command":{"Name":"glueetl",'
'"ScriptLocation":"s3://glue-scripts-bucket/scripts/daily-transform.py",'
'"PythonVersion":"3"}}\''
),
],
# -- Task 126: Security β€” Cognito password policy fix (pool-id dynamic) ----
126: [
# update-user-pool resolved dynamically (needs pool ID from setup)
],
# -- Task 127: SRE β€” CloudFormation stack recovery -------------------------
127: [
"aws s3api create-bucket --bucket legacy-data-backup",
"aws cloudformation delete-stack --stack-name legacy-infra",
(
"aws cloudformation create-stack --stack-name legacy-infra-v2 "
"--template-body "
'\'{"AWSTemplateFormatVersion":"2010-09-09","Resources":{"Table":'
'{"Type":"AWS::DynamoDB::Table","Properties":{"TableName":"legacy-config",'
'"AttributeDefinitions":[{"AttributeName":"id","AttributeType":"S"}],'
'"KeySchema":[{"AttributeName":"id","KeyType":"HASH"}],'
'"BillingMode":"PAY_PER_REQUEST"}}}}\''
),
],
}
# Tasks that need dynamic command resolution from setup state
_DYNAMIC_TASK_IDS = {111, 113, 114, 115, 123, 124, 126}
# ---------------------------------------------------------------------------
# MiniStack Compatibility β€” patching setup commands
# ---------------------------------------------------------------------------
def _patch_setup_command(cmd: str, state: dict[str, str]) -> str:
"""Patch setup commands for MiniStack compatibility."""
# Replace hardcoded Route53 zone-001 with tracked zone ID
if "zone-001" in cmd and "route53_zone_id" in state:
cmd = cmd.replace("zone-001", state["route53_zone_id"])
# Replace --group-name with --group-id for authorize-security-group-ingress
if "authorize-security-group-ingress" in cmd:
for key, val in state.items():
if key.startswith("sg_"):
group_name = key[3:]
if f"--group-name {group_name}" in cmd:
cmd = cmd.replace(
f"--group-name {group_name}",
f"--group-id {val}",
)
return cmd
def _track_state(cmd: str, stdout: str, state: dict[str, str]) -> None:
"""Track dynamic IDs from command outputs for subsequent commands."""
try:
data = json.loads(stdout) if stdout.strip() else {}
except json.JSONDecodeError:
return
# Track Route53 hosted zone ID
if "create-hosted-zone" in cmd and isinstance(data, dict):
hz = data.get("HostedZone", {})
zone_id = hz.get("Id", "")
if "/" in zone_id:
zone_id = zone_id.split("/")[-1]
if zone_id:
state["route53_zone_id"] = zone_id
# Track security group IDs
if "create-security-group" in cmd and isinstance(data, dict):
group_id = data.get("GroupId", "")
if group_id:
match = re.search(r"--group-name\s+(\S+)", cmd)
if match:
state[f"sg_{match.group(1)}"] = group_id
# Track Cognito user pool ID
if "create-user-pool" in cmd and isinstance(data, dict):
pool = data.get("UserPool", {})
pool_id = pool.get("Id", "")
if pool_id:
state["cognito_pool_id"] = pool_id
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _execute_setup(
task_entry: dict, backend: SimulatorStrategy
) -> tuple[list[tuple[str, bool, str, str]], dict[str, str]]:
"""Execute setup commands with patching; return results and tracked state."""
results: list[tuple[str, bool, str, str]] = []
state: dict[str, str] = {}
for cmd in task_entry.get("setup_commands", []):
cmd = _patch_setup_command(cmd, state)
success, stdout, stderr = backend.execute_command(cmd)
results.append((cmd, success, stdout, stderr))
if success:
_track_state(cmd, stdout, state)
return results, state
def _resolve_dynamic_commands(
task_id: int, backend: SimulatorStrategy, state: dict[str, str]
) -> list[str]:
"""Generate commands that depend on dynamic IDs from setup state."""
if task_id == 111:
# authorize-security-group-ingress needs group-id
sg_id = state.get("sg_analytics-db-sg-fixed", "")
if not sg_id:
# Try to get it from the create output
_, stdout, _ = backend.execute_command(
"aws ec2 describe-security-groups --group-names analytics-db-sg-fixed"
)
try:
data = json.loads(stdout)
sg_id = data["SecurityGroups"][0]["GroupId"]
except (json.JSONDecodeError, KeyError, IndexError):
sg_id = ""
return [
f"aws ec2 authorize-security-group-ingress "
f"--group-id {sg_id} "
f"--protocol tcp --port 3306 --cidr 10.0.1.0/24"
]
if task_id == 113:
# RedrivePolicy needs JSON format to avoid shorthand parsing issues
redrive = json.dumps(
{
"deadLetterTargetArn": "arn:aws:sqs:us-east-1:000000000000:order-processing-dlq",
"maxReceiveCount": "5",
}
)
attrs = json.dumps({"RedrivePolicy": redrive})
return [
f"aws sqs set-queue-attributes "
f"--queue-url http://localhost:4566/000000000000/order-processing "
f"--attributes '{attrs}'"
]
if task_id == 114:
# Route53 zone-id from setup
zone_id = state.get("route53_zone_id", "zone-001")
change_batch = json.dumps(
{
"Changes": [
{
"Action": "UPSERT",
"ResourceRecordSet": {
"Name": "api.example.com",
"Type": "A",
"TTL": 300,
"ResourceRecords": [{"Value": "10.0.1.50"}],
},
}
]
}
)
return [
f"aws route53 change-resource-record-sets "
f"--hosted-zone-id {zone_id} "
f"--change-batch '{change_batch}'"
]
if task_id == 115:
# Need target group ARN for modify-target-group
success, stdout, _ = backend.execute_command(
"aws elbv2 describe-target-groups --names web-targets"
)
try:
data = json.loads(stdout)
tg_arn = data["TargetGroups"][0]["TargetGroupArn"]
except (json.JSONDecodeError, KeyError, IndexError):
tg_arn = "unknown"
return [
f"aws elbv2 modify-target-group --target-group-arn {tg_arn} "
f"--health-check-path /health --health-check-port 80 "
f"--health-check-interval-seconds 15 --healthy-threshold-count 2"
]
if task_id == 123:
# Need subscription ARN for set-subscription-attributes
success, stdout, _ = backend.execute_command(
"aws sns list-subscriptions-by-topic "
"--topic-arn arn:aws:sns:us-east-1:000000000000:order-notifications"
)
try:
data = json.loads(stdout)
sub_arn = data["Subscriptions"][0]["SubscriptionArn"]
except (json.JSONDecodeError, KeyError, IndexError):
sub_arn = "unknown"
redrive = json.dumps(
{
"deadLetterTargetArn": "arn:aws:sqs:us-east-1:000000000000:order-notifications-dlq"
}
)
return [
f"aws sns set-subscription-attributes --subscription-arn {sub_arn} "
f"--attribute-name RedrivePolicy "
f"--attribute-value '{redrive}'"
]
if task_id == 124:
# authorize-security-group-ingress needs group-id
sg_id = state.get("sg_efs-mount-sg", "")
if not sg_id:
_, stdout, _ = backend.execute_command(
"aws ec2 describe-security-groups --group-names efs-mount-sg"
)
try:
data = json.loads(stdout)
sg_id = data["SecurityGroups"][0]["GroupId"]
except (json.JSONDecodeError, KeyError, IndexError):
sg_id = ""
return [
f"aws ec2 authorize-security-group-ingress "
f"--group-id {sg_id} "
f"--protocol tcp --port 2049 --cidr 10.0.2.0/24"
]
if task_id == 126:
# Cognito user-pool-id from setup
pool_id = state.get("cognito_pool_id", "us-east-1_customer-auth")
policies = json.dumps(
{
"PasswordPolicy": {
"MinimumLength": 12,
"RequireUppercase": True,
"RequireLowercase": True,
"RequireNumbers": True,
"RequireSymbols": True,
"TemporaryPasswordValidityDays": 1,
}
}
)
return [
f"aws cognito-idp update-user-pool "
f"--user-pool-id {pool_id} "
f"--policies '{policies}'"
]
return []
def _execute_all_commands(
task_id: int, backend: SimulatorStrategy, state: dict[str, str] | None = None
) -> list[tuple[str, bool, str, str]]:
"""Execute static + dynamic solution commands, return all (cmd, ok, out, err)."""
if state is None:
state = {}
static_cmds = EXPERT_COMMANDS[task_id]
results: list[tuple[str, bool, str, str]] = []
for cmd in static_cmds:
success, stdout, stderr = backend.execute_command(cmd)
results.append((cmd, success, stdout, stderr))
# Track security group IDs from solution commands too
if success:
_track_state(cmd, stdout, state)
if task_id in _DYNAMIC_TASK_IDS:
extra_cmds = _resolve_dynamic_commands(task_id, backend, state)
for cmd in extra_cmds:
success, stdout, stderr = backend.execute_command(cmd)
results.append((cmd, success, stdout, stderr))
return results
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
@pytest.fixture(scope="module")
def backend() -> SimulatorStrategy:
return SimulatorStrategy()
@pytest.fixture(scope="module")
def grader(backend: SimulatorStrategy) -> TaskGrader:
return TaskGrader(backend)
@pytest.fixture(scope="module")
def expert_tasks() -> list[dict]:
with open(TASKS_FILE) as f:
return yaml.safe_load(f)
def _build_task(entry: dict, state: dict[str, str] | None = None) -> Task:
"""Build a Task model, patching state_check commands with dynamic IDs."""
task = Task(
task_id=TaskID(entry["task_id"]),
difficulty=TaskDifficulty.EXPERT,
description=entry["description"],
success_criteria=SuccessCriteria(**entry.get("success_criteria", {})),
setup_commands=[
SetupCommand(command=cmd) if isinstance(cmd, str) else SetupCommand(**cmd)
for cmd in entry.get("setup_commands", [])
],
)
# Patch state_check commands with dynamic IDs from setup
if state:
for check in task.success_criteria.state_checks:
if "route53_zone_id" in state and "zone-001" in check.command:
check.command = check.command.replace(
"zone-001", state["route53_zone_id"]
)
if "cognito_pool_id" in state:
pool_id = state["cognito_pool_id"]
check.command = check.command.replace(
"us-east-1_customer-auth", pool_id
)
return task
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
def test_all_expert_tasks_have_commands(expert_tasks: list[dict]) -> None:
"""Every expert task in the YAML must have a corresponding test command sequence."""
missing = [
t["task_id"] for t in expert_tasks if t["task_id"] not in EXPERT_COMMANDS
]
assert not missing, f"No test commands mapped for task_ids: {missing}"
@pytest.mark.parametrize(
"task_id",
sorted(EXPERT_COMMANDS.keys()),
ids=[f"task_{tid}" for tid in sorted(EXPERT_COMMANDS.keys())],
)
def test_expert_task_setup_executes(
task_id: int,
expert_tasks: list[dict],
backend: SimulatorStrategy,
) -> None:
"""All setup commands must execute successfully to provision initial state."""
entry = next((t for t in expert_tasks if t["task_id"] == task_id), None)
assert entry is not None, f"task_id {task_id} not found in expert.yaml"
backend.reset_environment()
results, _ = _execute_setup(entry, backend)
for i, (cmd, success, stdout, stderr) in enumerate(results):
assert success, (
f"Setup command {i + 1}/{len(results)} failed for task {task_id}.\n"
f" Command: {cmd}\n"
f" Stderr: {stderr}"
)
@pytest.mark.parametrize(
"task_id",
sorted(EXPERT_COMMANDS.keys()),
ids=[f"task_{tid}" for tid in sorted(EXPERT_COMMANDS.keys())],
)
def test_expert_task_commands_execute(
task_id: int,
expert_tasks: list[dict],
backend: SimulatorStrategy,
) -> None:
"""All solution commands must execute successfully after setup."""
entry = next((t for t in expert_tasks if t["task_id"] == task_id), None)
assert entry is not None
backend.reset_environment()
_, state = _execute_setup(entry, backend)
results = _execute_all_commands(task_id, backend, state)
for i, (cmd, success, stdout, stderr) in enumerate(results):
assert success, (
f"Command {i + 1}/{len(results)} failed for task {task_id}.\n"
f" Command: {cmd}\n"
f" Stderr: {stderr}"
)
@pytest.mark.parametrize(
"task_id",
sorted(EXPERT_COMMANDS.keys()),
ids=[f"task_{tid}" for tid in sorted(EXPERT_COMMANDS.keys())],
)
def test_expert_task_grading(
task_id: int,
expert_tasks: list[dict],
backend: SimulatorStrategy,
grader: TaskGrader,
) -> None:
"""Execute setup + full solution and verify the grader marks the task as achieved."""
entry = next((t for t in expert_tasks if t["task_id"] == task_id), None)
assert entry is not None, f"task_id {task_id} not found in expert.yaml"
backend.reset_environment()
_, state = _execute_setup(entry, backend)
task = _build_task(entry, state)
results = _execute_all_commands(task_id, backend, state)
tracker = EpisodeTracker()
for cmd, success, stdout, stderr in results:
step = tracker.record_step(cmd, success, stdout, stderr)
result = grader.grade(task, tracker, step)
all_cmds = [r[0] for r in results]
assert result.task_achieved, (
f"Task {task_id} not achieved.\n"
f" Description: {entry['description']}\n"
f" Commands: {all_cmds}\n"
f" Reason: {result.reason}\n"
f" Reward: {result.reward}"
)
assert result.reward == 1.0, f"Expected reward=1.0, got {result.reward}"
@pytest.mark.parametrize(
"task_id",
sorted(EXPERT_COMMANDS.keys()),
ids=[f"task_{tid}_setup_only" for tid in sorted(EXPERT_COMMANDS.keys())],
)
def test_expert_task_setup_only_gives_no_completion(
task_id: int,
expert_tasks: list[dict],
backend: SimulatorStrategy,
grader: TaskGrader,
) -> None:
"""Running only setup commands (no agent fix actions) should not achieve the task."""
entry = next((t for t in expert_tasks if t["task_id"] == task_id), None)
assert entry is not None
backend.reset_environment()
_, state = _execute_setup(entry, backend)
task = _build_task(entry, state)
# Agent does a no-op command to produce a StepRecord
tracker = EpisodeTracker()
success, stdout, stderr = backend.execute_command("aws sts get-caller-identity")
step = tracker.record_step("aws sts get-caller-identity", success, stdout, stderr)
result = grader.grade(task, tracker, step)
assert not result.task_achieved, (
f"Task {task_id} should NOT be achieved with only setup + no-op.\n"
f" Reason: {result.reason}"
)
assert result.reward < 1.0
@pytest.mark.parametrize(
"task_id",
sorted(EXPERT_COMMANDS.keys()),
ids=[f"task_{tid}_partial" for tid in sorted(EXPERT_COMMANDS.keys())],
)
def test_expert_task_partial_gives_no_completion(
task_id: int,
expert_tasks: list[dict],
backend: SimulatorStrategy,
grader: TaskGrader,
) -> None:
"""Executing only the first solution command should not achieve a multi-step task."""
entry = next((t for t in expert_tasks if t["task_id"] == task_id), None)
assert entry is not None
state_checks = entry.get("success_criteria", {}).get("state_checks", [])
if len(state_checks) < 2:
pytest.skip("Single state-check task β€” partial test not applicable")
static_cmds = EXPERT_COMMANDS[task_id]
if len(static_cmds) < 1:
pytest.skip("No static commands β€” dynamic-only task")
backend.reset_environment()
_, state = _execute_setup(entry, backend)
task = _build_task(entry, state)
cmd = static_cmds[0]
success, stdout, stderr = backend.execute_command(cmd)
tracker = EpisodeTracker()
step = tracker.record_step(cmd, success, stdout, stderr)
result = grader.grade(task, tracker, step)
assert not result.task_achieved, (
f"Task {task_id} should NOT be achieved with only the first command.\n"
f" Command: {cmd}\n"
f" Reason: {result.reason}"
)
assert result.reward < 1.0