| | import os
|
| | import cv2
|
| | import numpy as np
|
| | import pickle
|
| | from PIL import Image
|
| | import matplotlib.pyplot as plt
|
| | import tensorflow as tf
|
| | from tensorflow.keras import layers
|
| | from tensorflow.keras.models import load_model, Model
|
| | from tensorflow.keras.applications import EfficientNetV2B0
|
| | from tensorflow.keras.applications.efficientnet import preprocess_input as efficientnet_preprocess
|
| | from tensorflow.keras.preprocessing.sequence import pad_sequences
|
| | from tensorflow.keras.preprocessing.image import img_to_array
|
| | from tqdm import tqdm
|
| | import random
|
| | from tensorflow.keras.preprocessing.sequence import pad_sequences
|
| |
|
| | import tempfile
|
| | import traceback
|
| | from pathlib import Path
|
| | from huggingface_hub import hf_hub_download
|
| |
|
| | import gradio as gr
|
| | from PIL import Image
|
| | import pickle
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | class ChannelAttention(layers.Layer):
|
| | def __init__(self, ratio=8, **kwargs):
|
| | super(ChannelAttention, self).__init__(**kwargs)
|
| | self.ratio = ratio
|
| |
|
| | def build(self, input_shape):
|
| | self.gap = layers.GlobalAveragePooling1D()
|
| | self.gmp = layers.GlobalMaxPooling1D()
|
| | self.shared_mlp = tf.keras.Sequential([
|
| | layers.Dense(units=1280 // self.ratio, activation='relu'),
|
| | layers.Dense(units=1280)
|
| | ])
|
| | self.sigmoid = layers.Activation('sigmoid')
|
| | super(ChannelAttention, self).build(input_shape)
|
| |
|
| | def call(self, inputs):
|
| | gap = self.gap(inputs)
|
| | gmp = self.gmp(inputs)
|
| | gap_mlp = self.shared_mlp(gap)
|
| | gmp_mlp = self.shared_mlp(gmp)
|
| | channel_attention = self.sigmoid(gap_mlp + gmp_mlp)
|
| | return inputs * tf.expand_dims(channel_attention, axis=1)
|
| |
|
| | def get_config(self):
|
| | config = super(ChannelAttention, self).get_config()
|
| | config.update({'ratio': self.ratio})
|
| | return config
|
| |
|
| | @classmethod
|
| | def from_config(cls, config):
|
| | return cls(**config)
|
| |
|
| |
|
| |
|
| | class SpatialAttention(layers.Layer):
|
| | def __init__(self, **kwargs):
|
| | super(SpatialAttention, self).__init__(**kwargs)
|
| |
|
| | def build(self, input_shape):
|
| | self.conv = layers.Conv1D(1, kernel_size=3, padding='same', activation='sigmoid')
|
| | super(SpatialAttention, self).build(input_shape)
|
| |
|
| | def call(self, inputs):
|
| | spatial_attention = self.conv(inputs)
|
| | return inputs * spatial_attention
|
| |
|
| | def get_config(self):
|
| | return super(SpatialAttention, self).get_config()
|
| |
|
| | @classmethod
|
| | def from_config(cls, config):
|
| | return cls(**config)
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def load_caption_model(model_path):
|
| | custom_objects = {
|
| | 'ChannelAttention': ChannelAttention,
|
| | 'SpatialAttention': SpatialAttention
|
| | }
|
| | model = load_model(model_path, custom_objects=custom_objects)
|
| | print("✅ Đã load model thành công!")
|
| | return model
|
| |
|
| |
|
| | def load_tokenizer_and_config(tokenizer_path, config_path):
|
| | with open(tokenizer_path, 'rb') as f:
|
| | tokenizer = pickle.load(f)
|
| | with open(config_path, 'rb') as f:
|
| | config = pickle.load(f)
|
| | return tokenizer, config['max_length'], config['vocab_size']
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def load_feature_extractor():
|
| | base_model = EfficientNetV2B0(include_top=False, weights='imagenet', pooling='avg')
|
| | return Model(inputs=base_model.input, outputs=base_model.output)
|
| |
|
| |
|
| | def extract_features_from_image(image_path, extractor):
|
| | image = cv2.imread(image_path)
|
| | if image is None:
|
| | print(f"❌ Không đọc được ảnh: {image_path}")
|
| | return None
|
| | image = cv2.resize(image, (224, 224))
|
| | image = img_to_array(image)
|
| | image = np.expand_dims(image, axis=0)
|
| | image = efficientnet_preprocess(image)
|
| | feature = extractor.predict(image, verbose=0)
|
| | return feature
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def generate_caption(model, tokenizer, image_features, max_length):
|
| | in_text = 'startseq'
|
| | for _ in range(max_length):
|
| | sequence = tokenizer.texts_to_sequences([in_text])[0]
|
| | sequence = pad_sequences([sequence], maxlen=max_length)
|
| | yhat = model.predict([image_features, sequence], verbose=0)
|
| | yhat = np.argmax(yhat)
|
| | word = tokenizer.index_word.get(yhat)
|
| | if word is None or word == 'endseq':
|
| | break
|
| | in_text += ' ' + word
|
| | return in_text.replace('startseq ', '')
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | MODEL_REPO = "slyviee/img_cap"
|
| |
|
| |
|
| | model_path = hf_hub_download(repo_id=MODEL_REPO, filename="best_model.keras")
|
| | tokenizer_path = hf_hub_download(repo_id=MODEL_REPO, filename="tokenizer.pkl")
|
| | config_path = hf_hub_download(repo_id=MODEL_REPO, filename="model_config.pkl")
|
| |
|
| | model = None
|
| | tokenizer = None
|
| | max_length = None
|
| | vocab_size = None
|
| | extractor = None
|
| | ready = False
|
| | startup_error = ""
|
| |
|
| |
|
| | def _startup():
|
| | global model, tokenizer, max_length, vocab_size, extractor, ready, startup_error
|
| | try:
|
| |
|
| | missing = [p for p in [model_path, tokenizer_path, config_path] if not Path(p).exists()]
|
| | if missing:
|
| | startup_error = "Thiếu tệp: " + ", ".join(missing)
|
| | ready = False
|
| | return
|
| |
|
| | print("🔄 Đang tải model...")
|
| | model = load_caption_model(model_path)
|
| | print("✅ Model đã được tải.")
|
| |
|
| | print("🔄 Đang tải tokenizer và config...")
|
| | tokenizer, max_length, vocab_size = load_tokenizer_and_config(tokenizer_path, config_path)
|
| | print("✅ Tokenizer và config đã được tải.")
|
| |
|
| | print("🔄 Đang tải feature extractor...")
|
| | extractor = load_feature_extractor()
|
| | print("✅ Feature extractor đã được tải.")
|
| |
|
| | ready = True
|
| | except Exception as e:
|
| | startup_error = f"Khởi tạo lỗi: {e}\n{traceback.format_exc()}"
|
| | ready = False
|
| |
|
| |
|
| | def predict(pil_image: Image.Image):
|
| | if not ready:
|
| | return f"Hệ thống chưa sẵn sàng. {startup_error or 'Thiếu model/tokenizer/config.'}"
|
| |
|
| | try:
|
| |
|
| | with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
|
| | pil_image.convert("RGB").save(tmp.name, format="JPEG")
|
| | tmp_path = tmp.name
|
| |
|
| | features = extract_features_from_image(tmp_path, extractor)
|
| | os.unlink(tmp_path)
|
| |
|
| | if features is None:
|
| | return "Không đọc được ảnh đầu vào."
|
| | caption = generate_caption(model, tokenizer, features, max_length)
|
| | return caption
|
| | except Exception as e:
|
| | return f"Lỗi trong quá trình dự đoán: {e}\n{traceback.format_exc()}"
|
| |
|
| | DESCRIPTION = (
|
| | "Upload ảnh và nhận caption sinh ra bởi mô hình. "
|
| | )
|
| |
|
| | demo = gr.Interface(
|
| | fn=predict,
|
| | inputs=gr.Image(type="pil", label="Ảnh vào"),
|
| | outputs=gr.Textbox(label="Caption"),
|
| | title="Image Captioning — Gradio",
|
| | description=DESCRIPTION,
|
| | )
|
| |
|
| | if __name__ == '__main__':
|
| | _startup()
|
| | demo.launch() |