|
|
|
from abc import ABC, abstractmethod |
|
from haystack.nodes import BM25Retriever, FARMReader |
|
from haystack.document_stores import ElasticsearchDocumentStore |
|
from haystack.pipelines import ExtractiveQAPipeline, DocumentSearchPipeline |
|
from haystack.document_stores import PineconeDocumentStore |
|
from haystack.nodes import EmbeddingRetriever, OpenAIAnswerGenerator |
|
from json import JSONDecodeError |
|
from pathlib import Path |
|
from typing import List, Optional |
|
|
|
import pandas as pd |
|
|
|
from haystack import BaseComponent, Document |
|
from haystack.document_stores import PineconeDocumentStore |
|
from haystack.nodes import ( |
|
EmbeddingRetriever, |
|
FARMReader |
|
) |
|
from haystack.pipelines import ExtractiveQAPipeline, Pipeline, GenerativeQAPipeline |
|
from haystack.pipelines import BaseStandardPipeline |
|
from haystack.nodes.reader import BaseReader |
|
from haystack.nodes.retriever import BaseRetriever |
|
from sentence_transformers import SentenceTransformer |
|
|
|
import certifi |
|
import datetime |
|
import requests |
|
from base64 import b64encode |
|
|
|
ca_certs = certifi.where() |
|
class QAPipeline(BaseStandardPipeline): |
|
""" |
|
Pipeline for Extractive Question Answering. |
|
""" |
|
|
|
def __init__(self, reader: BaseReader, retriever: BaseRetriever): |
|
""" |
|
:param reader: Reader instance |
|
:param retriever: Retriever instance |
|
""" |
|
self.pipeline = Pipeline() |
|
self.pipeline.add_node(component=retriever, name="Retriever", inputs=["Query"]) |
|
self.pipeline.add_node(component=reader, name="Reader", inputs=["Retriever"]) |
|
self.metrics_filter = {"Retriever": ["recall_single_hit"]} |
|
|
|
def run(self, query: str, params: Optional[dict] = None, debug: Optional[bool] = None): |
|
""" |
|
:param query: The search query string. |
|
:param params: Params for the `retriever` and `reader`. For instance, |
|
params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 5}} |
|
:param debug: Whether the pipeline should instruct nodes to collect debug information |
|
about their execution. By default these include the input parameters |
|
they received and the output they generated. |
|
All debug information can then be found in the dict returned |
|
by this method under the key "_debug" |
|
""" |
|
|
|
output = self.pipeline.run(query=query, params=params, debug=debug) |
|
return output |
|
|
|
|
|
class DocumentQueries(ABC): |
|
|
|
@abstractmethod |
|
def search_by_query(self, query : str, retriever_top_k: int, reader_top_k: int, index_name: str = None, filters = None): |
|
pass |
|
|
|
class PineconeProposalQueries(DocumentQueries): |
|
|
|
def __init__(self, index_name: str, api_key, reader_name_or_path: str, use_gpu = True, |
|
embedding_dim = 384, environment = "us-east1-gcp", OPENAI_key = None) -> None: |
|
|
|
reader = FARMReader(model_name_or_path = reader_name_or_path, |
|
use_gpu = use_gpu, num_processes = 1, |
|
context_window_size = 200) |
|
|
|
self._initialize_pipeline(index_name, api_key, reader = reader, embedding_dim= |
|
embedding_dim, environment = environment, OPENAI_key= OPENAI_key) |
|
|
|
self.OpenAI_api_key = None |
|
|
|
def _initialize_pipeline(self, index_name, api_key, similarity = "cosine", |
|
embedding_dim = 384, reader = None, |
|
environment = "us-east1-gcp", |
|
metadata_config = {"indexed": ["title", "source_title"]}, |
|
OPENAI_key = None): |
|
if reader is not None: |
|
self.reader = reader |
|
|
|
|
|
|
|
self.document_store = PineconeDocumentStore( |
|
api_key = api_key, |
|
environment = environment, |
|
index = index_name, |
|
similarity = similarity, |
|
embedding_dim = embedding_dim, |
|
metadata_config = {"indexed": ["title","source_title"]} |
|
) |
|
|
|
self.retriever = EmbeddingRetriever( |
|
document_store= self.document_store, |
|
embedding_model = "sentence-transformers/multi-qa-MiniLM-L6-cos-v1", |
|
model_format="sentence_transformers" |
|
) |
|
|
|
self.extractive_pipe = ExtractiveQAPipeline (reader = self.reader, |
|
retriever = self.retriever) |
|
|
|
self.generative_OPENAI_pipe = None |
|
if (OPENAI_key != None and OPENAI_key != ""): |
|
OPENAI_generator = OpenAIAnswerGenerator(api_key = OPENAI_key, |
|
model="text-davinci-003", temperature=.5, max_tokens=60) |
|
self.generative_OPENAI_pipe = GenerativeQAPipeline(generator = OPENAI_generator, |
|
retriever = self.retriever) |
|
|
|
def search_by_query(self, query : str, retriever_top_k: int, reader_top_k: int, index_name: str = None, filters = None): |
|
|
|
params = {"Retriever": {"top_k": retriever_top_k, |
|
"filters": filters}, |
|
"Reader": {"top_k": reader_top_k}} |
|
prediction = self.extractive_pipe.run( query = query, params = params, debug = True) |
|
return prediction["answers"] |
|
|
|
def __initialize_openAIGEnerator(self, OPENAI_key, openai_model_name= "text-davinci-003", temperature = .5, max_tokens = 30): |
|
if OPENAI_key != self.OpenAI_api_key: |
|
OPENAI_generator = OpenAIAnswerGenerator(api_key=OPENAI_key, |
|
model=openai_model_name, temperature= temperature, max_tokens=max_tokens) |
|
self.generative_OPENAI_pipe = GenerativeQAPipeline(generator = OPENAI_generator, |
|
retriever = self.retriever) |
|
self.OpenAI_api_key = OPENAI_key |
|
|
|
def genenerate_answer_OpenAI(self, query : str, retriever_top_k: int, generator_top_k: int, filters = None, |
|
OPENAI_key = None, openai_model_name= "text-davinci-003",temperature = .5, max_tokens = 30): |
|
if OPENAI_key != self.OpenAI_api_key: |
|
self.__initialize_openAIGEnerator(OPENAI_key, openai_model_name, temperature, max_tokens) |
|
params = {"Retriever": {"top_k": retriever_top_k, |
|
"filters": filters}, |
|
"Generator": {"top_k": generator_top_k}} |
|
prediction = self.generative_OPENAI_pipe.run( query = query, params = params) |
|
return prediction["answers"] |
|
else: |
|
return None |
|
|
|
def genenerate_answer_HF(self, query : str, retriever_top_k: int, reader_top_k: int, es_index: str = None, filters = None) : |
|
params = {"Retriever": {"top_k": retriever_top_k, |
|
"filters": filters}, |
|
"Generator": {"top_k": reader_top_k}} |
|
prediction = self.generative_HF_pipe.run( query = query, params = params) |
|
return prediction["answers"] |
|
|
|
class Log(): |
|
|
|
def __init__(self, es_host: str, es_index: str, es_user, es_password) -> None: |
|
self.elastic_endpoint = f"https://{es_host}:443/{es_index}/_doc" |
|
self.credentials = b64encode(b"3pvrzh9tl:4yl4vk9ijr").decode("ascii") |
|
self.auth_header = { 'Authorization' : 'Basic %s' % self.credentials } |
|
|
|
def write_log(self, message: str, source: str) -> None: |
|
created_date = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ') |
|
post_data = { |
|
"message" : message, |
|
"createdDate": { |
|
"date" : created_date |
|
}, |
|
"source": source |
|
} |
|
r = requests.post(self.elastic_endpoint, json = post_data, headers = self.auth_header) |
|
print(r.text) |