| import os |
| import json |
| import sqlite3 |
| import random |
| import time |
| import requests |
| from datetime import datetime |
| from flask import Flask, render_template, request, jsonify, send_from_directory |
| from werkzeug.utils import secure_filename |
|
|
| app = Flask(__name__) |
| app.config['SECRET_KEY'] = 'forest-sentry-secret-key-2026' |
| app.config['MAX_CONTENT_LENGTH'] = 100 * 1024 * 1024 |
| app.config['UPLOAD_FOLDER'] = os.path.join(app.instance_path, 'uploads') |
|
|
| |
| os.makedirs(app.instance_path, exist_ok=True) |
| os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) |
|
|
| DB_PATH = os.path.join(app.instance_path, "forest_sentry.db") |
| SILICONFLOW_API_KEY = "sk-vimuseiptfbomzegyuvmebjzooncsqbyjtlddrfodzcdskgi" |
|
|
| |
| def get_db_connection(): |
| conn = sqlite3.connect(DB_PATH) |
| conn.row_factory = sqlite3.Row |
| return conn |
|
|
| def init_db(): |
| with get_db_connection() as conn: |
| conn.execute(''' |
| CREATE TABLE IF NOT EXISTS plots ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| name TEXT NOT NULL, |
| location TEXT, |
| type TEXT, |
| carbon_stock REAL, |
| fire_risk INTEGER, |
| health_score INTEGER, |
| last_scan TEXT, |
| status TEXT |
| ) |
| ''') |
| conn.execute(''' |
| CREATE TABLE IF NOT EXISTS logs ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| timestamp TEXT, |
| action TEXT, |
| details TEXT |
| ) |
| ''') |
| |
| |
| cur = conn.execute('SELECT count(*) FROM plots') |
| if cur.fetchone()[0] == 0: |
| seed_data(conn) |
| print("数据库初始化完成 (Database initialized).") |
|
|
| def seed_data(conn): |
| plots = [ |
| ("阿尔法林区 (Alpha)", "45.12, 122.34", "针叶林", 1250.5, 15, 92, datetime.now().strftime("%Y-%m-%d %H:%M"), "正常"), |
| ("贝塔山脊 (Beta)", "45.15, 122.38", "混合林", 890.2, 45, 78, datetime.now().strftime("%Y-%m-%d %H:%M"), "警告"), |
| ("伽马谷地 (Gamma)", "45.10, 122.30", "阔叶林", 1500.0, 5, 98, datetime.now().strftime("%Y-%m-%d %H:%M"), "正常"), |
| ("德尔塔荒地 (Delta)", "45.18, 122.40", "灌木丛", 300.5, 85, 45, datetime.now().strftime("%Y-%m-%d %H:%M"), "危急"), |
| ] |
| conn.executemany('INSERT INTO plots (name, location, type, carbon_stock, fire_risk, health_score, last_scan, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?)', plots) |
| conn.commit() |
|
|
| |
| def call_ai_reasoning(context_data, prompt_type="analyze"): |
| if not SILICONFLOW_API_KEY: |
| return {"error": "No API Key"} |
| |
| headers = { |
| "Authorization": f"Bearer {SILICONFLOW_API_KEY}", |
| "Content-Type": "application/json" |
| } |
| |
| system_prompt = """你是一个“森林哨兵”AI,是林业、生态和野火预防方面的专家。 |
| 你分析传感器数据并提供可操作的见解。 |
| 输出必须仅为有效的 JSON,JSON 块之外不得有 markdown 格式。 |
| 请使用中文回复。""" |
| |
| user_content = "" |
| if prompt_type == "analyze": |
| user_content = f"""分析此林地数据: {json.dumps(context_data, ensure_ascii=False)}。 |
| 评估火灾风险 (0-100),碳汇潜力,并建议 3 个具体行动。 |
| 返回 JSON: {{ "risk_analysis": "string", "carbon_insight": "string", "actions": ["string", "string", "string"], "alert_level": "正常/警告/危急" }}""" |
| elif prompt_type == "plan": |
| user_content = f"""为以下情况创建无人机任务计划: {json.dumps(context_data, ensure_ascii=False)}。 |
| 返回 JSON: {{ "mission_name": "string", "waypoints": ["lat,lon", ...], "estimated_time": "string", "priority": "string" }}""" |
|
|
| payload = { |
| "model": "Qwen/Qwen2.5-7B-Instruct", |
| "messages": [ |
| {"role": "system", "content": system_prompt}, |
| {"role": "user", "content": user_content} |
| ], |
| "response_format": {"type": "json_object"} |
| } |
| |
| try: |
| |
| response = requests.post("https://api.siliconflow.cn/v1/chat/completions", headers=headers, json=payload, timeout=10) |
| if response.status_code == 200: |
| res_json = response.json() |
| content = res_json['choices'][0]['message']['content'] |
| |
| if "```json" in content: |
| content = content.split("```json")[1].split("```")[0] |
| elif "```" in content: |
| content = content.split("```")[1].split("```")[0] |
| return json.loads(content) |
| else: |
| print(f"AI Error: {response.text}") |
| raise Exception("API Error") |
| except Exception as e: |
| print(f"Fallback AI due to: {e}") |
| |
| if prompt_type == "analyze": |
| return { |
| "risk_analysis": "AI 服务连接超时。基于启发式规则:检测到高温和低湿度,建议加强巡逻。", |
| "carbon_insight": "预估生长稳定,碳汇潜力中等。", |
| "actions": ["人工巡逻", "检查传感器状态", "更新卫星图像"], |
| "alert_level": "警告" |
| } |
| return {"mission_name": "紧急扫描任务", "waypoints": ["45.12, 122.35", "45.13, 122.36"], "estimated_time": "15分钟", "priority": "高"} |
|
|
| |
| @app.route('/') |
| def index(): |
| return render_template('index.html') |
|
|
| @app.route('/api/plots', methods=['GET']) |
| def get_plots(): |
| with get_db_connection() as conn: |
| plots = conn.execute('SELECT * FROM plots').fetchall() |
| return jsonify([dict(row) for row in plots]) |
|
|
| @app.route('/api/analyze', methods=['POST']) |
| def analyze_plot(): |
| data = request.json |
| if not data: |
| return jsonify({"error": "No data"}), 400 |
| |
| |
| sensor_data = { |
| "temp": random.uniform(15, 35), |
| "humidity": random.uniform(20, 80), |
| "wind_speed": random.uniform(0, 15), |
| "soil_moisture": random.uniform(10, 60), |
| "plot_info": data |
| } |
| |
| |
| analysis = call_ai_reasoning(sensor_data, "analyze") |
| |
| |
| with get_db_connection() as conn: |
| if analysis.get('alert_level') in ['警告', '危急', 'High', 'Critical']: |
| |
| level = analysis['alert_level'] |
| if level == 'High': level = '警告' |
| if level == 'Critical': level = '危急' |
| |
| conn.execute('UPDATE plots SET status = ?, fire_risk = ? WHERE id = ?', |
| (level, random.randint(70, 95), data['id'])) |
| conn.commit() |
| |
| return jsonify({"sensors": sensor_data, "analysis": analysis}) |
|
|
| @app.route('/api/action', methods=['POST']) |
| def take_action(): |
| data = request.json |
| action_type = data.get('type') |
| plot_id = data.get('plot_id') |
| |
| |
| result = {"status": "success", "message": "行动已完成"} |
| |
| if action_type == 'drone_scan': |
| |
| plan = call_ai_reasoning({"plot_id": plot_id, "action": "drone_scan"}, "plan") |
| result["data"] = plan |
| result["message"] = f"无人机已派遣: {plan.get('mission_name', '扫描任务')}" |
| |
| |
| with get_db_connection() as conn: |
| conn.execute('UPDATE plots SET last_scan = ? WHERE id = ?', |
| (datetime.now().strftime("%Y-%m-%d %H:%M"), plot_id)) |
| conn.commit() |
| |
| elif action_type == 'alert_rangers': |
| result["message"] = "已通过短信/无线电通知阿尔法护林队。" |
| |
| return jsonify(result) |
|
|
| @app.route('/api/upload', methods=['POST']) |
| def upload_file(): |
| if 'file' not in request.files: |
| return jsonify({"error": "No file part"}), 400 |
| file = request.files['file'] |
| if file.filename == '': |
| return jsonify({"error": "No selected file"}), 400 |
| |
| if file: |
| filename = secure_filename(file.filename) |
| |
| file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename) |
| file.save(file_path) |
| return jsonify({"status": "success", "message": f"文件 {filename} 上传成功", "path": file_path}) |
|
|
| @app.route('/api/reset', methods=['POST']) |
| def reset_db(): |
| try: |
| if os.path.exists(DB_PATH): |
| os.remove(DB_PATH) |
| init_db() |
| return jsonify({"status": "success", "message": "数据库已重置"}) |
| except Exception as e: |
| return jsonify({"error": str(e)}), 500 |
|
|
| if __name__ == '__main__': |
| init_db() |
| app.run(host='0.0.0.0', port=7860) |
|
|