File size: 17,590 Bytes
30ffb9e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
from weaviate import Client, AuthApiKey
from dataclasses import dataclass
from openai import OpenAI
from sentence_transformers import SentenceTransformer
from typing import List, Union, Callable
from torch import cuda
from tqdm import tqdm
import time

class WeaviateClient(Client):
    '''
    A python native Weaviate Client class that encapsulates Weaviate functionalities 
    in one object. Several convenience methods are added for ease of use.

    Args
    ----
    api_key: str
        The API key for the Weaviate Cloud Service (WCS) instance.
        https://console.weaviate.cloud/dashboard

    endpoint: str
        The url endpoint for the Weaviate Cloud Service instance.

    model_name_or_path: str='sentence-transformers/all-MiniLM-L6-v2'
        The name or path of the SentenceTransformer model to use for vector search.
        Will also support OpenAI text-embedding-ada-002 model.  This param enables 
        the use of most leading models on MTEB Leaderboard: 
        https://huggingface.co/spaces/mteb/leaderboard
    openai_api_key: str=None
        The API key for the OpenAI API. Only required if using OpenAI text-embedding-ada-002 model.
    '''    
    def __init__(self, 
                 api_key: str,
                 endpoint: str,
                 model_name_or_path: str='sentence-transformers/all-MiniLM-L6-v2',
                 openai_api_key: str=None,
                 **kwargs
                ):
        auth_config = AuthApiKey(api_key=api_key)
        super().__init__(auth_client_secret=auth_config,
                         url=endpoint,
                         **kwargs)    
        self.model_name_or_path = model_name_or_path
        self.openai_model = False
        if self.model_name_or_path == 'text-embedding-ada-002':
            if not openai_api_key:
                raise ValueError(f'OpenAI API key must be provided to use this model: {self.model_name_or_path}')
            self.model = OpenAI(api_key=openai_api_key)
            self.openai_model = True
        else: 
            self.model = SentenceTransformer(self.model_name_or_path) if self.model_name_or_path else None

        self.display_properties = ['title', 'video_id', 'length', 'thumbnail_url', 'views', 'episode_url', \
                                    'doc_id', 'guest', 'content']  # 'playlist_id', 'channel_id', 'author'
        
    def show_classes(self) -> Union[List[dict], str]:
        '''
        Shows all available classes (indexes) on the Weaviate instance.
        '''
        classes = self.cluster.get_nodes_status()[0]['shards']
        if classes:
            return [d['class'] for d in classes]
        else: 
            return "No classes found on cluster."

    def show_class_info(self) -> Union[List[dict], str]:
        '''
        Shows all information related to the classes (indexes) on the Weaviate instance.
        '''
        classes = self.cluster.get_nodes_status()[0]['shards']
        if classes:
            return [d for d in classes]
        else: 
            return "No classes found on cluster."

    def show_class_properties(self, class_name: str) -> Union[dict, str]:
        '''
        Shows all properties of a class (index) on the Weaviate instance.
        '''
        classes = self.schema.get()
        if classes:
            all_classes = classes['classes']
            for d in all_classes:
                if d['class'] == class_name:
                    return d['properties']
            return f'Class "{class_name}" not found on host'
        return f'No Classes found on host'
    
    def show_class_config(self, class_name: str) -> Union[dict, str]:
        '''
        Shows all configuration of a class (index) on the Weaviate instance.
        '''
        classes = self.schema.get()
        if classes:
            all_classes = classes['classes']
            for d in all_classes:
                if d['class'] == class_name:
                    return d
            return f'Class "{class_name}" not found on host'
        return f'No Classes found on host'
    
    def delete_class(self, class_name: str) -> str:
        '''
        Deletes a class (index) on the Weaviate instance, if it exists.
        '''
        available = self._check_class_avialability(class_name)
        if isinstance(available, bool):
            if available:
                self.schema.delete_class(class_name)
                not_deleted = self._check_class_avialability(class_name)
                if isinstance(not_deleted, bool):
                    if not_deleted:
                        return f'Class "{class_name}" was not deleted. Try again.'
                    else: 
                        return f'Class "{class_name}" deleted'
                return f'Class "{class_name}" deleted and there are no longer any classes on host'
            return f'Class "{class_name}" not found on host'
        return available
    
    def _check_class_avialability(self, class_name: str) -> Union[bool, str]:
        '''
        Checks if a class (index) exists on the Weaviate instance.
        '''
        classes = self.schema.get()
        if classes:
            all_classes = classes['classes']
            for d in all_classes:
                if d['class'] == class_name:
                    return True
            return False
        else: 
            return f'No Classes found on host'
        
    def format_response(self, 
                         response: dict,
                         class_name: str
                         ) -> List[dict]:
        '''
        Formats json response from Weaviate into a list of dictionaries.
        Expands _additional fields if present into top-level dictionary.
        '''
        if response.get('errors'):
            return response['errors'][0]['message']
        results = []
        hits = response['data']['Get'][class_name]
        for d in hits:
            temp = {k:v for k,v in d.items() if k != '_additional'}
            if d.get('_additional'):
                for key in d['_additional']:
                    temp[key] = d['_additional'][key]
            results.append(temp)
        return results
    
    def update_ef_value(self, class_name: str, ef_value: int) -> str:
        '''
        Updates ef_value for a class (index) on the Weaviate instance.
        '''
        self.schema.update_config(class_name=class_name, config={'vectorIndexConfig': {'ef': ef_value}})
        print(f'ef_value updated to {ef_value} for class {class_name}')
        return self.show_class_config(class_name)['vectorIndexConfig']
        
    def keyword_search(self,
                       request: str,
                       class_name: str,
                       properties: List[str]=['content'],
                       limit: int=10,
                       where_filter: dict=None,
                       display_properties: List[str]=None,
                       return_raw: bool=False) -> Union[dict, List[dict]]:
        '''
        Executes Keyword (BM25) search. 

        Args
        ----
        query: str
            User query.
        class_name: str
            Class (index) to search.
        properties: List[str]
            List of properties to search across.
        limit: int=10
            Number of results to return.
        display_properties: List[str]=None
            List of properties to return in response.
            If None, returns all properties.
        return_raw: bool=False
            If True, returns raw response from Weaviate.
        '''
        display_properties = display_properties if display_properties else self.display_properties
        response = (self.query
                    .get(class_name, display_properties)
                    .with_bm25(query=request, properties=properties)
                    .with_additional(['score', "id"])
                    .with_limit(limit)
                    )
        response = response.with_where(where_filter).do() if where_filter else response.do()
        if return_raw:
            return response
        else: 
            return self.format_response(response, class_name)

    def vector_search(self,
                      request: str,
                      class_name: str,
                      limit: int=10,
                      where_filter: dict=None,
                      display_properties: List[str]=None,
                      return_raw: bool=False,
                      device: str='cuda:0' if cuda.is_available() else 'cpu'
                      ) -> Union[dict, List[dict]]:
        '''
        Executes vector search using embedding model defined on instantiation 
        of WeaviateClient instance.
        
        Args
        ----
        query: str
            User query.
        class_name: str
            Class (index) to search.
        limit: int=10
            Number of results to return.
        display_properties: List[str]=None
            List of properties to return in response.
            If None, returns all properties.
        return_raw: bool=False
            If True, returns raw response from Weaviate.
        '''
        display_properties = display_properties if display_properties else self.display_properties
        query_vector = self._create_query_vector(request, device=device)
        response = (
                    self.query
                    .get(class_name, display_properties)
                    .with_near_vector({"vector": query_vector})
                    .with_limit(limit)
                    .with_additional(['distance'])
                    )
        response = response.with_where(where_filter).do() if where_filter else response.do()
        if return_raw:
            return response
        else: 
            return self.format_response(response, class_name)     
    
    def _create_query_vector(self, query: str, device: str) -> List[float]:
        '''
        Creates embedding vector from text query.
        '''
        return self.get_openai_embedding(query) if self.openai_model else self.model.encode(query, device=device).tolist()
    
    def get_openai_embedding(self, query: str) -> List[float]:
        '''
        Gets embedding from OpenAI API for query.
        '''
        embedding = self.model.embeddings.create(input=query, model='text-embedding-ada-002').model_dump()
        if embedding:
            return embedding['data'][0]['embedding']
        else:
           raise ValueError(f'No embedding found for query: {query}')
        
    def hybrid_search(self,
                      request: str,
                      class_name: str,
                      properties: List[str]=['content'],
                      alpha: float=0.5,
                      limit: int=10,
                      where_filter: dict=None,
                      display_properties: List[str]=None,
                      return_raw: bool=False,
                      device: str='cuda:0' if cuda.is_available() else 'cpu'
                     ) -> Union[dict, List[dict]]:
        '''
        Executes Hybrid (BM25 + Vector) search.
        
        Args
        ----
        query: str
            User query.
        class_name: str
            Class (index) to search.
        properties: List[str]
            List of properties to search across (using BM25)
        alpha: float=0.5
            Weighting factor for BM25 and Vector search.
            alpha can be any number from 0 to 1, defaulting to 0.5:
                alpha = 0 executes a pure keyword search method (BM25)
                alpha = 0.5 weighs the BM25 and vector methods evenly
                alpha = 1 executes a pure vector search method
        limit: int=10
            Number of results to return.
        display_properties: List[str]=None
            List of properties to return in response.
            If None, returns all properties.
        return_raw: bool=False
            If True, returns raw response from Weaviate.
        '''
        display_properties = display_properties if display_properties else self.display_properties
        query_vector = self._create_query_vector(request, device=device)
        response = (
                    self.query
                    .get(class_name, display_properties)
                    .with_hybrid(query=request,
                                 alpha=alpha,
                                 vector=query_vector,
                                 properties=properties,
                                 fusion_type='relativeScoreFusion') #hard coded option for now
                    .with_additional(["score", "explainScore"])
                    .with_limit(limit)
                    )
        
        response = response.with_where(where_filter).do() if where_filter else response.do()
        if return_raw:
            return response
        else: 
            return self.format_response(response, class_name)
        
        
class WeaviateIndexer:

    def __init__(self,
                 client: WeaviateClient,
                 batch_size: int=150,
                 num_workers: int=4,
                 dynamic: bool=True,
                 creation_time: int=5,
                 timeout_retries: int=3,
                 connection_error_retries: int=3,
                 callback: Callable=None,
                 ):
        '''
        Class designed to batch index documents into Weaviate. Instantiating
        this class will automatically configure the Weaviate batch client.
        '''
        self._client = client
        self._callback = callback if callback else self._default_callback
        
        self._client.batch.configure(batch_size=batch_size,
                                     num_workers=num_workers,
                                     dynamic=dynamic,
                                     creation_time=creation_time,
                                     timeout_retries=timeout_retries,
                                     connection_error_retries=connection_error_retries,
                                     callback=self._callback
                                    )
        
    def _default_callback(self, results: dict):
        """
        Check batch results for errors.

        Parameters
        ----------
        results : dict
            The Weaviate batch creation return value.
        """

        if results is not None:
            for result in results:
                if "result" in result and "errors" in result["result"]:
                    if "error" in result["result"]["errors"]:
                        print(result["result"])

    def batch_index_data(self,
                         data: List[dict], 
                         class_name: str,
                         vector_property: str='content_embedding'
                         ) -> None:
        '''
        Batch function for fast indexing of data onto Weaviate cluster. 
        This method assumes that self._client.batch is already configured.
        '''
        start = time.perf_counter()
        with self._client.batch as batch:
            for d in tqdm(data):
                
                #define single document 
                properties = {k:v for k,v in d.items() if k != vector_property}
                try:
                    #add data object to batch
                    batch.add_data_object(
                                        data_object=properties,
                                        class_name=class_name,
                                        vector=d[vector_property]
                                        )
                except Exception as e:
                    print(e)
                    continue

        end = time.perf_counter() - start
    
        print(f'Batch job completed in {round(end/60, 2)} minutes.')
        class_info = self._client.show_class_info()
        for i, c in enumerate(class_info):
            if c['class'] == class_name:
                print(class_info[i])
        self._client.batch.shutdown()

@dataclass
class WhereFilter:

    '''
    Simplified interface for constructing a WhereFilter object.

    Args
    ----
    path: List[str]
        List of properties to filter on.
    operator: str
        Operator to use for filtering. Options: ['And', 'Or', 'Equal', 'NotEqual', 
        'GreaterThan', 'GreaterThanEqual', 'LessThan', 'LessThanEqual', 'Like', 
        'WithinGeoRange', 'IsNull', 'ContainsAny', 'ContainsAll']
    value[dataType]: Union[int, bool, str, float, datetime]
        Value to filter on. The dataType suffix must match the data type of the 
        property being filtered on. At least and only one value type must be provided. 
    '''
    path: List[str]
    operator: str
    valueInt: int=None
    valueBoolean: bool=None
    valueText: str=None
    valueNumber: float=None
    valueDate = None

    def post_init(self):
        operators = ['And', 'Or', 'Equal', 'NotEqual','GreaterThan', 'GreaterThanEqual', 'LessThan',\
                      'LessThanEqual', 'Like', 'WithinGeoRange', 'IsNull', 'ContainsAny', 'ContainsAll']
        if self.operator not in operators:
            raise ValueError(f'operator must be one of: {operators}, got {self.operator}')
        values = [self.valueInt, self.valueBoolean, self.valueText, self.valueNumber, self.valueDate]
        if not any(values):
            raise ValueError('At least one value must be provided.')
        if len(values) > 1:
            raise ValueError('At most one value can be provided.')
    
    def todict(self):
        return {k:v for k,v in self.__dict__.items() if v is not None}