import json import os from flask import Flask, jsonify, request from semantic_search import SemanticSearch from datetime import datetime search = SemanticSearch() app = Flask(__name__) app.config['JSON_AS_ASCII'] = False # Set the path for log files LOGS_BASE_PATH = os.getenv("LOGS_BASE_PATH", "logs") # Create logs directory if it doesn't exist if not os.path.exists(LOGS_BASE_PATH): os.makedirs(LOGS_BASE_PATH) # Check if logs are enabled ENABLE_LOGS = os.getenv("ENABLE_LOGS", "0") == "1" def log_query_result(query, top, request_id, result): if not ENABLE_LOGS: return timestamp = datetime.now().strftime("%Y%m%d%H%M%S") log_file_path = os.path.join(LOGS_BASE_PATH, f"{timestamp}.json") log_data = { "timestamp": timestamp, "query": query, "top": top, "request_id": request_id, "result": result } with open(log_file_path, 'w') as log_file: json.dump(log_data, log_file, indent=2) @app.route('/health', methods=['GET']) def health(): return jsonify({"status": "ok"}) @app.route('/search', methods=['POST']) def search_route(): data = request.get_json() query = data.get('query', '') top = data.get('top', 10) use_llm_for_teasers = data.get('use_llm_for_teasers', False) request_id = data.get('request_id', '') titles, docs, teasers, scores = search.search(query, top, use_llm_for_teasers) result = [{'title': str(item1), 'text': str(item2), 'teaser': (item3), 'relevance': str(item4)} for item1, item2, item3, item4 in zip(titles, docs, teasers, scores)] # Log the query and result if ENABLE_LOGS is True log_query_result(query, top, request_id, result) return jsonify(result) @app.route('/read_logs', methods=['GET']) def read_logs(): logs = [] for log_file in os.listdir(LOGS_BASE_PATH): if log_file.endswith(".json"): with open(os.path.join(LOGS_BASE_PATH, log_file), 'r') as file: log_data = json.load(file) logs.append(log_data) return jsonify(logs) @app.route('/analyze_logs', methods=['GET']) def analyze_logs(): logs_by_query_top = {} for log_file in os.listdir(LOGS_BASE_PATH): if log_file.endswith(".json"): with open(os.path.join(LOGS_BASE_PATH, log_file), 'r') as file: log_data = json.load(file) query = log_data.get("query", "") top = log_data.get("top", "") request_id = log_data.get("request_id", "") # Group logs by query and top key = f"{query}_{top}" if key not in logs_by_query_top: logs_by_query_top[key] = [] logs_by_query_top[key].append(log_data) # Analyze logs and filter out logs with different results for the same query and top invalid_logs = [] for key, logs in logs_by_query_top.items(): if len(set(json.dumps(log['result']) for log in logs)) > 1: invalid_logs.extend(logs) return jsonify(invalid_logs) if __name__ == '__main__': app.run(debug=False, host='0.0.0.0')