import json import numpy as np import tensorflow as tf from tensorflow.keras import layers import sentencepiece as spm import requests from flask import Flask, request, Response, session, jsonify from bs4 import BeautifulSoup from huggingface_hub import hf_hub_download import uuid import os import time from collections import Counter app = Flask(__name__, static_folder="static") app.secret_key = os.urandom(32) # ===================== # 모델/토크나이저 다운로드 # ===================== os.environ["HF_HOME"] = "/tmp/hf_cache" hf_token = os.getenv("HF_TOKEN") CHAT_MODEL_PATH = hf_hub_download( repo_id="AlphaCNN/LamCo", filename="LamCo.weights.h5", repo_type="model", token=hf_token ) CHAT_TOKENIZER_PATH = hf_hub_download( repo_id="AlphaCNN/LamCo", filename="ko_unigram.model", repo_type="model", token=hf_token ) print(CHAT_MODEL_PATH) sp = spm.SentencePieceProcessor() sp.load(CHAT_TOKENIZER_PATH) pad_id = sp.piece_to_id("") or 0 start_id = sp.piece_to_id("") or 1 end_id = sp.piece_to_id("") or 2 unk_id = sp.piece_to_id("") or 3 sep_id = sp.piece_to_id("") vocab_size = sp.get_piece_size() max_len = 125 def text_to_ids(text): return sp.encode(text, out_type=int) def ids_to_text(ids): return sp.decode(ids) class SwiGLU(layers.Layer): def __init__(self, d_model, f_d=8/3): super().__init__() hidden_dim = int(d_model * f_d) self.proj = layers.Dense(hidden_dim * 2, use_bias=False, dtype='float32') self.out = layers.Dense(d_model, use_bias=False, dtype='float32') def call(self, x): x_val, x_gate = tf.split(self.proj(x), 2, axis=-1) return self.out(x_val * tf.nn.silu(x_gate)) class DilatedConvLayer(layers.Layer): def __init__(self, d_model, dilation_rate, dropout_rate=0.1): super().__init__() self.conv = layers.Conv1D( filters=d_model, kernel_size=3, dilation_rate=dilation_rate, padding='causal', use_bias=True, kernel_initializer='he_normal', dtype='float32' ) self.ln = layers.LayerNormalization(epsilon=1e-5, dtype='float32') self.dropout = layers.Dropout(dropout_rate) def call(self, x, training=False): residual = x x = self.conv(x) x = self.ln(x + residual) x = self.dropout(x, training=training) return x class Lamko(tf.keras.Model): def __init__(self, vocab_size, max_seq_len, d_model, n_layers, dropout_rate=0.1): super().__init__() self.token_embedding = layers.Embedding(vocab_size, d_model, dtype='float32') self.pos_embedding = layers.Embedding(max_seq_len, d_model, dtype='float32') self.blocks = [] for i in range(n_layers): self.blocks.append(DilatedConvLayer(d_model, 2 ** i, dropout_rate)) if (i + 1) % 3 == 0: self.blocks.append(SwiGLU(d_model)) self.blocks.append(layers.LayerNormalization(epsilon=1e-5, dtype='float32')) self.ln_f = layers.LayerNormalization(epsilon=1e-5, dtype='float32') def call(self, x, training=False): batch_size, seq_len = tf.shape(x)[0], tf.shape(x)[1] positions = tf.range(seq_len)[tf.newaxis, :] positions = tf.clip_by_value(positions, 0, self.pos_embedding.input_dim - 1) x = self.token_embedding(x) + self.pos_embedding(positions) for block in self.blocks: if isinstance(block, SwiGLU): x = x + block(x) else: x = block(x, training=training) if hasattr(block, 'training') else block(x) x = self.ln_f(x) logits = tf.matmul(x, self.token_embedding.weights[0], transpose_b=True) return logits model = Lamko(vocab_size=vocab_size, max_seq_len=max_len, d_model=384, n_layers=9) dummy_input = tf.zeros((1, max_len), dtype=tf.int32) _ = model(dummy_input) model.load_weights(CHAT_MODEL_PATH) print("모델 가중치 로드 완료!") @tf.function(input_signature=[ tf.TensorSpec(shape=(1, None), dtype=tf.int32), # input_ids tf.TensorSpec(shape=(vocab_size,), dtype=tf.int32), # token_counts tf.TensorSpec(shape=(), dtype=tf.int32), # current_length tf.TensorSpec(shape=(), dtype=tf.float32), # temperature tf.TensorSpec(shape=(), dtype=tf.float32), # repetition_penalty tf.TensorSpec(shape=(), dtype=tf.float32), # top_p tf.TensorSpec(shape=(), dtype=tf.int32), # top_k tf.TensorSpec(shape=(), dtype=tf.int32), # min_len tf.TensorSpec(shape=(), dtype=tf.int32), # step ]) def generate_step(input_ids, token_counts, current_length, temperature, repetition_penalty, top_p, top_k, min_len, step): pad_len = max_len - tf.shape(input_ids)[1] input_padded = tf.pad(input_ids, [[0,0],[0,pad_len]], constant_values=pad_id) logits = model(input_padded, training=False) next_logits = logits[0, current_length - 1] penalty = tf.pow(repetition_penalty, tf.cast(token_counts, tf.float32)) next_logits = next_logits / penalty # 최소 길이와 pad 마스킹 if current_length < min_len: next_logits = tf.tensor_scatter_nd_update(next_logits, [[end_id]], [-1e9]) next_logits = tf.tensor_scatter_nd_update(next_logits, [[pad_id]], [-1e9]) # top-k 필터링 if top_k > 0: kth_val = tf.math.top_k(next_logits, k=top_k).values[-1] mask = next_logits < kth_val next_logits = tf.where(mask, -1e9, next_logits) # top-p (nucleus) 필터링 + temperature next_logits = next_logits / temperature probs = tf.nn.softmax(next_logits) sorted_probs, sorted_idx = tf.math.top_k(probs, k=vocab_size) cum_probs = tf.cumsum(sorted_probs) cutoff_mask = cum_probs <= top_p cutoff_idx = tf.reduce_sum(tf.cast(cutoff_mask, tf.int32)) + 1 cutoff_idx = tf.minimum(cutoff_idx, vocab_size) filtered_idx = sorted_idx[:cutoff_idx] filtered_probs = sorted_probs[:cutoff_idx] filtered_probs = filtered_probs / tf.reduce_sum(filtered_probs) # 🔹 50%는 argmax, 50%는 샘플링 rand_val = tf.random.uniform([], 0, 1) def sample(): sampled_id = tf.random.categorical(tf.math.log([filtered_probs]), 1)[0,0] return filtered_idx[sampled_id] def argmax(): return filtered_idx[tf.argmax(filtered_probs)] sampled_id = tf.cond(rand_val < 0.5536, argmax, sample) sampled_id = tf.cast(sampled_id, tf.int32) # token_counts 업데이트 token_counts = tf.tensor_scatter_nd_add(token_counts, [[sampled_id]], [1]) return sampled_id, token_counts # ===================== # 스트리밍 생성기 (CPU 최적화 버전) # ===================== def generate_text_streaming(model, prompt, max_len=115, max_gen=100, temperature=0.75, min_len=20, repetition_penalty=1.2, top_p=0.9, top_k=50): model_input = text_to_ids(f" {prompt} ") model_input = model_input[:max_len] generated = list(model_input) start_output_idx = len(model_input) # TF 변수로 토큰 카운트 관리 token_counts_np = np.zeros(vocab_size, dtype=np.int32) for t in generated: token_counts_np[t] += 1 token_counts = tf.Variable(token_counts_np, dtype=tf.int32) prev_decoded = "" for step in range(max_gen): input_tensor = tf.expand_dims(generated, axis=0) # [1, seq_len] sampled_id, token_counts = generate_step( input_tensor, token_counts, tf.constant(len(generated), dtype=tf.int32), tf.constant(temperature, dtype=tf.float32), tf.constant(repetition_penalty, dtype=tf.float32), tf.constant(top_p, dtype=tf.float32), tf.constant(top_k, dtype=tf.int32), tf.constant(min_len, dtype=tf.int32), tf.constant(step, dtype=tf.int32) ) sampled_id = int(sampled_id.numpy()) generated.append(sampled_id) # 디코딩은 출력 시점에만 if len(generated) > start_output_idx: decoded_full = sp.decode(generated[start_output_idx:]) decoded_full = decoded_full.replace("▁", " ").strip() for t in ["", "", ""]: decoded_full = decoded_full.replace(t, "") decoded_full = decoded_full.lstrip(",!?.는은 ") new_output = decoded_full[len(prev_decoded):] if new_output: yield new_output prev_decoded = decoded_full # 종료 조건 if len(generated) >= min_len and (sampled_id == end_id or decoded_full.endswith(('.', '!', '?'))): break token_map = { "하이": "안녕하세요!", "ㅎㅇ": "안녕하세요!", "하이~": "안녕하세요!", "안녕": "안녕하세요!", "안녕!": "안녕하세요!", "잘가": "잘가. 나중에 보자", "잘 가": "잘 가. 나중에 보자" } def preprocess_text(text): for key, val in token_map.items(): text = text.replace(key, val) return text # ===================== @app.route('/') def index(): return app.send_static_file('index.html') @app.route('/api/search') def search_api(): query = request.args.get("query", "").strip() if not query: return jsonify({"results": []}) search_url = f"https://ko.wikipedia.org/w/index.php?search={query}" headers = {"User-Agent": "Mozilla/5.0"} resp = requests.get(search_url, headers=headers) soup = BeautifulSoup(resp.text, "html.parser") results = [] # 1. 검색 결과 리스트가 있는 경우 search_items = soup.select(".mw-search-result-heading a") if search_items: for item in search_items[:5]: title = item.text link = "https://ko.wikipedia.org" + item.get("href") snippet_tag = item.find_parent().find("div", class_="searchresult") snippet = snippet_tag.text.strip() if snippet_tag else "" results.append({"title": title, "link": link, "snippet": snippet}) # 2. 검색어와 정확히 일치하는 페이지로 바로 이동한 경우 elif soup.select("#firstHeading"): title = soup.select_one("#firstHeading").text.strip() link = resp.url # 문서 첫 번째 단락 추출 content_paragraph = soup.select_one(".mw-parser-output > p") snippet = content_paragraph.text.strip() if content_paragraph else "" results.append({"title": title, "link": link, "snippet": snippet}) return jsonify({"results": results}) @app.before_request def ensure_user_id(): if 'user_id' not in session: session['user_id'] = str(uuid.uuid4()) @app.route('/api/chat', methods=['GET','POST']) def chat_api(): user_msg = (request.json.get("message") if request.method=="POST" else request.args.get("message") or "").strip() if not user_msg: return Response((f'data: {{"error":"메시지를 입력해주세요."}}\n\n' for _ in range(1)), mimetype='text/event-stream') user_id = session['user_id'] user_msg = preprocess_text(user_msg) def gen(): try: # 외부 검색 제거, search_result는 항상 빈 문자열 search_result = "" for token in generate_text_streaming( model, user_msg, max_len=max_len, max_gen=115, temperature=0.8, min_len=10, repetition_penalty=1.1, top_p=0.9, top_k=5 ): safe_token = json.dumps(token) yield f'data: {{"char":{safe_token}}}\n\n' yield 'data: {"done":true}\n\n' except Exception as e: yield f'data: {{"error":{json.dumps(str(e))}}}\n\n' return Response(gen(), mimetype='text/event-stream') if __name__=="__main__": app.run(host="0.0.0.0", port=7860)