|
import gradio as gr |
|
import gspread |
|
import torch |
|
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline |
|
from oauth2client.service_account import ServiceAccountCredentials |
|
from llama_index.core import VectorStoreIndex, Settings |
|
from llama_index.core.node_parser import SentenceSplitter |
|
from llama_index.embeddings.huggingface import HuggingFaceEmbedding |
|
from llama_index.core.schema import Document |
|
|
|
|
|
cached_index = None |
|
cached_data = {} |
|
|
|
def read_google_sheets_separated(): |
|
try: |
|
scope = ["https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/drive"] |
|
creds = ServiceAccountCredentials.from_json_keyfile_name("credentials.json", scope) |
|
client = gspread.authorize(creds) |
|
|
|
SPREADSHEET_ID = "1ZLmz1onvPEX4TbgPJbR4LxVZjIluf6BpISTiGS5_5Rg" |
|
sheet_names = ["datatarget", "datacuti", "dataabsen", "datalembur", "pkb"] |
|
spreadsheet = client.open_by_key(SPREADSHEET_ID) |
|
|
|
data_map = {} |
|
|
|
for sheet_name in sheet_names: |
|
try: |
|
sheet = spreadsheet.worksheet(sheet_name) |
|
data = sheet.get_all_values() |
|
headers = data[0] |
|
rows = data[1:] |
|
entries = [] |
|
|
|
if sheet_name == "datatarget": |
|
for row in rows: |
|
if len(row) >= 4: |
|
try: |
|
jumlah = int(row[3]) |
|
status = "KURANG" if jumlah < 0 else "LEBIH" |
|
entries.append( |
|
f"[SELISIH] Mesin: {row[0]} | Kategori: {row[1]} | Bulan: {row[2]} | Selisih: {abs(jumlah)} pcs ({status})" |
|
) |
|
except ValueError: |
|
|
|
entries.append( |
|
f"[WARNING] Data tidak valid: {' | '.join(row)}" |
|
) |
|
|
|
elif sheet_name == "datacuti": |
|
for row in rows: |
|
if len(row) >= 3: |
|
entries.append(f"{row[0]} memiliki sisa cuti {row[1]} hari pada tahun {row[2]}") |
|
|
|
elif sheet_name == "dataabsen": |
|
for row in rows: |
|
if len(row) >= 3: |
|
entries.append(f"Kehadiran {row[0]} adalah {row[1]} hari pada bulan {row[2]}") |
|
|
|
elif sheet_name == "datalembur": |
|
for row in rows: |
|
if len(row) >= 3: |
|
entries.append(f"{row[0]} telah lembur sebanyak {row[1]} jam pada bulan {row[2]}") |
|
|
|
elif sheet_name == "pkb": |
|
for row in rows: |
|
if len(row) >= 4: |
|
bab, poin, kategori, isi = row[0], row[1], row[2], row[3] |
|
entries.append(f"Bab {bab}, Poin {poin} - Kategori: {kategori}\nIsi: {isi}") |
|
|
|
data_map[sheet_name] = entries |
|
except gspread.exceptions.WorksheetNotFound: |
|
data_map[sheet_name] = [f"❌ ERROR: Worksheet {sheet_name} tidak ditemukan."] |
|
|
|
return data_map |
|
except Exception as e: |
|
return {"error": str(e)} |
|
|
|
def detect_intent(message): |
|
msg = message.lower() |
|
|
|
intent_keywords = { |
|
"pkb": ["ketentuan", "aturan", "kompensasi", "hak", "berlaku", "diperbolehkan", "pkb", "perusahaan", "pekerja", |
|
"tenaga kerja asing", "jam kerja", "kerja lembur", "perjalanan dinas", "pengupahan", |
|
"pemutusan hubungan kerja", "jaminan sosial", "kesejahteraan", "fasilitas kerja", |
|
"alih tugas", "kewajiban", "disiplin kerja", "larangan", "sanksi", "mogok", |
|
"pesangon", "penghargaan masa kerja", "uang pisah"], |
|
|
|
"cuti": ["cuti", "sisa cuti", "jumlah cuti", "berapa hari cuti", "libur"], |
|
|
|
"target": ["target", "aktual", "selisih", "produksi", "mesin", "pcs"], |
|
|
|
"lembur": ["lembur", "jam lembur", "berapa jam", "jam kerja tambahan"], |
|
|
|
"absensi": ["absensi", "hadir", "tidak hadir", "izin", "masuk", "alpha", "berapa hari masuk", "kehadiran"] |
|
} |
|
|
|
scores = {} |
|
for intent, keywords in intent_keywords.items(): |
|
scores[intent] = sum(1 for k in keywords if k in msg) |
|
|
|
best_intent = max(scores, key=scores.get) |
|
|
|
|
|
return best_intent if scores[best_intent] > 0 else "all" |
|
|
|
def initialize_index(): |
|
global cached_index, cached_data |
|
cached_data = read_google_sheets_separated() |
|
all_text = sum(cached_data.values(), []) |
|
document = Document(text="\n".join(all_text)) |
|
parser = SentenceSplitter(chunk_size=256, chunk_overlap=30) |
|
nodes = parser.get_nodes_from_documents([document]) |
|
|
|
embedding = HuggingFaceEmbedding("sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2") |
|
Settings.embed_model = embedding |
|
cached_index = VectorStoreIndex(nodes) |
|
|
|
def search_google_sheets_vector(query): |
|
if cached_data == {}: |
|
initialize_index() |
|
|
|
intent = detect_intent(query) |
|
|
|
if intent == "pkb": |
|
selected_data = cached_data.get("pkb", []) |
|
elif intent == "cuti": |
|
selected_data = cached_data.get("datacuti", []) |
|
elif intent == "target": |
|
selected_data = cached_data.get("datatarget", []) |
|
elif intent == "absensi": |
|
selected_data =cached_data.get("dataabsen", []) |
|
elif intent == "lembur": |
|
selected_data =cached_data.get("datalembur", []) |
|
else: |
|
selected_data = sum(cached_data.values(), []) |
|
|
|
document = Document(text="\n".join(selected_data)) |
|
parser = SentenceSplitter(chunk_size=256, chunk_overlap=30) |
|
nodes = parser.get_nodes_from_documents([document]) |
|
|
|
embedding = HuggingFaceEmbedding("sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2") |
|
Settings.embed_model = embedding |
|
temp_index = VectorStoreIndex(nodes) |
|
|
|
retriever = temp_index.as_retriever(similarity_top_k=3) |
|
retriever.similarity_cutoff = 1.0 |
|
retrieved_nodes = retriever.retrieve(query) |
|
|
|
results = [node.text for node in retrieved_nodes] |
|
return "\n".join(results) if results else "Maaf, saya tidak menemukan informasi yang relevan." |
|
|
|
|
|
def load_model(): |
|
model_id = "NousResearch/Llama-2-7b-chat-hf" |
|
tokenizer = AutoTokenizer.from_pretrained(model_id) |
|
model = AutoModelForCausalLM.from_pretrained( |
|
model_id, |
|
device_map="auto", |
|
torch_dtype=torch.float16 |
|
) |
|
pipe = pipeline( |
|
"text-generation", |
|
model=model, |
|
tokenizer=tokenizer, |
|
max_new_tokens=512, |
|
temperature=0.7, |
|
repetition_penalty=1.2, |
|
do_sample=True, |
|
) |
|
return pipe |
|
|
|
|
|
def generate_prompt(user_message, context_data): |
|
prompt = f""" |
|
### SISTEM: |
|
Anda adalah chatbot HRD yang membantu karyawan memahami administrasi perusahaan. |
|
Jangan menjawab menggunakan Bahasa Inggris. |
|
Gunakan Bahasa Indonesia dengan gaya profesional dan ramah. |
|
Jika informasi tidak tersedia dalam dokumen, katakan dengan sopan bahwa Anda tidak tahu. |
|
Jawaban harus singkat, jelas, dan sesuai konteks. |
|
Jangan memberikan jawaban untuk pertanyaan yang tidak diajukan oleh pengguna. |
|
Jangan menyertakan rekomendasi pertanyaan lain. |
|
### DATA: |
|
{context_data} |
|
### PERTANYAAN: |
|
{user_message} |
|
### JAWABAN: |
|
""" |
|
return prompt.strip() |
|
|
|
|
|
def should_use_history(message): |
|
keywords = ["jika", "tadi", "sebelumnya","kalau begitu", "gimana kalau", "lanjutkan", "terus", "bagaimana dengan", "berarti", "jadi", "oke lalu"] |
|
return any(kata in message.lower() for kata in keywords) |
|
|
|
def generate_response(message, history, pipe): |
|
if should_use_history(message) and history: |
|
previous_message = history[-1][0] |
|
combined_message = previous_message + " " + message |
|
else: |
|
combined_message = message |
|
|
|
context = search_google_sheets_vector(combined_message) |
|
|
|
if "❌ ERROR" in context or context.strip() == "" or "tidak ditemukan" in context.lower(): |
|
return "Maaf, saya tidak menemukan informasi yang relevan untuk pertanyaan tersebut." |
|
|
|
full_prompt = generate_prompt(message, context) |
|
response = pipe(full_prompt)[0]["generated_text"] |
|
cleaned = response.split("### JAWABAN:")[-1].strip() |
|
|
|
history = history or [] |
|
history.append((message, cleaned)) |
|
return cleaned |
|
|
|
|
|
def main(): |
|
pipe = load_model() |
|
initialize_index() |
|
|
|
def chatbot_fn(message, history): |
|
return generate_response(message, history, pipe) |
|
|
|
gr.ChatInterface( |
|
fn=chatbot_fn, |
|
title="Chatbot HRD - Transformers", |
|
theme="compact" |
|
).launch(share=True) |
|
|
|
if __name__ == "__main__": |
|
main() |