# Copyright 2022 Tristan Behrens. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # Lint as: python3 from fastapi import BackgroundTasks, FastAPI from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse from pydantic import BaseModel from PIL import Image import os import io import random import base64 from time import time from statistics import mean from collections import OrderedDict import torch import wave from source.logging import create_logger from source.tokensequence import token_sequence_to_audio, token_sequence_to_image from source import constants from transformers import AutoTokenizer, AutoModelForCausalLM logger = create_logger(__name__) # Load the auth-token from authtoken.txt. auth_token = os.getenv("authtoken") # Loading the model and its tokenizer. logger.info("Loading tokenizer and model...") tokenizer = AutoTokenizer.from_pretrained( "ai-guru/lakhclean_mmmtrack_4bars_d-2048" ) model = AutoModelForCausalLM.from_pretrained( "ai-guru/lakhclean_mmmtrack_4bars_d-2048" ) logger.info("Done.") # Create the app logger.info("Creating app...") app = FastAPI(docs_url=None, redoc_url=None) app.mount("/static", StaticFiles(directory="static"), name="static") logger.info("Done.") class Options(BaseModel): music_style: str density: str temperature: str class NewTask(BaseModel): music_style = "synth" density = "medium" temperature = "medium" def get_place_in_queue(task_id): queued_tasks = list( task for task in tasks.values() if task["status"] == "queued" or task["status"] == "processing" ) queued_tasks.sort(key=lambda task: task["created_at"]) queued_task_ids = list(task["task_id"] for task in queued_tasks) try: return queued_task_ids.index(task_id) + 1 except: return 0 def calculate_eta(task_id): total_durations = list( task["completed_at"] - task["started_at"] for task in tasks.values() if "completed_at" in task and task["status"] == "completed" ) initial_place_in_queue = tasks[task_id]["initial_place_in_queue"] if len(total_durations): eta = initial_place_in_queue * mean(total_durations) else: eta = initial_place_in_queue * 35 return round(eta, 1) def next_task(task_id): tasks[task_id]["completed_at"] = time() queued_tasks = list(task for task in tasks.values() if task["status"] == "queued") if queued_tasks: print( f"{task_id} {tasks[task_id]['status']}. Task/s remaining: {len(queued_tasks)}" ) process_task(queued_tasks[0]["task_id"]) def process_task(task_id): if "processing" in list(task["status"] for task in tasks.values()): return if tasks[task_id]["last_poll"] and time() - tasks[task_id]["last_poll"] > 30: tasks[task_id]["status"] = "abandoned" next_task(task_id) tasks[task_id]["status"] = "processing" tasks[task_id]["started_at"] = time() print(f"Processing {task_id}") try: tasks[task_id]["output"] = compose( tasks[task_id]["music_style"], tasks[task_id]["density"], tasks[task_id]["temperature"], ) except Exception as ex: tasks[task_id]["status"] = "failed" tasks[task_id]["error"] = repr(ex) else: tasks[task_id]["status"] = "completed" finally: next_task(task_id) def compose(music_style, density, temperature): instruments = constants.get_instruments(music_style) density = constants.get_density(density) temperature = constants.get_temperature(temperature) print(f"instruments: {instruments} density: {density} temperature: {temperature}") # Generate with the given parameters. logger.info(f"Generating token sequence...") generated_sequence = generate_sequence(instruments, density, temperature) logger.info(f"Generated token sequence: {generated_sequence}") # Get the audio data as a array of int16. logger.info("Generating audio...") sample_rate, audio_data = token_sequence_to_audio(generated_sequence) logger.info(f"Done. Audio data: {len(audio_data)}") # Encode the audio-data as wave file in memory. Use the wave module. audio_data_bytes = io.BytesIO() wave_file = wave.open(audio_data_bytes, "wb") wave_file.setframerate(sample_rate) wave_file.setnchannels(1) wave_file.setsampwidth(2) wave_file.writeframes(audio_data) wave_file.close() # Return the audio-data as a base64-encoded string. audio_data_bytes.seek(0) audio_data_base64 = base64.b64encode(audio_data_bytes.read()).decode("utf-8") audio_data_bytes.close() # Convert the audio data to an PIL image. image = token_sequence_to_image(generated_sequence) # Save PIL image to harddrive as PNG. logger.debug(f"Saving image to harddrive... {type(image)}") image_file_name = "compose.png" image.save(image_file_name, "PNG") # Save image to virtual file. img_io = io.BytesIO() image.save(img_io, "PNG", quality=70) img_io.seek(0) # Return the image as a base64-encoded string. image_data_base64 = base64.b64encode(img_io.read()).decode("utf-8") img_io.close() # Return. return { "tokens": generated_sequence, "audio": "data:audio/wav;base64," + audio_data_base64, "image": "data:image/png;base64," + image_data_base64, "status": "OK", } def generate_sequence(instruments, density, temperature): instruments = instruments[::] random.shuffle(instruments) generated_ids = tokenizer.encode("PIECE_START", return_tensors="pt")[0] for instrument in instruments: more_ids = tokenizer.encode( f"TRACK_START INST={instrument} DENSITY={density}", return_tensors="pt" )[0] generated_ids = torch.cat((generated_ids, more_ids)) generated_ids = generated_ids.unsqueeze(0) generated_ids = model.generate( generated_ids, max_length=2048, do_sample=True, temperature=temperature, eos_token_id=tokenizer.encode("TRACK_END")[0], )[0] generated_sequence = tokenizer.decode(generated_ids) print("GENERATING COMPLETE") print(generate_sequence) return generated_sequence tasks = OrderedDict() # Route for the loading page. @app.head("/") @app.route("/") def index(request): return FileResponse(path="static/index.html", media_type="text/html") @app.post("/task/create") def create_task(background_tasks: BackgroundTasks, new_task: NewTask): created_at = time() task_id = f"{str(created_at)}_{new_task.music_style}" tasks[task_id] = OrderedDict( { "task_id": task_id, "status": "queued", "eta": None, "created_at": created_at, "started_at": None, "completed_at": None, "last_poll": None, "poll_count": 0, "initial_place_in_queue": None, "place_in_queue": None, "music_style": new_task.music_style, "density": new_task.density, "temperature": new_task.temperature, "output": None, } ) tasks[task_id]["initial_place_in_queue"] = get_place_in_queue(task_id) tasks[task_id]["eta"] = calculate_eta(task_id) background_tasks.add_task(process_task, task_id) return tasks[task_id] @app.get("/task/poll") def poll_task(task_id: str): tasks[task_id]["place_in_queue"] = get_place_in_queue(task_id) tasks[task_id]["eta"] = calculate_eta(task_id) tasks[task_id]["last_poll"] = time() tasks[task_id]["poll_count"] += 1 return tasks[task_id]