File size: 3,616 Bytes
fd45282 75ecc06 fd45282 abc9568 f5f891b c0ba1b5 75ecc06 14c753e c80b246 14c753e 1c0fdf3 abc9568 1c0fdf3 abc9568 c80b246 1c0fdf3 c80b246 1c0fdf3 c80b246 1c0fdf3 c80b246 1c0fdf3 c80b246 1c0fdf3 abc9568 75ecc06 c80b246 75ecc06 c80b246 75ecc06 5141e4c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
import gradio as gr
from transformers import AutoTokenizer, AutoModelForCausalLM
from peft import PeftModel
import torch
import requests
import json
model_id = "deepseek-ai/deepseek-coder-1.3b-base"
lora_id = "Seunggg/lora-plant"
# 加载 tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
# 加载基础模型,启用自动设备分配并脱载
base = AutoModelForCausalLM.from_pretrained(
model_id,
device_map="auto",
offload_folder="offload/",
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
trust_remote_code=True
)
# 加载 LoRA adapter,同样启用脱载
model = PeftModel.from_pretrained(
base,
lora_id,
offload_folder="offload/",
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
)
model.eval()
# 生成 pipeline
from transformers import pipeline
pipe = pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
device_map="auto",
max_new_tokens=256
)
from ask_api import ask_with_sensor # 引入调用函数
def get_sensor_data():
try:
sensor_response = requests.get("https://arduino-realtime.onrender.com/api/data", timeout=5)
sensor_data = sensor_response.json().get("sensorData", None)
sensor_display = json.dumps(sensor_data, ensure_ascii=False, indent=2) if sensor_data else "暂无传感器数据"
except Exception as e:
sensor_display = "⚠️ 获取失败:" + str(e)
return sensor_display
def show_sensor_data(_=None):
return get_sensor_data()
def respond(user_input):
sensor_display = get_sensor_data()
if not user_input.strip():
return sensor_display, "请输入植物相关的问题 😊"
prompt = f"用户提问:{user_input}\n"
try:
sensor_response = requests.get("https://arduino-realtime.onrender.com/api/data", timeout=5)
sensor_data = sensor_response.json().get("sensorData", None)
if sensor_data:
prompt += f"当前传感器数据:{json.dumps(sensor_data, ensure_ascii=False)}\n"
prompt += "请用更人性化的语言生成建议,并推荐相关植物文献或资料。\n回答:"
result = pipe(prompt)
full_output = result[0]["generated_text"]
answer = full_output.replace(prompt, "").strip()
except Exception as e:
answer = f"生成建议时出错:{str(e)}"
return sensor_display, answer
# 构建提示词
prompt = f"用户提问:{user_input}\n"
if sensor_data:
prompt += f"当前传感器数据:{json.dumps(sensor_data, ensure_ascii=False)}\n"
prompt += "请用更人性化的语言生成建议,并推荐相关植物文献或资料。\n回答:"
# 模型生成
try:
result = pipe(prompt)
full_output = result[0]["generated_text"]
# 删除重复的提问部分,只保留回答段
answer = full_output.replace(prompt, "").strip()
except Exception as e:
answer = f"生成建议时出错:{str(e)}"
# Gradio 界面
with gr.Blocks() as demo:
with gr.Row():
sensor_box = gr.Textbox(label="🧪 当前传感器数据", lines=6, interactive=False)
question_box = gr.Textbox(label="🌿 植物问题", lines=4)
answer_box = gr.Textbox(label="🤖 回答建议", lines=8, interactive=False)
# 实时轮询,每 2 秒自动更新
sensor_box.change(fn=show_sensor_data, inputs=None, outputs=sensor_box).every(2)
# 用户提问后,更新传感器 + 回答
question_box.submit(fn=respond, inputs=question_box, outputs=[sensor_box, answer_box])
demo.launch()
|