import os import uuid import fitz # PyMuPDF from markdownify import markdownify as md import pytesseract from PIL import Image import gradio as gr from transformers import AutoTokenizer, AutoModelForCausalLM import torch import json from sentence_transformers import SentenceTransformer import numpy as np from llama_cpp import Llama # GGUFモデル用のライブラリ # ----------------------------------------------------- # 設定項目 # ----------------------------------------------------- # Tesseractの実行パス (必要に応じて環境に合わせる) TESSERACT_CMD = "/usr/bin/tesseract" # 出力ディレクトリ OUTPUT_DIR = "output" # Qwenモデル (QA生成用) QWEN_MODEL_NAME = "rinna/qwen2.5-bakeneko-32b-instruct-v2-gguf" # GGUFフォーマットの日本語特化モデル # Embeddingモデル (RAG検索用) EMBEDDING_MODEL_NAME = "intfloat/multilingual-e5-base" # 高性能モデル例 (GPUメモリ要件高): "intfloat/multilingual-e5-large" # ベクトル化済みデータセットのパス VECTORIZED_DATA_PATH = "vectorized_qa_dataset.json" # QA生成時の最大トークン数 MAX_NEW_TOKENS_QA = 128 # RAG検索時の類似度閾値 (この値以下の類似度の回答は「不明」とする) # 設定しない場合は -1.0 などにする SIMILARITY_THRESHOLD = 0.6 # 例: 60%以上の類似度で回答 # ----------------------------------------------------- # 初期設定 # ----------------------------------------------------- os.makedirs(OUTPUT_DIR, exist_ok=True) # Tesseractパス設定 try: pytesseract.pytesseract.tesseract_cmd = TESSERACT_CMD # Tesseractが利用可能かバージョンを確認 print(f"Tesseract version: {pytesseract.get_tesseract_version()}") except Exception as e: print(f"警告: Tesseractの設定または起動に失敗しました: {e}") print("OCR機能が利用できない可能性があります。") # ----------------------------------------------------- # ヘルパー関数・クラス # ----------------------------------------------------- # PDF → 画像化 def pdf_to_images(pdf_file): if pdf_file is None: gr.Warning("PDFファイルがアップロードされていません。") return None output_paths = [] try: doc = fitz.open(pdf_file.name) if len(doc) == 0: gr.Warning("PDFファイルにページがありません。") doc.close() return None gr.Info(f"{len(doc)}ページの処理を開始します...") for page_index in range(len(doc)): pix = doc[page_index].get_pixmap() output_path = os.path.join(OUTPUT_DIR, f"{uuid.uuid4().hex}_page_{page_index + 1}.png") pix.save(output_path) output_paths.append(output_path) doc.close() gr.Info("画像化が完了しました。") return output_paths except Exception as e: gr.Error(f"PDFの画像化中にエラーが発生しました: {e}") return None # PDF → Markdown def pdf_to_markdown(pdf_file): if pdf_file is None: gr.Warning("PDFファイルがアップロードされていません。") return None try: doc = fitz.open(pdf_file.name) if len(doc) == 0: gr.Warning("PDFファイルにページがありません。") doc.close() return None gr.Info(f"{len(doc)}ページの処理を開始します...") full_text = "" for page in doc: full_text += page.get_text("text", sort=True) # テキスト抽出(ソートオプション追加) doc.close() if not full_text.strip(): gr.Warning("PDFからテキストを抽出できませんでした。画像ベースのPDFかもしれません。") return None # markdownifyにオプションを追加して変換品質を調整可能 markdown = md(full_text, heading_style="ATX") # ATX形式 (# 見出し) を使用 output_path = os.path.join(OUTPUT_DIR, f"{uuid.uuid4().hex}.md") with open(output_path, "w", encoding="utf-8") as f: f.write(markdown) gr.Info("Markdown変換が完了しました。") return output_path except Exception as e: gr.Error(f"PDFのMarkdown変換中にエラーが発生しました: {e}") return None # 画像OCR def image_ocr(image_file_path): if image_file_path is None: gr.Warning("画像がアップロードされていません。") return None, None try: image = Image.open(image_file_path) # Tesseractが利用可能か再確認 try: pytesseract.get_tesseract_version() except pytesseract.TesseractNotFoundError: gr.Error("Tesseractが利用できません。インストールとパス設定を確認してください。") return None, None gr.Info("OCR処理を実行中...") # tesseractにオプションを追加して精度向上を図ることも可能 (例: --psm 6) text = pytesseract.image_to_string(image, lang='eng+jpn') # 英語と日本語 output_path = os.path.join(OUTPUT_DIR, f"{uuid.uuid4().hex}.txt") with open(output_path, "w", encoding="utf-8") as f: f.write(text) gr.Info("OCR処理が完了しました。") return text, output_path except Exception as e: gr.Error(f"画像OCR処理中にエラーが発生しました: {e}") return None, None # Markdown/テキストファイル → テキスト抽出 def markdown_ocr(file_obj): # Gradio Fileコンポーネントはオブジェクトを渡す if file_obj is None: gr.Warning("ファイルがアップロードされていません。") return None, None try: # file_obj.name でファイルパスを取得 file_path = file_obj.name gr.Info(f"ファイル '{os.path.basename(file_path)}' を処理中...") with open(file_path, "r", encoding="utf-8") as f: content = f.read() # 出力パスもファイル名を元に作成 base_name = os.path.splitext(os.path.basename(file_path))[0] output_path = os.path.join(OUTPUT_DIR, f"{base_name}_extracted.txt") with open(output_path, "w", encoding="utf-8") as f: f.write(content) gr.Info("テキスト抽出が完了しました。") return content, output_path except Exception as e: gr.Error(f"ファイル処理中にエラーが発生しました: {e}") return None, None # ----------------------------------------------------- # 大規模言語モデル (Qwen) 関連 # ----------------------------------------------------- print(f"Qwenモデル '{QWEN_MODEL_NAME}' をロード中...") try: # GGUFモデルのロード model_qwen = Llama( model_path=QWEN_MODEL_NAME, n_ctx=2048, # コンテキストウィンドウサイズ n_gpu_layers=0 # 一時的にCPUのみで動作させる ) print("Qwenモデルのロード完了。") qwen_loaded = True # GGUFモデルではtokenizerは不要 tokenizer_qwen = None except Exception as e: print(f"警告: Qwenモデルのロードに失敗しました: {e}") print("QA生成機能は利用できません。") print("llama-cppがインストールされているか確認してください: pip install llama-cpp-python") qwen_loaded = False model_qwen = None tokenizer_qwen = None # テキストからQAを生成する関数 def generate_qa_from_text(text): if not qwen_loaded: return "Qwenモデルがロードされていないため、QA生成を実行できません。" if not text or not text.strip(): return "入力テキストが空です。" # プロンプトの改善例 prompt = f"""以下のテキストに基づいて、最も重要だと思われる質問とその回答のペアを1つだけ作成してください。質問は「Q:」で始め、回答は「A:」で始めてください。 --- テキスト --- {text[:2000]} --- ここまで --- Q: """ # 入力テキストが長すぎる場合を考慮して一部を切り出す try: gr.Info("QA生成を実行中...") # GGUFモデルでの生成 response = model_qwen.create_completion( prompt, max_tokens=MAX_NEW_TOKENS_QA, temperature=0.7, top_p=0.8, repeat_penalty=1.1, stop=["Q:", "\n\n"] # 生成を停止する条件 ) # 生成されたテキストを取得 result = response['choices'][0]['text'].strip() # Q: A: 形式になっているか簡易チェック if result.startswith("A:") or "A:" in result: final_qa = "Q: " + result else: # 期待した形式でない場合、調整するかそのまま返す final_qa = "Q: (不明瞭な質問)\nA: " + result # 生成されたQAをベクトル化してデータセットに追加 try: # QAを分解して質問と回答を抽出 qa_parts = final_qa.split("\nA: ") if len(qa_parts) == 2: question = qa_parts[0].replace("Q: ", "").strip() answer = qa_parts[1].strip() # vectorize_datasetの関数を呼び出してQAを追加 import vectorize_dataset if embedding_model_loaded: success = vectorize_dataset.add_and_vectorize_qa(question, answer, embedding_model) if success: gr.Info("生成されたQAをベクトル化してデータセットに追加しました。") else: gr.Warning("生成されたQAは既にデータセットに存在するか、追加に失敗しました。") else: gr.Warning("Embeddingモデルがロードされていないため、QAをベクトル化できません。") else: gr.Warning("生成されたQAの形式が不正です。") except Exception as e: gr.Warning(f"QAのベクトル化中にエラーが発生しました: {e}") gr.Info("QA生成が完了しました。") return final_qa except Exception as e: gr.Error(f"QA生成中にエラーが発生しました: {e}") return f"QA生成エラー: {e}" # ファイルを読み込んでQA生成を実行 def read_and_generate(file_obj): if file_obj is None: gr.Warning("ファイルがアップロードされていません。") return None if not qwen_loaded: gr.Error("Qwenモデルがロードされていません。QA生成は実行できません。") return None try: file_path = file_obj.name gr.Info(f"ファイル '{os.path.basename(file_path)}' を読み込んでQA生成を開始します...") with open(file_path, "r", encoding="utf-8") as f: text = f.read() if not text.strip(): gr.Warning("ファイルの内容が空です。") return None return generate_qa_from_text(text) except Exception as e: gr.Error(f"ファイル読み込みまたはQA生成中にエラー: {e}") return None # ----------------------------------------------------- # Embeddingモデル & RAG 関連 # ----------------------------------------------------- print(f"Embeddingモデル '{EMBEDDING_MODEL_NAME}' をロード中...") try: # GPUが利用可能なら自動的にGPUを使用する embedding_model = SentenceTransformer(EMBEDDING_MODEL_NAME, device='cuda' if torch.cuda.is_available() else 'cpu') print(f"Embeddingモデルをデバイス '{embedding_model.device}' にロードしました。") embedding_model_loaded = True except Exception as e: print(f"警告: Embeddingモデルのロードに失敗しました: {e}") print("チャットボット機能は利用できません。") embedding_model_loaded = False embedding_model = None # ベクトル化されたデータセットのロード vectorized_data = [] if embedding_model_loaded and os.path.exists(VECTORIZED_DATA_PATH): print(f"ベクトル化データセット '{VECTORIZED_DATA_PATH}' をロード中...") try: with open(VECTORIZED_DATA_PATH, 'r', encoding='utf-8') as f: vectorized_data = json.load(f) # ベクトルをNumpy配列に変換 (torch.Tensorにしても良い) for item in vectorized_data: item['vector'] = np.array(item['vector'], dtype=np.float32) print(f"{len(vectorized_data)} 件のベクトル化データをロードしました。") if not vectorized_data: print("警告: ベクトル化データセットは空です。") except Exception as e: print(f"警告: ベクトル化データセットのロード/解析に失敗しました: {e}") vectorized_data = [] embedding_model_loaded = False # データがないなら実質ロード失敗扱い elif embedding_model_loaded: print(f"警告: ベクトル化データセットファイルが見つかりません: {VECTORIZED_DATA_PATH}") print("チャットボットは質問に回答できません。") embedding_model_loaded = False # コサイン類似度計算 (Numpy版) def cosine_similarity_np(v1, v2): dot_product = np.dot(v1, v2) norm_v1 = np.linalg.norm(v1) norm_v2 = np.linalg.norm(v2) if norm_v1 == 0 or norm_v2 == 0: return 0.0 return dot_product / (norm_v1 * norm_v2) # チャットボット応答関数 (RAG検索のみ) def chatbot_response(message, history): if not message or not message.strip(): gr.Warning("質問を入力してください。") return "" # 空の応答を返す if not embedding_model_loaded or not vectorized_data: return "申し訳ありませんが、チャットボット機能の準備ができていません。" try: gr.Info(f"質問 '{message}' のベクトル化と検索を実行中...") # ユーザーの質問をベクトル化 # モデルによっては prefix が必要: ["query: " + message] message_vector = embedding_model.encode(message, convert_to_tensor=False) best_match_score = -1.0 best_match_index = -1 # 全データと比較して類似度を計算 for i, item in enumerate(vectorized_data): similarity = cosine_similarity_np(message_vector, item['vector']) if similarity > best_match_score: best_match_score = similarity best_match_index = i # 閾値チェックと回答の選択 if best_match_index != -1 and best_match_score >= SIMILARITY_THRESHOLD: found_answer = vectorized_data[best_match_index]['answer'] found_question = vectorized_data[best_match_index]['question'] # 参考情報 gr.Info(f"類似度 {best_match_score:.4f} で回答が見つかりました (Q: {found_question})。") return found_answer else: gr.Info(f"類似度 {best_match_score:.4f} でしたが、閾値 {SIMILARITY_THRESHOLD} 未満のため回答できません。") return "すみません、その質問に関連する情報が見つかりませんでした。" except Exception as e: gr.Error(f"チャットボット応答生成中にエラーが発生しました: {e}") return "申し訳ありません、エラーが発生しました。" # ----------------------------------------------------- # Gradio UI 定義 # ----------------------------------------------------- with gr.Blocks(theme=gr.themes.Soft()) as demo: # テーマ適用例 gr.Markdown("# 📘 ドキュメント処理・QA生成・RAGチャットデモ") gr.Markdown("PDF/画像の処理、テキストからのQA自動生成、および事前知識ベースのチャット機能を提供します。") with gr.Tab("① PDF → 画像"): with gr.Row(): with gr.Column(scale=1): pdf_input_img = gr.File(file_types=[".pdf"], label="PDFファイルをアップロード") pdf_img_button = gr.Button("🖼️ 画像化実行", variant="primary") with gr.Column(scale=3): pdf_output_img = gr.Gallery(label="画像出力", columns=4, object_fit="contain", height="auto", preview=True) pdf_img_button.click(fn=pdf_to_images, inputs=pdf_input_img, outputs=pdf_output_img) with gr.Tab("② PDF → Markdown"): with gr.Row(): with gr.Column(scale=1): pdf_input_md = gr.File(file_types=[".pdf"], label="PDFファイルをアップロード") pdf_md_button = gr.Button("📄 Markdown変換実行", variant="primary") with gr.Column(scale=3): pdf_md_output_path = gr.Textbox(label="Markdownファイル保存先", interactive=False) # Markdown内容をプレビューするコンポーネントを追加(オプション) pdf_md_preview = gr.Markdown(label="Markdownプレビュー") # pdf_to_markdown がファイルパスを返すように変更し、 # PDFをMarkdownに変換し、プレビューを表示 pdf_md_button.click( fn=pdf_to_markdown, inputs=pdf_input_md, outputs=[pdf_md_output_path, pdf_md_preview] ) with gr.Tab("③ 画像OCR"): with gr.Row(): with gr.Column(scale=1): img_input_ocr = gr.Image(type="filepath", label="画像をアップロード または ドラッグ&ドロップ") ocr_button = gr.Button("🔍 OCR実行", variant="primary") with gr.Column(scale=3): ocr_result_text = gr.Textbox(label="OCR結果", lines=10, interactive=False) ocr_output_path_text = gr.Textbox(label="テキストファイル保存先", interactive=False) ocr_button.click(fn=image_ocr, inputs=img_input_ocr, outputs=[ocr_result_text, ocr_output_path_text]) with gr.Tab("④ Markdown/テキスト → テキスト抽出"): with gr.Row(): with gr.Column(scale=1): md_input_extract = gr.File(file_types=[".md", ".txt"], label="Markdownまたはテキストファイルをアップロード") md_extract_button = gr.Button("✂️ テキスト抽出実行", variant="primary") with gr.Column(scale=3): md_text_output_extract = gr.Textbox(label="抽出されたテキスト", lines=10, interactive=False) md_output_path_extract = gr.Textbox(label="テキストファイル保存先", interactive=False) md_extract_button.click(fn=markdown_ocr, inputs=md_input_extract, outputs=[md_text_output_extract, md_output_path_extract]) with gr.Tab("⑤ QA生成 (Qwen)"): with gr.Row(): with gr.Column(scale=1): qa_input_file_gen = gr.File(file_types=[".md", ".txt"], label="テキストファイルをアップロード") qa_gen_button = gr.Button("💡 QA生成実行", variant="primary", interactive=qwen_loaded) # モデルロード失敗時は非活性 with gr.Column(scale=3): qa_gen_output_text = gr.Textbox(label="生成されたQA", lines=8, interactive=False) if not qwen_loaded: gr.Markdown("**警告:** Qwenモデルのロードに失敗したため、この機能は利用できません。") qa_gen_button.click(fn=read_and_generate, inputs=qa_input_file_gen, outputs=qa_gen_output_text) with gr.Tab("⑥ RAG チャットボット"): gr.Markdown("`qa_dataset.json` の情報に基づいて質問に答えます。(類似度検索)") if not embedding_model_loaded: gr.Markdown("**警告:** Embeddingモデルまたはベクトル化データセットのロードに失敗したため、この機能は利用できません。") # データセットからサンプル質問を取得 example_questions = [] if vectorized_data: num_examples = min(len(vectorized_data), 5) # 最大5件の例 example_questions = [vectorized_data[i]['question'] for i in range(num_examples)] chatbot_ui = gr.ChatInterface( fn=chatbot_response, title="簡易RAGチャットボット", description="下のテキストボックスに質問を入力してください。", examples=example_questions if example_questions else ["日本の首都は?", "水の化学式は?"], # ChatInterfaceを読み取り専用にする場合 (例: モデルがロードされていない場合) # interactive=embedding_model_loaded ) # モデルがロードされていない場合に備えてインタラクションを無効化 if not embedding_model_loaded: chatbot_ui.interactive = False # ----------------------------------------------------- # アプリ起動 # ----------------------------------------------------- if __name__ == "__main__": # ベクトル化処理を実行 print("データセットのベクトル化を開始します...") import vectorize_dataset vectorize_dataset.main() print("データセットのベクトル化が完了しました。") # queue() を有効にすると、複数ユーザーの同時アクセスに対応しやすくなる demo.queue().launch() # ローカルで実行する場合: demo.launch(debug=True)