IlyasMoutawwakil HF staff commited on
Commit
bfab57f
β€’
1 Parent(s): fe31e4e

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +94 -121
app.py CHANGED
@@ -1,50 +1,47 @@
1
- from haystack.document_stores.faiss import FAISSDocumentStore
2
- from haystack.nodes.retriever import EmbeddingRetriever
3
- from haystack.nodes.ranker import BaseRanker
4
- from haystack.pipelines import Pipeline
5
 
6
- from haystack.document_stores.base import BaseDocumentStore
7
- from haystack.schema import Document
8
 
9
- from typing import Optional, List
10
-
11
- from huggingface_hub import get_inference_endpoint
12
  from datasets import load_dataset
13
  from time import perf_counter
14
  import gradio as gr
15
- import numpy as np
16
  import requests
17
  import os
18
 
19
- TOP_K = 2
20
- BATCH_SIZE = 16
21
 
22
  HF_TOKEN = os.getenv("HF_TOKEN")
23
  RANKER_URL = os.getenv("RANKER_URL")
24
- RETRIEVER_URL = os.getenv("RETRIEVER_URL")
25
 
26
- RETRIEVER_IE = get_inference_endpoint(
27
- "fastrag-retriever", namespace="optimum-intel", token=HF_TOKEN
28
  )
29
  RANKER_IE = get_inference_endpoint(
30
  "fastrag-ranker", namespace="optimum-intel", token=HF_TOKEN
31
  )
32
 
 
33
  def check_inference_endpoints():
34
- RETRIEVER_IE.update()
35
  RANKER_IE.update()
36
 
37
  messages = []
38
 
39
- if RETRIEVER_IE.status in ["initializing", "pending"]:
40
  messages += [
41
- f"Retriever Inference Endpoint is {RETRIEVER_IE.status}. Please wait a few seconds and try again."
42
  ]
43
- elif RETRIEVER_IE.status in ["paused", "scaledToZero"]:
44
  messages += [
45
- f"Retriever Inference Endpoint is {RETRIEVER_IE.status}. Resuming it. Please wait a few seconds and try again."
46
  ]
47
- RETRIEVER_IE.resume()
48
 
49
  if RANKER_IE.status in ["initializing", "pending"]:
50
  messages += [
@@ -62,7 +59,6 @@ def check_inference_endpoints():
62
  return None
63
 
64
 
65
-
66
  def post(url, payload):
67
  response = requests.post(
68
  url,
@@ -85,81 +81,47 @@ def method_timer(method):
85
  return timed
86
 
87
 
88
- class Retriever(EmbeddingRetriever):
89
- def __init__(
90
- self,
91
- document_store: Optional[BaseDocumentStore] = None,
92
- top_k: int = 10,
93
- batch_size: int = 32,
94
- scale_score: bool = True,
95
- ):
96
- self.document_store = document_store
97
- self.top_k = top_k
98
- self.batch_size = batch_size
99
- self.scale_score = scale_score
100
-
101
- @method_timer
102
- def embed_queries(self, queries: List[str]) -> np.ndarray:
103
- payload = {"queries": queries, "inputs": ""}
104
- response = post(RETRIEVER_URL, payload)
105
 
106
  if "error" in response:
107
  raise gr.Error(response["error"])
108
 
109
- arrays = np.array(response)
110
- return arrays
111
 
112
- @method_timer
113
- def embed_documents(self, documents: List[Document]) -> np.ndarray:
 
 
114
  documents = [d.to_dict() for d in documents]
115
- for doc in documents:
116
- doc["embedding"] = None
117
 
118
  payload = {"documents": documents, "inputs": ""}
119
- response = post(RETRIEVER_URL, payload)
120
 
121
  if "error" in response:
122
  raise gr.Error(response["error"])
123
 
124
- arrays = np.array(response)
125
- return arrays
126
 
127
 
128
- class Ranker(BaseRanker):
129
- @method_timer
130
- def predict(
131
- self, query: str, documents: List[Document], top_k: Optional[int] = None
132
- ) -> List[Document]:
133
- documents = [d.to_dict() for d in documents]
134
- for doc in documents:
135
- doc["embedding"] = None
136
-
137
- payload = {"query": query, "documents": documents, "top_k": top_k, "inputs": ""}
138
- response = post(RANKER_URL, payload)
139
-
140
- if "error" in response:
141
- raise gr.Error(response["error"])
142
 
143
- return [Document.from_dict(d) for d in response]
144
-
145
- @method_timer
146
- def predict_batch(
147
- self,
148
- queries: List[str],
149
- documents: List[List[Document]],
150
- batch_size: Optional[int] = None,
151
- top_k: Optional[int] = None,
152
- ) -> List[List[Document]]:
153
- documents = [[d.to_dict() for d in docs] for docs in documents]
154
- for docs in documents:
155
- for doc in docs:
156
- doc["embedding"] = None
157
 
158
  payload = {
159
- "queries": queries,
160
  "documents": documents,
161
- "batch_size": batch_size,
162
- "top_k": top_k,
163
  "inputs": "",
164
  }
165
  response = post(RANKER_URL, payload)
@@ -167,49 +129,58 @@ class Ranker(BaseRanker):
167
  if "error" in response:
168
  raise gr.Error(response["error"])
169
 
170
- return [[Document.from_dict(d) for d in docs] for docs in response]
171
 
172
 
173
- if (
174
- os.path.exists("/data/faiss_document_store.db")
175
- and os.path.exists("/data/faiss_index.json")
176
- and os.path.exists("/data/faiss_index")
177
- ):
178
- document_store = FAISSDocumentStore.load("/data/faiss_index")
179
- retriever = Retriever(
180
- document_store=document_store, top_k=TOP_K, batch_size=BATCH_SIZE
181
- )
182
- document_store.save(index_path="/data/faiss_index")
183
- else:
184
- for file in [
185
- "/data/faiss_document_store.db",
186
- "/data/faiss_index.json",
187
- "/data/faiss_index",
188
- ]:
189
- try:
190
- os.remove(file)
191
- except FileNotFoundError:
192
- pass
193
-
194
- document_store = FAISSDocumentStore(
195
- sql_url="sqlite:////data/faiss_document_store.db",
196
  return_embedding=True,
 
197
  embedding_dim=384,
198
  )
199
- document_store.write_documents(
200
- load_dataset("bilgeyucel/seven-wonders", split="train")
201
- )
202
- retriever = Retriever(
203
- document_store=document_store, top_k=TOP_K, batch_size=BATCH_SIZE
204
- )
205
- document_store.update_embeddings(retriever=retriever)
206
- document_store.save(index_path="/data/faiss_index")
207
-
208
- ranker = Ranker()
209
 
210
  pipe = Pipeline()
211
- pipe.add_node(component=retriever, name="Retriever", inputs=["Query"])
212
- pipe.add_node(component=ranker, name="Ranker", inputs=["Retriever"])
 
 
 
 
 
 
 
 
 
 
 
 
 
213
 
214
 
215
  def run(query: str) -> dict:
@@ -221,11 +192,12 @@ def run(query: str) -> dict:
221
  <p>{message}</p>
222
  """
223
 
224
- pipe_output = pipe.run(query=query)
225
 
226
- output = f"""<h2>Top {TOP_K} Documents</h2>"""
227
 
228
- for i, doc in enumerate(pipe_output["documents"]):
 
229
  output += f"""
230
  <h3>Document {i + 1}</h3>
231
  <p><strong>ID:</strong> {doc.id}</p>
@@ -251,6 +223,7 @@ input_text = gr.components.Textbox(
251
  )
252
  output_html = gr.components.HTML(label="Documents")
253
 
 
254
  gr.Interface(
255
  fn=run,
256
  inputs=input_text,
@@ -259,9 +232,9 @@ gr.Interface(
259
  cache_examples=False,
260
  allow_flagging="never",
261
  title="End-to-End Retrieval & Ranking with Hugging Face Inference Endpoints and Spaces",
262
- description="""## A [haystack](https://haystack.deepset.ai/) pipeline with the following components
263
- - <strong>Document Store</strong>: A [FAISS document store](https://github.com/facebookresearch/faiss/tree/main) containing the [`seven-wonders` dataset](https://huggingface.co/datasets/bilgeyucel/seven-wonders), created on this Space's [persistent storage](https://huggingface.co/docs/hub/en/spaces-storage).
264
- - <strong>Retriever</strong>: [Quantized FastRAG Retriever](https://huggingface.co/optimum-intel/fastrag-retriever) deployed on [Inference Endpoints](https://huggingface.co/docs/inference-endpoints/index) + Intel Sapphire Rapids CPU.
265
  - <strong>Ranker</strong>: [Quantized FastRAG Retriever](https://huggingface.co/optimum-intel/fastrag-ranker) deployed on [Inference Endpoints](https://huggingface.co/docs/inference-endpoints/index) + Intel Sapphire Rapids CPU.
266
 
267
  This Space is based on the optimizations demonstrated in the blog [CPU Optimized Embeddings with πŸ€— Optimum Intel and fastRAG](https://huggingface.co/blog/intel-fast-embedding)
 
1
+ from haystack import Document, Pipeline, component
2
+ from haystack_integrations.document_stores.qdrant import QdrantDocumentStore
3
+ from haystack_integrations.components.retrievers.qdrant import QdrantEmbeddingRetriever
 
4
 
5
+ from typing import List
 
6
 
7
+ from huggingface_hub import get_inference_endpoint, get_token
 
 
8
  from datasets import load_dataset
9
  from time import perf_counter
10
  import gradio as gr
11
+ import shutil
12
  import requests
13
  import os
14
 
15
+ RETRIEVER_TOP_K = 5
16
+ RANKER_TOP_K = 2
17
 
18
  HF_TOKEN = os.getenv("HF_TOKEN")
19
  RANKER_URL = os.getenv("RANKER_URL")
20
+ EMBEDDER_URL = os.getenv("EMBEDDER_URL")
21
 
22
+ EMBEDDER_IE = get_inference_endpoint(
23
+ "fastrag-embedder", namespace="optimum-intel", token=HF_TOKEN
24
  )
25
  RANKER_IE = get_inference_endpoint(
26
  "fastrag-ranker", namespace="optimum-intel", token=HF_TOKEN
27
  )
28
 
29
+
30
  def check_inference_endpoints():
31
+ EMBEDDER_IE.update()
32
  RANKER_IE.update()
33
 
34
  messages = []
35
 
36
+ if EMBEDDER_IE.status in ["initializing", "pending"]:
37
  messages += [
38
+ f"Embedder Inference Endpoint is {EMBEDDER_IE.status}. Please wait a few seconds and try again."
39
  ]
40
+ elif EMBEDDER_IE.status in ["paused", "scaledToZero"]:
41
  messages += [
42
+ f"Embedder Inference Endpoint is {EMBEDDER_IE.status}. Resuming it. Please wait a few seconds and try again."
43
  ]
44
+ EMBEDDER_IE.resume()
45
 
46
  if RANKER_IE.status in ["initializing", "pending"]:
47
  messages += [
 
59
  return None
60
 
61
 
 
62
  def post(url, payload):
63
  response = requests.post(
64
  url,
 
81
  return timed
82
 
83
 
84
+ @component
85
+ class InferenceEndpointTextEmbedder:
86
+ @component.output_types(embedding=List[float])
87
+ def run(self, text: str):
88
+ payload = {"text": text, "inputs": ""}
89
+ response = post(EMBEDDER_URL, payload)
 
 
 
 
 
 
 
 
 
 
 
90
 
91
  if "error" in response:
92
  raise gr.Error(response["error"])
93
 
94
+ return {"embedding": response["embedding"]}
95
+
96
 
97
+ @component
98
+ class InferenceEndpointDocumentEmbedder:
99
+ @component.output_types(documents=List[Document])
100
+ def run(self, documents: List[Document]):
101
  documents = [d.to_dict() for d in documents]
 
 
102
 
103
  payload = {"documents": documents, "inputs": ""}
104
+ response = post(EMBEDDER_URL, payload)
105
 
106
  if "error" in response:
107
  raise gr.Error(response["error"])
108
 
109
+ return {"documents": [Document.from_dict(doc) for doc in response["documents"]]}
 
110
 
111
 
112
+ @component
113
+ class InferenceEndpointRanker:
114
+ def __init__(self, top_k: int):
115
+ self.top_k = top_k
 
 
 
 
 
 
 
 
 
 
116
 
117
+ @component.output_types(documents=List[Document])
118
+ def run(self, query: str, documents: List[Document]):
119
+ documents = [d.to_dict() for d in documents]
 
 
 
 
 
 
 
 
 
 
 
120
 
121
  payload = {
122
+ "query": query,
123
  "documents": documents,
124
+ "top_k": self.top_k,
 
125
  "inputs": "",
126
  }
127
  response = post(RANKER_URL, payload)
 
129
  if "error" in response:
130
  raise gr.Error(response["error"])
131
 
132
+ return {"documents": [Document.from_dict(doc) for doc in response["documents"]]}
133
 
134
 
135
+ document_store = None
136
+
137
+ if os.path.exists("data/qdrant"):
138
+ try:
139
+ document_store = QdrantDocumentStore(
140
+ path="./data/qdrant",
141
+ return_embedding=True,
142
+ recreate_index=False,
143
+ embedding_dim=384,
144
+ )
145
+ except Exception:
146
+ shutil.rmtree("data/qdrant", ignore_errors=True)
147
+
148
+ if document_store is None:
149
+ shutil.rmtree("data/qdrant", ignore_errors=True)
150
+
151
+ document_store = QdrantDocumentStore(
152
+ path="./data/qdrant",
 
 
 
 
 
153
  return_embedding=True,
154
+ recreate_index=True,
155
  embedding_dim=384,
156
  )
157
+ dataset = load_dataset("bilgeyucel/seven-wonders")
158
+ documents = [Document(**doc) for doc in dataset["train"]]
159
+ documents_embedder = InferenceEndpointDocumentEmbedder()
160
+ documents_with_embedding = documents_embedder.run(documents)["documents"]
161
+ document_store.write_documents(documents_with_embedding)
162
+
163
+ print(
164
+ "Number of embedded documents in DocumentStore:",
165
+ document_store.count_documents(),
166
+ )
167
 
168
  pipe = Pipeline()
169
+
170
+ embedder = InferenceEndpointTextEmbedder()
171
+ ranker = InferenceEndpointRanker(top_k=RANKER_TOP_K)
172
+ retriever = QdrantEmbeddingRetriever(
173
+ document_store=document_store, top_k=RETRIEVER_TOP_K
174
+ )
175
+
176
+ pipe.add_component("retriever", retriever)
177
+ pipe.add_component("embedder", embedder)
178
+ pipe.add_component("ranker", ranker)
179
+
180
+ pipe.connect("retriever", "ranker.documents")
181
+ pipe.connect("embedder", "retriever")
182
+
183
+ print(pipe)
184
 
185
 
186
  def run(query: str) -> dict:
 
192
  <p>{message}</p>
193
  """
194
 
195
+ pipe_output = pipe.run({"embedder": {"text": query}, "ranker": {"query": query}})
196
 
197
+ output = """<h2>Top Ranked Documents</h2>"""
198
 
199
+ for i, doc in enumerate(pipe_output["ranker"]["documents"]):
200
+ # limit content to 100 characters
201
  output += f"""
202
  <h3>Document {i + 1}</h3>
203
  <p><strong>ID:</strong> {doc.id}</p>
 
223
  )
224
  output_html = gr.components.HTML(label="Documents")
225
 
226
+
227
  gr.Interface(
228
  fn=run,
229
  inputs=input_text,
 
232
  cache_examples=False,
233
  allow_flagging="never",
234
  title="End-to-End Retrieval & Ranking with Hugging Face Inference Endpoints and Spaces",
235
+ description="""## A [haystack](https://haystack.deepset.ai/) V2 pipeline with the following components
236
+ - <strong>Document Store</strong>: A [Qdrant document store](https://github.com/qdrant/qdrant) containing the [`seven-wonders` dataset](https://huggingface.co/datasets/bilgeyucel/seven-wonders), created on this Space's [persistent storage](https://huggingface.co/docs/hub/en/spaces-storage).
237
+ - <strong>Embedder</strong>: [Quantized FastRAG Embedder](https://huggingface.co/optimum-intel/fastrag-embedder) deployed on [Inference Endpoints](https://huggingface.co/docs/inference-endpoints/index) + Intel Sapphire Rapids CPU.
238
  - <strong>Ranker</strong>: [Quantized FastRAG Retriever](https://huggingface.co/optimum-intel/fastrag-ranker) deployed on [Inference Endpoints](https://huggingface.co/docs/inference-endpoints/index) + Intel Sapphire Rapids CPU.
239
 
240
  This Space is based on the optimizations demonstrated in the blog [CPU Optimized Embeddings with πŸ€— Optimum Intel and fastRAG](https://huggingface.co/blog/intel-fast-embedding)