Spaces:
Runtime error
Runtime error
import asyncio | |
import os | |
import re | |
import sys | |
from pathlib import Path | |
import boto3 | |
import gradio as gr | |
import nest_asyncio | |
import requests | |
from bs4 import BeautifulSoup | |
from dotenv import load_dotenv | |
from llama_index.core import Settings | |
from llama_index.core.retrievers import QueryFusionRetriever | |
from llama_index.retrievers.bm25 import BM25Retriever | |
load_dotenv() | |
Settings.similarity_top_k = 20 # type: ignore | |
# Параметри S3 | |
BUCKET_NAME = "legal-position" | |
PREFIX_RETRIEVER = "Save_Index/" # Префікс для всього вмісту, який потрібно завантажити | |
LOCAL_DIR = Path("Save_Index_Local") # Локальна директорія для збереження даних з S3 | |
# Параметри індексу | |
PERSIST_PATH = Path("Save_Index_Local") | |
INDEX_NAME = "bm25_retriever" | |
# INDEX_NAME = "bm25_retriever_meta" | |
# Створюємо локальну директорію, якщо вона не існує | |
LOCAL_DIR.mkdir(parents=True, exist_ok=True) | |
# Ініціалізація клієнта S3 | |
s3_client = boto3.client( | |
"s3", | |
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), | |
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), | |
region_name="eu-north-1" | |
) | |
# Функція для завантаження файлу з S3 | |
def download_s3_file(bucket_name, s3_key, local_path): | |
s3_client.download_file(bucket_name, s3_key, str(local_path)) | |
print(f"Завантажено: {s3_key} -> {local_path}") | |
# Функція для завантаження всієї папки з S3 у локальну директорію | |
def download_s3_folder(bucket_name, prefix, local_dir): | |
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix) | |
if 'Contents' in response: | |
for obj in response['Contents']: | |
s3_key = obj['Key'] | |
# Пропускаємо "папку" (кореневий префікс) у S3 | |
if s3_key.endswith('/'): | |
continue | |
# Визначаємо локальний шлях, де буде збережений файл | |
local_file_path = local_dir / Path(s3_key).relative_to(prefix) | |
local_file_path.parent.mkdir(parents=True, exist_ok=True) # створення підкаталогів, якщо потрібно | |
# Завантажуємо файл | |
s3_client.download_file(bucket_name, s3_key, str(local_file_path)) | |
print(f"Завантажено: {s3_key} -> {local_file_path}") | |
# Завантаження всього вмісту папки `Save_Index` з S3 у локальну директорію `Save_Index_Local` | |
download_s3_folder(BUCKET_NAME, PREFIX_RETRIEVER, LOCAL_DIR) | |
nest_asyncio.apply() | |
state_nodes = gr.State() | |
def parse_doc_ids(doc_ids): | |
if doc_ids is None: | |
return [] | |
if isinstance(doc_ids, list): | |
return [str(id).strip("[]") for id in doc_ids] | |
if isinstance(doc_ids, str): | |
cleaned = doc_ids.strip("[]").replace(" ", "") | |
if cleaned: | |
return [id.strip() for id in cleaned.split(",")] | |
return [] | |
def get_links_html(doc_ids): | |
parsed_ids = parse_doc_ids(doc_ids) | |
if not parsed_ids: | |
return "" | |
links = [ | |
f"[Рішення ВС: {doc_id}](https://reyestr.court.gov.ua/Review/{doc_id})" | |
for doc_id in parsed_ids | |
] | |
return ", ".join(links) | |
def parse_lp_ids(lp_ids): | |
if lp_ids is None: | |
return [] | |
if isinstance(lp_ids, (str, int)): | |
cleaned = str(lp_ids).strip("[]").replace(" ", "") | |
if cleaned: | |
return [cleaned] | |
return [] | |
def get_links_html_lp(lp_ids): | |
parsed_ids = parse_lp_ids(lp_ids) | |
if not parsed_ids: | |
return "" | |
links = [ | |
f"[Правова позиція ВС: {lp_id}](https://lpd.court.gov.ua/home/search/{lp_id})" | |
for lp_id in parsed_ids | |
] | |
return ", ".join(links) | |
def initialize_components(): | |
try: | |
if not PERSIST_PATH.exists(): | |
raise FileNotFoundError(f"Directory not found: {PERSIST_PATH}") | |
required_files = [INDEX_NAME] | |
missing_files = [f for f in required_files if not (PERSIST_PATH / f).exists()] | |
if missing_files: | |
raise FileNotFoundError(f"Missing required files: {', '.join(missing_files)}") | |
global retriever_bm25 | |
bm25_retriever = BM25Retriever.from_persist_dir(str(PERSIST_PATH / INDEX_NAME)) | |
retriever_bm25 = QueryFusionRetriever( | |
[ | |
bm25_retriever, | |
], | |
similarity_top_k=Settings.similarity_top_k, # type: ignore | |
num_queries=1, | |
use_async=True, | |
) | |
return True | |
except Exception as e: | |
print(f"Error initializing components: {str(e)}", file=sys.stderr) | |
return False | |
def extract_court_decision_text(url): | |
response = requests.get(url) | |
soup = BeautifulSoup(response.content, "html.parser") | |
unwanted_texts = [ | |
"Доступ до Реєстру здійснюється в тестовому (обмеженому) режимі.", | |
"З метою упередження перешкоджанню стабільній роботі Реєстру", | |
] | |
decision_text = "" | |
for paragraph in soup.find_all("p"): | |
text = paragraph.get_text(separator="\n").strip() | |
if not any(unwanted_text in text for unwanted_text in unwanted_texts): | |
decision_text += text + "\n" | |
return decision_text.strip() | |
async def search_without_ai_action(url): | |
try: | |
court_decision_text = extract_court_decision_text(url) | |
nodes = await retriever_bm25.aretrieve(court_decision_text) | |
search_output_content = ( | |
f"**Результати пошуку (наявні правові позиції ВС) за посиланням:** \n\n" | |
) | |
for index, node in enumerate(nodes, start=1): | |
source_title = node.node.metadata.get("title", "Невідомий заголовок") | |
doc_ids = node.node.metadata.get("doc_id") | |
lp_ids = node.node.metadata.get("lp_id") | |
links = get_links_html(doc_ids) | |
links_lp = get_links_html_lp(lp_ids) | |
search_output_content += f"\n[{index}] *{source_title}* ⚖️ {links_lp} | {links} 👉 Score: {node.score} \n" | |
return search_output_content, nodes | |
except Exception as e: | |
return f"Error during search: {str(e)}", None | |
async def search_without_ai_action_text(question_input): | |
try: | |
nodes = await retriever_bm25.aretrieve(question_input) | |
search_output_content = f"**Результати пошуку (наявні правові позиції ВС) за текстовим запитом:** \n\n" | |
for index, node in enumerate(nodes, start=1): | |
source_title = node.node.metadata.get("title", "Невідомий заголовок") | |
doc_ids = node.node.metadata.get("doc_id") | |
lp_ids = node.node.metadata.get("lp_id") | |
links = get_links_html(doc_ids) | |
links_lp = get_links_html_lp(lp_ids) | |
search_output_content += f"\n[{index}] *{source_title}* ⚖️ {links_lp} | {links} 👉 Score: {node.score} \n" | |
return search_output_content, nodes | |
except Exception as e: | |
return f"Error during search: {str(e)}", None | |
def create_gradio_interface(): | |
with gr.Blocks() as app: | |
gr.Markdown("# Знаходьте правові позиції Верховного Суду") | |
input_field = gr.Textbox( | |
label="Введіть текст або посилання на судове рішення", lines=1 | |
) | |
search_button = gr.Button("Пошук", interactive=False) | |
warning_message = gr.Markdown(visible=False) | |
search_output = gr.Markdown(label="Результат пошуку") | |
state_nodes = gr.State() | |
async def search_action(input_text): | |
if re.match( | |
r"^https://reyestr\.court\.gov\.ua/Review/\d+$", input_text.strip() | |
): | |
return await search_without_ai_action(input_text) | |
else: | |
return await search_without_ai_action_text(input_text) | |
def update_button_state(text): | |
text = text.strip() | |
if not text: | |
return gr.update(value="Пошук", interactive=False), gr.update( | |
visible=False | |
) | |
elif re.match(r"^https://reyestr\.court\.gov\.ua/Review/\d+$", text): | |
return gr.update(value="Пошук за URL", interactive=True), gr.update( | |
visible=False | |
) | |
elif text.startswith("http"): | |
return gr.update(value="Пошук", interactive=False), gr.update( | |
value="Неправильний формат URL. Використовуйте посилання формату https://reyestr.court.gov.ua/Review/{doc_id}", | |
visible=True, | |
) | |
else: | |
return gr.update(value="Пошук за текстом", interactive=True), gr.update( | |
visible=False | |
) | |
search_button.click( | |
fn=search_action, inputs=input_field, outputs=[search_output, state_nodes] | |
) | |
input_field.change( | |
fn=update_button_state, | |
inputs=input_field, | |
outputs=[search_button, warning_message], | |
) | |
return app | |
if __name__ == "__main__": | |
if initialize_components(): | |
print("Components initialized successfully!") | |
app = create_gradio_interface() | |
app.launch(share=True) | |
else: | |
print( | |
"Failed to initialize components. Please check the paths and try again.", | |
file=sys.stderr, | |
) | |
sys.exit(1) | |