| """Tests for P19 microcode learning engine."""
|
|
|
| import pytest
|
| import sys, os
|
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
|
|
| import neurocore as nc
|
| from neurocore.microcode import (
|
| encode_instruction, decode_instruction, execute_program,
|
| LearningRule, _assemble,
|
| OP_NOP, OP_ADD, OP_SUB, OP_MUL, OP_SHR, OP_SHL,
|
| OP_MAX, OP_MIN, OP_LOADI, OP_STORE_W, OP_STORE_E,
|
| OP_SKIP_Z, OP_SKIP_NZ, OP_HALT,
|
| R_TRACE1, R_TRACE2, R_WEIGHT, R_ELIG, R_CONST,
|
| R_TEMP0, R_TEMP1, R_REWARD,
|
| LTD_START, LTD_END, LTP_START, LTP_END,
|
| MICROCODE_DEPTH,
|
| )
|
| from neurocore.constants import NEURONS_PER_CORE, WEIGHT_MAX_STDP, WEIGHT_MIN_STDP
|
|
|
|
|
| class TestEncoding:
|
| def test_encode_decode_roundtrip(self):
|
| """Encoding then decoding should return original fields."""
|
| word = encode_instruction(OP_ADD, dst=R_WEIGHT, src_a=R_TRACE1, src_b=R_TEMP0)
|
| d = decode_instruction(word)
|
| assert d["op"] == OP_ADD
|
| assert d["dst"] == R_WEIGHT
|
| assert d["src_a"] == R_TRACE1
|
| assert d["src_b"] == R_TEMP0
|
| assert d["op_name"] == "ADD"
|
|
|
| def test_all_opcodes_valid(self):
|
| """All 14 opcodes should encode to valid 32-bit words."""
|
| for op in range(14):
|
| word = encode_instruction(op)
|
| assert 0 <= word <= 0xFFFFFFFF
|
| d = decode_instruction(word)
|
| assert d["op"] == op
|
|
|
| def test_shift_encoding(self):
|
| """Shift field should roundtrip correctly."""
|
| for shift in range(8):
|
| word = encode_instruction(OP_SHR, dst=R_TEMP0, src_a=R_TRACE1, shift=shift)
|
| d = decode_instruction(word)
|
| assert d["shift"] == shift
|
|
|
| def test_immediate_encoding(self):
|
| """Signed immediate should roundtrip correctly."""
|
| for imm in [0, 1, -1, 32767, -32768, 100, -100]:
|
| word = encode_instruction(OP_LOADI, dst=R_CONST, imm=imm)
|
| d = decode_instruction(word)
|
| assert d["imm"] == imm
|
|
|
| def test_invalid_opcode_raises(self):
|
| with pytest.raises(ValueError):
|
| encode_instruction(14)
|
| with pytest.raises(ValueError):
|
| encode_instruction(-1)
|
|
|
| def test_invalid_register_raises(self):
|
| with pytest.raises(ValueError):
|
| encode_instruction(OP_ADD, dst=8)
|
|
|
|
|
| class TestExecution:
|
| def test_add(self):
|
| """ADD R5, R0, R2 with R0=10, R2=20 -> R5=30."""
|
| prog = [encode_instruction(OP_NOP)] * MICROCODE_DEPTH
|
| prog[0] = encode_instruction(OP_ADD, dst=R_TEMP0, src_a=R_TRACE1, src_b=R_WEIGHT)
|
| prog[1] = encode_instruction(OP_HALT)
|
| regs = [10, 0, 20, 0, 0, 0, 0, 0]
|
| result = execute_program(prog, 0, 16, regs)
|
| assert regs[R_TEMP0] == 30
|
|
|
| def test_sub(self):
|
| """SUB R5, R2, R0 with R2=100, R0=30 -> R5=70."""
|
| prog = [encode_instruction(OP_NOP)] * MICROCODE_DEPTH
|
| prog[0] = encode_instruction(OP_SUB, dst=R_TEMP0, src_a=R_WEIGHT, src_b=R_TRACE1)
|
| prog[1] = encode_instruction(OP_HALT)
|
| regs = [30, 0, 100, 0, 0, 0, 0, 0]
|
| execute_program(prog, 0, 16, regs)
|
| assert regs[R_TEMP0] == 70
|
|
|
| def test_shr(self):
|
| """SHR R5, R0, 3 with R0=100 -> R5=12."""
|
| prog = [encode_instruction(OP_NOP)] * MICROCODE_DEPTH
|
| prog[0] = encode_instruction(OP_SHR, dst=R_TEMP0, src_a=R_TRACE1, shift=3)
|
| prog[1] = encode_instruction(OP_HALT)
|
| regs = [100, 0, 0, 0, 0, 0, 0, 0]
|
| execute_program(prog, 0, 16, regs)
|
| assert regs[R_TEMP0] == 12
|
|
|
| def test_shl(self):
|
| """SHL R5, R0, 2 with R0=5 -> R5=20."""
|
| prog = [encode_instruction(OP_NOP)] * MICROCODE_DEPTH
|
| prog[0] = encode_instruction(OP_SHL, dst=R_TEMP0, src_a=R_TRACE1, shift=2)
|
| prog[1] = encode_instruction(OP_HALT)
|
| regs = [5, 0, 0, 0, 0, 0, 0, 0]
|
| execute_program(prog, 0, 16, regs)
|
| assert regs[R_TEMP0] == 20
|
|
|
| def test_max_min(self):
|
| """MAX and MIN opcodes."""
|
| prog = [encode_instruction(OP_NOP)] * MICROCODE_DEPTH
|
| prog[0] = encode_instruction(OP_MAX, dst=R_TEMP0, src_a=R_TRACE1, src_b=R_WEIGHT)
|
| prog[1] = encode_instruction(OP_MIN, dst=R_TEMP1, src_a=R_TRACE1, src_b=R_WEIGHT)
|
| prog[2] = encode_instruction(OP_HALT)
|
| regs = [30, 0, 100, 0, 0, 0, 0, 0]
|
| execute_program(prog, 0, 16, regs)
|
| assert regs[R_TEMP0] == 100
|
| assert regs[R_TEMP1] == 30
|
|
|
| def test_loadi(self):
|
| """LOADI R4, 42 -> R4=42."""
|
| prog = [encode_instruction(OP_NOP)] * MICROCODE_DEPTH
|
| prog[0] = encode_instruction(OP_LOADI, dst=R_CONST, imm=42)
|
| prog[1] = encode_instruction(OP_HALT)
|
| regs = [0] * 8
|
| execute_program(prog, 0, 16, regs)
|
| assert regs[R_CONST] == 42
|
|
|
| def test_skip_z(self):
|
| """SKIP_Z should skip next instruction when src_a == 0."""
|
| prog = [encode_instruction(OP_NOP)] * MICROCODE_DEPTH
|
| prog[0] = encode_instruction(OP_SKIP_Z, src_a=R_TRACE1)
|
| prog[1] = encode_instruction(OP_LOADI, dst=R_TEMP0, imm=99)
|
| prog[2] = encode_instruction(OP_LOADI, dst=R_TEMP1, imm=42)
|
| prog[3] = encode_instruction(OP_HALT)
|
| regs = [0] * 8
|
| execute_program(prog, 0, 16, regs)
|
| assert regs[R_TEMP0] == 0
|
| assert regs[R_TEMP1] == 42
|
|
|
| def test_store_w(self):
|
| """STORE_W should report weight written."""
|
| prog = [encode_instruction(OP_NOP)] * MICROCODE_DEPTH
|
| prog[0] = encode_instruction(OP_LOADI, dst=R_WEIGHT, imm=999)
|
| prog[1] = encode_instruction(OP_STORE_W, src_a=R_WEIGHT)
|
| prog[2] = encode_instruction(OP_HALT)
|
| regs = [0, 0, 500, 0, 0, 0, 0, 0]
|
| result = execute_program(prog, 0, 16, regs)
|
| assert result["weight_written"] is True
|
| assert result["weight"] == 999
|
|
|
| def test_store_e(self):
|
| """STORE_E should report eligibility written."""
|
| prog = [encode_instruction(OP_NOP)] * MICROCODE_DEPTH
|
| prog[0] = encode_instruction(OP_LOADI, dst=R_ELIG, imm=-50)
|
| prog[1] = encode_instruction(OP_STORE_E, src_a=R_ELIG)
|
| prog[2] = encode_instruction(OP_HALT)
|
| regs = [0] * 8
|
| result = execute_program(prog, 0, 16, regs)
|
| assert result["elig_written"] is True
|
| assert result["elig"] == -50
|
|
|
|
|
| class TestAssembler:
|
| def test_basic_assembly(self):
|
| """Assemble a simple LTD program."""
|
| text = """
|
| SHR R5, R0, 3
|
| SKIP_Z R5
|
| SUB R2, R2, R5
|
| STORE_W R2
|
| HALT
|
| """
|
| instrs = _assemble(text)
|
| assert len(instrs) == 5
|
| d = decode_instruction(instrs[0])
|
| assert d["op_name"] == "SHR"
|
| assert d["dst"] == R_TEMP0
|
| assert d["src_a"] == R_TRACE1
|
| assert d["shift"] == 3
|
|
|
| def test_comments_stripped(self):
|
| """Comments starting with ; or # should be ignored."""
|
| text = """
|
| ; This is a comment
|
| NOP
|
| # Another comment
|
| HALT
|
| """
|
| instrs = _assemble(text)
|
| assert len(instrs) == 2
|
|
|
| def test_loadi_assembly(self):
|
| """LOADI with hex immediate."""
|
| text = "LOADI R4, 0xFF"
|
| instrs = _assemble(text)
|
| d = decode_instruction(instrs[0])
|
| assert d["op"] == OP_LOADI
|
| assert d["imm"] == 255
|
|
|
|
|
| class TestLearningRule:
|
| def test_stdp_factory(self):
|
| """LearningRule.stdp() should produce a 64-word program."""
|
| rule = LearningRule.stdp()
|
| prog = rule.get_program()
|
| assert len(prog) == MICROCODE_DEPTH
|
|
|
| ltd = rule.get_ltd()
|
| assert any(decode_instruction(w)["op"] != OP_NOP for w in ltd)
|
|
|
| def test_three_factor_factory(self):
|
| """LearningRule.three_factor() uses STORE_E instead of STORE_W."""
|
| rule = LearningRule.three_factor()
|
| ltd = rule.get_ltd()
|
| has_store_e = any(decode_instruction(w)["op"] == OP_STORE_E for w in ltd)
|
| has_store_w = any(decode_instruction(w)["op"] == OP_STORE_W for w in ltd)
|
| assert has_store_e
|
| assert not has_store_w
|
|
|
| def test_from_instructions(self):
|
| """Build rule from raw instruction lists."""
|
| ltd = [encode_instruction(OP_HALT)]
|
| ltp = [encode_instruction(OP_HALT)]
|
| rule = LearningRule.from_instructions(ltd, ltp)
|
| prog = rule.get_program()
|
| assert decode_instruction(prog[0])["op"] == OP_HALT
|
| assert decode_instruction(prog[16])["op"] == OP_HALT
|
|
|
| def test_assemble_ltd_ltp(self):
|
| """Build rule from assembly text."""
|
| rule = LearningRule()
|
| rule.assemble_ltd("SHR R5, R0, 3\nSKIP_Z R5\nSUB R2, R2, R5\nSTORE_W R2\nHALT")
|
| rule.assemble_ltp("SHR R5, R0, 3\nSKIP_Z R5\nADD R2, R2, R5\nSTORE_W R2\nHALT")
|
| prog = rule.get_program()
|
|
|
| assert decode_instruction(prog[0])["op"] == OP_SHR
|
|
|
| assert decode_instruction(prog[16])["op"] == OP_SHR
|
|
|
|
|
| class TestMicrocodeSTDP:
|
| """Test that microcode STDP reproduces hardcoded STDP behavior."""
|
|
|
| def test_default_microcode_stdp_weight_change(self):
|
| """Default microcode STDP should produce same weight changes as hardcoded."""
|
| net = nc.Network()
|
| src = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0})
|
| tgt = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0})
|
| net.connect(src, tgt, topology="all_to_all", weight=500)
|
| net.set_learning_rule(LearningRule.stdp())
|
|
|
| sim = nc.Simulator()
|
| sim.deploy(net)
|
| sim.set_learning(learn=True)
|
|
|
|
|
| sim.inject(src, current=200)
|
| sim.run(1)
|
| sim.run(1)
|
|
|
|
|
| adj = sim._adjacency
|
| for targets in adj.values():
|
| for entry in targets:
|
| w = entry[1]
|
| assert w > 500, f"Expected LTP increase, got {w}"
|
|
|
| def test_default_microcode_three_factor(self):
|
| """Default 3-factor microcode should accumulate eligibility."""
|
| net = nc.Network()
|
| src = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0})
|
| tgt = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0})
|
| net.connect(src, tgt, topology="all_to_all", weight=500)
|
| net.set_learning_rule(LearningRule.three_factor())
|
|
|
| sim = nc.Simulator()
|
| sim.deploy(net)
|
| sim.set_learning(learn=True, three_factor=True)
|
|
|
| sim.inject(src, current=200)
|
| sim.inject(tgt, current=200)
|
| sim.run(3)
|
|
|
|
|
| assert len(sim._eligibility) > 0
|
|
|
|
|
| for targets in sim._adjacency.values():
|
| for entry in targets:
|
| assert entry[1] == 500
|
|
|
| def test_anti_stdp_custom_rule(self):
|
| """Custom anti-STDP: LTD becomes LTP and vice versa."""
|
| rule = LearningRule()
|
|
|
| rule.assemble_ltd(
|
| "SHR R5, R0, 3\n"
|
| "SKIP_Z R5\n"
|
| "ADD R2, R2, R5\n"
|
| "STORE_W R2\n"
|
| "HALT"
|
| )
|
|
|
| rule.assemble_ltp(
|
| "SHR R5, R0, 3\n"
|
| "SKIP_Z R5\n"
|
| "SUB R2, R2, R5\n"
|
| "STORE_W R2\n"
|
| "HALT"
|
| )
|
|
|
| net = nc.Network()
|
| src = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0})
|
| tgt = net.population(1, params={"threshold": 100, "leak": 0, "refrac": 0})
|
| net.connect(src, tgt, topology="all_to_all", weight=500)
|
| net.set_learning_rule(rule)
|
|
|
| sim = nc.Simulator()
|
| sim.deploy(net)
|
| sim.set_learning(learn=True)
|
|
|
|
|
|
|
| sim.inject(src, current=200)
|
| sim.run(1)
|
| sim.run(1)
|
|
|
| adj = sim._adjacency
|
| for targets in adj.values():
|
| for entry in targets:
|
| w = entry[1]
|
| assert w < 500, f"Anti-STDP should decrease weight, got {w}"
|
|
|
| def test_compiler_generates_learn_cmds(self):
|
| """Compiler should generate PROG_LEARN commands when rule is attached."""
|
| from neurocore.compiler import Compiler
|
|
|
| net = nc.Network()
|
| src = net.population(2)
|
| tgt = net.population(2)
|
| net.connect(src, tgt, topology="all_to_all", weight=200)
|
| net.set_learning_rule(LearningRule.stdp())
|
|
|
| compiled = Compiler().compile(net)
|
| assert len(compiled.prog_learn_cmds) > 0
|
|
|
| for cmd in compiled.prog_learn_cmds:
|
| assert "core" in cmd
|
| assert "addr" in cmd
|
| assert "instr" in cmd
|
|
|