"""LilyScript — a symbolic-music AIGC app built on the LilyletNotaGen model. Left column: (1) generation parameter panel (2) streaming run log (3) .lyl file list (session outputs + built-in examples) (4) editable lyl editor Right column: sheet-music panel (placeholder for now; later a Lilylet music score renderer reusing the lilylet-live-editor pipeline). Generation streams patch-by-patch: raw decoded text (with `[r:x/y]` stream markers) goes to the run log, while the measure-segmented postprocessed text fills the editor. The backend is the int8 + two-level KV-cache ONNX generator (see lilyscript/generator.py); weights are pulled from the HF model repo `k-l-lambda/LilyNota` on first use (override with LILYSCRIPT_MODEL_DIR locally). """ import os import re import time import json import random import logging from collections import deque import gradio as gr from lilyscript.generator import StreamingLilyletGenerator from lilyscript.postprocess import postprocess from lilyscript.mask_monitor import MaskMonitor, load_blacklist HERE = os.path.dirname(os.path.abspath(__file__)) # Model weights are pulled from the HuggingFace model repo `k-l-lambda/LilyNota` # at first use (the int8 + KV-cache ONNX bundle lives under its `onnx/` dir). # For local development, point LILYSCRIPT_MODEL_DIR at a local onnx dir to skip # the download. HF_MODEL_REPO = os.environ.get('LILYSCRIPT_MODEL_REPO', 'k-l-lambda/LilyNota') HF_MODEL_SUBDIR = 'onnx' # weights + geometry + tokenizer live here in the repo MODEL_DIR = os.environ.get('LILYSCRIPT_MODEL_DIR') # set -> use this local dir instead of the hub ASSET_DIR = os.path.join(HERE, 'assets') EXAMPLES_DIR = os.path.join(HERE, 'examples') OUTPUT_DIR = os.path.join(HERE, 'outputs') WEB_DIR = os.path.join(HERE, 'web') # vendored browser libs + score player (gitignored bundles) EXAMPLE_PREFIX = '\U0001F4C4 ' # 📄 examples OUTPUT_PREFIX = '✨ ' # ✨ session outputs # Suggested metadata values (editable — the dropdowns allow custom input), loaded # from assets/styles.json. Drawn from the NotaGenX period/instrumentation # vocabulary + values seen in examples. _STYLES = json.load(open(os.path.join(ASSET_DIR, 'styles.json'), encoding='utf-8')) COMPOSERS = _STYLES['composers'] GENRES = _STYLES['genres'] INSTRUMENTS = _STYLES['instruments'] _GEN = None # Syntax-blacklist mask: discovered 2-grams whose forbidden next-tokens are masked # during sampling so the model can't emit those locally-illegal continuations. # Loaded once at startup; a fresh (stateful) MaskMonitor is built per generation. # Empty/missing file -> no masking (behavior unchanged). BLACKLIST_PATH = os.path.join(ASSET_DIR, 'lilylet-blacklist.json') _BLACKLIST = load_blacklist(BLACKLIST_PATH) # ---- system log capture ---------------------------------------------------- # A ring buffer that mirrors Python logging (lifecycle messages, warnings, and # errors) into the Logs panel, alongside the streamed generation text. LOG = logging.getLogger('lilyscript') class _RingBufferHandler (logging.Handler): '''Keep the most recent N log records as formatted strings.''' def __init__ (self, capacity=400): super().__init__() self.records = deque(maxlen=capacity) def emit (self, record): try: self.records.append(self.format(record)) except Exception: pass def text (self): return '\n'.join(self.records) _LOG_BUFFER = _RingBufferHandler() _LOG_BUFFER.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(name)s: %(message)s', datefmt='%H:%M:%S')) def _init_logging (): '''Route app + library logs and Python warnings into the ring buffer (once), and also echo to stderr so the terminal keeps a copy.''' logging.captureWarnings(True) root = logging.getLogger() if _LOG_BUFFER not in root.handlers: root.addHandler(_LOG_BUFFER) stderr_h = logging.StreamHandler() stderr_h.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(name)s: %(message)s', datefmt='%H:%M:%S')) root.addHandler(stderr_h) root.setLevel(logging.INFO) LOG.setLevel(logging.INFO) _init_logging() def resolve_model_dir (): '''Where the ONNX weights live. If LILYSCRIPT_MODEL_DIR is set, use it as-is (local dev). Otherwise pull the `onnx/` bundle from the HF model repo and return its local snapshot path. The tokenizer is NOT pulled — it's read from the app's own assets/ dir — so we only fetch the weight files.''' if MODEL_DIR: return MODEL_DIR from huggingface_hub import snapshot_download LOG.info('downloading model weights from hf:%s (%s/) ...', HF_MODEL_REPO, HF_MODEL_SUBDIR) local = snapshot_download( repo_id=HF_MODEL_REPO, allow_patterns=[f'{HF_MODEL_SUBDIR}/patch_kv_int8.onnx', f'{HF_MODEL_SUBDIR}/token_kv_int8.onnx', f'{HF_MODEL_SUBDIR}/wte.npy', f'{HF_MODEL_SUBDIR}/geometry.json'], ) return os.path.join(local, HF_MODEL_SUBDIR) def get_generator (): '''Lazily build the (heavy) ONNX generator on first use.''' global _GEN if _GEN is None: model_dir = resolve_model_dir() LOG.info('loading ONNX generator from %s ...', model_dir) t0 = time.perf_counter() _GEN = StreamingLilyletGenerator(model_dir, ASSET_DIR) LOG.info('generator ready (%.1fs)', time.perf_counter() - t0) LOG.info('syntax blacklist: %d contexts / %d forbidden pairs from %s', len(_BLACKLIST), sum(len(v) for v in _BLACKLIST.values()), BLACKLIST_PATH) return _GEN def load_examples (): '''Read built-in example .lyl files into a {label: text} dict. Examples may be stored raw (with inline `[r:x/y]` stream markers and run-together header lines); run them through `postprocess` so the editor — and the score renderer it feeds — get syntactically clean Lilylet. It's a no-op on already-clean text.''' store = {} if os.path.isdir(EXAMPLES_DIR): for name in sorted(os.listdir(EXAMPLES_DIR)): if name.endswith('.lyl'): with open(os.path.join(EXAMPLES_DIR, name), encoding='utf-8') as f: store[EXAMPLE_PREFIX + name] = postprocess(f.read()) return store def load_outputs (): '''Read previously-generated .lyl files from the outputs dir into a {label: text} dict, so past session outputs survive a server restart.''' store = {} if os.path.isdir(OUTPUT_DIR): for name in sorted(os.listdir(OUTPUT_DIR)): if name.endswith('.lyl'): with open(os.path.join(OUTPUT_DIR, name), encoding='utf-8') as f: store[OUTPUT_PREFIX + name[:-4]] = f.read() return store def load_library (): '''Initial file list: built-in examples + any persisted session outputs.''' return {**load_examples(), **load_outputs()} _STYLE_LINE_RE = re.compile(r'^\[(composer|genre|instrument)\s+".*"\]\s*$') def sync_prompt (composer, genre, instrument, current): '''Rewrite the metadata-prompt text from the three style dropdowns. The `[composer/genre/instrument "..."]` lines are regenerated from the dropdowns and placed at the top; any other lines the user typed (e.g. `[key "..."]`) are preserved below in their original order. ''' lines = [] for field, value in (('composer', composer), ('genre', genre), ('instrument', instrument)): value = (value or '').strip() if value: lines.append(f'[{field} "{value}"]') # keep every line that isn't one of the three managed style lines for ln in (current or '').splitlines(): if not _STYLE_LINE_RE.match(ln.strip()): if ln.strip(): lines.append(ln) return '\n'.join(lines) # Marker line written to the log buffer at the moment generation output begins. # `_log_panel` replaces this marker with the live generation text, so the streamed # output appears in true chronological position — after the "requested"/"ready" # lines and before the later "timing"/"done" lines. _GEN_MARKER = '__GENERATION_OUTPUT__' def _log_panel (raw=''): '''Render the Logs panel in chronological order: the captured system log, with the generation-output marker (if present) expanded to the live text.''' sys_log = _LOG_BUFFER.text() if _GEN_MARKER in sys_log: block = '--- generation output ---\n' + raw if raw else '--- generation output ---' return sys_log.replace(_GEN_MARKER, block) # no marker yet (e.g. before generation): fall back to appending raw if raw: return (sys_log + '\n' if sys_log else '') + '--- generation output ---\n' + raw return sys_log def run_generation (prompt, measures, temperature, max_patches, seed, store, top_k=0, top_p=0.9): '''Streaming generate callback. Yields updates for (log, editor, file_list, store, seed, gen_btn). store: {label: lyl_text} dict held in gr.State; the produced document is added to it under a timestamped label once generation finishes. top_k / top_p have fixed defaults (no UI controls); pass them explicitly to override. Progress is shown on the Generate button itself: its label becomes "Generating… M/N" (by completed measures when a measure count is requested, else by patches out of max_patches) during the run, and reverts to "Generate" at the end. (Gradio's native progress bar can't render on a Button, so we drive the label directly.) The output file is named with the seed used for THIS generation; on completion the seed slider is randomized (final yield) so the next click uses a fresh seed. ''' meas = int(measures) if measures and int(measures) > 0 else None store = dict(store or {}) LOG.info('generation requested: measures=%s temperature=%s max_patches=%s seed=%s', meas, temperature, max_patches, seed) raw = pretty = '' n_yields = 0 mp = int(max_patches) t0 = time.perf_counter() try: gen = get_generator() # build a fresh per-run mask monitor (stateful: tracks the running 2-gram). # None when no blacklist is loaded -> sampling path unchanged. monitor = MaskMonitor(gen, _BLACKLIST) if _BLACKLIST else None # drop a marker in the log timeline; _log_panel expands it to the live output, # so subsequent log lines (timing/done) land *after* the generation text. LOG.info(_GEN_MARKER) for raw, pretty, done in gen.generate_stream( prompt_text=prompt or '', max_patches=mp, temperature=float(temperature), top_k=int(top_k), top_p=float(top_p), measures=meas, seed=int(seed), monitor=monitor): if not done: n_yields += 1 # progress on the Generate button label: by measures (completed `|` # separators vs target) when a measure count was requested, else by patches. if meas: btn_label = 'Generating… %d/%d' % (min(raw.count('|'), meas), meas) else: btn_label = 'Generating… %d/%d' % (max(0, n_yields - 1), mp) # The log streams every patch (raw text). The editor, however, must stay # syntactically valid: only sync it at a measure boundary — i.e. when the # accumulated text ends with the measure separator `|` (so it never shows a # half-generated, incomplete measure). `done` forces a final sync. at_boundary = raw.rstrip().endswith('|') editor_update = pretty if (at_boundary or done) else gr.update() yield _log_panel(raw), editor_update, gr.update(), store, gr.update(), gr.update(value=btn_label) except Exception as e: LOG.exception('generation failed: %s', e) yield _log_panel(raw), pretty, gr.update(), store, gr.update(), gr.update(value='Generate') return # timing: the stream yields once for prefill + once per generated patch, so the # patch count is the non-done yields minus that initial prefill yield. elapsed = time.perf_counter() - t0 n_patches = max(0, n_yields - 1) per_patch = (elapsed / n_patches) if n_patches else 0.0 LOG.info('timing: %.2fs total, %d patches, %.3fs/patch (%.1f patches/s)', elapsed, n_patches, per_patch, (n_patches / elapsed if elapsed else 0.0)) # finished: persist the document, refresh the file list, select the new entry label = OUTPUT_PREFIX + time.strftime('%Y%m%d_%H%M%S') + ('_m%d' % meas if meas else '') + '_s%d' % int(seed) store[label] = pretty os.makedirs(OUTPUT_DIR, exist_ok=True) out_path = os.path.join(OUTPUT_DIR, label.replace(OUTPUT_PREFIX, '') + '.lyl') with open(out_path, 'w', encoding='utf-8') as f: f.write(pretty) LOG.info('generation done: %d chars -> %s', len(pretty), os.path.basename(out_path)) # randomize the seed slider for the next run (the file above already used the # seed this generation ran with, so naming is unaffected) next_seed = random.randint(0, 2147483647) yield _log_panel(raw), pretty, gr.update(choices=list(store.keys()), value=label), store, gr.update(value=next_seed), gr.update(value='Generate') def load_file (label, store): '''File-list selection -> load that document into the editor.''' return (store or {}).get(label, '') SHEET_PLACEHOLDER = '''