Spaces:
Sleeping
Sleeping
import re | |
import glob | |
import numpy as np | |
from dtw import dtw | |
from itertools import product | |
from PIL import Image, ImageDraw | |
from scipy.stats import entropy | |
from src.utils.img import make_img_sheet | |
from src.utils.mymath import jsdiv | |
from src.utils.filesys import getpath | |
''' | |
Encoding: | |
X (00) -> Solid tile | |
S (01) -> Breakable block | |
- (02) -> Empty tile | |
% (03) -> Mushroom platform | |
t (04) -> Normal tube | |
T (05) -> Flower tube | |
b (06) -> Bullet bill body | |
B (07) -> Bullet bill head | |
o (08) -> Coin | |
Q (09) -> Coin question block | |
@ (10) -> Mushroom question block | |
U (11) -> Mushroom breakable block | |
L (12) -> 1UP block | |
1 (13) -> Invisible 1UP block | |
2 (14) -> Invisible coin block | |
g (15) -> Goomba | |
k (16) -> Koopa (green) | |
r (17) -> Koopa (red) | |
K (18) -> Flying Koopa (green) | |
R (19) -> Flying Koppa (red) | |
y (20) -> Spiky | |
''' | |
class MarioLevel: | |
height = 16 | |
seg_width = 16 | |
mapping = { | |
'i-c': ( | |
'X', 'S', '-', '%', 't', 'T', 'b', 'B', 'o', 'Q', '@', | |
'U', 'L', '1', '2', 'g', 'k', 'r', 'K', 'R', 'y' | |
), | |
'c-i': { | |
'X': 0, 'S': 1, '-': 2, '%': 3, 't': 4, 'T': 5, 'b': 6, 'B': 7, 'o': 8, 'Q': 9, '@': 10, | |
'U': 11, 'L': 12, '1': 13, '2': 14, 'g': 15, 'k': 16, 'r': 17, 'K': 18, 'R': 19, 'y': 20, | |
'#': 0, '|': 2, 'F': 2, 'M': 2, 'C': 8, '!': 9, '?': 10, 'E': 15 | |
} | |
} | |
n_types = len(mapping['i-c']) | |
pipeset = {'<', '>', '[', ']'} | |
solidset = {'X', '#', 'S', 't', 'T', '%', 'Q', '@', '<', '>', '[', ']'} | |
def __init__(self, content): | |
# print(content) | |
if isinstance(content, np.ndarray): | |
self.content = content | |
else: | |
tmp = [list(line) for line in content.split('\n')] | |
while not tmp[-1]: | |
tmp.pop() | |
self.content = np.array(tmp) | |
self.h, self.w = self.content.shape | |
self.__tile_pttr_cnts = {} | |
self.attr_dict = {} | |
def to_num_arr(self): | |
res = np.zeros((self.h, self.w), int) | |
for i, j in product(range(self.h), range(self.w)): | |
char = self.content[i, j] | |
res[i, j] = MarioLevel.mapping['c-i'][char] | |
return res | |
def to_img(self, save_path=None) -> Image: | |
img = LevelRender.render(self) | |
if save_path: | |
safe_path = getpath(save_path) | |
img.save(safe_path) | |
return img | |
def to_segs(self): | |
W = MarioLevel.seg_width | |
return [self[:, s:s+W] for s in range(0, self.w, W)] | |
def save(self, fpath): | |
safe_path = getpath(fpath) | |
if safe_path[-4:] != '.lvl': | |
safe_path += '.lvl' | |
with open(safe_path, 'w') as f: | |
f.write(str(self)) | |
def tile_pattern_counts(self, w=2): | |
if not w in self.__tile_pttr_cnts.keys(): | |
counts = {} | |
for i, j in product(range(self.h - w + 1), range(self.w - w + 1)): | |
key = ''.join(self.content[i+x][j+y] for x, y in product(range(w), range(w))) | |
count = counts.setdefault(key, 0) | |
counts[key] = count + 1 | |
self.__tile_pttr_cnts[w] = counts | |
return self.__tile_pttr_cnts[w] | |
def tile_pattern_distribution(self, w=2): | |
counts = self.tile_pattern_counts(w) | |
C = (self.h - w + 1) * (self.w - w + 1) | |
return {key: val / C for key, val in counts.items()} | |
def __getattr__(self, item): | |
if item == 'shape': | |
return self.content.shape | |
elif item == 'h': | |
return self.content.shape[0] | |
elif item == 'w': | |
return self.content.shape[1] | |
elif item not in self.attr_dict.keys(): | |
if item == 'n_gaps': | |
empty_map1 = np.where(self.content[-1] in MarioLevel.empty_chars, 1, 0) | |
empty_map2 = np.where(self.content[-2] in MarioLevel.empty_chars, 1, 0) | |
res = len(np.where(empty_map1 + empty_map2 == 2)) | |
self.attr_dict['n_ground'] = res | |
elif item == 'n_enemies': | |
self.attr_dict['n_enemies'] = str(self).count('E') | |
elif item == 'n_coins': | |
self.attr_dict['n_coins'] = str(self).count('o') | |
elif item == 'n_questions': | |
self.attr_dict['n_questions'] = str(self).count('Q') | |
elif item == 'n_empties': | |
empty_map = np.where(self.content in MarioLevel.empty_chars) | |
self.attr_dict['n_questions'] = len(empty_map) | |
return self.attr_dict[item] | |
def __str__(self): | |
lines = [''.join(line) + '\n' for line in self.content] | |
return ''.join(lines) | |
def __add__(self, other): | |
concated = np.concatenate([self.content, other.content], axis=1) | |
return MarioLevel(concated) | |
def __getitem__(self, item): | |
try: | |
content = self.content[item] | |
if type(content) == np.ndarray: | |
return MarioLevel(self.content[item]) | |
else: | |
return str(content) | |
except IndexError: | |
return None | |
def copy(self): | |
return MarioLevel.from_num_arr(self.to_num_arr()) | |
def from_num_arr(num_arr): | |
h, w = num_arr.shape | |
res = np.empty((h, w), str) | |
for i, j in product(range(h), range(w)): | |
if num_arr[i, j] == 0: | |
res[i, j] = 'X' if i >= MarioLevel.height - 2 else '#' | |
else: | |
tile_id = num_arr[i, j] | |
if type(tile_id) != int: | |
tile_id = round(tile_id) | |
res[i, j] = MarioLevel.mapping['i-c'][tile_id] | |
visited = set() | |
for i, j in product(range(h), range(w)): | |
if res[i, j] == '%': | |
if f'{i}-{j}' in visited: | |
continue | |
s, e = j, j | |
while e < w and res[i, e] == '%': | |
visited.add(f'{i}-{e}') | |
e += 1 | |
if (e - s) <= 2: | |
stalk_cols = range(s, e) | |
else: | |
stalk_cols = range(s + 1, e - 1) | |
for q in stalk_cols: | |
p = i + 1 | |
while p < h and res[p, q] not in MarioLevel.solidset: | |
if res[p, q] == '-': | |
res[p, q] = '|' | |
p += 1 | |
return MarioLevel(res) | |
def from_file(fpath): | |
safe_path = getpath(fpath) | |
with open(safe_path, 'r') as f: | |
return MarioLevel(f.read()) | |
def from_one_hot_arr(one_hot_arr: np.ndarray): | |
num_lvl = one_hot_arr.argmax(axis=0) | |
return MarioLevel.from_num_arr(num_lvl) | |
class LevelRender: | |
# BG_COLOR = (109, 143, 252) | |
BG_COLOR = (138, 165, 253) | |
tubeset = {'t', 'T'} | |
tex_size = 16 | |
textures = { | |
re.split('[/\\\\]',fpath)[-1][:-4]: Image.open(fpath) | |
for fpath in glob.glob(getpath('smb/assets/*.png')) | |
} | |
def render(level): | |
ts = LevelRender.tex_size | |
img = Image.new('RGBA', (level.w * ts, level.h * ts), LevelRender.BG_COLOR) | |
# img.fill(LevelRender.BG_COLOR) | |
reconded_lvl = MarioLevel.from_num_arr(level.to_num_arr()) | |
j_t_platforms, tubes, chompers = LevelRender.__get_objects(reconded_lvl) | |
LevelRender.__render_objects(img, j_t_platforms, tubes, chompers, reconded_lvl) | |
LevelRender.__render_tiles(img, reconded_lvl) | |
return img | |
def __get_objects(level): | |
h, w = level.shape | |
visited = set() | |
j_t_platforms = [] | |
tubes = [] | |
chompers = [] | |
for i, j in product(range(h), range(w)): | |
c = level[i, j] | |
if f'{i}-{j}' in visited: | |
continue | |
if c == '%': | |
s, e = j, j | |
while level[i, e] == '%': | |
visited.add(f'{i}-{e}') | |
e += 1 | |
j_t_platforms.append({'row': i, 'col-start': s, 'col-end': e}) | |
if c == 'T' and level[i, j - 1] not in LevelRender.tubeset and level[i, j + 1] in LevelRender.tubeset: | |
chompers.append((i - 1, j)) | |
if c in LevelRender.tubeset: | |
single = level[i, j + 1] not in LevelRender.tubeset | |
start = (i, j) | |
left_height = 0 | |
right_height = None if single else 0 | |
visited.add(f'{i}-{j}') | |
if not single: | |
visited.add(f'{i}-{j+1}') | |
while level[i + left_height, j] in LevelRender.tubeset: | |
visited.add(f'{i + left_height}-{j}') | |
left_height += 1 | |
if not single: | |
while level[i + right_height, j + 1] in LevelRender.tubeset: | |
visited.add(f'{i + right_height}-{j+1}') | |
right_height += 1 | |
tubes.append({'start': start, 'left-height': left_height, 'right-height': right_height}) | |
return j_t_platforms, tubes, chompers | |
def __render_objects(img, j_t_platforms, tubes, chompers, level): | |
ts = LevelRender.tex_size | |
textures = LevelRender.textures | |
for j_t_platform in j_t_platforms: | |
row, col_start, col_end = j_t_platform['row'], j_t_platform['col-start'], j_t_platform['col-end'] | |
stalk_start, stalk_end = col_start, col_end | |
if col_end - col_start == 1: | |
img.paste(textures['MS'], (col_start * ts, row * ts), textures['MS']) | |
else: | |
img.paste(textures['ML'], (col_start * ts, row * ts), textures['ML']) | |
img.paste(textures['MR'], ((col_end - 1) * ts, row * ts), textures['MR']) | |
for j in range(col_start + 1, col_end - 1): | |
img.paste(textures['MM'], (j * ts, row * ts), textures['MM']) | |
if col_end - col_start > 2: | |
stalk_start += 1 | |
stalk_end -= 1 | |
for j in range(stalk_start, stalk_end): | |
i = row + 1 | |
while i < level.h and level[i, j] not in MarioLevel.solidset: | |
img.paste(textures['stalk'], (j * ts, i * ts), textures['stalk']) | |
i += 1 | |
for chomper in chompers: | |
i, j = chomper | |
img.paste(textures['chomper'], (int((j + 0.5) * ts), i * ts), textures['chomper']) | |
for tube in tubes: | |
(i, j), left_height, right_height = tube['start'], tube['left-height'], tube['right-height'] | |
if right_height is None: | |
img.paste(textures['TSP'], (j * ts, i * ts), textures['TSP']) | |
for k in range(1, left_height): | |
img.paste(textures['BSP'], (j * ts, (i + k) * ts), textures['BSP']) | |
else: | |
img.paste(textures['TLP'], (j * ts, i * ts), textures['TLP']) | |
img.paste(textures['TRP'], ((j + 1) * ts, i * ts), textures['TRP']) | |
for k in range(1, left_height): | |
img.paste(textures['['], (j * ts, (i + k) * ts), textures['[']) | |
for k in range(1, left_height): | |
img.paste(textures[']'], ((j + 1) * ts, (i + k) * ts), textures[']']) | |
def __render_tiles(img, level): | |
ts = LevelRender.tex_size | |
for i, j in product(range(level.h), range(level.w)): | |
target = (j * ts, i * ts) | |
tile = level[i, j] | |
if tile in {'-', 't', 'T', '%', '|', 'F', 'M'}: | |
continue | |
elif tile == 'b': | |
t = level[i - 1, j] | |
if t == 'B': | |
img.paste(LevelRender.textures['CB1'], target, LevelRender.textures['CB1']) | |
else: | |
img.paste(LevelRender.textures['CB2'], target, LevelRender.textures['CB2']) | |
elif tile == 'K': | |
img.paste(LevelRender.textures['wingk'], target, LevelRender.textures['wingk']) | |
elif tile == 'R': | |
img.paste(LevelRender.textures['wingr'], target, LevelRender.textures['wingr']) | |
else: | |
img.paste(LevelRender.textures[tile], target, LevelRender.textures[tile]) | |
def draw_trace_on(lvlimg, trace, color='black', lw=3): | |
p = 0 | |
while p < len(trace) and trace[p][0] < lvlimg.get_width(): | |
p += 1 | |
drawer = ImageDraw.Draw(lvlimg) | |
drawer.line([(x, y-8) for x, y in trace[:p]], color, lw) | |
return lvlimg | |
def trace_div(trace1, trace2, w=10, trace_size_norm=False): | |
h, ts = MarioLevel.height, LevelRender.tex_size | |
t1, t2 = np.array(trace1) / ts, np.array(trace2) / ts | |
dist_metric = (lambda x, y: np.linalg.norm(x - y)) | |
dtw_val, *_ = dtw(t1, t2, dist_metric, w=max(w, abs(len(t1) - len(t2)))) | |
norm_factor = max(len(trace1), len(trace2)) * h if trace_size_norm else MarioLevel.seg_width * h | |
return dtw_val / norm_factor | |
def tile_pattern_kl_div(seg1: MarioLevel, seg2: MarioLevel, w=2, eps=1e-3): | |
counts1 = seg1.tile_pattern_counts(w) | |
counts2 = seg2.tile_pattern_counts(w) | |
all_keys = counts1.keys().__or__(counts2.keys()) | |
p = np.array([counts1.setdefault(key, 0) for key in all_keys]) | |
q = np.array([counts2.setdefault(key, 0) for key in all_keys]) | |
p = p / p.sum() + eps | |
q = q / q.sum() + eps | |
return (entropy(p, q, base=2) + entropy(q, p, base=2)) / 2 | |
def tile_pattern_js_div(seg1: MarioLevel, seg2: MarioLevel, w=2): | |
counts1 = seg1.tile_pattern_counts(w) | |
counts2 = seg2.tile_pattern_counts(w) | |
all_keys = counts1.keys().__or__(counts2.keys()) | |
p = np.array([counts1.setdefault(key, 0) for key in all_keys]) | |
q = np.array([counts2.setdefault(key, 0) for key in all_keys]) | |
return jsdiv(p, q) | |
def lvl_js(lvl1: MarioLevel, lvl2: MarioLevel): | |
segs1 = lvl1.to_segs() | |
segs2 = lvl2.to_segs() | |
divs = [tile_pattern_js_div(s1, s2) for s1, s2 in zip(segs1, segs2)] | |
return np.mean(divs) | |
pass | |
def hamming_dis(lvl1, lvl2): | |
assert lvl1.h == lvl2.h and lvl1.w == lvl2.w | |
differences = len(np.where(lvl1.content != lvl2.content)[0]) | |
num_segs = lvl1.w / MarioLevel.seg_width | |
return differences | |
def lvl_dtw(lvl1, lvl2, w=5, distmtrc=hamming_dis): | |
segs1, segs2 = lvl1.to_segs(), lvl2.to_segs() | |
res, *_ = dtw(segs1, segs2, distmtrc, w=w) | |
return res / max(len(segs1), len(segs2)) | |
def lvlhcat(lvls) -> MarioLevel: | |
if type(lvls[0]) == MarioLevel: | |
concated_content = np.concatenate([l.content for l in lvls], axis=1) | |
else: | |
concated_content = np.concatenate([l for l in lvls], axis=1) | |
return MarioLevel(concated_content) | |
def save_batch(lvls, fname): | |
contents = [str(lvl).strip() for lvl in lvls] | |
content = '\n;\n'.join(contents) | |
if len(fname) <= 5 or fname[-5:] != '.lvls': | |
fname += '.lvls' | |
with open(getpath(fname), 'w') as f: | |
f.write(content) | |
pass | |
def load_batch(fname): | |
with open(getpath(fname), 'r') as f: | |
content = f.read() | |
return [MarioLevel(c) for c in content.split('\n;\n')] | |
def traverse_level_files(path): | |
for lvl_path in glob.glob(getpath(f'{path}/*.lvl')): | |
lvl = MarioLevel.from_file(lvl_path) | |
name = re.split('[/\\\\]', lvl_path)[-1][:-4] | |
yield lvl, name | |
def traverse_batched_level_files(path): | |
for lvl_path in glob.glob(getpath(f'{path}/*.lvls')): | |
name = re.split('[/\\\\]', lvl_path)[-1][:-5] | |
with open(lvl_path, 'r') as f: | |
txt = f.read() | |
lvls = [] | |
for lvlstr in txt.split('\n;\n'): | |
if len(lvlstr) < 10: | |
continue | |
lvls.append(MarioLevel(lvlstr)) | |
yield lvls, name | |
if __name__ == '__main__': | |
for task in ('lgp', 'fhp'): | |
for t in range(1, 6): | |
lvls = load_batch(f'test_data/sac/{task}/t{t}/samples.lvls') | |
samples = [lvl[:, :16*16] for lvl in lvls[:10]] | |
make_img_sheet([s.to_img() for s in samples], 1, y_margin=12, save_path=f'test_data/sac/{task}/t{t}/samples.png') | |
for l in ('0.1', '0.3', '0.5'): | |
lvls = load_batch(f'test_data/varpm-{task}/l{l}_m5/t{t}/samples.lvls') | |
samples = [lvl[:, :16 * 16] for lvl in lvls[:10]] | |
make_img_sheet([s.to_img() for s in samples], 1, y_margin=12, | |
save_path=f'test_data/varpm-{task}/l{l}_m5/t{t}/samples.png') | |
# for lvl, path in traverse_level_files('smb/levels'): | |
# lvl.to_img(f'smb/levels_render/{path}.png') | |
# tmp = lvl.to_num_arr() | |
# MarioLevel.from_num_arr(tmp).to_img(f'smb/levels_render/{path}-recoding.png') | |
# a = MarioLevel.from_file('smb/levels/lvl-1.lvl') | |
# b = MarioLevel.from_file('smb/levels/lvl-2.lvl') | |
# save_batch([a, b], 'smb/testbatch') | |
# levels = load_batch('smb/testbatch.lvls') | |
# print(levels[0], levels[1]) | |
pass | |