Spaces:
Runtime error
Runtime error
from abc import ABC, abstractmethod | |
from haystack.nodes import BM25Retriever, FARMReader | |
from haystack.document_stores import ElasticsearchDocumentStore | |
from haystack.pipelines import ExtractiveQAPipeline, DocumentSearchPipeline | |
from haystack.document_stores import PineconeDocumentStore | |
from haystack.nodes import EmbeddingRetriever, OpenAIAnswerGenerator | |
from json import JSONDecodeError | |
from pathlib import Path | |
from typing import List, Optional | |
import pandas as pd | |
from haystack import BaseComponent, Document | |
from haystack.document_stores import PineconeDocumentStore | |
from haystack.nodes import ( | |
EmbeddingRetriever, | |
FARMReader | |
) | |
from haystack.pipelines import ExtractiveQAPipeline, Pipeline, GenerativeQAPipeline | |
from haystack.pipelines import BaseStandardPipeline | |
from haystack.nodes.reader import BaseReader | |
from haystack.nodes.retriever import BaseRetriever | |
from sentence_transformers import SentenceTransformer | |
import certifi | |
import datetime | |
import requests | |
from base64 import b64encode | |
ca_certs = certifi.where() | |
class QAPipeline(BaseStandardPipeline): | |
""" | |
Pipeline for Extractive Question Answering. | |
""" | |
def __init__(self, reader: BaseReader, retriever: BaseRetriever): | |
""" | |
:param reader: Reader instance | |
:param retriever: Retriever instance | |
""" | |
self.pipeline = Pipeline() | |
self.pipeline.add_node(component=retriever, name="Retriever", inputs=["Query"]) | |
self.pipeline.add_node(component=reader, name="Reader", inputs=["Retriever"]) | |
self.metrics_filter = {"Retriever": ["recall_single_hit"]} | |
def run(self, query: str, params: Optional[dict] = None, debug: Optional[bool] = None): | |
""" | |
:param query: The search query string. | |
:param params: Params for the `retriever` and `reader`. For instance, | |
params={"Retriever": {"top_k": 10}, "Reader": {"top_k": 5}} | |
:param debug: Whether the pipeline should instruct nodes to collect debug information | |
about their execution. By default these include the input parameters | |
they received and the output they generated. | |
All debug information can then be found in the dict returned | |
by this method under the key "_debug" | |
""" | |
output = self.pipeline.run(query=query, params=params, debug=debug) | |
return output | |
class DocumentQueries(ABC): | |
def search_by_query(self, query : str, retriever_top_k: int, reader_top_k: int, index_name: str = None, filters = None): | |
pass | |
class PineconeProposalQueries(DocumentQueries): | |
def __init__(self, index_name: str, api_key, reader_name_or_path: str, use_gpu = True, | |
embedding_dim = 384, environment = "us-east1-gcp", OPENAI_key = None) -> None: | |
reader = FARMReader(model_name_or_path = reader_name_or_path, | |
use_gpu = use_gpu, num_processes = 1, | |
context_window_size = 200) | |
self._initialize_pipeline(index_name, api_key, reader = reader, embedding_dim= | |
embedding_dim, environment = environment, OPENAI_key= OPENAI_key) | |
#self.log = Log(es_host= es_host, es_index="log", es_user = es_user, es_password= es_password) | |
self.OpenAI_api_key = None | |
def _initialize_pipeline(self, index_name, api_key, similarity = "cosine", | |
embedding_dim = 384, reader = None, | |
environment = "us-east1-gcp", | |
metadata_config = {"indexed": ["title", "source_title"]}, | |
OPENAI_key = None): | |
if reader is not None: | |
self.reader = reader | |
#pinecone.init(api_key=es_password, environment="us-east1-gcp") | |
self.document_store = PineconeDocumentStore( | |
api_key = api_key, | |
environment = environment, | |
index = index_name, | |
similarity = similarity, | |
embedding_dim = embedding_dim, | |
metadata_config = {"indexed": ["title","source_title"]} | |
) | |
self.retriever = EmbeddingRetriever( | |
document_store= self.document_store, | |
embedding_model = "sentence-transformers/multi-qa-MiniLM-L6-cos-v1", | |
model_format="sentence_transformers" | |
) | |
self.extractive_pipe = ExtractiveQAPipeline (reader = self.reader, | |
retriever = self.retriever) | |
self.generative_OPENAI_pipe = None | |
if (OPENAI_key != None and OPENAI_key != ""): | |
OPENAI_generator = OpenAIAnswerGenerator(api_key = OPENAI_key, | |
model="text-davinci-003", temperature=.5, max_tokens=60) | |
self.generative_OPENAI_pipe = GenerativeQAPipeline(generator = OPENAI_generator, | |
retriever = self.retriever) | |
def search_by_query(self, query : str, retriever_top_k: int, reader_top_k: int, index_name: str = None, filters = None): | |
#self.document_store.update_embeddings(self.retriever, update_existing_embeddings=False) | |
params = {"Retriever": {"top_k": retriever_top_k, | |
"filters": filters}, | |
"Reader": {"top_k": reader_top_k}} | |
prediction = self.extractive_pipe.run( query = query, params = params, debug = True) | |
return prediction["answers"] | |
def __initialize_openAIGEnerator(self, OPENAI_key, openai_model_name= "text-davinci-003", temperature = .5, max_tokens = 30): | |
if OPENAI_key != self.OpenAI_api_key: | |
OPENAI_generator = OpenAIAnswerGenerator(api_key=OPENAI_key, | |
model=openai_model_name, temperature= temperature, max_tokens=max_tokens) | |
self.generative_OPENAI_pipe = GenerativeQAPipeline(generator = OPENAI_generator, | |
retriever = self.retriever) | |
self.OpenAI_api_key = OPENAI_key | |
def genenerate_answer_OpenAI(self, query : str, retriever_top_k: int, generator_top_k: int, filters = None, | |
OPENAI_key = None, openai_model_name= "text-davinci-003",temperature = .5, max_tokens = 30): | |
if OPENAI_key != self.OpenAI_api_key: | |
self.__initialize_openAIGEnerator(OPENAI_key, openai_model_name, temperature, max_tokens) | |
params = {"Retriever": {"top_k": retriever_top_k, | |
"filters": filters}, | |
"Generator": {"top_k": generator_top_k}} | |
prediction = self.generative_OPENAI_pipe.run( query = query, params = params) | |
return prediction["answers"] | |
else: | |
return None | |
def genenerate_answer_HF(self, query : str, retriever_top_k: int, reader_top_k: int, es_index: str = None, filters = None) : | |
params = {"Retriever": {"top_k": retriever_top_k, | |
"filters": filters}, | |
"Generator": {"top_k": reader_top_k}} | |
prediction = self.generative_HF_pipe.run( query = query, params = params) | |
return prediction["answers"] | |
class Log(): | |
def __init__(self, es_host: str, es_index: str, es_user, es_password) -> None: | |
self.elastic_endpoint = f"https://{es_host}:443/{es_index}/_doc" | |
self.credentials = b64encode(b"3pvrzh9tl:4yl4vk9ijr").decode("ascii") | |
self.auth_header = { 'Authorization' : 'Basic %s' % self.credentials } | |
def write_log(self, message: str, source: str) -> None: | |
created_date = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ') | |
post_data = { | |
"message" : message, | |
"createdDate": { | |
"date" : created_date | |
}, | |
"source": source | |
} | |
r = requests.post(self.elastic_endpoint, json = post_data, headers = self.auth_header) | |
print(r.text) |