|
import json
|
|
import uuid
|
|
import logging
|
|
from typing import Dict, Any, List
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
GameState = Dict[str, Any]
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MockAgent:
|
|
def invoke(self, payload: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Mocks an LLM agent invocation. In a real scenario, this would call your
|
|
actual LLM API (e.g., through LangChain, LlamaIndex, etc.).
|
|
"""
|
|
user_message = payload["messages"][-1]["content"]
|
|
print(f"\n--- Mock Agent Received Prompt (partial) ---\n{user_message[:500]}...\n------------------------------------------")
|
|
|
|
|
|
|
|
if "Propose a high-level action flow" in user_message:
|
|
return {
|
|
"messages": [{
|
|
"content": json.dumps({
|
|
"action_overall_flow": {
|
|
"Sprite1": {
|
|
"description": "Basic movement and interaction",
|
|
"plans": [
|
|
{
|
|
"event": "when flag clicked",
|
|
"logic": "forever loop: move 10 steps, if touching Edge then turn 15 degrees"
|
|
},
|
|
{
|
|
"event": "when space key pressed",
|
|
"logic": "say Hello! for 2 seconds"
|
|
}
|
|
]
|
|
},
|
|
"Ball": {
|
|
"description": "Simple bouncing behavior",
|
|
"plans": [
|
|
{
|
|
"event": "when flag clicked",
|
|
"logic": "move 5 steps, if on edge bounce"
|
|
}
|
|
]
|
|
}
|
|
}
|
|
})
|
|
}]
|
|
}
|
|
elif "You are an AI assistant generating Scratch 3.0 block JSON" in user_message:
|
|
|
|
|
|
|
|
if "Sprite1" in user_message:
|
|
return {
|
|
"messages": [{
|
|
"content": json.dumps({
|
|
f"block_id_{generate_block_id()}": {
|
|
"opcode": "event_whenflagclicked",
|
|
"next": f"block_id_{generate_block_id()}_forever",
|
|
"parent": None,
|
|
"inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 100, "y": 100
|
|
},
|
|
f"block_id_{generate_block_id()}_forever": {
|
|
"opcode": "control_forever",
|
|
"next": None,
|
|
"parent": f"block_id_{generate_block_id()}",
|
|
"inputs": {
|
|
"SUBSTACK": [2, f"block_id_{generate_block_id()}_move"]
|
|
}, "fields": {}, "shadow": False, "topLevel": False
|
|
},
|
|
f"block_id_{generate_block_id()}_move": {
|
|
"opcode": "motion_movesteps",
|
|
"next": f"block_id_{generate_block_id()}_if",
|
|
"parent": f"block_id_{generate_block_id()}_forever",
|
|
"inputs": {
|
|
"STEPS": [1, [4, "10"]]
|
|
}, "fields": {}, "shadow": False, "topLevel": False
|
|
},
|
|
f"block_id_{generate_block_id()}_if": {
|
|
"opcode": "control_if",
|
|
"next": None,
|
|
"parent": f"block_id_{generate_block_id()}_forever",
|
|
"inputs": {
|
|
"CONDITION": [2, f"block_id_{generate_block_id()}_touching"],
|
|
"SUBSTACK": [2, f"block_id_{generate_block_id()}_turn"]
|
|
}, "fields": {}, "shadow": False, "topLevel": False
|
|
},
|
|
f"block_id_{generate_block_id()}_touching": {
|
|
"opcode": "sensing_touchingobject",
|
|
"next": None,
|
|
"parent": f"block_id_{generate_block_id()}_if",
|
|
"inputs": {
|
|
"TOUCHINGOBJECTMENU": [1, f"block_id_{generate_block_id()}_touching_menu"]
|
|
}, "fields": {}, "shadow": False, "topLevel": False
|
|
},
|
|
f"block_id_{generate_block_id()}_touching_menu": {
|
|
"opcode": "sensing_touchingobjectmenu",
|
|
"next": None,
|
|
"parent": f"block_id_{generate_block_id()}_touching",
|
|
"inputs": {},
|
|
"fields": {"TOUCHINGOBJECTMENU": ["_edge_", None]},
|
|
"shadow": True, "topLevel": False
|
|
},
|
|
f"block_id_{generate_block_id()}_turn": {
|
|
"opcode": "motion_turnright",
|
|
"next": None,
|
|
"parent": f"block_id_{generate_block_id()}_if",
|
|
"inputs": {
|
|
"DEGREES": [1, [4, "15"]]
|
|
}, "fields": {}, "shadow": False, "topLevel": False
|
|
},
|
|
f"block_id_{generate_block_id()}_say": {
|
|
"opcode": "looks_sayforsecs",
|
|
"next": None,
|
|
"parent": None,
|
|
"inputs": {
|
|
"MESSAGE": [1, [10, "Hello!"]],
|
|
"SECS": [1, [4, "2"]]
|
|
}, "fields": {}, "shadow": False, "topLevel": True, "x": 300, "y": 100
|
|
}
|
|
})
|
|
}]
|
|
}
|
|
elif "Ball" in user_message:
|
|
return {
|
|
"messages": [{
|
|
"content": json.dumps({
|
|
f"block_id_{generate_block_id()}": {
|
|
"opcode": "event_whenflagclicked",
|
|
"next": f"block_id_{generate_block_id()}_moveball",
|
|
"parent": None,
|
|
"inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 100, "y": 100
|
|
},
|
|
f"block_id_{generate_block_id()}_moveball": {
|
|
"opcode": "motion_movesteps",
|
|
"next": f"block_id_{generate_block_id()}_edgebounce",
|
|
"parent": f"block_id_{generate_block_id()}",
|
|
"inputs": {
|
|
"STEPS": [1, [4, "5"]]
|
|
}, "fields": {}, "shadow": False, "topLevel": False
|
|
},
|
|
f"block_id_{generate_block_id()}_edgebounce": {
|
|
"opcode": "motion_ifonedgebounce",
|
|
"next": None,
|
|
"parent": f"block_id_{generate_block_id()}_moveball",
|
|
"inputs": {}, "fields": {}, "shadow": False, "topLevel": False
|
|
}
|
|
})
|
|
}]
|
|
}
|
|
return {"messages": [{"content": "[]"}]}
|
|
|
|
agent = MockAgent()
|
|
|
|
|
|
def generate_block_id():
|
|
return str(uuid.uuid4())[:10].replace('-', '')
|
|
|
|
|
|
def extract_json_from_llm_response(response_string):
|
|
try:
|
|
|
|
json_match = response_string.strip().replace("```json", "").replace("```", "").strip()
|
|
return json.loads(json_match)
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Failed to decode JSON from LLM response: {e}")
|
|
logger.error(f"Raw response: {response_string}")
|
|
raise ValueError("Invalid JSON response from LLM")
|
|
|
|
|
|
|
|
|
|
|
|
ALL_SCRATCH_BLOCKS_CATALOG = {
|
|
"motion_movesteps": {
|
|
"opcode": "motion_movesteps", "next": None, "parent": None,
|
|
"inputs": {"STEPS": [1, [4, "10"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 464, "y": -416
|
|
},
|
|
"motion_turnright": {
|
|
"opcode": "motion_turnright", "next": None, "parent": None,
|
|
"inputs": {"DEGREES": [1, [4, "15"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 467, "y": -316
|
|
},
|
|
"motion_ifonedgebounce": {
|
|
"opcode": "motion_ifonedgebounce", "next": None, "parent": None,
|
|
"inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 467, "y": -316
|
|
},
|
|
"event_whenflagclicked": {
|
|
"opcode": "event_whenflagclicked", "next": None, "parent": None,
|
|
"inputs": {}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10
|
|
},
|
|
"event_whenkeypressed": {
|
|
"opcode": "event_whenkeypressed", "next": None, "parent": None,
|
|
"inputs": {}, "fields": {"KEY_OPTION": ["space", None]}, "shadow": False, "topLevel": True, "x": 10, "y": 10
|
|
},
|
|
"control_forever": {
|
|
"opcode": "control_forever", "next": None, "parent": None,
|
|
"inputs": {"SUBSTACK": [2, "some_id"]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10
|
|
},
|
|
"control_if": {
|
|
"opcode": "control_if", "next": None, "parent": None,
|
|
"inputs": {"CONDITION": [2, "some_id"], "SUBSTACK": [2, "some_id_2"]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10
|
|
},
|
|
"looks_sayforsecs": {
|
|
"opcode": "looks_sayforsecs", "next": None, "parent": None,
|
|
"inputs": {"MESSAGE": [1, [10, "Hello!"]], "SECS": [1, [4, "2"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10
|
|
},
|
|
"looks_say": {
|
|
"opcode": "looks_say", "next": None, "parent": None,
|
|
"inputs": {"MESSAGE": [1, [10, "Hello!"]]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10
|
|
},
|
|
"sensing_touchingobject": {
|
|
"opcode": "sensing_touchingobject", "next": None, "parent": None,
|
|
"inputs": {"TOUCHINGOBJECTMENU": [1, "some_id"]}, "fields": {}, "shadow": False, "topLevel": True, "x": 10, "y": 10
|
|
},
|
|
"sensing_touchingobjectmenu": {
|
|
"opcode": "sensing_touchingobjectmenu", "next": None, "parent": None,
|
|
"inputs": {}, "fields": {"TOUCHINGOBJECTMENU": ["_mouse_", None]}, "shadow": True, "topLevel": True, "x": 10, "y": 10
|
|
},
|
|
|
|
}
|
|
|
|
|
|
def get_relevant_blocks_for_plan(action_plan: Dict[str, Any], all_blocks_catalog: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Analyzes the natural language action plan and selects relevant Scratch blocks
|
|
from the comprehensive catalog. This is a heuristic approach and might need
|
|
to be refined based on your specific use cases and LLM capabilities.
|
|
"""
|
|
relevant_opcodes = set()
|
|
|
|
|
|
relevant_opcodes.add("event_whenflagclicked")
|
|
relevant_opcodes.add("event_whenkeypressed")
|
|
|
|
|
|
keyword_map = {
|
|
"move": "motion_movesteps",
|
|
"steps": "motion_movesteps",
|
|
"turn": "motion_turnright",
|
|
"rotate": "motion_turnright",
|
|
"bounce": "motion_ifonedgebounce",
|
|
"edge": "motion_ifonedgebounce",
|
|
"forever": "control_forever",
|
|
"loop": "control_forever",
|
|
"if": "control_if",
|
|
"condition": "control_if",
|
|
"say": "looks_say",
|
|
"hello": "looks_say",
|
|
"touching": "sensing_touchingobject",
|
|
"mouse pointer": "sensing_touchingobjectmenu",
|
|
"edge": "sensing_touchingobjectmenu",
|
|
}
|
|
|
|
|
|
for sprite_name, sprite_actions in action_plan.get("action_overall_flow", {}).items():
|
|
for plan in sprite_actions.get("plans", []):
|
|
event_logic = plan.get("event", "").lower() + " " + plan.get("logic", "").lower()
|
|
|
|
|
|
for opcode in all_blocks_catalog.keys():
|
|
if opcode in event_logic:
|
|
relevant_opcodes.add(opcode)
|
|
|
|
|
|
for keyword, opcode in keyword_map.items():
|
|
if keyword in event_logic:
|
|
relevant_opcodes.add(opcode)
|
|
|
|
if opcode == "sensing_touchingobject":
|
|
relevant_opcodes.add("sensing_touchingobjectmenu")
|
|
if opcode == "event_whenkeypressed":
|
|
relevant_opcodes.add("event_whenkeypressed")
|
|
|
|
|
|
relevant_blocks_catalog = {
|
|
opcode: all_blocks_catalog[opcode]
|
|
for opcode in relevant_opcodes if opcode in all_blocks_catalog
|
|
}
|
|
return relevant_blocks_catalog
|
|
|
|
|
|
def plan_sprite_actions(state: GameState):
|
|
logger.info("--- Running PlanSpriteActionsNode ---")
|
|
|
|
planning_prompt = (
|
|
f"You are an AI assistant tasked with planning Scratch 3.0 block code for a game. "
|
|
f"The game description is: '{state['description']}'.\n\n"
|
|
f"Here are the sprites currently in the project: {', '.join(target['name'] for target in state['project_json']['targets'] if not target['isStage']) if len(state['project_json']['targets']) > 1 else 'None'}.\n"
|
|
f"Initial positions: {json.dumps(state.get('sprite_initial_positions', {}), indent=2)}\n\n"
|
|
f"Consider the main actions and interactions required for each sprite. "
|
|
f"Think step-by-step about what each sprite needs to *do*, *when* it needs to do it (events), "
|
|
f"and if any actions need to *repeat* or depend on *conditions*.\n\n"
|
|
f"Propose a high-level action flow for each sprite in the following JSON format. "
|
|
f"Do NOT generate Scratch block JSON yet. Only describe the logic using natural language or simplified pseudo-code.\n\n"
|
|
f"Example format:\n"
|
|
f"```json\n"
|
|
f"{{\n"
|
|
f" \"action_overall_flow\": {{\n"
|
|
f" \"Sprite1\": {{\n"
|
|
f" \"description\": \"Main character actions\",\n"
|
|
f" \"plans\": [\n"
|
|
f" {{\n"
|
|
f" \"event\": \"when flag clicked\",\n"
|
|
f" \"logic\": \"forever loop: move 10 steps, if on edge bounce\"\n"
|
|
f" }},\n"
|
|
f" {{\n"
|
|
f" \"event\": \"when space key pressed\",\n"
|
|
f" \"logic\": \"change y by 10, wait 0.1 seconds, change y by -10\"\n"
|
|
f" }}\n"
|
|
f" ]\n"
|
|
f" }},\n"
|
|
f" \"Ball\": {{\n"
|
|
f" \"description\": \"Projectile movement\",\n"
|
|
f" \"plans\": [\n"
|
|
f" {{\n"
|
|
f" \"event\": \"when I start as a clone\",\n"
|
|
f" \"logic\": \"glide 1 sec to random position, if touching Sprite1 then stop this script\"\n"
|
|
f" }}\n"
|
|
f" ]\n"
|
|
f" }}\n"
|
|
f" }}\n"
|
|
f"}}\n"
|
|
f"```\n\n"
|
|
f"Return ONLY the JSON object for the action overall flow."
|
|
)
|
|
|
|
try:
|
|
response = agent.invoke({"messages": [{"role": "user", "content": planning_prompt}]})
|
|
raw_response = response["messages"][-1].content
|
|
print("Raw response from LLM [PlanSpriteActionsNode]:", raw_response)
|
|
action_plan = extract_json_from_llm_response(raw_response)
|
|
logger.info("Sprite action plan generated by PlanSpriteActionsNode.")
|
|
return {"action_plan": action_plan}
|
|
except Exception as e:
|
|
logger.error(f"Error in PlanSpriteActionsNode: {e}")
|
|
raise
|
|
|
|
|
|
def build_action_nodes(state: GameState):
|
|
logger.info("--- Running ActionNodeBuilder ---")
|
|
|
|
action_plan = state.get("action_plan", {})
|
|
if not action_plan:
|
|
raise ValueError("No action plan found in state. Run PlanSpriteActionsNode first.")
|
|
|
|
|
|
project_json = state["project_json"]
|
|
targets = project_json["targets"]
|
|
|
|
|
|
sprite_map = {target["name"]: target for target in targets if not target["isStage"]}
|
|
|
|
|
|
relevant_scratch_blocks_catalog = get_relevant_blocks_for_plan(action_plan, ALL_SCRATCH_BLOCKS_CATALOG)
|
|
logger.info(f"Filtered {len(relevant_scratch_blocks_catalog)} relevant blocks out of {len(ALL_SCRATCH_BLOCKS_CATALOG)} total.")
|
|
|
|
|
|
|
|
for sprite_name, sprite_actions in action_plan.get("action_overall_flow", {}).items():
|
|
if sprite_name in sprite_map:
|
|
current_sprite_target = sprite_map[sprite_name]
|
|
|
|
if "blocks" not in current_sprite_target:
|
|
current_sprite_target["blocks"] = {}
|
|
|
|
|
|
|
|
llm_block_generation_prompt = (
|
|
f"You are an AI assistant generating Scratch 3.0 block JSON based on a provided plan. "
|
|
f"The current sprite is '{sprite_name}'.\n"
|
|
f"Its planned actions are:\n"
|
|
f"```json\n{json.dumps(sprite_actions, indent=2)}\n```\n\n"
|
|
f"Here is a **curated catalog of only the most relevant Scratch 3.0 blocks** for this plan:\n"
|
|
f"```json\n{json.dumps(relevant_scratch_blocks_catalog, indent=2)}\n```\n\n"
|
|
f"Current Scratch project JSON (for context, specifically this sprite's existing blocks if any):\n"
|
|
f"```json\n{json.dumps(current_sprite_target, indent=2)}\n```\n\n"
|
|
f"**Instructions:**\n"
|
|
f"1. For each planned event and its associated logic, generate the corresponding Scratch 3.0 block JSON.\n"
|
|
f"2. **Generate unique block IDs** for every new block. Use a format like 'block_id_abcdef12'.\n"
|
|
f"3. Properly link blocks using `next` and `parent` fields to form execution stacks. Hat blocks (`topLevel: true`, `parent: null`).\n"
|
|
f"4. Correctly fill `inputs` and `fields` based on the catalog and the plan's logic (e.g., specific values for motion, keys for events, conditions for controls).\n"
|
|
f"5. For C-blocks (like `control_repeat`, `control_forever`, `control_if`), use the `SUBSTACK` input to link to the first block inside its loop/conditional.\n"
|
|
f"6. If the plan involves operators (e.g., 'if touching Sprite1'), use the appropriate operator blocks from the catalog and link them correctly as `CONDITION` inputs.\n"
|
|
f"7. Ensure that any shadow blocks (e.g., for dropdowns like `motion_goto_menu`, `sensing_touchingobjectmenu`) are generated with `shadow: true` and linked correctly as inputs to their parent block.\n"
|
|
f"8. Return ONLY the **updated 'blocks' dictionary** for this specific sprite. Do NOT return the full project JSON. ONLY the `blocks` dictionary."
|
|
)
|
|
|
|
try:
|
|
response = agent.invoke({"messages": [{"role": "user", "content": llm_block_generation_prompt}]})
|
|
raw_response = response["messages"][-1].content
|
|
print(f"Raw response from LLM [ActionNodeBuilder - {sprite_name}]:", raw_response)
|
|
generated_blocks = extract_json_from_llm_response(raw_response)
|
|
current_sprite_target["blocks"].update(generated_blocks)
|
|
logger.info(f"Action blocks added for sprite '{sprite_name}' by ActionNodeBuilder.")
|
|
except Exception as e:
|
|
logger.error(f"Error generating blocks for sprite '{sprite_name}': {e}")
|
|
|
|
raise
|
|
|
|
return {"project_json": project_json}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
initial_game_state = {
|
|
"description": "A simple game where a sprite moves and says hello.",
|
|
"project_json": {
|
|
"targets": [
|
|
{"isStage": True, "name": "Stage", "blocks": {}},
|
|
{"isStage": False, "name": "Sprite1", "blocks": {}},
|
|
{"isStage": False, "name": "Ball", "blocks": {}}
|
|
]
|
|
},
|
|
"sprite_initial_positions": {}
|
|
}
|
|
|
|
|
|
try:
|
|
state_after_planning = plan_sprite_actions(initial_game_state)
|
|
initial_game_state.update(state_after_planning)
|
|
print("\n--- Game State After Planning ---")
|
|
print(json.dumps(initial_game_state, indent=2))
|
|
except Exception as e:
|
|
print(f"Planning failed: {e}")
|
|
exit()
|
|
|
|
|
|
try:
|
|
state_after_building = build_action_nodes(initial_game_state)
|
|
initial_game_state.update(state_after_building)
|
|
print("\n--- Game State After Building Blocks ---")
|
|
|
|
for target in initial_game_state["project_json"]["targets"]:
|
|
if not target["isStage"]:
|
|
print(f"\nBlocks for {target['name']}:")
|
|
print(json.dumps(target.get('blocks', {}), indent=2))
|
|
|
|
except Exception as e:
|
|
print(f"Building blocks failed: {e}") |