Scratch_vlm_v1 / utils /block_builder_main.py
WebashalarForML's picture
Upload 175 files
a522962 verified
import json
import copy
import re
from collections import defaultdict
import secrets
import string
from typing import Dict, Any, TypedDict
from plan_generator_10 import generate_plan,generate_blocks_from_opcodes,all_block_definitions
#################################################################################################################################################################
#--------------------------------------------------[Security key id generation for the better understanding of keys]---------------------------------------------
#################################################################################################################################################################
def generate_secure_token(length=20):
charset = string.ascii_letters + string.digits + "!@#$%^&*()[]{}=+-_~"
return ''.join(secrets.choice(charset) for _ in range(length))
#################################################################################################################################################################
#--------------------------------------------------[Processed the two Skelton as input and generate refined skelton json]----------------------------------------
#################################################################################################################################################################
def process_scratch_blocks(all_generated_blocks, generated_output_json):
processed_blocks = {}
# Initialize dictionaries to store and reuse generated unique IDs
# This prevents creating multiple unique IDs for the same variable/broadcast across different blocks
variable_id_map = defaultdict(lambda: generate_secure_token(20))
broadcast_id_map = defaultdict(lambda: generate_secure_token(20))
for block_id, gen_block_data in generated_output_json.items():
processed_block = {}
all_gen_block_data = all_generated_blocks.get(block_id, {})
# Copy and update fields, inputs, next, parent, shadow, topLevel, mutation, and opcode
processed_block["opcode"] = all_gen_block_data.get("op_code", gen_block_data.get("op_code"))
processed_block["inputs"] = {}
processed_block["fields"] = {}
processed_block["shadow"] = all_gen_block_data.get("shadow", gen_block_data.get("shadow"))
processed_block["topLevel"] = all_gen_block_data.get("topLevel", gen_block_data.get("topLevel"))
processed_block["parent"] = all_gen_block_data.get("parent", gen_block_data.get("parent"))
processed_block["next"] = all_gen_block_data.get("next", gen_block_data.get("next"))
if "mutation" in all_gen_block_data:
processed_block["mutation"] = all_gen_block_data["mutation"]
# Process inputs
if "inputs" in all_gen_block_data:
for input_name, input_data in all_gen_block_data["inputs"].items():
if input_name in ["SUBSTACK", "CONDITION"]:
# These should always be type 2
if isinstance(input_data, list) and len(input_data) == 2:
processed_block["inputs"][input_name] = [2, input_data[1]]
elif isinstance(input_data, dict) and input_data.get("kind") == "block":
processed_block["inputs"][input_name] = [2, input_data.get("block")]
else: # Fallback for unexpected formats, try to use the original if possible
processed_block["inputs"][input_name] = gen_block_data["inputs"].get(input_name, [2, None])
elif isinstance(input_data, dict):
if input_data.get("kind") == "value":
# Case 1: Direct value input
processed_block["inputs"][input_name] = [
1,
[
4,
str(input_data.get("value", ""))
]
]
elif input_data.get("kind") == "block":
# Case 3: Nested block input
existing_shadow_value = ""
if input_name in gen_block_data.get("inputs", {}) and \
isinstance(gen_block_data["inputs"][input_name], list) and \
len(gen_block_data["inputs"][input_name]) > 2 and \
isinstance(gen_block_data["inputs"][input_name][2], list) and \
len(gen_block_data["inputs"][input_name][2]) > 1:
existing_shadow_value = gen_block_data["inputs"][input_name][2][1]
processed_block["inputs"][input_name] = [
3,
input_data.get("block", ""),
[
10, # Assuming 10 for number/string shadow
existing_shadow_value
]
]
elif input_data.get("kind") == "menu":
# Handle menu inputs like in event_broadcast
menu_option = input_data.get("option", "")
# Generate or retrieve a unique ID for the broadcast message
broadcast_id = broadcast_id_map[menu_option] # Use defaultdict for unique IDs
processed_block["inputs"][input_name] = [
1,
[
11, # This is typically the code for menu dropdowns
menu_option,
broadcast_id
]
]
elif isinstance(input_data, list):
# For cases like TOUCHINGOBJECTMENU, where input_data is a list [1, "block_id"]
processed_block["inputs"][input_name] = input_data
# Process fields
if "fields" in all_gen_block_data:
for field_name, field_value in all_gen_block_data["fields"].items():
if field_name == "VARIABLE" and isinstance(field_value, list) and len(field_value) > 0:
# Generate or retrieve a unique ID for the variable
variable_name = field_value[0]
unique_id = variable_id_map[variable_name] # Use defaultdict for unique IDs
processed_block["fields"][field_name] = [
variable_name,
unique_id
]
elif field_name == "STOP_OPTION":
processed_block["fields"][field_name] = [
field_value[0],
None
]
elif field_name == "TOUCHINGOBJECTMENU":
referenced_menu_block_id = all_gen_block_data["inputs"].get("TOUCHINGOBJECTMENU", [None, None])[1]
if referenced_menu_block_id and referenced_menu_block_id in all_generated_blocks:
menu_block = all_generated_blocks[referenced_menu_block_id]
menu_value = menu_block.get("fields", {}).get("TOUCHINGOBJECTMENU", ["", None])[0]
processed_block["fields"][field_name] = [menu_value, None]
else:
processed_block["fields"][field_name] = [field_value[0], None]
else:
processed_block["fields"][field_name] = field_value
# Remove unwanted keys from the processed block
keys_to_remove = ["functionality", "block_shape", "id", "block_name", "block_type"]
for key in keys_to_remove:
if key in processed_block:
del processed_block[key]
processed_blocks[block_id] = processed_block
return processed_blocks
#################################################################################################################################################################
#--------------------------------------------------[Unique secret key for skelton json to make sure it donot overwrite each other]-------------------------------
#################################################################################################################################################################
def rename_blocks(block_json: dict, opcode_count: dict) -> tuple[dict, dict]:
"""
Replace each block key in block_json and each identifier in opcode_count
with a newly generated secure token.
Args:
block_json: Mapping of block_key -> block_data.
opcode_count: Mapping of opcode -> list of block_keys.
Returns:
A tuple of (new_block_json, new_opcode_count) with updated keys.
"""
# Step 1: Generate a secure token mapping for every existing block key
token_map = {}
for old_key in block_json.keys():
# Ensure uniqueness in the unlikely event of a collision
while True:
new_key = generate_secure_token()
if new_key not in token_map.values():
break
token_map[old_key] = new_key
# Step 2: Rebuild block_json with new keys
new_block_json = {}
for old_key, block in block_json.items():
new_key = token_map[old_key]
new_block_json[new_key] = block.copy()
# Update parent and next references
if 'parent' in block and block['parent'] in token_map:
new_block_json[new_key]['parent'] = token_map[block['parent']]
if 'next' in block and block['next'] in token_map:
new_block_json[new_key]['next'] = token_map[block['next']]
# Update inputs if they reference blocks
for inp_key, inp_val in block.get('inputs', {}).items():
if isinstance(inp_val, list) and len(inp_val) == 2:
idx, ref = inp_val
if idx in (2, 3) and isinstance(ref, str) and ref in token_map:
new_block_json[new_key]['inputs'][inp_key] = [idx, token_map[ref]]
# Step 3: Update opcode count map
new_opcode_count = {}
for opcode, key_list in opcode_count.items():
new_opcode_count[opcode] = [token_map.get(k, k) for k in key_list]
return new_block_json, new_opcode_count
#################################################################################################################################################################
#--------------------------------------------------[Helper function to add Variables and Broadcasts [USed in main app file for main projectjson]]----------------
#################################################################################################################################################################
def variable_intialization(project_data):
"""
Updates variable and broadcast definitions in a Scratch project JSON,
populating the 'variables' and 'broadcasts' sections of the Stage target
and extracting initial values for variables.
Args:
project_data (dict): The loaded JSON data of the Scratch project.
Returns:
dict: The updated project JSON data.
"""
stage_target = None
for target in project_data['targets']:
if target.get('isStage'):
stage_target = target
break
if stage_target is None:
print("Error: Stage target not found in the project data.")
return project_data
# Ensure 'variables' and 'broadcasts' exist in the Stage target
if "variables" not in stage_target:
stage_target["variables"] = {}
if "broadcasts" not in stage_target:
stage_target["broadcasts"] = {}
# Helper function to recursively find and update variable/broadcast fields
def process_dict(obj):
if isinstance(obj, dict):
# Check for "data_setvariableto" opcode to extract initial values
if obj.get("opcode") == "data_setvariableto":
variable_field = obj.get("fields", {}).get("VARIABLE")
value_input = obj.get("inputs", {}).get("VALUE")
if variable_field and isinstance(variable_field, list) and len(variable_field) == 2:
var_name = variable_field[0]
var_id = variable_field[1]
initial_value = ""
if value_input and isinstance(value_input, list) and len(value_input) > 1 and \
isinstance(value_input[1], list) and len(value_input[1]) > 1:
# Extract value from various formats, e.g., [1, [10, "0"]] or [3, [12, "score", "id"], [10, "0"]]
if value_input[1][0] == 10: # Direct value like [10, "0"]
initial_value = str(value_input[1][1])
elif value_input[1][0] == 12 and len(value_input) > 2 and isinstance(value_input[2], list) and value_input[2][0] == 10: # Variable reference with initial value block
initial_value = str(value_input[2][1])
elif isinstance(value_input[1], (str, int, float)): # For direct number/string inputs
initial_value = str(value_input[1])
# Add/update the variable in the Stage's 'variables' with its initial value
stage_target["variables"][var_id] = [var_name, initial_value]
for key, value in obj.items():
# Process variable definitions in 'fields' (for blocks that define variables like 'show variable')
if key == "VARIABLE" and isinstance(value, list) and len(value) == 2:
var_name = value[0]
var_id = value[1]
# Only add if not already defined with an initial value from set_variableto
if var_id not in stage_target["variables"]:
stage_target["variables"][var_id] = [var_name, ""] # Default to empty string if no initial value found yet
elif stage_target["variables"][var_id][0] != var_name: # Update name if ID exists but name is different
stage_target["variables"][var_id][0] = var_name
# Process broadcast definitions in 'inputs' (BROADCAST_INPUT)
elif key == "BROADCAST_INPUT" and isinstance(value, list) and len(value) == 2 and \
isinstance(value[1], list) and len(value[1]) == 3 and value[1][0] == 11:
broadcast_name = value[1][1]
broadcast_id = value[1][2]
# Add/update the broadcast in the Stage's 'broadcasts'
stage_target["broadcasts"][broadcast_id] = broadcast_name
# Process broadcast definitions in 'fields' (BROADCAST_OPTION)
elif key == "BROADCAST_OPTION" and isinstance(value, list) and len(value) == 2:
broadcast_name = value[0]
broadcast_id = value[1]
# Add/update the broadcast in the Stage's 'broadcasts'
stage_target["broadcasts"][broadcast_id] = broadcast_name
# Recursively call for nested dictionaries or lists
process_dict(value)
elif isinstance(obj, list):
for i, item in enumerate(obj):
# Process variable references in 'inputs' (like [12, "score", "id"])
if isinstance(item, list) and len(item) == 3 and item[0] == 12:
var_name = item[1]
var_id = item[2]
# Only add if not already defined with an initial value from set_variableto
if var_id not in stage_target["variables"]:
stage_target["variables"][var_id] = [var_name, ""] # Default to empty string if no initial value found yet
elif stage_target["variables"][var_id][0] != var_name: # Update name if ID exists but name is different
stage_target["variables"][var_id][0] = var_name
process_dict(item)
# Iterate through all targets to process their blocks
for target in project_data['targets']:
if "blocks" in target:
for block_id, block_data in target["blocks"].items():
process_dict(block_data)
return project_data
def deduplicate_variables(project_data):
"""
Removes duplicate variable entries in the 'variables' dictionary of the Stage target,
prioritizing entries with non-empty values.
Args:
project_data (dict): The loaded JSON data of the Scratch project.
Returns:
dict: The updated project JSON data with deduplicated variables.
"""
stage_target = None
for target in project_data['targets']:
if target.get('isStage'):
stage_target = target
break
if stage_target is None:
print("Error: Stage target not found in the project data.")
return project_data
if "variables" not in stage_target:
return project_data # No variables to deduplicate
# Use a temporary dictionary to store the preferred variable entry by name
# Format: {variable_name: [variable_id, variable_name, variable_value]}
resolved_variables = {}
for var_id, var_info in stage_target["variables"].items():
var_name = var_info[0]
var_value = var_info[1]
if var_name not in resolved_variables:
# If the variable name is not yet seen, add it
resolved_variables[var_name] = [var_id, var_name, var_value]
else:
# If the variable name is already seen, decide which one to keep
existing_id, existing_name, existing_value = resolved_variables[var_name]
# Prioritize the entry with a non-empty value
if var_value != "" and existing_value == "":
resolved_variables[var_name] = [var_id, var_name, var_value]
# If both have non-empty values, or both are empty, keep the current one (arbitrary choice, but consistent)
# The current logic will effectively keep the last one encountered that has a value,
# or the very last one if all are empty.
elif var_value != "" and existing_value != "":
# If there are multiple non-empty values for the same variable name
# this keeps the one from the most recent iteration.
# For the given example, this will correctly keep "5".
resolved_variables[var_name] = [var_id, var_name, var_value]
elif var_value == "" and existing_value == "":
# If both are empty, just keep the current one (arbitrary)
resolved_variables[var_name] = [var_id, var_name, var_value]
# Reconstruct the 'variables' dictionary using the resolved entries
new_variables_dict = {}
for var_name, var_data in resolved_variables.items():
var_id_to_keep = var_data[0]
var_name_to_keep = var_data[1]
var_value_to_keep = var_data[2]
new_variables_dict[var_id_to_keep] = [var_name_to_keep, var_value_to_keep]
stage_target["variables"] = new_variables_dict
return project_data
def variable_adder_main(project_data):
try:
declare_variable_json= variable_intialization(project_data)
except Exception as e:
print(f"Error error in the variable initialization opcodes: {e}")
try:
processed_json= deduplicate_variables(declare_variable_json)
return
except Exception as e:
print(f"Error error in the variable initialization opcodes: {e}")
#################################################################################################################################################################
#--------------------------------------------------[Helper main function]----------------------------------------------------------------------------------------
#################################################################################################################################################################
def block_builder(opcode_count,pseudo_code):
try:
generated_output_json, initial_opcode_occurrences = generate_blocks_from_opcodes(opcode_count, all_block_definitions)
except Exception as e:
print(f"Error generating blocks from opcodes: {e}")
return {}
try:
all_generated_blocks = generate_plan(generated_output_json, initial_opcode_occurrences, pseudo_code)
except Exception as e:
print(f"Error generating plan from blocks: {e}")
return {}
try:
processed_blocks= process_scratch_blocks(all_generated_blocks, generated_output_json)
except Exception as e:
print(f"Error processing Scratch blocks: {e}")
return {}
renamed_blocks, renamed_counts = rename_blocks(processed_blocks, initial_opcode_occurrences)
return renamed_blocks
#################################################################################################################################################################
#--------------------------------------------------[Example use of the function here]----------------------------------------------------------------------------
#################################################################################################################################################################
initial_opcode_counts = [
{
"opcode": "event_whenflagclicked",
"count": 1
},
{
"opcode": "data_setvariableto",
"count": 2
},
{
"opcode": "data_showvariable",
"count": 2
},
{
"opcode": "event_broadcast",
"count": 1
}
]
pseudo_code="""
when green flag clicked
set [score v] to (0)
set [lives v] to (3)
show variable [score v]
show variable [lives v]
broadcast [Game Start v]
"""
generated_output_json, initial_opcode_occurrences = generate_blocks_from_opcodes(initial_opcode_counts, all_block_definitions)
all_generated_blocks = generate_plan(generated_output_json, initial_opcode_occurrences, pseudo_code)
processed_blocks= process_scratch_blocks(all_generated_blocks, generated_output_json)
print(all_generated_blocks)
print("--------------\n\n")
print(processed_blocks)
print("--------------\n\n")
print(initial_opcode_occurrences)