|
|
import json |
|
|
import numpy as np |
|
|
import tensorflow as tf |
|
|
from tensorflow.keras import layers |
|
|
import sentencepiece as spm |
|
|
import requests |
|
|
from flask import Flask, request, Response, session, jsonify |
|
|
from bs4 import BeautifulSoup |
|
|
from huggingface_hub import hf_hub_download |
|
|
import uuid |
|
|
import os |
|
|
import time |
|
|
from collections import Counter |
|
|
|
|
|
app = Flask(__name__, static_folder="static") |
|
|
app.secret_key = os.urandom(32) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
os.environ["HF_HOME"] = "/tmp/hf_cache" |
|
|
hf_token = os.getenv("HF_TOKEN") |
|
|
|
|
|
CHAT_MODEL_PATH = hf_hub_download( |
|
|
repo_id="AlphaCNN/LamCo", |
|
|
filename="LamCo.weights.h5", |
|
|
repo_type="model", |
|
|
token=hf_token |
|
|
) |
|
|
CHAT_TOKENIZER_PATH = hf_hub_download( |
|
|
repo_id="AlphaCNN/LamCo", |
|
|
filename="ko_unigram.model", |
|
|
repo_type="model", |
|
|
token=hf_token |
|
|
) |
|
|
|
|
|
print(CHAT_MODEL_PATH) |
|
|
sp = spm.SentencePieceProcessor() |
|
|
sp.load(CHAT_TOKENIZER_PATH) |
|
|
pad_id = sp.piece_to_id("<pad>") or 0 |
|
|
start_id = sp.piece_to_id("<start>") or 1 |
|
|
end_id = sp.piece_to_id("<end>") or 2 |
|
|
unk_id = sp.piece_to_id("<unk>") or 3 |
|
|
sep_id = sp.piece_to_id("<sep>") |
|
|
vocab_size = sp.get_piece_size() |
|
|
max_len = 125 |
|
|
|
|
|
def text_to_ids(text): |
|
|
return sp.encode(text, out_type=int) |
|
|
|
|
|
def ids_to_text(ids): |
|
|
return sp.decode(ids) |
|
|
|
|
|
class SwiGLU(layers.Layer): |
|
|
def __init__(self, d_model, f_d=8/3): |
|
|
super().__init__() |
|
|
hidden_dim = int(d_model * f_d) |
|
|
self.proj = layers.Dense(hidden_dim * 2, use_bias=False, dtype='float32') |
|
|
self.out = layers.Dense(d_model, use_bias=False, dtype='float32') |
|
|
|
|
|
def call(self, x): |
|
|
x_val, x_gate = tf.split(self.proj(x), 2, axis=-1) |
|
|
return self.out(x_val * tf.nn.silu(x_gate)) |
|
|
|
|
|
class DilatedConvLayer(layers.Layer): |
|
|
def __init__(self, d_model, dilation_rate, dropout_rate=0.1): |
|
|
super().__init__() |
|
|
self.conv = layers.Conv1D( |
|
|
filters=d_model, |
|
|
kernel_size=3, |
|
|
dilation_rate=dilation_rate, |
|
|
padding='causal', |
|
|
use_bias=True, |
|
|
kernel_initializer='he_normal', |
|
|
dtype='float32' |
|
|
) |
|
|
self.ln = layers.LayerNormalization(epsilon=1e-5, dtype='float32') |
|
|
self.dropout = layers.Dropout(dropout_rate) |
|
|
|
|
|
def call(self, x, training=False): |
|
|
residual = x |
|
|
x = self.conv(x) |
|
|
x = self.ln(x + residual) |
|
|
x = self.dropout(x, training=training) |
|
|
return x |
|
|
|
|
|
class Lamko(tf.keras.Model): |
|
|
def __init__(self, vocab_size, max_seq_len, d_model, n_layers, dropout_rate=0.1): |
|
|
super().__init__() |
|
|
self.token_embedding = layers.Embedding(vocab_size, d_model, dtype='float32') |
|
|
self.pos_embedding = layers.Embedding(max_seq_len, d_model, dtype='float32') |
|
|
|
|
|
self.blocks = [] |
|
|
for i in range(n_layers): |
|
|
self.blocks.append(DilatedConvLayer(d_model, 2 ** i, dropout_rate)) |
|
|
if (i + 1) % 3 == 0: |
|
|
self.blocks.append(SwiGLU(d_model)) |
|
|
self.blocks.append(layers.LayerNormalization(epsilon=1e-5, dtype='float32')) |
|
|
|
|
|
self.ln_f = layers.LayerNormalization(epsilon=1e-5, dtype='float32') |
|
|
|
|
|
def call(self, x, training=False): |
|
|
batch_size, seq_len = tf.shape(x)[0], tf.shape(x)[1] |
|
|
positions = tf.range(seq_len)[tf.newaxis, :] |
|
|
positions = tf.clip_by_value(positions, 0, self.pos_embedding.input_dim - 1) |
|
|
|
|
|
x = self.token_embedding(x) + self.pos_embedding(positions) |
|
|
|
|
|
for block in self.blocks: |
|
|
if isinstance(block, SwiGLU): |
|
|
x = x + block(x) |
|
|
else: |
|
|
x = block(x, training=training) if hasattr(block, 'training') else block(x) |
|
|
|
|
|
x = self.ln_f(x) |
|
|
logits = tf.matmul(x, self.token_embedding.weights[0], transpose_b=True) |
|
|
return logits |
|
|
|
|
|
model = Lamko(vocab_size=vocab_size, max_seq_len=max_len, d_model=384, n_layers=9) |
|
|
dummy_input = tf.zeros((1, max_len), dtype=tf.int32) |
|
|
_ = model(dummy_input) |
|
|
model.load_weights(CHAT_MODEL_PATH) |
|
|
print("λͺ¨λΈ κ°μ€μΉ λ‘λ μλ£!") |
|
|
|
|
|
@tf.function(input_signature=[ |
|
|
tf.TensorSpec(shape=(1, None), dtype=tf.int32), |
|
|
tf.TensorSpec(shape=(vocab_size,), dtype=tf.int32), |
|
|
tf.TensorSpec(shape=(), dtype=tf.int32), |
|
|
tf.TensorSpec(shape=(), dtype=tf.float32), |
|
|
tf.TensorSpec(shape=(), dtype=tf.float32), |
|
|
tf.TensorSpec(shape=(), dtype=tf.float32), |
|
|
tf.TensorSpec(shape=(), dtype=tf.int32), |
|
|
tf.TensorSpec(shape=(), dtype=tf.int32), |
|
|
tf.TensorSpec(shape=(), dtype=tf.int32), |
|
|
]) |
|
|
def generate_step(input_ids, token_counts, current_length, temperature, repetition_penalty, top_p, top_k, min_len, step): |
|
|
pad_len = max_len - tf.shape(input_ids)[1] |
|
|
input_padded = tf.pad(input_ids, [[0,0],[0,pad_len]], constant_values=pad_id) |
|
|
logits = model(input_padded, training=False) |
|
|
next_logits = logits[0, current_length - 1] |
|
|
|
|
|
penalty = tf.pow(repetition_penalty, tf.cast(token_counts, tf.float32)) |
|
|
next_logits = next_logits / penalty |
|
|
|
|
|
|
|
|
if current_length < min_len: |
|
|
next_logits = tf.tensor_scatter_nd_update(next_logits, [[end_id]], [-1e9]) |
|
|
next_logits = tf.tensor_scatter_nd_update(next_logits, [[pad_id]], [-1e9]) |
|
|
|
|
|
|
|
|
if top_k > 0: |
|
|
kth_val = tf.math.top_k(next_logits, k=top_k).values[-1] |
|
|
mask = next_logits < kth_val |
|
|
next_logits = tf.where(mask, -1e9, next_logits) |
|
|
|
|
|
|
|
|
next_logits = next_logits / temperature |
|
|
probs = tf.nn.softmax(next_logits) |
|
|
sorted_probs, sorted_idx = tf.math.top_k(probs, k=vocab_size) |
|
|
cum_probs = tf.cumsum(sorted_probs) |
|
|
cutoff_mask = cum_probs <= top_p |
|
|
cutoff_idx = tf.reduce_sum(tf.cast(cutoff_mask, tf.int32)) + 1 |
|
|
cutoff_idx = tf.minimum(cutoff_idx, vocab_size) |
|
|
filtered_idx = sorted_idx[:cutoff_idx] |
|
|
filtered_probs = sorted_probs[:cutoff_idx] |
|
|
filtered_probs = filtered_probs / tf.reduce_sum(filtered_probs) |
|
|
|
|
|
|
|
|
rand_val = tf.random.uniform([], 0, 1) |
|
|
def sample(): |
|
|
sampled_id = tf.random.categorical(tf.math.log([filtered_probs]), 1)[0,0] |
|
|
return filtered_idx[sampled_id] |
|
|
def argmax(): |
|
|
return filtered_idx[tf.argmax(filtered_probs)] |
|
|
sampled_id = tf.cond(rand_val < 0.5536, argmax, sample) |
|
|
sampled_id = tf.cast(sampled_id, tf.int32) |
|
|
|
|
|
|
|
|
token_counts = tf.tensor_scatter_nd_add(token_counts, [[sampled_id]], [1]) |
|
|
return sampled_id, token_counts |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_text_streaming(model, prompt, max_len=115, max_gen=100, |
|
|
temperature=0.75, min_len=20, |
|
|
repetition_penalty=1.2, top_p=0.9, top_k=50): |
|
|
model_input = text_to_ids(f"<start> {prompt} <sep>") |
|
|
model_input = model_input[:max_len] |
|
|
generated = list(model_input) |
|
|
start_output_idx = len(model_input) |
|
|
|
|
|
|
|
|
token_counts_np = np.zeros(vocab_size, dtype=np.int32) |
|
|
for t in generated: |
|
|
token_counts_np[t] += 1 |
|
|
token_counts = tf.Variable(token_counts_np, dtype=tf.int32) |
|
|
|
|
|
prev_decoded = "" |
|
|
|
|
|
for step in range(max_gen): |
|
|
input_tensor = tf.expand_dims(generated, axis=0) |
|
|
|
|
|
sampled_id, token_counts = generate_step( |
|
|
input_tensor, |
|
|
token_counts, |
|
|
tf.constant(len(generated), dtype=tf.int32), |
|
|
tf.constant(temperature, dtype=tf.float32), |
|
|
tf.constant(repetition_penalty, dtype=tf.float32), |
|
|
tf.constant(top_p, dtype=tf.float32), |
|
|
tf.constant(top_k, dtype=tf.int32), |
|
|
tf.constant(min_len, dtype=tf.int32), |
|
|
tf.constant(step, dtype=tf.int32) |
|
|
) |
|
|
|
|
|
sampled_id = int(sampled_id.numpy()) |
|
|
generated.append(sampled_id) |
|
|
|
|
|
|
|
|
if len(generated) > start_output_idx: |
|
|
decoded_full = sp.decode(generated[start_output_idx:]) |
|
|
decoded_full = decoded_full.replace("β", " ").strip() |
|
|
for t in ["<start>", "<sep>", "<end>"]: |
|
|
decoded_full = decoded_full.replace(t, "") |
|
|
decoded_full = decoded_full.lstrip(",!?.λμ ") |
|
|
|
|
|
new_output = decoded_full[len(prev_decoded):] |
|
|
if new_output: |
|
|
yield new_output |
|
|
prev_decoded = decoded_full |
|
|
|
|
|
|
|
|
if len(generated) >= min_len and (sampled_id == end_id or decoded_full.endswith(('.', '!', '?'))): |
|
|
break |
|
|
|
|
|
token_map = { |
|
|
"νμ΄": "μλ
νμΈμ!", |
|
|
"γ
γ
": "μλ
νμΈμ!", |
|
|
"νμ΄~": "μλ
νμΈμ!", |
|
|
"μλ
": "μλ
νμΈμ!", |
|
|
"μλ
!": "μλ
νμΈμ!", |
|
|
"μκ°": "μκ°. λμ€μ 보μ", |
|
|
"μ κ°": "μ κ°. λμ€μ 보μ" |
|
|
} |
|
|
|
|
|
def preprocess_text(text): |
|
|
for key, val in token_map.items(): |
|
|
text = text.replace(key, val) |
|
|
return text |
|
|
|
|
|
|
|
|
@app.route('/') |
|
|
def index(): |
|
|
return app.send_static_file('index.html') |
|
|
|
|
|
@app.route('/api/search') |
|
|
def search_api(): |
|
|
query = request.args.get("query", "").strip() |
|
|
if not query: |
|
|
return jsonify({"results": []}) |
|
|
|
|
|
search_url = f"https://ko.wikipedia.org/w/index.php?search={query}" |
|
|
headers = {"User-Agent": "Mozilla/5.0"} |
|
|
resp = requests.get(search_url, headers=headers) |
|
|
soup = BeautifulSoup(resp.text, "html.parser") |
|
|
|
|
|
results = [] |
|
|
|
|
|
|
|
|
search_items = soup.select(".mw-search-result-heading a") |
|
|
if search_items: |
|
|
for item in search_items[:5]: |
|
|
title = item.text |
|
|
link = "https://ko.wikipedia.org" + item.get("href") |
|
|
snippet_tag = item.find_parent().find("div", class_="searchresult") |
|
|
snippet = snippet_tag.text.strip() if snippet_tag else "" |
|
|
results.append({"title": title, "link": link, "snippet": snippet}) |
|
|
|
|
|
|
|
|
elif soup.select("#firstHeading"): |
|
|
title = soup.select_one("#firstHeading").text.strip() |
|
|
link = resp.url |
|
|
|
|
|
content_paragraph = soup.select_one(".mw-parser-output > p") |
|
|
snippet = content_paragraph.text.strip() if content_paragraph else "" |
|
|
results.append({"title": title, "link": link, "snippet": snippet}) |
|
|
|
|
|
return jsonify({"results": results}) |
|
|
|
|
|
@app.before_request |
|
|
def ensure_user_id(): |
|
|
if 'user_id' not in session: |
|
|
session['user_id'] = str(uuid.uuid4()) |
|
|
|
|
|
@app.route('/api/chat', methods=['GET','POST']) |
|
|
def chat_api(): |
|
|
user_msg = (request.json.get("message") if request.method=="POST" else request.args.get("message") or "").strip() |
|
|
if not user_msg: |
|
|
return Response((f'data: {{"error":"λ©μμ§λ₯Ό μ
λ ₯ν΄μ£ΌμΈμ."}}\n\n' for _ in range(1)), |
|
|
mimetype='text/event-stream') |
|
|
user_id = session['user_id'] |
|
|
|
|
|
user_msg = preprocess_text(user_msg) |
|
|
|
|
|
def gen(): |
|
|
try: |
|
|
|
|
|
search_result = "" |
|
|
for token in generate_text_streaming( |
|
|
model, user_msg, |
|
|
max_len=max_len, |
|
|
max_gen=115, |
|
|
temperature=0.8, |
|
|
min_len=10, |
|
|
repetition_penalty=1.1, |
|
|
top_p=0.9, |
|
|
top_k=5 |
|
|
): |
|
|
safe_token = json.dumps(token) |
|
|
yield f'data: {{"char":{safe_token}}}\n\n' |
|
|
|
|
|
yield 'data: {"done":true}\n\n' |
|
|
except Exception as e: |
|
|
yield f'data: {{"error":{json.dumps(str(e))}}}\n\n' |
|
|
|
|
|
return Response(gen(), mimetype='text/event-stream') |
|
|
|
|
|
if __name__=="__main__": |
|
|
app.run(host="0.0.0.0", port=7860) |