Spaces:
Sleeping
Sleeping
| import json | |
| import re | |
| from typing import Any, Dict | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| # Dummy data for demonstration. You should replace this with your actual opcode data. | |
| # Each item should have at least an 'opcode' and 'text' field. | |
| hat_block_data = [ | |
| {"opcode": "event_whenflagclicked", "text": "when green flag clicked"}, | |
| {"opcode": "event_whenkeypressed", "text": "when [key] pressed"}, | |
| {"opcode": "event_whenbroadcastreceived", "text": "when I receive [message]"}, | |
| ] | |
| boolean_block_data = [ | |
| {"opcode": "operator_gt", "text": "< ( ) > ( ) >"}, | |
| {"opcode": "sensing_touchingobject", "text": "<touching [object]?>"}, | |
| {"opcode": "operator_equals", "text": "< ( ) = ( ) >"}, | |
| ] | |
| c_block_data = [ | |
| {"opcode": "control_forever", "text": "forever"}, | |
| {"opcode": "control_if", "text": "if < > then"}, | |
| {"opcode": "control_repeat", "text": "repeat ( )"}, | |
| ] | |
| cap_block_data = [ | |
| {"opcode": "control_stop", "text": "stop [all]"}, | |
| ] | |
| reporter_block_data = [ | |
| {"opcode": "motion_xposition", "text": "(x position)"}, | |
| {"opcode": "motion_yposition", "text": "(y position)"}, | |
| {"opcode": "data_variable", "text": "(variable)"}, | |
| {"opcode": "sensing_answer", "text": "(answer)"}, | |
| ] | |
| stack_block_data = [ | |
| {"opcode": "motion_gotoxy", "text": "go to x: ( ) y: ( )"}, | |
| {"opcode": "motion_changeyby", "text": "change y by ( )"}, | |
| {"opcode": "motion_setx", "text": "set x to ( )"}, | |
| {"opcode": "motion_glidesecstoxy", "text": "glide ( ) secs to x: ( ) y: ( )"}, | |
| {"opcode": "data_setvariableto", "text": "set [variable] to ( )"}, | |
| {"opcode": "looks_hide", "text": "hide"}, | |
| {"opcode": "looks_show", "text": "show"}, | |
| {"opcode": "event_broadcast", "text": "broadcast [message]"}, | |
| ] | |
| # Combine all block data into a single list for easier lookup | |
| all_opcodes_list = [] | |
| for category_data in [ | |
| hat_block_data, | |
| boolean_block_data, | |
| c_block_data, | |
| cap_block_data, | |
| reporter_block_data, | |
| stack_block_data, | |
| ]: | |
| all_opcodes_list.extend(category_data) | |
| def extract_json_from_llm_response(response_text: str) -> Dict[str, Any]: | |
| """Extracts JSON from an LLM response string.""" | |
| try: | |
| json_match = re.search(r"```json\n(.*)\n```", response_text, re.DOTALL) | |
| if json_match: | |
| return json.loads(json_match.group(1)) | |
| return json.loads(response_text) # Try parsing directly if no code block | |
| except json.JSONDecodeError as e: | |
| logger.error(f"Failed to decode JSON: {e} from response: {response_text}") | |
| raise | |
| # Node 9:plan with exact count of the opcode used per logic | |
| def plan_opcode_counter_node(state: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| For each plan in state["action_plan"]["action_overall_flow"], calls the LLM agent | |
| to analyze the `logic` string and return a list of {opcode, count} for each category. | |
| """ | |
| logger.info("=== Running OPCODE COUTER LOGIC with LLM counts ===") | |
| game_description = state.get("description", "No game description provided.") | |
| sprite_name = {target["name"]: target["name"] for target in state["project_json"]["targets"]} # Adjusted for direct use | |
| action_flow = state.get("action_plan", {}).get("action_overall_flow", {}) | |
| if not action_flow: | |
| logger.warning("No action_overall_flow found; skipping.") | |
| return state | |
| # Prepare block reference strings for the prompt | |
| hat_description = "Blocks that start a script when an event happens." | |
| hat_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in hat_block_data]) | |
| boolean_description = "Blocks that report a true or false value and fit into hexagonal inputs." | |
| boolean_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in boolean_block_data]) | |
| c_description = "Blocks that run scripts inside them repeatedly or conditionally." | |
| c_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in c_block_data]) | |
| cap_description = "Blocks that end a script." | |
| cap_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in cap_block_data]) | |
| reporter_description = "Blocks that report a value (number or string) and fit into rounded inputs." | |
| reporter_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in reporter_block_data]) | |
| stack_description = "Blocks that perform a main action in a script." | |
| stack_opcodes_functionalities = "\n".join([f"- {block['opcode']}: {block['text']}" for block in stack_block_data]) | |
| refined_flow: Dict[str, Any] = {} | |
| for sprite, sprite_data in action_flow.items(): # Use .items() for direct iteration | |
| refined_plans = [] | |
| for plan in sprite_data.get("plans", []): | |
| logic = plan.get("logic", "") | |
| event = plan.get("event", "") | |
| # This is where the core change for counting opcodes will happen. | |
| # We will use the 'logic' string to determine the actual opcodes and their counts. | |
| opcode_counts = { | |
| "motion": [], | |
| "control": [], | |
| "operator": [], | |
| "sensing": [], | |
| "looks": [], | |
| "sounds": [], | |
| "events": [], | |
| "data": [], | |
| } | |
| # Initialize a dictionary to hold counts for each opcode | |
| temp_opcode_counts = {} | |
| # Add the event block explicitly | |
| if event: | |
| event_opcode = event.replace('v', '').strip() # Clean the event string | |
| temp_opcode_counts[event_opcode] = temp_opcode_counts.get(event_opcode, 0) + 1 | |
| # Iterate through all known opcodes and check if their 'text' appears in the logic | |
| for block_info in all_opcodes_list: | |
| opcode = block_info["opcode"] | |
| # Use a more robust regex for matching, accounting for variable names or block inputs | |
| # We need to be careful with common words that are also part of opcodes, e.g., "if" | |
| # A more robust solution might involve parsing the Scratch-like logic more deeply. | |
| # For now, let's try to match the "text" from the block definition. | |
| # Escape special characters in the block text for regex | |
| block_text_escaped = re.escape(block_info["text"]) | |
| # Replace placeholders like [key], [object], ( ) with regex wildcards | |
| block_text_pattern = block_text_escaped.replace(r"\[key\]", r".*?").replace(r"\[message\]", r".*?").replace(r"\[object\]", r".*?").replace(r"\( \)", r".*?") | |
| block_text_pattern = block_text_pattern.replace(r"\[variable\]", r".*?") | |
| # For blocks that might have variations in text (e.g., if-then, if-then-else) | |
| if opcode == "control_if": | |
| if_regex = r"if <.+?> then" | |
| if_else_regex = r"if <.+?> then\n.*else" | |
| if re.search(if_else_regex, logic, re.DOTALL): | |
| temp_opcode_counts["control_if_else"] = temp_opcode_counts.get("control_if_else", 0) + 1 | |
| elif re.search(if_regex, logic, re.DOTALL): | |
| temp_opcode_counts["control_if"] = temp_opcode_counts.get("control_if", 0) + 1 | |
| continue # Skip general matching for control_if | |
| if opcode == "control_forever" and "forever" in logic: | |
| temp_opcode_counts[opcode] = temp_opcode_counts.get(opcode, 0) + 1 | |
| continue # Skip general matching | |
| # General regex match for other blocks | |
| # We need to make sure we're not just matching substrings of other blocks | |
| # A simple word boundary or line-by-line check might be better | |
| # For now, a simple count of occurrences of the "text" within the logic | |
| # will be used, but this is a simplification. | |
| count = len(re.findall(block_text_pattern, logic)) | |
| if count > 0: | |
| temp_opcode_counts[opcode] = temp_opcode_counts.get(opcode, 0) + count | |
| # Fill the opcode_counts for each category based on temp_opcode_counts | |
| def add_to_category(category_list, opcode_name, count): | |
| if count > 0: | |
| category_list.append({"opcode": opcode_name, "count": count}) | |
| for opcode, count in temp_opcode_counts.items(): | |
| if opcode.startswith("motion_"): | |
| add_to_category(opcode_counts["motion"], opcode, count) | |
| elif opcode.startswith("control_"): | |
| add_to_category(opcode_counts["control"], opcode, count) | |
| elif opcode.startswith("operator_"): | |
| add_to_category(opcode_counts["operator"], opcode, count) | |
| elif opcode.startswith("sensing_"): | |
| add_to_category(opcode_counts["sensing"], opcode, count) | |
| elif opcode.startswith("looks_"): | |
| add_to_category(opcode_counts["looks"], opcode, count) | |
| elif opcode.startswith("sounds_"): | |
| add_to_category(opcode_counts["sounds"], opcode, count) | |
| elif opcode.startswith("event_"): | |
| add_to_category(opcode_counts["events"], opcode, count) | |
| elif opcode.startswith("data_"): | |
| add_to_category(opcode_counts["data"], opcode, count) | |
| # Assign the new opcode_counts to the plan | |
| plan["opcode_counts"] = opcode_counts | |
| # The original plan structure also had categories as direct keys. | |
| # You can choose to keep this or remove it, depending on your downstream needs. | |
| # If you want to keep it, you'd populate them based on opcode_counts. | |
| # For simplicity, let's keep the new 'opcode_counts' key as requested. | |
| # Clear previous lists if you are relying solely on 'opcode_counts' | |
| plan["motion"] = [] | |
| plan["control"] = [] | |
| plan["operator"] = [] | |
| plan["sensing"] = [] | |
| plan["looks"] = [] | |
| plan["sounds"] = [] | |
| plan["events"] = [] | |
| plan["data"] = [] | |
| # Populate the individual lists based on the newly calculated opcode_counts if needed | |
| for category, opcodes_list in opcode_counts.items(): | |
| for item in opcodes_list: | |
| # Append just the opcode string to the category list | |
| plan[category].extend([item['opcode']] * item['count']) | |
| refined_plans.append(plan) | |
| refined_flow[sprite] = { | |
| "description": sprite_data.get("description", ""), | |
| "plans": refined_plans | |
| } | |
| state["temporary_node"] = refined_flow | |
| print(f"[OPCODE COUTER LOGIC]: {refined_flow}") | |
| logger.info("=== OPCODE COUTER LOGIC completed ===") | |
| return state |