import os import re import gradio as gr import pandas as pd import requests import json import faiss import nest_asyncio import sys import boto3 from pathlib import Path from bs4 import BeautifulSoup from typing import Union, List import asyncio from llama_index.core import ( StorageContext, ServiceContext, VectorStoreIndex, Settings, load_index_from_storage ) from llama_index.llms.openai import OpenAI from llama_index.core.llms import ChatMessage from llama_index.core.schema import IndexNode from llama_index.core.storage.docstore import SimpleDocumentStore from llama_index.retrievers.bm25 import BM25Retriever from llama_index.embeddings.openai import OpenAIEmbedding # from llama_index.vector_stores.faiss import FaissVectorStore from llama_index.core.retrievers import QueryFusionRetriever from llama_index.core.workflow import Event, Context, Workflow, StartEvent, StopEvent, step from llama_index.core.schema import NodeWithScore from llama_index.core.prompts import PromptTemplate from llama_index.core.response_synthesizers import ResponseMode, get_response_synthesizer from prompts import SYSTEM_PROMPT, LEGAL_POSITION_PROMPT, PRECEDENT_ANALYSIS_TEMPLATE from dotenv import load_dotenv load_dotenv() aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID") aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY") openai_api_key = os.getenv("OPENAI_API_KEY") embed_model = OpenAIEmbedding(model_name="text-embedding-3-small") Settings.embed_model = embed_model Settings.context_window = 20000 Settings.chunk_size = 2048 Settings.similarity_top_k = 20 # Параметри S3 BUCKET_NAME = "legal-position" PREFIX_RETRIEVER = "Save_Index/" # Префікс для всього вмісту, який потрібно завантажити LOCAL_DIR = Path("Save_Index_Local") # Локальна директорія для збереження даних з S3 # Ініціалізація клієнта S3 s3_client = boto3.client( "s3", aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, region_name="eu-north-1" ) # # Ініціалізація клієнта S3 # s3_client = boto3.client( # "s3", # aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), # aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), # region_name="eu-north-1" # ) # Створюємо локальну директорію, якщо вона не існує LOCAL_DIR.mkdir(parents=True, exist_ok=True) # Функція для завантаження файлу з S3 def download_s3_file(bucket_name, s3_key, local_path): s3_client.download_file(bucket_name, s3_key, str(local_path)) print(f"Завантажено: {s3_key} -> {local_path}") # Функція для завантаження всієї папки з S3 у локальну директорію def download_s3_folder(bucket_name, prefix, local_dir): response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix) if 'Contents' in response: for obj in response['Contents']: s3_key = obj['Key'] # Пропускаємо "папку" (кореневий префікс) у S3 if s3_key.endswith('/'): continue # Визначаємо локальний шлях, де буде збережений файл local_file_path = local_dir / Path(s3_key).relative_to(prefix) local_file_path.parent.mkdir(parents=True, exist_ok=True) # створення підкаталогів, якщо потрібно # Завантажуємо файл s3_client.download_file(bucket_name, s3_key, str(local_file_path)) print(f"Завантажено: {s3_key} -> {local_file_path}") # Завантаження всього вмісту папки `Save_Index` з S3 у локальну директорію `Save_Index_Local` # download_s3_folder(BUCKET_NAME, PREFIX_RETRIEVER, LOCAL_DIR) # !!! тимчасово відключено # PERSIST_DIR = "/home/docsa/Legal_Position/Save_index" # Apply nest_asyncio to handle nested async calls nest_asyncio.apply() class RetrieverEvent(Event): nodes: list[NodeWithScore] state_lp_json = gr.State() state_nodes = gr.State() class PrecedentAnalysisWorkflow(Workflow): @step async def analyze(self, ctx: Context, ev: StartEvent) -> StopEvent: query = ev.get("query") # нове рішення question = ev.get("question") # уточнююче питання nodes = ev.get("nodes") # знайдені правові позиції if not all([query, nodes]): return StopEvent(result="Недостатньо даних для аналізу. Необхідні нове рішення та правові позиції.") llm_analyse = OpenAI(model="gpt-4o", temperature=0) # llm_analyse = OpenAI(model="gpt-4o-mini", temperature=0) # Підготовка контексту та збір ID правових позицій context_parts = [] for i, node in enumerate(nodes, 1): # Отримуємо текст з node.node якщо це NodeWithScore node_text = node.node.text if hasattr(node, 'node') else node.text # Отримуємо metadata з node.node якщо це NodeWithScore metadata = node.node.metadata if hasattr(node, 'node') else node.metadata lp_id = metadata.get('lp_id', f'unknown_{i}') source_index = str(i) context_parts.append(f"Source {source_index} (ID: {lp_id}):\n{node_text}") context_str = "\n\n".join(context_parts) response_format = { "type": "json_schema", "json_schema": { "name": "relevant_positions_schema", "schema": { "type": "object", "properties": { "relevant_positions": { "type": "array", "items": { "type": "object", "properties": { "lp_id": {"type": "string"}, "source_index": {"type": "string"}, "description": {"type": "string"} }, "required": ["lp_id", "source_index", "description"] } } }, "required": ["relevant_positions"] } } } # Формування промпту та отримання відповіді prompt = PRECEDENT_ANALYSIS_TEMPLATE.format( query=query, question=question if question else "Загальний аналіз релевантності", context_str=context_str ) messages = [ ChatMessage(role="system", content="Ти - кваліфікований юрист-аналітик."), ChatMessage(role="user", content=prompt) ] response = llm_analyse.chat( messages=messages, response_format=response_format ) try: parsed_response = json.loads(response.message.content) if "relevant_positions" in parsed_response: # Форматуємо результат response_lines = [] for position in parsed_response["relevant_positions"]: position_text = ( f"* [{position['source_index']}]: {position['description']} " ) response_lines.append(position_text) response_text = "\n".join(response_lines) return StopEvent(result=response_text) else: return StopEvent(result="Помилка: відповідь не містить аналізу правових позицій") except json.JSONDecodeError: return StopEvent(result="Помилка обробки відповіді від AI") def parse_doc_ids(doc_ids): if doc_ids is None: return [] if isinstance(doc_ids, list): return [str(id).strip('[]') for id in doc_ids] if isinstance(doc_ids, str): cleaned = doc_ids.strip('[]').replace(' ', '') if cleaned: return [id.strip() for id in cleaned.split(',')] return [] def get_links_html(doc_ids): parsed_ids = parse_doc_ids(doc_ids) if not parsed_ids: return "" links = [f"[Рішення ВС: {doc_id}](https://reyestr.court.gov.ua/Review/{doc_id})" for doc_id in parsed_ids] return ", ".join(links) def parse_lp_ids(lp_ids): if lp_ids is None: return [] if isinstance(lp_ids, (str, int)): cleaned = str(lp_ids).strip('[]').replace(' ', '') if cleaned: return [cleaned] return [] def get_links_html_lp(lp_ids): parsed_ids = parse_lp_ids(lp_ids) if not parsed_ids: return "" links = [f"[ПП ВС: {lp_id}](https://lpd.court.gov.ua/home/search/{lp_id})" for lp_id in parsed_ids] return ", ".join(links) def initialize_components(): try: # Використовуємо папку `Save_Index_Local`, куди завантажено файли з S3 persist_path = Path("Save_Index_Local") # Перевірка існування локальної директорії if not persist_path.exists(): raise FileNotFoundError(f"Directory not found: {persist_path}") # Перевірка наявності необхідних файлів і папок required_files = ['docstore_es_filter.json', 'bm25_retriever_es'] missing_files = [f for f in required_files if not (persist_path / f).exists()] if missing_files: raise FileNotFoundError(f"Missing required files: {', '.join(missing_files)}") # Ініціалізація компонентів global retriever_bm25 # Ініціалізація `SimpleDocumentStore` з `docstore_es_filter.json` docstore = SimpleDocumentStore.from_persist_path(str(persist_path / "docstore_es_filter.json")) # Ініціалізація `BM25Retriever` з папки `bm25_retriever_es` bm25_retriever = BM25Retriever.from_persist_dir(str(persist_path / "bm25_retriever_es")) # Ініціалізація `QueryFusionRetriever` з налаштуваннями retriever_bm25 = QueryFusionRetriever( [ bm25_retriever, ], similarity_top_k=Settings.similarity_top_k, num_queries=1, use_async=True, ) return True except Exception as e: print(f"Error initializing components: {str(e)}", file=sys.stderr) return False def extract_court_decision_text(url): response = requests.get(url) soup = BeautifulSoup(response.content, 'html.parser') unwanted_texts = [ "Доступ до Реєстру здійснюється в тестовому (обмеженому) режимі.", "З метою упередження перешкоджанню стабільній роботі Реєстру" ] decision_text = "" for paragraph in soup.find_all('p'): text = paragraph.get_text(separator="\n").strip() if not any(unwanted_text in text for unwanted_text in unwanted_texts): decision_text += text + "\n" return decision_text.strip() # Constants for JSON schema LEGAL_POSITION_SCHEMA = { "type": "json_schema", "json_schema": { "name": "lp_schema", "schema": { "type": "object", "properties": { "title": {"type": "string", "description": "Title of the legal position"}, "text": {"type": "string", "description": "Text of the legal position"}, "proceeding": {"type": "string", "description": "Type of court proceedings"}, "category": {"type": "string", "description": "Category of the legal position"}, }, "required": ["title", "text", "proceeding", "category"], "additionalProperties": False }, "strict": True } } def generate_legal_position(court_decision_text, user_question): """ Генерує правову позицію на основі тексту судового рішення. Args: court_decision_text (str): Текст судового рішення для аналізу user_question (str): Питання користувача (наразі не використовується) Returns: dict: Словник з правовою позицією або повідомленням про помилку """ try: # Ініціалізація моделі llm_lp = OpenAI( model="ft:gpt-4o-mini-2024-07-18:personal:legal-position-400:AT3wvKsU", temperature=0 ) # Формування повідомлень для чату messages = [ ChatMessage(role="system", content=SYSTEM_PROMPT), ChatMessage( role="user", content=LEGAL_POSITION_PROMPT.format(court_decision_text=court_decision_text) ), ] # Отримання відповіді від моделі response = llm_lp.chat(messages, response_format=LEGAL_POSITION_SCHEMA) # Обробка відповіді parsed_response = json.loads(response.message.content) # Перевірка наявності обов'язкових полів if all(field in parsed_response for field in ["title", "text", "proceeding", "category"]): return parsed_response return { "title": "Error: Missing required fields in response", "text": response.message.content, "proceeding": "Unknown", "category": "Error" } except json.JSONDecodeError: return { "title": "Error parsing response", "text": response.message.content, "proceeding": "Unknown", "category": "Error" } except Exception as e: return { "title": "Unexpected error", "text": str(e), "proceeding": "Unknown", "category": "Error" } def create_gradio_interface(): with gr.Blocks() as app: gr.Markdown("# Аналізатор релевантних Правових Позицій Верховного Суду для нового судового рішення") with gr.Row(): url_input = gr.Textbox(label="URL судового рішення:") question_input = gr.Textbox(label="Уточнююче питання для аналізу:") with gr.Row(): generate_position_button = gr.Button("Генерувати короткий зміст позиції суду") search_with_ai_button = gr.Button("Пошук із ШІ", interactive=False) # search_without_ai_button = gr.Button("Пошук без ШІ") analyze_button = gr.Button("Аналіз", interactive=False) position_output = gr.Markdown(label="Короткий зміст позиції суду за введеним рішенням") search_output = gr.Markdown(label="Результат пошуку") analysis_output = gr.Markdown(label="Результат аналізу") # Два об'єкти стану для зберігання legal_position_json та nodes state_lp_json = gr.State() state_nodes = gr.State() async def generate_position_action(url): try: court_decision_text = extract_court_decision_text(url) legal_position_json = generate_legal_position(court_decision_text, "") position_output_content = f"**Короткий зміст позиції суду за введеним рішенням:**\n *{legal_position_json['title']}*: \n{legal_position_json['text']} **Категорія:** \n{legal_position_json['category']} ({legal_position_json['proceeding']})\n\n" return position_output_content, legal_position_json except Exception as e: return f"Error during position generation: {str(e)}", None async def search_with_ai_action(legal_position_json): try: query_text = legal_position_json["title"] + ': ' + legal_position_json["text"] + ': ' + legal_position_json["proceeding"] + ': ' + legal_position_json["category"] nodes = await retriever_bm25.aretrieve(query_text) sources_output = "\n **Результати пошуку (наявні правові позиції ВСУ):** \n\n" for index, node in enumerate(nodes, start=1): source_title = node.node.metadata.get('title') doc_ids = node.node.metadata.get('doc_id') lp_ids = node.node.metadata.get('lp_id') links = get_links_html(doc_ids) links_lp = get_links_html_lp(lp_ids) sources_output += f"\n[{index}] *{source_title}* {links_lp} 👉 Score: {node.score} {links}\n" return sources_output, nodes except Exception as e: return f"Error during search: {str(e)}", None async def search_without_ai_action(url): try: court_decision_text = extract_court_decision_text(url) nodes = await retriever_bm25.aretrieve(court_decision_text) search_output_content = f"**Результати пошуку (наявні правові позиції ВСУ):** \n\n" for index, node in enumerate(nodes, start=1): source_title = node.node.metadata.get('title', 'Невідомий заголовок') doc_ids = node.node.metadata.get('doc_id') links = get_links_html(doc_ids) search_output_content += f"\n[{index}] *{source_title}* 👉 Score: {node.score} {links}\n" return search_output_content, nodes except Exception as e: return f"Error during search: {str(e)}", None async def analyze_action(legal_position_json, question, nodes): try: workflow = PrecedentAnalysisWorkflow(timeout=600) # Формуємо єдиний текст запиту з legal_position_json query = ( f"{legal_position_json['title']}: " f"{legal_position_json['text']}: " f"{legal_position_json['proceeding']}: " f"{legal_position_json['category']}" ) # Запускаємо workflow і отримуємо текст аналізу response_text = await workflow.run( query=query, question=question, nodes=nodes ) # Формуємо вивід output = f"**Аналіз ШІ:**\n{response_text}\n\n" output += "**Наявні в базі Правові Позицій Верховного Суду:**\n\n" # Розбиваємо текст відповіді на рядки analysis_lines = response_text.split('\n') # Проходимо по кожному рядку аналізу for line in analysis_lines: if line.startswith('* ['): # З кожного рядка отримуємо індекс index = line[3:line.index(']')] # Витягуємо індекс з "* [X]" # Знаходимо відповідний node за індексом node = nodes[int(index) - 1] source_node = node.node source_title = source_node.metadata.get('title', 'Невідомий заголовок') source_text_lp = node.text doc_ids = source_node.metadata.get('doc_id') lp_id = source_node.metadata.get('lp_id') links = get_links_html(doc_ids) links_lp = get_links_html_lp(lp_id) output += f"[{index}]: *{source_title}* | {source_text_lp} | {links_lp} | {links}\n\n" return output except Exception as e: return f"Error during analysis: {str(e)}" # Підключаємо функції до кнопок з оновленими входами та виходами generate_position_button.click( fn=generate_position_action, inputs=url_input, outputs=[position_output, state_lp_json] ) generate_position_button.click( fn=lambda: gr.update(interactive=True), inputs=None, outputs=search_with_ai_button ) search_with_ai_button.click( fn=search_with_ai_action, inputs=state_lp_json, outputs=[search_output, state_nodes] ) search_with_ai_button.click( fn=lambda: gr.update(interactive=True), inputs=None, outputs=analyze_button ) # search_without_ai_button.click( # fn=search_without_ai_action, # inputs=url_input, # outputs=[search_output, state_nodes] # ) # search_without_ai_button.click( # fn=lambda: gr.update(interactive=True), # inputs=None, # outputs=analyze_button # ) analyze_button.click( fn=analyze_action, inputs=[state_lp_json, question_input, state_nodes], outputs=analysis_output ) return app if __name__ == "__main__": if initialize_components(): print("Components initialized successfully!") app = create_gradio_interface() app.launch(share=True) else: print("Failed to initialize components. Please check the paths and try again.", file=sys.stderr) sys.exit(1)