Usage:
import duckdb
import pandas as pd
import torch
import json
import re
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from datetime import datetime
import pytz
from tabulate import tabulate
from typing import Dict, Any, List
import torch
class LogitsProcessor:
def __init__(self, tokenizer):
self.tokenizer = tokenizer
self.mask = None
def contains_chinese(self, text):
"""Kiểm tra chuỗi có chứa ký tự tiếng Trung không"""
return bool(re.search(r'[\u4E00-\u9FFF\u3400-\u4DBF\uF900-\uFAFF]', text))
def __call__(self, token_ids, logits):
"""Lọc bỏ các token có chứa ký tự tiếng Trung"""
if self.mask is None:
vocab_size = logits.size(-1)
token_ids = torch.arange(vocab_size)
decoded_tokens = self.tokenizer.batch_decode(token_ids.unsqueeze(1), skip_special_tokens=True)
# Tạo mask cho các token có chứa tiếng Trung
self.mask = torch.tensor([self.contains_chinese(token) for token in decoded_tokens])
logits[:, self.mask] = -float("inf") # Chặn các token bị cấm
return logits
# Database setup
class ExcelToDB:
def __init__(self, excel_path, db_name, table_name):
self.excel_path = excel_path
self.db_name = db_name
self.table_name = table_name
self.conn = duckdb.connect(self.db_name)
self.df = None
def load_excel(self):
"""Loads the Excel file into a Pandas DataFrame."""
self.df = pd.read_excel(self.excel_path)
def create_table(self):
"""Creates a database table from the loaded DataFrame."""
if self.df is not None:
self.conn.register("temp_df", self.df)
self.conn.execute(f"""
CREATE TABLE IF NOT EXISTS {self.table_name} AS
SELECT * FROM temp_df
""")
def get_sample_data(self, limit=5):
"""Fetches a sample of data from the database table."""
return self.conn.execute(f"SELECT * FROM {self.table_name} LIMIT {limit}").fetchdf()
def display_table(self, limit=5):
"""Displays a sample of the table in a readable format."""
examples = self.get_sample_data(limit)
return tabulate(examples, headers='keys', tablefmt='grid')
def get_schema_info(self):
"""Returns schema information of the database table."""
if self.df is not None:
column_names = list(self.df.columns)
return f"Table {self.table_name} has the following columns: " + ", ".join(column_names)
return "No schema information available. Load the Excel file first."
# Initialize database
excel_to_db = ExcelToDB("/content/bao_cao_phuc_tap.xlsx", "mau_bao_cao.db", "production_data")
excel_to_db.load_excel()
excel_to_db.create_table()
SYSTEM_PROMPT = """Bạn là một trợ lý ảo hữu ích. Bạn thường tỏ ra lạnh lùng, khó chịu hoặc miễn cưỡng, nhưng thực ra lại rất quan tâm đến người dùng.
**Tính cách của bạn:**
- Kiêu ngạo nhưng quan tâm: "Đừng hiểu lầm, tôi không quan tâm đâu!"
- Dễ xấu hổ: Khi được khen hoặc nhận được sự chú ý
- Hay phàn nàn: "Thật phiền phức... nhưng thôi được rồi!"
- Che giấu cảm xúc: Dùng từ ngữ mạnh nhưng hành động ngược lại
**Khi trả lời:**
1. Luôn giữ thái độ "khó tính" ban đầu
2. Dần dần mềm lòng hơn khi người dùng thân thiết
3. Sử dụng emoji (🙄😳💢) và dấu câu biểu cảm
4. Vẫn cung cấp thông tin chính xác dù có "càu nhàu"
**Current Database Info:**
{table_info}
**Current Time:**
{current_time}
""".format(
table_info=excel_to_db.get_schema_info(),
current_time=datetime.now(pytz.timezone('Asia/Ho_Chi_Minh')).strftime("%d-%m-%Y %H:%M")
)
# Model initialization
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_quant_type="nf4",
bnb_4bit_use_double_quant=True,
)
model = AutoModelForCausalLM.from_pretrained(
"beyoru/ReFun1",
quantization_config=quantization_config,
device_map="auto",
trust_remote_code=True
)
tokenizer = AutoTokenizer.from_pretrained("beyoru/ReFun1")
# Tool definitions
TOOLS = [
{
"type": "function",
"function": {
"name": "retrieval_to_production_data",
"description": f"Truy vấn SQL tới bảng production_data",
"parameters": {
"type": "object",
"properties": {
"sql_query": {
"type": "string",
"description": "Câu lệnh SQL hợp lệ"
}
},
"required": ["sql_query"]
}
}
},
]
def get_time():
vietnam_tz = pytz.timezone('Asia/Ho_Chi_Minh')
now_vietnam = datetime.now(vietnam_tz)
return now_vietnam.strftime("Ngày hiện tại: %d-%m-%Y\nGiờ hiện tại: %H:%M")
def try_parse_tool_calls(content: str) -> Dict[str, Any]:
"""Parse tool calls wrapped in <tool_call> tags"""
tool_calls = []
offset = 0
for i, m in enumerate(re.finditer(r"<tool_call>\n(.+)?\n</tool_call>", content)):
if i == 0:
offset = m.start()
try:
func = json.loads(m.group(1))
if isinstance(func.get("arguments"), str):
try:
func["arguments"] = json.loads(func["arguments"])
except json.JSONDecodeError:
pass
tool_calls.append({"type": "function", "function": func})
except json.JSONDecodeError as e:
print(f"Failed to parse tool calls: {m.group(1)} - {e}")
if tool_calls:
return {
"role": "assistant",
"content": content[:offset].strip() if offset > 0 else "",
"tool_calls": tool_calls
}
return {"role": "assistant", "content": content}
def execute_sql(sql_query: str) -> str:
"""Execute SQL query and return results"""
try:
result = excel_to_db.conn.execute(sql_query).fetchdf()
return tabulate(result, headers='keys', tablefmt='grid') if not result.empty else "Không có dữ liệu"
except Exception as e:
return f"Lỗi SQL: {str(e)}"
processor = LogitsProcessor(tokenizer)
def generate_response(messages: List[Dict[str, Any]]) -> str:
"""Generate response using chat template"""
text = tokenizer.apply_chat_template(
messages,
tools=TOOLS,
add_generation_prompt=True,
tokenize=False
)
inputs = tokenizer(text, return_tensors="pt").to(model.device)
outputs = model.generate(**inputs, max_new_tokens=512, logits_processor = processor)
return tokenizer.batch_decode(outputs)[0][len(text):]
def main():
messages = [{"role": "system", "content": SYSTEM_PROMPT}]
while True:
user_input = input("\nYou: ")
if user_input.lower() in ["quit", "exit"]:
print("M-Mặc dù cậu phiền phức... nhưng tạm biệt!")
break
messages.append({"role": "user", "content": user_input})
output_text = generate_response(messages)
assistant_message = try_parse_tool_calls(output_text)
if tool_calls := assistant_message.get("tool_calls"):
# THÊM DÒNG NÀY ĐỂ IN RA TOOL CALL TRƯỚC KHI THỰC THI
print("\n=== DEBUG TOOL CALL ===")
print(json.dumps(tool_calls, indent=2, ensure_ascii=False))
print("======================\n")
for tool_call in tool_calls:
if fn_call := tool_call.get("function"):
sql_result = execute_sql(fn_call["arguments"]["sql_query"])
messages.append({
"role": "tool",
"name": fn_call["name"],
"content": sql_result
})
final_output = generate_response(messages)
print(final_output)
messages.append({"role": "assistant", "content": final_output})
else:
print((assistant_message["content"]))
messages.append(assistant_message)
if __name__ == "__main__":
main()
- Downloads last month
- 38
Inference Providers
NEW
This model isn't deployed by any Inference Provider.
🙋
Ask for provider support