#!/usr/bin/env python3 """ Code here was refactored from gist: https://gist.github.com/maldevide/08829eada04ad9bd78e46c1a3787d42b CodeLlama example: https://huggingface.co/collections/mlabonne/codellama-6509bc68c2d4c8fc379ee87f Hugging Face Fine-Tuning example: https://colab.research.google.com/drive/1PEQyJO1-f6j0S_XJ8DV50NkpzasXkrzd?usp=sharing 2024-02-07 - unable to get unsloth to install. If you want to fine-tune, here's an example Unsloth fine tuning guide for: Alpaca + TinyLlama + RoPE Scaling full example.ipynb https://colab.research.google.com/drive/1AZghoNBQaMDgWJpi4RbffGM1h6raLUj9?usp=sharing """ import os import transformers import torch import logging from ddare.merge import merge_tensors from ddare.tensor import ( dare_ties_sparsification, relative_norm, divide_tensor_into_sets, ) from ddare.util import get_device import re from typing import Dict, Tuple, List logging.basicConfig(level=logging.INFO) log = logging.getLogger(__name__) def get_models( models: List[str], trust_remote_code: bool, ): """ get the models :param models: model names to download :param trust_remote_code: are you sure??? True/False """ config = { "torch_dtype": torch.float16, "low_cpu_mem_usage": False, "trust_remote_code": trust_remote_code, } loaded_models = [] num_models = len(models) for midx, model_path in enumerate(models): log.info( f"loading model={midx + 1}/{num_models} " f"model={model_path} " ) loaded_models.append( transformers.AutoModelForCausalLM.from_pretrained( model_path, **config ) ) return loaded_models def pm( model, ): """ pretty print model :param model: show me the model """ keys = model.state_dict().keys() log.info(f"model keys={len(keys)}") for i, k in enumerate(keys): tensor = model.state_dict()[k] log.info( f"{i:3d} {k} shape={tensor.shape} " f"type={tensor.dtype} dev={tensor.device} " f"contig={tensor.is_contiguous()}" ) def run_text_test( model, tokenizer_path: str, question: str, device: str = "cuda", ): """ run a question on the model and return the answer :param model: initialized model :param tokenizer_path: tokenizer path/name :param question: what are you asking? :param device: where do you want to run "cpu"/"gpu"? """ base_model = model.to(device) log.info(f"loading tokenizer={tokenizer_path}") tokenizer = transformers.AutoTokenizer.from_pretrained( tokenizer_path, torch_dtype=torch.float16, ) inputs = tokenizer(question, return_tensors="pt").to( device ) with torch.backends.cuda.sdp_kernel( enable_flash=True, enable_math=False, enable_mem_efficient=True, ): outputs = base_model.generate( **inputs, max_new_tokens=256, ) answer = tokenizer.decode( outputs[0], skip_special_tokens=True ) log.info( "\n" "----------" "\n" f"tokenizer={tokenizer}\n " f"question:\n{question}\n" f"answer:\n{answer}\n" "----------" ) base_model = base_model.to(device) return tokenizer def get_layer_type(key: str) -> Tuple[int, str]: """ get the layer type :param key: name of the layer :return: layer id and name """ matcher = re.compile(r"model.layers.(\d+).(.+)") m = matcher.match(key) if m is None: if "model.norm.weight" == key: return -1, "norm" if "model.embed_tokens.weight" == key: return -1, "embed" if "lm_head.weight" == key: return -1, "head" log.info(f"Unknown key {key}") return -1, "unknown" return int(m.group(1)), m.group(2) def merge_model_with_ties( models: List[str], model_dst: str, trust_remote_code: bool = True, ): """ merge the list of models into one model called model_dst :param models: list of models to merge :param model_dst: name of the new model :param trust_remote_code: are you sure? True/False """ models = get_models( models=models, trust_remote_code=trust_remote_code, ) config = {} result_dict: Dict[str, torch.Tensor] = {} device = get_device() keys = models[0].state_dict().keys() num_keys = len(keys) for k in keys: block, layer_type = get_layer_type(k) m0: torch.Tensor = models[0].state_dict()[k] result = m0.clone() sets = divide_tensor_into_sets(tensor=m0, n_sets=4) # get the src layers to merge m = [ models[1].state_dict()[k], models[2].state_dict()[k], models[3].state_dict()[k], models[4].state_dict()[k], ] # build a ratio ratio = { "to_q": 0.0, "to_k": 0.0, "to_v": 0.0, }.get(layer_type, 0.5) norm_ratio = 0.68 log.info( f"model={k} {num_keys} shape={m0.shape} " f"dtype={m0.dtype} {m0.device} " f"ratio={ratio} " f"contig={m0.is_contiguous()} " f"norm={norm_ratio}" ) # for all tensors for i, tensor in enumerate(m): if layer_type == "to_k": # Get to_q key q_base = models[0].state_dict()[ k.replace("to_k", "to_q") ] q_merge = models[i].state_dict()[ k.replace("to_k", "to_q") ] scale = relative_norm(q_merge, q_base) tensor = tensor.to(device) / scale del scale elif layer_type == "to_q": scale = relative_norm(tensor, m0) tensor = tensor.to(device) * scale del scale slice_mask = (sets == i).bool() new_tensor = dare_ties_sparsification( model_a_param=m0, model_b_param=tensor, drop_rate=norm_ratio, ties="sum", rescale="off", device=device, **config, ) new_tensor = merge_tensors( "slerp", m0, tensor, ratio ) result = torch.where( slice_mask, new_tensor, result ) del new_tensor, slice_mask result_dict[k] = result # end of merge log.info(f"done merge saving to file: {model_dst}") out_model = ( transformers.AutoModelForCausalLM.from_pretrained( model_dst, **config ) ) out_model.state_dict = lambda: result_dict out_model.save_pretrained(model_dst) def run(): """ run the merge and upload the model and tokenizer This requires having the Hugging Face token set before it will work: ```huggingface-cli login``` """ question = "why is the sky blue?" log.info( f"merging models and asking the question: {question}" ) model_src = "TinyLlama/TinyLlama-1.1B-intermediate-step-1431k-3T" model_dst = "matlok/tinyllama-cinder-openhermes-32k" device = "cuda" config = { "torch_dtype": torch.float16, "low_cpu_mem_usage": False, "trust_remote_code": True, } models = [ model_src, "Doctor-Shotgun/TinyLlama-1.1B-32k-Instruct", "Doctor-Shotgun/TinyLlama-1.1B-32k", "Tensoic/TinyLlama-1.1B-3T-openhermes", "Josephgflowers/TinyLlama-3T-Cinder-v1.3", ] merge_model_with_ties( models=models, model_dst=model_dst ) log.info(f"loading newly-created file: {model_dst}") model = ( transformers.AutoModelForCausalLM.from_pretrained( model_dst, **config ) ) log.info( f"loaded new model file: {model_dst} " f"asking question: {question} " ) run_text_test( model=model, tokenizer_path=model_src, question=question, device=device, ) # clean the temp merge dir # remove model dir to prevent issues with the tokenizer upload model_org = model_dst.split("/")[0] if os.path.exists(model_org): os.system(f"rm -rf ./{model_org}") log.info(f"uploading model: {model_dst}") model.push_to_hub(model_dst) log.info(f"uploading src tokenizer: {model_src}") # reload tokenizer to save it and found on: # https://colab.research.google.com/drive/1PEQyJO1-f6j0S_XJ8DV50NkpzasXkrzd?usp=sharing#scrollTo=QQn30cRtAZ-P tokenizer = transformers.AutoTokenizer.from_pretrained( model_src, trust_remote_code=True ) # https://huggingface.co/docs/transformers/model_sharing#use-the-pushtohub-function # tokenizer.push_to_hub("my-awesome-model") tokenizer.push_to_hub(model_dst) log.info( f"done loading new model: {model} " f"file: {model_dst}" ) if __name__ == "__main__": run()