Spaces:
Sleeping
Sleeping
| """Round 1 task generation and distribution script.""" | |
| import csv | |
| import json | |
| from datetime import datetime | |
| import httpx | |
| from instructor.database import Database | |
| from instructor.task_templates import TaskTemplateManager | |
| from shared.config import settings | |
| from shared.logger import setup_logger | |
| from shared.models import Attachment, TaskRequest | |
| from shared.utils import generate_nonce | |
| logger = setup_logger(__name__) | |
| class Round1TaskGenerator: | |
| """Generate and send round 1 tasks to students.""" | |
| def __init__(self) -> None: | |
| """Initialize task generator.""" | |
| self.db = Database() | |
| self.template_manager = TaskTemplateManager() | |
| self.db.create_tables() | |
| def load_submissions(self) -> list[dict]: | |
| """Load submissions from CSV file. | |
| Returns: | |
| List of submission dictionaries | |
| """ | |
| submissions = [] | |
| if not settings.submissions_csv.exists(): | |
| logger.error(f"Submissions file not found: {settings.submissions_csv}") | |
| return submissions | |
| with open(settings.submissions_csv, "r") as f: | |
| reader = csv.DictReader(f) | |
| for row in reader: | |
| submissions.append( | |
| { | |
| "timestamp": row["timestamp"], | |
| "email": row["email"], | |
| "endpoint": row["endpoint"], | |
| "secret": row["secret"], | |
| } | |
| ) | |
| logger.info(f"Loaded {len(submissions)} submissions") | |
| return submissions | |
| def generate_task_request(self, submission: dict) -> TaskRequest: | |
| """Generate task request for a submission. | |
| Args: | |
| submission: Submission data | |
| Returns: | |
| Task request | |
| """ | |
| email = submission["email"] | |
| # Check if round 1 task already exists | |
| # We check if there's a successful task (with statuscode 200) | |
| session = self.db.get_session() | |
| try: | |
| existing = ( | |
| session.query(self.db.Task) | |
| .filter_by(email=email, round=1, statuscode=200) | |
| .first() | |
| ) | |
| if existing: | |
| logger.info(f"Round 1 task already exists for {email}, skipping") | |
| return None | |
| finally: | |
| session.close() | |
| # Generate task from random template | |
| task_data = self.template_manager.generate_task(email, round_num=1) | |
| # Create task request | |
| nonce = generate_nonce() | |
| attachments = [Attachment(**att) for att in task_data["attachments"]] | |
| task_request = TaskRequest( | |
| email=email, | |
| secret=submission["secret"], | |
| task=task_data["task_id"], | |
| round=1, | |
| nonce=nonce, | |
| brief=task_data["brief"], | |
| checks=task_data["checks"], | |
| evaluation_url=settings.evaluation_api_url, | |
| attachments=attachments, | |
| ) | |
| logger.info(f"Generated task request for {email}: {task_request.task}") | |
| return task_request, submission | |
| async def send_task_request( | |
| self, task_request: TaskRequest, submission: dict | |
| ) -> int: | |
| """Send task request to student endpoint. | |
| Args: | |
| task_request: Task request to send | |
| submission: Submission data with endpoint | |
| Returns: | |
| HTTP status code | |
| """ | |
| endpoint = submission["endpoint"] | |
| logger.info(f"Sending task to {endpoint}") | |
| try: | |
| async with httpx.AsyncClient(timeout=30.0) as client: | |
| response = await client.post( | |
| endpoint, | |
| json=task_request.model_dump(), | |
| headers={"Content-Type": "application/json"}, | |
| ) | |
| status_code = response.status_code | |
| logger.info( | |
| f"Task sent to {endpoint}: " | |
| f"status {status_code}, response: {response.text[:200]}" | |
| ) | |
| return status_code | |
| except Exception as e: | |
| logger.error(f"Failed to send task to {endpoint}: {e}") | |
| return 0 # Use 0 to indicate failure | |
| def save_task_record( | |
| self, task_request: TaskRequest, submission: dict, status_code: int | |
| ) -> None: | |
| """Save task record to database. | |
| Args: | |
| task_request: Task request | |
| submission: Submission data | |
| status_code: HTTP status code from response | |
| """ | |
| task_data = { | |
| "timestamp": datetime.utcnow(), | |
| "email": task_request.email, | |
| "task": task_request.task, | |
| "round": task_request.round, | |
| "nonce": task_request.nonce, | |
| "brief": task_request.brief, | |
| "attachments": json.dumps([att.model_dump() for att in task_request.attachments]), | |
| "checks": json.dumps(task_request.checks), | |
| "evaluation_url": task_request.evaluation_url, | |
| "endpoint": submission["endpoint"], | |
| "statuscode": status_code, | |
| "secret": submission["secret"], | |
| } | |
| self.db.add_task(task_data) | |
| logger.info(f"Saved task record: {task_request.task}") | |
| async def process_submission(self, submission: dict) -> None: | |
| """Process a single submission. | |
| Args: | |
| submission: Submission data | |
| """ | |
| try: | |
| # Generate task | |
| result = self.generate_task_request(submission) | |
| if result is None: | |
| return # Already processed | |
| task_request, submission = result | |
| # Send task | |
| status_code = await self.send_task_request(task_request, submission) | |
| # Save record | |
| self.save_task_record(task_request, submission, status_code) | |
| if status_code == 200: | |
| logger.info(f"Successfully sent round 1 task to {submission['email']}") | |
| else: | |
| logger.warning( | |
| f"Failed to send round 1 task to {submission['email']}: " | |
| f"status {status_code}" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error processing submission {submission['email']}: {e}", exc_info=True) | |
| async def run(self) -> None: | |
| """Run round 1 task generation.""" | |
| logger.info("Starting round 1 task generation") | |
| # Load submissions | |
| submissions = self.load_submissions() | |
| if not submissions: | |
| logger.error("No submissions to process") | |
| return | |
| # Process each submission | |
| for submission in submissions: | |
| await self.process_submission(submission) | |
| logger.info("Round 1 task generation complete") | |
| async def main(): | |
| """Main entry point.""" | |
| generator = Round1TaskGenerator() | |
| await generator.run() | |
| if __name__ == "__main__": | |
| import asyncio | |
| asyncio.run(main()) | |