from abc import ABC, abstractmethod from haystack.nodes import BM25Retriever, FARMReader from haystack.document_stores import ElasticsearchDocumentStore from haystack.pipelines import ExtractiveQAPipeline, DocumentSearchPipeline from haystack.document_stores import PineconeDocumentStore from haystack.nodes import EmbeddingRetriever, OpenAIAnswerGenerator import json import logging import os import shutil import sys import uuid from json import JSONDecodeError from pathlib import Path from typing import List, Optional import pandas as pd from haystack import BaseComponent, Document from haystack.document_stores import PineconeDocumentStore from haystack.nodes import ( EmbeddingRetriever, FARMReader ) from haystack.pipelines import ExtractiveQAPipeline, Pipeline, GenerativeQAPipeline from haystack.pipelines import BaseStandardPipeline from haystack.nodes.reader import BaseReader from haystack.nodes.retriever import BaseRetriever from sentence_transformers import SentenceTransformer import certifi import datetime import requests from base64 import b64encode ca_certs = certifi.where() class QAPipeline(BaseStandardPipeline): """ Pipeline for Extractive Question Answering. """ def __init__(self, reader: BaseReader, retriever: BaseRetriever): """ :param reader: Reader instance :param retriever: Retriever instance """ self.pipeline = Pipeline() self.pipeline.add_node(component=retriever, name="Retriever", inputs=["Query"]) self.pipeline.add_node(component=reader, name="Reader", inputs=["Retriever"]) self.metrics_filter = {"Retriever": ["recall_single_hit"]} def run(self, query: str, params: Optional[dict] = None, debug: Optional[bool] = None): """ :param query: The search query string. :param params: Params for the `retriever` and `reader`. For instance, params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 5}} :param debug: Whether the pipeline should instruct nodes to collect debug information about their execution. By default these include the input parameters they received and the output they generated. All debug information can then be found in the dict returned by this method under the key "_debug" """ output = self.pipeline.run(query=query, params=params, debug=debug) return output class DocumentQueries(ABC): @abstractmethod def search_by_query(self, query : str, retriever_top_k: int, reader_top_k: int, index_name: str = None, filters = None): pass class PinecodeProposalQueries(DocumentQueries): def __init__(self, index_name: str, api_key, reader_name_or_path: str, use_gpu = True, embedding_dim = 384, environment = "us-east1-gcp") -> None: reader = FARMReader(model_name_or_path = reader_name_or_path, use_gpu = use_gpu, num_processes = 1, context_window_size = 200) self._initialize_pipeline(index_name, api_key, reader = reader, embedding_dim= embedding_dim, environment = environment) #self.log = Log(es_host= es_host, es_index="log", es_user = es_user, es_password= es_password) def _initialize_pipeline(self, index_name, api_key, similarity = "cosine", embedding_dim = 384, reader = None, environment = "us-east1-gcp", metadata_config = {"indexed": ["title", "source_title"]}): if reader is not None: self.reader = reader self.OPENAI_generator = OpenAIAnswerGenerator(api_key="", model="text-davinci-003", temperature=.5, max_tokens=60) #pinecone.init(api_key=es_password, environment="us-east1-gcp") self.document_store = PineconeDocumentStore( api_key = api_key, environment = environment, index = index_name, similarity = similarity, embedding_dim = embedding_dim, metadata_config = {"indexed": ["title","source_title"]} ) self.retriever = EmbeddingRetriever( document_store= self.document_store, embedding_model = "sentence-transformers/multi-qa-MiniLM-L6-cos-v1", model_format="sentence_transformers" ) self.extractive_pipe = ExtractiveQAPipeline (reader = self.reader, retriever = self.retriever) self.generative_OPENAI_pipe = GenerativeQAPipeline(generator = self.OPENAI_generator, retriever = self.retriever) def search_by_query(self, query : str, retriever_top_k: int, reader_top_k: int, index_name: str = None, filters = None): #self.document_store.update_embeddings(self.retriever, update_existing_embeddings=False) params = {"Retriever": {"top_k": retriever_top_k, "filters": filters}, "Reader": {"top_k": reader_top_k}} prediction = self.extractive_pipe.run( query = query, params = params, debug = True) return prediction["answers"] def genenerate_answer_OpenAI(self, query : str, retriever_top_k: int, reader_top_k: int, es_index: str = None, filters = None) : params = {"Retriever": {"top_k": retriever_top_k, "filters": filters}, "Generator": {"top_k": reader_top_k}} prediction = self.generative_OPENAI_pipe.run( query = query, params = params) return prediction def genenerate_answer_HF(self, query : str, retriever_top_k: int, reader_top_k: int, es_index: str = None, filters = None) : params = {"Retriever": {"top_k": retriever_top_k, "filters": filters}, "Generator": {"top_k": reader_top_k}} prediction = self.generative_HF_pipe.run( query = query, params = params) return prediction class Log(): def __init__(self, es_host: str, es_index: str, es_user, es_password) -> None: self.elastic_endpoint = f"https://{es_host}:443/{es_index}/_doc" self.credentials = b64encode(b"3pvrzh9tl:4yl4vk9ijr").decode("ascii") self.auth_header = { 'Authorization' : 'Basic %s' % self.credentials } def write_log(self, message: str, source: str) -> None: created_date = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ') post_data = { "message" : message, "createdDate": { "date" : created_date }, "source": source } r = requests.post(self.elastic_endpoint, json = post_data, headers = self.auth_header) print(r.text)