OpenLEM / app.py
OpenLab-NLP's picture
Update app.py
a8ba472 verified
raw
history blame
6.46 kB
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
import sentencepiece as spm
import gradio as gr
import requests
import os
# ----------------------
# ํŒŒ์ผ ๋‹ค์šด๋กœ๋“œ ์œ ํ‹ธ
# ----------------------
def download_file(url, save_path):
r = requests.get(url, stream=True)
r.raise_for_status()
with open(save_path, "wb") as f:
for chunk in r.iter_content(8192*2):
f.write(chunk)
print(f"โœ… {save_path} ์ €์žฅ๋จ")
MODEL_PATH = "encoder.weights.h5"
TOKENIZER_PATH = "bpe.model"
if not os.path.exists(MODEL_PATH):
download_file(
"https://huggingface.co/OpenLab-NLP/openlem-prototype/resolve/main/encoder_simcse.weights.h5?download=true",
MODEL_PATH
)
if not os.path.exists(TOKENIZER_PATH):
download_file(
"https://huggingface.co/OpenLab-NLP/openlem-prototype/resolve/main/bpe.model?download=true",
TOKENIZER_PATH
)
MAX_LEN = 128
EMBED_DIM = 384
LATENT_DIM = 384
DROPOUT_RATE = 0.01
# ===============================
# 1๏ธโƒฃ ํ† ํฌ๋‚˜์ด์ € ๋กœ๋”ฉ
# ===============================
sp = spm.SentencePieceProcessor(TOKENIZER_PATH)
pad_id = sp.piece_to_id("<pad>") if sp.piece_to_id("<pad>") != -1 else 0
vocab_size = sp.get_piece_size()
def encode_sentence(sentence, max_len=MAX_LEN):
return sp.encode(sentence, out_type=int)[:max_len]
def pad_sentence(tokens):
return tokens + [pad_id]*(MAX_LEN - len(tokens))
class EncoderBlock(tf.keras.layers.Layer):
def __init__(self, embed_dim=EMBED_DIM, ff_dim=1152, seq_len=MAX_LEN):
super().__init__()
self.embed_dim = embed_dim
self.seq_len = seq_len
self.fc1 = layers.Dense(ff_dim)
self.fc2 = layers.Dense(embed_dim)
self.fc3 = layers.Dense(ff_dim//2)
self.fc4 = layers.Dense(embed_dim)
self.attn = layers.Dense(1)
self.token_mixer = layers.Dense(seq_len)
self.token_gate = layers.Dense(seq_len, activation='sigmoid')
self.ln = layers.LayerNormalization(epsilon=1e-5)
self.ln1 = layers.LayerNormalization(epsilon=1e-5)
self.ln2 = layers.LayerNormalization(epsilon=1e-5)
self.ln3 = layers.LayerNormalization(epsilon=1e-5)
self.ln4 = layers.LayerNormalization(epsilon=1e-5)
def call(self, x, mask):
mask = mask
# x: (B, L, D)
x_norm = self.ln(x)
h = self.fc1(x_norm)
g, v = tf.split(h, 2, axis=-1)
h = tf.nn.silu(g) * v
h = self.fc2(h)
h = x + self.ln1(h)
scores = self.attn(h)
scores = tf.where(tf.equal(mask[..., tf.newaxis], 0), -1e9, scores)
scores = tf.nn.softmax(scores, axis=1)
attn = h + self.ln2(h * scores)
v = tf.transpose(attn, [0, 2, 1])
v = self.token_mixer(v) * self.token_gate(v)
v = tf.transpose(v, [0, 2, 1])
x_norm = attn + self.ln3(v)
x = self.fc3(x_norm)
x = tf.nn.silu(x)
x = self.fc4(x)
return x_norm + self.ln4(x)
class L2NormLayer(layers.Layer):
def __init__(self, axis=1, epsilon=1e-10, **kwargs):
super().__init__(**kwargs)
self.axis = axis
self.epsilon = epsilon
def call(self, inputs):
return tf.math.l2_normalize(inputs, axis=self.axis, epsilon=self.epsilon)
def get_config(self):
return {"axis": self.axis, "epsilon": self.epsilon, **super().get_config()}
class SentenceEncoder(tf.keras.Model):
def __init__(self, vocab_size, embed_dim=384, latent_dim=384, max_len=128, pad_id=pad_id):
super().__init__()
self.pad_id = pad_id
self.embed = layers.Embedding(vocab_size, embed_dim)
self.pos_embed = layers.Embedding(input_dim=max_len, output_dim=embed_dim)
self.blocks = [EncoderBlock() for _ in range(2)]
self.attn_pool = layers.Dense(1)
self.ln_f = layers.LayerNormalization(epsilon=1e-5, dtype=tf.float32)
self.latent = layers.Dense(latent_dim, activation=None) # tanh ์ œ๊ฑฐ
self.l2norm = L2NormLayer() # ์ถ”๊ฐ€
def call(self, x):
positions = tf.range(tf.shape(x)[1])[tf.newaxis, :]
x_embed = self.embed(x) + self.pos_embed(positions)
mask = tf.cast(tf.not_equal(x, self.pad_id), tf.float32)
x = x_embed
for block in self.blocks:
x = block(x, mask)
x = self.ln_f(x)
scores = self.attn_pool(x)
scores = tf.where(tf.equal(mask[..., tf.newaxis], 0), -1e9, scores)
scores = tf.nn.softmax(scores, axis=1)
pooled = tf.reduce_sum(x * scores, axis=1)
latent = self.latent(pooled)
return self.l2norm(latent) # L2 ์ •๊ทœํ™” ํ›„ ๋ฐ˜ํ™˜
# 3๏ธโƒฃ ๋ชจ๋ธ ๋กœ๋“œ
# ===============================
encoder = SentenceEncoder(vocab_size=vocab_size)
encoder(np.zeros((1, MAX_LEN), dtype=np.int32)) # ๋ชจ๋ธ ๋นŒ๋“œ
encoder.load_weights(MODEL_PATH)
# ===============================
# 4๏ธโƒฃ ๋ฒกํ„ฐํ™” ํ•จ์ˆ˜
# ===============================
def get_sentence_vector(sentence):
tokens = pad_sentence(encode_sentence(sentence))
vec = encoder(np.array([tokens])).numpy()[0]
return vec / np.linalg.norm(vec)
# ===============================
# 5๏ธโƒฃ ๊ฐ€์žฅ ๋น„์Šทํ•œ ๋ฌธ์žฅ ์ฐพ๊ธฐ
# ===============================
def find_most_similar(query, s1, s2, s3):
candidates = [s1, s2, s3]
candidate_vectors = np.stack([get_sentence_vector(c) for c in candidates]).astype(np.float32)
query_vector = get_sentence_vector(query)
sims = candidate_vectors @ query_vector # cosine similarity
top_idx = np.argmax(sims)
return {
"๊ฐ€์žฅ ๋น„์Šทํ•œ ๋ฌธ์žฅ": candidates[top_idx],
"์œ ์‚ฌ๋„": float(sims[top_idx])
}
# ===============================
# 6๏ธโƒฃ Gradio UI
# ===============================
with gr.Blocks() as demo:
gr.Markdown("## ๐Ÿ” ๋ฌธ์žฅ ์œ ์‚ฌ๋„ ๊ฒ€์ƒ‰๊ธฐ (์ฟผ๋ฆฌ 1๊ฐœ + ํ›„๋ณด 3๊ฐœ)")
with gr.Row():
query_input = gr.Textbox(label="๊ฒ€์ƒ‰ํ•  ๋ฌธ์žฅ (Query)", placeholder="์—ฌ๊ธฐ์— ์ž…๋ ฅ")
with gr.Row():
s1_input = gr.Textbox(label="๊ฒ€์ƒ‰ ํ›„๋ณด 1")
s2_input = gr.Textbox(label="๊ฒ€์ƒ‰ ํ›„๋ณด 2")
s3_input = gr.Textbox(label="๊ฒ€์ƒ‰ ํ›„๋ณด 3")
output = gr.JSON(label="๊ฒฐ๊ณผ")
search_btn = gr.Button("๊ฐ€์žฅ ๋น„์Šทํ•œ ๋ฌธ์žฅ ์ฐพ๊ธฐ")
search_btn.click(
fn=find_most_similar,
inputs=[query_input, s1_input, s2_input, s3_input],
outputs=output
)
demo.launch()