from abc import ABC, abstractmethod from haystack.nodes import BM25Retriever, FARMReader from haystack.document_stores import ElasticsearchDocumentStore from haystack.pipelines import ExtractiveQAPipeline, DocumentSearchPipeline from haystack.document_stores import PineconeDocumentStore from haystack.nodes import EmbeddingRetriever, OpenAIAnswerGenerator from json import JSONDecodeError from pathlib import Path from typing import List, Optional import pandas as pd from haystack import BaseComponent, Document from haystack.document_stores import PineconeDocumentStore from haystack.nodes import ( EmbeddingRetriever, FARMReader ) from haystack.pipelines import ExtractiveQAPipeline, Pipeline, GenerativeQAPipeline from haystack.pipelines import BaseStandardPipeline from haystack.nodes.reader import BaseReader from haystack.nodes.retriever import BaseRetriever from sentence_transformers import SentenceTransformer import certifi import datetime import requests from base64 import b64encode ca_certs = certifi.where() class QAPipeline(BaseStandardPipeline): """ Pipeline for Extractive Question Answering. """ def __init__(self, reader: BaseReader, retriever: BaseRetriever): """ :param reader: Reader instance :param retriever: Retriever instance """ self.pipeline = Pipeline() self.pipeline.add_node(component=retriever, name="Retriever", inputs=["Query"]) self.pipeline.add_node(component=reader, name="Reader", inputs=["Retriever"]) self.metrics_filter = {"Retriever": ["recall_single_hit"]} def run(self, query: str, params: Optional[dict] = None, debug: Optional[bool] = None): """ :param query: The search query string. :param params: Params for the `retriever` and `reader`. For instance, params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 5}} :param debug: Whether the pipeline should instruct nodes to collect debug information about their execution. By default these include the input parameters they received and the output they generated. All debug information can then be found in the dict returned by this method under the key "_debug" """ output = self.pipeline.run(query=query, params=params, debug=debug) return output class DocumentQueries(ABC): @abstractmethod def search_by_query(self, query : str, retriever_top_k: int, reader_top_k: int, index_name: str = None, filters = None): pass class PineconeProposalQueries(DocumentQueries): def __init__(self, index_name: str, api_key, reader_name_or_path: str, use_gpu = True, embedding_dim = 384, environment = "us-east1-gcp", OPENAI_key = None) -> None: reader = FARMReader(model_name_or_path = reader_name_or_path, use_gpu = use_gpu, num_processes = 1, context_window_size = 200) self._initialize_pipeline(index_name, api_key, reader = reader, embedding_dim= embedding_dim, environment = environment, OPENAI_key= OPENAI_key) #self.log = Log(es_host= es_host, es_index="log", es_user = es_user, es_password= es_password) self.OpenAI_api_key = None def _initialize_pipeline(self, index_name, api_key, similarity = "cosine", embedding_dim = 384, reader = None, environment = "us-east1-gcp", metadata_config = {"indexed": ["title", "source_title"]}, OPENAI_key = None): if reader is not None: self.reader = reader #pinecone.init(api_key=es_password, environment="us-east1-gcp") self.document_store = PineconeDocumentStore( api_key = api_key, environment = environment, index = index_name, similarity = similarity, embedding_dim = embedding_dim, metadata_config = {"indexed": ["title","source_title"]} ) self.retriever = EmbeddingRetriever( document_store= self.document_store, embedding_model = "sentence-transformers/multi-qa-MiniLM-L6-cos-v1", model_format="sentence_transformers" ) self.extractive_pipe = ExtractiveQAPipeline (reader = self.reader, retriever = self.retriever) self.generative_OPENAI_pipe = None if (OPENAI_key != None and OPENAI_key != ""): OPENAI_generator = OpenAIAnswerGenerator(api_key = OPENAI_key, model="text-davinci-003", temperature=.5, max_tokens=60) self.generative_OPENAI_pipe = GenerativeQAPipeline(generator = OPENAI_generator, retriever = self.retriever) def search_by_query(self, query : str, retriever_top_k: int, reader_top_k: int, index_name: str = None, filters = None): #self.document_store.update_embeddings(self.retriever, update_existing_embeddings=False) params = {"Retriever": {"top_k": retriever_top_k, "filters": filters}, "Reader": {"top_k": reader_top_k}} prediction = self.extractive_pipe.run( query = query, params = params, debug = True) return prediction["answers"] def __initialize_openAIGEnerator(self, OPENAI_key, openai_model_name= "text-davinci-003", temperature = .5, max_tokens = 30): if OPENAI_key != self.OpenAI_api_key: OPENAI_generator = OpenAIAnswerGenerator(api_key=OPENAI_key, model=openai_model_name, temperature= temperature, max_tokens=max_tokens) self.generative_OPENAI_pipe = GenerativeQAPipeline(generator = OPENAI_generator, retriever = self.retriever) self.OpenAI_api_key = OPENAI_key def genenerate_answer_OpenAI(self, query : str, retriever_top_k: int, generator_top_k: int, filters = None, OPENAI_key = None, openai_model_name= "text-davinci-003",temperature = .5, max_tokens = 30): if OPENAI_key != self.OpenAI_api_key: self.__initialize_openAIGEnerator(OPENAI_key, openai_model_name, temperature, max_tokens) params = {"Retriever": {"top_k": retriever_top_k, "filters": filters}, "Generator": {"top_k": generator_top_k}} prediction = self.generative_OPENAI_pipe.run( query = query, params = params) return prediction["answers"] else: return None def genenerate_answer_HF(self, query : str, retriever_top_k: int, reader_top_k: int, es_index: str = None, filters = None) : params = {"Retriever": {"top_k": retriever_top_k, "filters": filters}, "Generator": {"top_k": reader_top_k}} prediction = self.generative_HF_pipe.run( query = query, params = params) return prediction["answers"] class Log(): def __init__(self, es_host: str, es_index: str, es_user, es_password) -> None: self.elastic_endpoint = f"https://{es_host}:443/{es_index}/_doc" self.credentials = b64encode(b"3pvrzh9tl:4yl4vk9ijr").decode("ascii") self.auth_header = { 'Authorization' : 'Basic %s' % self.credentials } def write_log(self, message: str, source: str) -> None: created_date = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ') post_data = { "message" : message, "createdDate": { "date" : created_date }, "source": source } r = requests.post(self.elastic_endpoint, json = post_data, headers = self.auth_header) print(r.text)