dorogan
Update: changes in search API (teasers and docs texts were separated)
ac7cbfc
raw
history blame contribute delete
No virus
3.15 kB
import json
import os
from flask import Flask, jsonify, request
from semantic_search import SemanticSearch
from datetime import datetime
search = SemanticSearch()
app = Flask(__name__)
app.config['JSON_AS_ASCII'] = False
# Set the path for log files
LOGS_BASE_PATH = os.getenv("LOGS_BASE_PATH", "logs")
# Create logs directory if it doesn't exist
if not os.path.exists(LOGS_BASE_PATH):
os.makedirs(LOGS_BASE_PATH)
# Check if logs are enabled
ENABLE_LOGS = os.getenv("ENABLE_LOGS", "0") == "1"
def log_query_result(query, top, request_id, result):
if not ENABLE_LOGS:
return
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
log_file_path = os.path.join(LOGS_BASE_PATH, f"{timestamp}.json")
log_data = {
"timestamp": timestamp,
"query": query,
"top": top,
"request_id": request_id,
"result": result
}
with open(log_file_path, 'w') as log_file:
json.dump(log_data, log_file, indent=2)
@app.route('/health', methods=['GET'])
def health():
return jsonify({"status": "ok"})
@app.route('/search', methods=['POST'])
def search_route():
data = request.get_json()
query = data.get('query', '')
top = data.get('top', 10)
use_llm_for_teasers = data.get('use_llm_for_teasers', False)
request_id = data.get('request_id', '')
titles, docs, teasers, scores = search.search(query, top, use_llm_for_teasers)
result = [{'title': str(item1), 'text': str(item2), 'teaser': (item3), 'relevance': str(item4)}
for item1, item2, item3, item4 in zip(titles, docs, teasers, scores)]
# Log the query and result if ENABLE_LOGS is True
log_query_result(query, top, request_id, result)
return jsonify(result)
@app.route('/read_logs', methods=['GET'])
def read_logs():
logs = []
for log_file in os.listdir(LOGS_BASE_PATH):
if log_file.endswith(".json"):
with open(os.path.join(LOGS_BASE_PATH, log_file), 'r') as file:
log_data = json.load(file)
logs.append(log_data)
return jsonify(logs)
@app.route('/analyze_logs', methods=['GET'])
def analyze_logs():
logs_by_query_top = {}
for log_file in os.listdir(LOGS_BASE_PATH):
if log_file.endswith(".json"):
with open(os.path.join(LOGS_BASE_PATH, log_file), 'r') as file:
log_data = json.load(file)
query = log_data.get("query", "")
top = log_data.get("top", "")
request_id = log_data.get("request_id", "")
# Group logs by query and top
key = f"{query}_{top}"
if key not in logs_by_query_top:
logs_by_query_top[key] = []
logs_by_query_top[key].append(log_data)
# Analyze logs and filter out logs with different results for the same query and top
invalid_logs = []
for key, logs in logs_by_query_top.items():
if len(set(json.dumps(log['result']) for log in logs)) > 1:
invalid_logs.extend(logs)
return jsonify(invalid_logs)
if __name__ == '__main__':
app.run(debug=False, host='0.0.0.0')