temp / instructor /round1.py
CheeksTheGeek's picture
Initial commit: LLM Code Deployment System
c5292d8 unverified
"""Round 1 task generation and distribution script."""
import csv
import json
from datetime import datetime
import httpx
from instructor.database import Database
from instructor.task_templates import TaskTemplateManager
from shared.config import settings
from shared.logger import setup_logger
from shared.models import Attachment, TaskRequest
from shared.utils import generate_nonce
logger = setup_logger(__name__)
class Round1TaskGenerator:
"""Generate and send round 1 tasks to students."""
def __init__(self) -> None:
"""Initialize task generator."""
self.db = Database()
self.template_manager = TaskTemplateManager()
self.db.create_tables()
def load_submissions(self) -> list[dict]:
"""Load submissions from CSV file.
Returns:
List of submission dictionaries
"""
submissions = []
if not settings.submissions_csv.exists():
logger.error(f"Submissions file not found: {settings.submissions_csv}")
return submissions
with open(settings.submissions_csv, "r") as f:
reader = csv.DictReader(f)
for row in reader:
submissions.append(
{
"timestamp": row["timestamp"],
"email": row["email"],
"endpoint": row["endpoint"],
"secret": row["secret"],
}
)
logger.info(f"Loaded {len(submissions)} submissions")
return submissions
def generate_task_request(self, submission: dict) -> TaskRequest:
"""Generate task request for a submission.
Args:
submission: Submission data
Returns:
Task request
"""
email = submission["email"]
# Check if round 1 task already exists
# We check if there's a successful task (with statuscode 200)
session = self.db.get_session()
try:
existing = (
session.query(self.db.Task)
.filter_by(email=email, round=1, statuscode=200)
.first()
)
if existing:
logger.info(f"Round 1 task already exists for {email}, skipping")
return None
finally:
session.close()
# Generate task from random template
task_data = self.template_manager.generate_task(email, round_num=1)
# Create task request
nonce = generate_nonce()
attachments = [Attachment(**att) for att in task_data["attachments"]]
task_request = TaskRequest(
email=email,
secret=submission["secret"],
task=task_data["task_id"],
round=1,
nonce=nonce,
brief=task_data["brief"],
checks=task_data["checks"],
evaluation_url=settings.evaluation_api_url,
attachments=attachments,
)
logger.info(f"Generated task request for {email}: {task_request.task}")
return task_request, submission
async def send_task_request(
self, task_request: TaskRequest, submission: dict
) -> int:
"""Send task request to student endpoint.
Args:
task_request: Task request to send
submission: Submission data with endpoint
Returns:
HTTP status code
"""
endpoint = submission["endpoint"]
logger.info(f"Sending task to {endpoint}")
try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
endpoint,
json=task_request.model_dump(),
headers={"Content-Type": "application/json"},
)
status_code = response.status_code
logger.info(
f"Task sent to {endpoint}: "
f"status {status_code}, response: {response.text[:200]}"
)
return status_code
except Exception as e:
logger.error(f"Failed to send task to {endpoint}: {e}")
return 0 # Use 0 to indicate failure
def save_task_record(
self, task_request: TaskRequest, submission: dict, status_code: int
) -> None:
"""Save task record to database.
Args:
task_request: Task request
submission: Submission data
status_code: HTTP status code from response
"""
task_data = {
"timestamp": datetime.utcnow(),
"email": task_request.email,
"task": task_request.task,
"round": task_request.round,
"nonce": task_request.nonce,
"brief": task_request.brief,
"attachments": json.dumps([att.model_dump() for att in task_request.attachments]),
"checks": json.dumps(task_request.checks),
"evaluation_url": task_request.evaluation_url,
"endpoint": submission["endpoint"],
"statuscode": status_code,
"secret": submission["secret"],
}
self.db.add_task(task_data)
logger.info(f"Saved task record: {task_request.task}")
async def process_submission(self, submission: dict) -> None:
"""Process a single submission.
Args:
submission: Submission data
"""
try:
# Generate task
result = self.generate_task_request(submission)
if result is None:
return # Already processed
task_request, submission = result
# Send task
status_code = await self.send_task_request(task_request, submission)
# Save record
self.save_task_record(task_request, submission, status_code)
if status_code == 200:
logger.info(f"Successfully sent round 1 task to {submission['email']}")
else:
logger.warning(
f"Failed to send round 1 task to {submission['email']}: "
f"status {status_code}"
)
except Exception as e:
logger.error(f"Error processing submission {submission['email']}: {e}", exc_info=True)
async def run(self) -> None:
"""Run round 1 task generation."""
logger.info("Starting round 1 task generation")
# Load submissions
submissions = self.load_submissions()
if not submissions:
logger.error("No submissions to process")
return
# Process each submission
for submission in submissions:
await self.process_submission(submission)
logger.info("Round 1 task generation complete")
async def main():
"""Main entry point."""
generator = Round1TaskGenerator()
await generator.run()
if __name__ == "__main__":
import asyncio
asyncio.run(main())