import torch import gc import pickle import shutil import time # PEFT: Parameter-Efficient Fine-Tuning import peft import os # SFT: Supervised Fine-Tuning import trl import pandas as pd import pickle import huggingface_hub import getpass # logging import json import datetime import wandb from PIL import Image from tqdm import tqdm from datasets import load_dataset from transformers import AutoProcessor, AutoModelForImageTextToText, BitsAndBytesConfig from torch.utils.data import Dataset def process_vision_info(messages: list[dict]) -> list[Image.Image]: image_inputs = [] # Iterate through each conversation for msg in messages: # Get content (ensure it's a list) content = msg.get("content", []) if not isinstance(content, list): content = [content] # Check each content element for images for element in content: if isinstance(element, dict) and ( "image" in element or element.get("type") == "image" ): # Get the image and convert to RGB if "image" in element: image = element["image"] else: image = element image_inputs.append(image.convert("RGB")) return image_inputs # Create a data collator to encode text and image pairs def collate_fn(examples): texts = [] images = [] for example in examples: image_inputs = process_vision_info(example["messages"]) text = processor.apply_chat_template( example["messages"], add_generation_prompt=False, tokenize=False ) texts.append(text.strip()) images.append(image_inputs) # Tokenize the texts and process the images batch = processor(text=texts, images=images, return_tensors="pt", padding=True) # The labels are the input_ids, and we mask the padding tokens and image tokens in the loss computation labels = batch["input_ids"].clone() # Mask image tokens image_token_id = [ processor.tokenizer.convert_tokens_to_ids( processor.tokenizer.special_tokens_map["boi_token"] ) ] # Mask tokens for not being used in the loss computation labels[labels == processor.tokenizer.pad_token_id] = -100 labels[labels == image_token_id] = -100 labels[labels == 262144] = -100 batch["labels"] = labels return batch def clear_memory(): # Delete variables if they exist in the current global scope if "inputs" in globals(): del globals()["inputs"] if "model" in globals(): del globals()["model"] if "processor" in globals(): del globals()["processor"] if "trainer" in globals(): del globals()["trainer"] if "peft_model" in globals(): del globals()["peft_model"] if "bnb_config" in globals(): del globals()["bnb_config"] time.sleep(2) # Garbage collection and clearing CUDA memory gc.collect() time.sleep(2) torch.cuda.empty_cache() # torch.cuda.synchronize() time.sleep(2) gc.collect() time.sleep(2) print(f"GPU allocated memory: {torch.cuda.memory_allocated() / 1024**3:.2f} GB") print(f"GPU reserved memory: {torch.cuda.memory_reserved() / 1024**3:.2f} GB") import psutil # Get memory details mem = psutil.virtual_memory() print(f"Total RAM: {mem.total / (1024 ** 3):.2f} GB") print(f"Available RAM: {mem.available / (1024 ** 3):.2f} GB") print(f"Used RAM: {mem.used / (1024 ** 3):.2f} GB") print(f"Free RAM: {mem.free / (1024 ** 3):.2f} GB") swap = psutil.swap_memory() print(f"Total Swap: {swap.total / (1024 ** 3):.2f} GB") print(f"Used Swap: {swap.used / (1024 ** 3):.2f} GB") print(f"Free Swap: {swap.free / (1024 ** 3):.2f} GB") def format_data_ft_local(sample, system_message_training_data, human_message_training_data): return { "messages": [ { "role": "system", "content": [{"type": "text", "text": system_message_training_data}], }, { "role": "user", "content": [ { "type": "image", "image": sample["image"], }, { "type": "text", "text": human_message_training_data, }, ], }, { "role": "assistant", "content": [{"type": "text", "text": sample["label"]}], }, ], } def create_data_pkl(df, output_path): system_message_training_data = "You are an assistant for question-answering tasks." human_message_training_data = "Do the user-provided task on the input image. \ The answer must be provided in JSON format. \ The task is: " + "Extract the features" + ".\ If there is no information of a target, return NaN." dataset_id = "path-to-directory-of-training-dataset-from-HF" train_dataset = load_dataset(dataset_id) in_file_data_entering_train = 'path-to-file/train.parquet' df_train = pd.read_parquet(in_file_data_entering_train, engine='pyarrow') list_response_train = [] start_index = 0 for index, row in tqdm(df_train.iloc[start_index:].iterrows(), total=len(df_train)): filename = row.filename if filename not in df['filename'].values: continue sample = {} # IMAGE sample["image"] = train_dataset['train'][index]['image'] # RESPONSE EXAMPLES OF ASSISTANT pp_data_per_image = "" result = [] for idx, value in row.items(): if idx == 'label' or idx == 'filename': continue if pd.notnull(value): if idx == 'product_weight': number = value.split(' ')[0] unit = value.split(' ')[1] result.append(f"weight_number: {number}") result.append(f"weight_unit: {unit}") else: result.append(f"{idx}: {value}") pp_data_per_image = ", ".join(result) sample["label"] = "{" + pp_data_per_image + "}" list_response_train.append(format_data_ft_local(sample, system_message_training_data, human_message_training_data)) with open(output_path, 'wb') as f: pickle.dump(list_response_train, f) if __name__ == "__main__": hf_token = getpass.getpass("Enter your Hugging Face token: ") huggingface_hub.login(token=hf_token) wandb_token = getpass.getpass("Enter your Weight and Bias token: ") wandb.login(key=wandb_token) clear_memory() # create .pkl files from random dataset (run only once) df_train = pd.read_parquet("path-to-file/df_random_train_10250.parquet", engine="pyarrow") df_val = pd.read_parquet("path-to-file/df_random_val_2500.parquet", engine="pyarrow") create_data_pkl(df=df_train, output_path='path-to-file/training_data_random_10250.pkl') create_data_pkl(df=df_val, output_path='path-to-file/validation_data_random_2500.pkl') # load .pkl files from random dataset with open('path-to-file/training_data_random_10250.pkl', 'rb') as f: train_dataset = pickle.load(f) with open('path-to-file/validation_data_random_2500.pkl', 'rb') as f: val_dataset = pickle.load(f) # Hugging Face model id model_id = "google/gemma-3-4b-pt" # Check if GPU benefits from bfloat16 if torch.cuda.get_device_capability()[0] < 8: raise ValueError("GPU does not support bfloat16, please use a GPU that supports bfloat16.") # Define model init arguments model_kwargs = dict( attn_implementation="eager", torch_dtype=torch.bfloat16, device_map="auto", cache_dir="path-to-directory/tmp" ) # BitsAndBytesConfig int-4 config model_kwargs["quantization_config"] = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=model_kwargs["torch_dtype"], bnb_4bit_quant_storage=model_kwargs["torch_dtype"], ) # Load model and tokenizer model = AutoModelForImageTextToText.from_pretrained(model_id, **model_kwargs) processor = AutoProcessor.from_pretrained("google/gemma-3-4b-it") if torch.cuda.is_available(): for i in range(torch.cuda.device_count()): print(f"GPU {i}: {torch.cuda.get_device_name(i)}") else: print("No GPU available.") folder_date = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") output_path = os.path.join("path-to-result-directory", "google_gemma-3-4b-pt", "subset-random", folder_date) os.makedirs(output_path, exist_ok=True) cache_path = "~/.cache" if os.path.exists(cache_path): shutil.rmtree(cache_path) peft_config = peft.LoraConfig( lora_alpha=16, lora_dropout=0.05, r=16, bias="none", target_modules="all-linear", task_type="CAUSAL_LM", modules_to_save=[ "lm_head", "embed_tokens", ], ) args = trl.SFTConfig( output_dir=output_path, # directory to save and repository id num_train_epochs=3, # number of training epochs per_device_train_batch_size=2, # batch size per device during training per_device_eval_batch_size=2, # batch size per device during evaluation gradient_accumulation_steps=4, # number of steps before performing a backward/update pass gradient_checkpointing=True, # use gradient checkpointing to save memory optim="adamw_torch_fused", # use fused adamw optimizer logging_steps=5, # log every 5 steps save_strategy="epoch", # save checkpoint every epoch eval_strategy="epoch", # save checkpoint every epoch learning_rate=2e-4, # learning rate, based on QLoRA paper bf16=True, # use bfloat16 precision max_grad_norm=0.3, # max gradient norm based on QLoRA paper warmup_ratio=0.03, # warmup ratio based on QLoRA paper lr_scheduler_type="constant", # use constant learning rate scheduler push_to_hub=True, # push model to hub report_to="wandb", gradient_checkpointing_kwargs={ "use_reentrant": False }, dataset_text_field="", # need a dummy field for collator dataset_kwargs={"skip_prepare_dataset": True}, # important for collator ) args.remove_unused_columns = False # important for collator with open(os.path.join(output_path, "args.txt"), "w") as f: f.write(json.dumps(args.to_dict(), indent=4)) trainer = trl.SFTTrainer( model=model, args=args, train_dataset=train_dataset, eval_dataset=val_dataset, peft_config=peft_config, processing_class=processor, data_collator=collate_fn, ) start_time = time.time() trainer.train() end_time = time.time() elapsed_time = end_time - start_time with open(os.path.join(output_path, "elapsed_time.txt"), "w") as file: file.write(f"Elapsed time: {elapsed_time:.2f} seconds\n") trainer.save_model(args.output_dir) # free the memory again del model del trainer torch.cuda.empty_cache()