DocUA's picture
d
77e02ed
raw
history blame
26.1 kB
import os
import re
import gradio as gr
import pandas as pd
import requests
import json
import faiss
import nest_asyncio
import sys
import boto3
from pathlib import Path
from bs4 import BeautifulSoup
from typing import Union, List
import asyncio
from anthropic import Anthropic
from openai import OpenAI
from llama_index.core import (
StorageContext,
ServiceContext,
VectorStoreIndex,
Settings,
load_index_from_storage
)
from llama_index.llms.openai import OpenAI
from llama_index.core.llms import ChatMessage
from llama_index.core.schema import IndexNode
from llama_index.core.storage.docstore import SimpleDocumentStore
from llama_index.retrievers.bm25 import BM25Retriever
from llama_index.embeddings.openai import OpenAIEmbedding
# from llama_index.vector_stores.faiss import FaissVectorStore
from llama_index.core.retrievers import QueryFusionRetriever
from llama_index.core.workflow import Event, Context, Workflow, StartEvent, StopEvent, step
from llama_index.core.schema import NodeWithScore
from llama_index.core.prompts import PromptTemplate
from llama_index.core.response_synthesizers import ResponseMode, get_response_synthesizer
from prompts import SYSTEM_PROMPT, LEGAL_POSITION_PROMPT, PRECEDENT_ANALYSIS_TEMPLATE
from dotenv import load_dotenv
load_dotenv()
aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID")
aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY")
openai_api_key = os.getenv("OPENAI_API_KEY")
anthropic_api_key=os.getenv("ANTHROPIC_API_KEY")
embed_model = OpenAIEmbedding(model_name="text-embedding-3-small")
Settings.embed_model = embed_model
Settings.context_window = 20000
Settings.chunk_size = 2048
Settings.similarity_top_k = 20
# Параметри S3
BUCKET_NAME = "legal-position"
PREFIX_RETRIEVER = "Save_Index/" # Префікс для всього вмісту, який потрібно завантажити
LOCAL_DIR = Path("Save_Index_Local") # Локальна директорія для збереження даних з S3
# Ініціалізація клієнта S3
s3_client = boto3.client(
"s3",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name="eu-north-1"
)
# # Ініціалізація клієнта S3
# s3_client = boto3.client(
# "s3",
# aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
# aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
# region_name="eu-north-1"
# )
# Створюємо локальну директорію, якщо вона не існує
LOCAL_DIR.mkdir(parents=True, exist_ok=True)
# Функція для завантаження файлу з S3
def download_s3_file(bucket_name, s3_key, local_path):
s3_client.download_file(bucket_name, s3_key, str(local_path))
print(f"Завантажено: {s3_key} -> {local_path}")
# Функція для завантаження всієї папки з S3 у локальну директорію
def download_s3_folder(bucket_name, prefix, local_dir):
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
if 'Contents' in response:
for obj in response['Contents']:
s3_key = obj['Key']
# Пропускаємо "папку" (кореневий префікс) у S3
if s3_key.endswith('/'):
continue
# Визначаємо локальний шлях, де буде збережений файл
local_file_path = local_dir / Path(s3_key).relative_to(prefix)
local_file_path.parent.mkdir(parents=True, exist_ok=True) # створення підкаталогів, якщо потрібно
# Завантажуємо файл
s3_client.download_file(bucket_name, s3_key, str(local_file_path))
print(f"Завантажено: {s3_key} -> {local_file_path}")
# Завантаження всього вмісту папки `Save_Index` з S3 у локальну директорію `Save_Index_Local`
download_s3_folder(BUCKET_NAME, PREFIX_RETRIEVER, LOCAL_DIR) #
# PERSIST_DIR = "/home/docsa/Legal_Position/Save_index"
# Apply nest_asyncio to handle nested async calls
nest_asyncio.apply()
class RetrieverEvent(Event):
nodes: list[NodeWithScore]
state_lp_json = gr.State()
state_nodes = gr.State()
from enum import Enum
class ModelProvider(str, Enum):
OPENAI = "openai"
ANTHROPIC = "anthropic"
class ModelName(str, Enum):
# OpenAI models
GPT4o = "gpt-4o"
GPT4o_MINI = "gpt-4o-mini"
# Anthropic models
CLAUDE3_5_SONNET = "claude-3-5-sonnet-latest"
CLAUDE3_5_HAIKU = "claude-3-5-haiku-latest"
class LLMAnalyzer:
def __init__(self, provider: ModelProvider, model_name: ModelName):
self.provider = provider
self.model_name = model_name
if provider == ModelProvider.OPENAI:
self.client = OpenAI(model=model_name)
elif provider == ModelProvider.ANTHROPIC:
# Додаємо API ключ при ініціалізації
self.client = Anthropic(api_key=anthropic_api_key)
else:
raise ValueError(f"Unsupported provider: {provider}")
async def analyze(self, prompt: str, response_schema: dict) -> str:
if self.provider == ModelProvider.OPENAI:
return await self._analyze_with_openai(prompt, response_schema)
else:
return await self._analyze_with_anthropic(prompt, response_schema)
async def _analyze_with_openai(self, prompt: str, response_schema: dict) -> str:
messages = [
ChatMessage(role="system",
content="Ти - кваліфікований юрист-аналітик, експерт з правових позицій Верховного Суду."),
ChatMessage(role="user", content=prompt)
]
# Правильний формат для response_format
response_format = {
"type": "json_schema",
"json_schema": {
"name": "relevant_positions_schema", # Додаємо обов'язкове поле name
"schema": response_schema
}
}
response = self.client.chat(
messages=messages,
response_format=response_format,
temperature=0
)
return response.message.content
async def _analyze_with_anthropic(self, prompt: str, response_schema: dict) -> str:
response = self.client.messages.create( # Прибрали await
model=self.model_name,
max_tokens=2000,
messages=[
{
"role": "assistant",
"content": "Ти - кваліфікований юрист-аналітик, експерт з правових позицій Верховного Суду."
},
{
"role": "user",
"content": prompt
}
]
)
return response.content[0].text
class PrecedentAnalysisWorkflow(Workflow):
def __init__(self, provider: ModelProvider = ModelProvider.OPENAI,
model_name: ModelName = ModelName.GPT4o_MINI):
super().__init__()
self.analyzer = LLMAnalyzer(provider, model_name)
@step
async def analyze(self, ctx: Context, ev: StartEvent) -> StopEvent:
try:
# Отримуємо параметри з події з дефолтними значеннями
query = ev.get("query", "")
question = ev.get("question", "")
nodes = ev.get("nodes", [])
# Перевірка на пусті значення
if not query:
return StopEvent(result="Помилка: Не надано текст нового рішення (query)")
if not nodes:
return StopEvent(result="Помилка: Не надано правові позиції для аналізу (nodes)")
# Підготовка контексту
context_parts = []
for i, node in enumerate(nodes, 1):
node_text = node.node.text if hasattr(node, 'node') else node.text
metadata = node.node.metadata if hasattr(node, 'node') else node.metadata
lp_id = metadata.get('lp_id', f'unknown_{i}')
context_parts.append(f"Source {i} (ID: {lp_id}):\n{node_text}")
context_str = "\n\n".join(context_parts)
# Схема відповіді
response_schema = {
"type": "object",
"properties": {
"relevant_positions": {
"type": "array",
"items": {
"type": "object",
"properties": {
"lp_id": {"type": "string"},
"source_index": {"type": "string"},
"description": {"type": "string"}
},
"required": ["lp_id", "source_index", "description"]
}
}
},
"required": ["relevant_positions"]
}
# Формування промпту
prompt = PRECEDENT_ANALYSIS_TEMPLATE.format(
query=query,
question=question if question else "Загальний аналіз релевантності",
context_str=context_str
)
# Отримання відповіді від моделі
response_content = await self.analyzer.analyze(prompt, response_schema)
try:
parsed_response = json.loads(response_content)
if "relevant_positions" in parsed_response:
response_lines = []
for position in parsed_response["relevant_positions"]:
position_text = (
f"* [{position['source_index']}] {position['description']} "
)
response_lines.append(position_text)
response_text = "\n".join(response_lines)
return StopEvent(result=response_text)
else:
return StopEvent(result="Не знайдено релевантних правових позицій")
except json.JSONDecodeError:
return StopEvent(result="Помилка обробки відповіді від AI")
except Exception as e:
return StopEvent(result=f"Error during analysis: {str(e)}")
# Формування промпту та отримання відповіді
prompt = PRECEDENT_ANALYSIS_TEMPLATE.format(
query=query,
question=question if question else "Загальний аналіз релевантності",
context_str=context_str
)
messages = [
ChatMessage(role="system", content="Ти - кваліфікований юрист-аналітик."),
ChatMessage(role="user", content=prompt)
]
response = llm_analyse.chat(
messages=messages,
response_format=response_format
)
try:
parsed_response = json.loads(response.message.content)
if "relevant_positions" in parsed_response:
# Форматуємо результат
response_lines = []
for position in parsed_response["relevant_positions"]:
position_text = (
f"* [{position['source_index']}]: {position['description']} "
)
response_lines.append(position_text)
response_text = "\n".join(response_lines)
return StopEvent(result=response_text)
else:
return StopEvent(result="Помилка: відповідь не містить аналізу правових позицій")
except json.JSONDecodeError:
return StopEvent(result="Помилка обробки відповіді від AI")
def parse_doc_ids(doc_ids):
if doc_ids is None:
return []
if isinstance(doc_ids, list):
return [str(id).strip('[]') for id in doc_ids]
if isinstance(doc_ids, str):
cleaned = doc_ids.strip('[]').replace(' ', '')
if cleaned:
return [id.strip() for id in cleaned.split(',')]
return []
def get_links_html(doc_ids):
parsed_ids = parse_doc_ids(doc_ids)
if not parsed_ids:
return ""
links = [f"[Рішення ВС: {doc_id}](https://reyestr.court.gov.ua/Review/{doc_id})"
for doc_id in parsed_ids]
return ", ".join(links)
def parse_lp_ids(lp_ids):
if lp_ids is None:
return []
if isinstance(lp_ids, (str, int)):
cleaned = str(lp_ids).strip('[]').replace(' ', '')
if cleaned:
return [cleaned]
return []
def get_links_html_lp(lp_ids):
parsed_ids = parse_lp_ids(lp_ids)
if not parsed_ids:
return ""
links = [f"[ПП ВС: {lp_id}](https://lpd.court.gov.ua/home/search/{lp_id})" for lp_id in parsed_ids]
return ", ".join(links)
def initialize_components():
try:
# Використовуємо папку `Save_Index_Local`, куди завантажено файли з S3
persist_path = Path("Save_Index_Local")
# Перевірка існування локальної директорії
if not persist_path.exists():
raise FileNotFoundError(f"Directory not found: {persist_path}")
# Перевірка наявності необхідних файлів і папок
required_files = ['docstore_es_filter.json', 'bm25_retriever_es']
missing_files = [f for f in required_files if not (persist_path / f).exists()]
if missing_files:
raise FileNotFoundError(f"Missing required files: {', '.join(missing_files)}")
# Ініціалізація компонентів
global retriever_bm25
# Ініціалізація `SimpleDocumentStore` з `docstore_es_filter.json`
docstore = SimpleDocumentStore.from_persist_path(str(persist_path / "docstore_es_filter.json"))
# Ініціалізація `BM25Retriever` з папки `bm25_retriever_es`
bm25_retriever = BM25Retriever.from_persist_dir(str(persist_path / "bm25_retriever_es"))
# Ініціалізація `QueryFusionRetriever` з налаштуваннями
retriever_bm25 = QueryFusionRetriever(
[
bm25_retriever,
],
similarity_top_k=Settings.similarity_top_k,
num_queries=1,
use_async=True,
)
return True
except Exception as e:
print(f"Error initializing components: {str(e)}", file=sys.stderr)
return False
def extract_court_decision_text(url):
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
unwanted_texts = [
"Доступ до Реєстру здійснюється в тестовому (обмеженому) режимі.",
"З метою упередження перешкоджанню стабільній роботі Реєстру"
]
decision_text = ""
for paragraph in soup.find_all('p'):
text = paragraph.get_text(separator="\n").strip()
if not any(unwanted_text in text for unwanted_text in unwanted_texts):
decision_text += text + "\n"
return decision_text.strip()
# Constants for JSON schema
LEGAL_POSITION_SCHEMA = {
"type": "json_schema",
"json_schema": {
"name": "lp_schema",
"schema": {
"type": "object",
"properties": {
"title": {"type": "string", "description": "Title of the legal position"},
"text": {"type": "string", "description": "Text of the legal position"},
"proceeding": {"type": "string", "description": "Type of court proceedings"},
"category": {"type": "string", "description": "Category of the legal position"},
},
"required": ["title", "text", "proceeding", "category"],
"additionalProperties": False
},
"strict": True
}
}
def generate_legal_position(court_decision_text, comment_input):
"""
Генерує правову позицію на основі тексту судового рішення.
Args:
court_decision_text (str): Текст судового рішення для аналізу
user_question (str): Питання користувача (наразі не використовується)
Returns:
dict: Словник з правовою позицією або повідомленням про помилку
"""
try:
# Ініціалізація моделі
llm_lp = OpenAI(
model="ft:gpt-4o-mini-2024-07-18:personal:legal-position-400:AT3wvKsU",
temperature=0
)
# Формування повідомлень для чату
# Формуємо контент з урахуванням коментаря
content = LEGAL_POSITION_PROMPT.format(
court_decision_text=court_decision_text,
comment=comment_input if comment_input else "Коментар відсутній"
)
# Формування повідомлень для чату
messages = [
ChatMessage(role="system", content=SYSTEM_PROMPT),
ChatMessage(role="user", content=content),
]
# Отримання відповіді від моделі
response = llm_lp.chat(messages, response_format=LEGAL_POSITION_SCHEMA)
# Обробка відповіді
parsed_response = json.loads(response.message.content)
# Перевірка наявності обов'язкових полів
if all(field in parsed_response for field in ["title", "text", "proceeding", "category"]):
return parsed_response
return {
"title": "Error: Missing required fields in response",
"text": response.message.content,
"proceeding": "Unknown",
"category": "Error"
}
except json.JSONDecodeError:
return {
"title": "Error parsing response",
"text": response.message.content,
"proceeding": "Unknown",
"category": "Error"
}
except Exception as e:
return {
"title": "Unexpected error",
"text": str(e),
"proceeding": "Unknown",
"category": "Error"
}
def create_gradio_interface():
async def generate_position_action(url):
try:
court_decision_text = extract_court_decision_text(url)
legal_position_json = generate_legal_position(court_decision_text, comment_input)
position_output_content = f"**Короткий зміст позиції суду за введеним рішенням:**\n *{legal_position_json['title']}*: \n{legal_position_json['text']} **Категорія:** \n{legal_position_json['category']} ({legal_position_json['proceeding']})\n\n"
return position_output_content, legal_position_json
except Exception as e:
return f"Error during position generation: {str(e)}", None
async def search_with_ai_action(legal_position_json):
try:
query_text = legal_position_json["title"] + ': ' + legal_position_json["text"] + ': ' + legal_position_json["proceeding"] + ': ' + legal_position_json["category"]
nodes = await retriever_bm25.aretrieve(query_text)
sources_output = "\n **Результати пошуку (наявні правові позиції ВСУ):** \n\n"
for index, node in enumerate(nodes, start=1):
source_title = node.node.metadata.get('title')
doc_ids = node.node.metadata.get('doc_id')
lp_ids = node.node.metadata.get('lp_id')
links = get_links_html(doc_ids)
links_lp = get_links_html_lp(lp_ids)
sources_output += f"\n[{index}] *{source_title}* {links_lp} 👉 Score: {node.score} {links}\n"
return sources_output, nodes
except Exception as e:
return f"Error during search: {str(e)}", None
async def analyze_action(legal_position_json, question, nodes, provider, model_name):
try:
workflow = PrecedentAnalysisWorkflow(
provider=ModelProvider(provider),
model_name=ModelName(model_name)
)
query = (
f"{legal_position_json['title']}: "
f"{legal_position_json['text']}: "
f"{legal_position_json['proceeding']}: "
f"{legal_position_json['category']}"
)
response_text = await workflow.run(
query=query,
question=question,
nodes=nodes
)
output = f"**Аналіз ШІ (модель: {model_name}):**\n{response_text}\n\n"
output += "**Наявні в базі Правові Позицій Верховного Суду:**\n\n"
analysis_lines = response_text.split('\n')
for line in analysis_lines:
if line.startswith('* ['):
index = line[3:line.index(']')]
node = nodes[int(index) - 1]
source_node = node.node
source_title = source_node.metadata.get('title', 'Невідомий заголовок')
source_text_lp = node.text
doc_ids = source_node.metadata.get('doc_id')
lp_id = source_node.metadata.get('lp_id')
links = get_links_html(doc_ids)
links_lp = get_links_html_lp(lp_id)
output += f"[{index}]: *{source_title}* | {source_text_lp} | {links_lp} | {links}\n\n"
return output
except Exception as e:
return f"Error during analysis: {str(e)}"
def update_model_choices(provider):
if provider == ModelProvider.OPENAI.value:
return gr.Dropdown(choices=[m.value for m in ModelName if m.value.startswith("gpt")])
else:
return gr.Dropdown(choices=[m.value for m in ModelName if m.value.startswith("claude")])
with gr.Blocks() as app:
# Далі ваш код інтерфейсу...
gr.Markdown("# Аналізатор релевантних Правових Позицій Верховного Суду для нового судового рішення")
with gr.Row():
comment_input = gr.Textbox(label="Коментар до формування короткого змісту судового рішення:")
url_input = gr.Textbox(label="URL судового рішення:")
question_input = gr.Textbox(label="Уточнююче питання для аналізу:")
with gr.Row():
provider_dropdown = gr.Dropdown(
choices=[p.value for p in ModelProvider],
value=ModelProvider.OPENAI.value,
label="Провайдер AI",
)
model_dropdown = gr.Dropdown(
choices=[m.value for m in ModelName if m.value.startswith("gpt")],
value=ModelName.GPT4o_MINI.value,
label="Модель",
)
with gr.Row():
generate_position_button = gr.Button("Генерувати короткий зміст позиції суду")
search_with_ai_button = gr.Button("Пошук із ШІ", interactive=False)
analyze_button = gr.Button("Аналіз", interactive=False)
position_output = gr.Markdown(label="Короткий зміст позиції суду за введеним рішенням")
search_output = gr.Markdown(label="Результат пошуку")
analysis_output = gr.Markdown(label="Результат аналізу")
state_lp_json = gr.State()
state_nodes = gr.State()
# Підключення функцій до кнопок
generate_position_button.click(
fn=generate_position_action,
inputs=url_input,
outputs=[position_output, state_lp_json]
).then(
fn=lambda: gr.update(interactive=True),
inputs=None,
outputs=search_with_ai_button
)
search_with_ai_button.click(
fn=search_with_ai_action,
inputs=state_lp_json,
outputs=[search_output, state_nodes]
).then(
fn=lambda: gr.update(interactive=True),
inputs=None,
outputs=analyze_button
)
analyze_button.click(
fn=analyze_action,
inputs=[state_lp_json, question_input, state_nodes, provider_dropdown, model_dropdown],
outputs=analysis_output
)
provider_dropdown.change(
fn=update_model_choices,
inputs=provider_dropdown,
outputs=model_dropdown
)
return app
if __name__ == "__main__":
if initialize_components():
print("Components initialized successfully!")
app = create_gradio_interface()
app.launch(share=True)
else:
print("Failed to initialize components. Please check the paths and try again.", file=sys.stderr)
sys.exit(1)