Santiago Hincapie Potes commited on
Commit
f01bb12
1 Parent(s): bd231ba
src/deploy_utils.py ADDED
@@ -0,0 +1,98 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import typing as tp
2
+ import pandas as pd
3
+ import numpy as np
4
+ import faiss
5
+ from numpy import typing as ntp
6
+ import tensorflow_hub as tfhub
7
+ from sentence_transformers import SentenceTransformer
8
+ from sklearn.metrics.pairwise import cosine_distances
9
+
10
+
11
+ Embedder = tp.Callable[[list[str]], ntp.ArrayLike]
12
+
13
+
14
+ def load_model(model_name: str):
15
+ if "universal-sentence-encoder" in model_name:
16
+ model = tfhub.load("https://tfhub.dev/google/universal-sentence-encoder/4")
17
+
18
+ def inner_forward_fn(input_texts: list[str]):
19
+ return model(input_texts)
20
+
21
+ else:
22
+ model = SentenceTransformer(model_name)
23
+
24
+ def inner_forward_fn(input_texts: list[str]):
25
+ return model.encode(input_texts, convert_to_tensor=True)
26
+
27
+ return inner_forward_fn
28
+
29
+
30
+ def get_matching_reviews_ids(relevant_products: pd.DataFrame):
31
+ matching_reviews_ids = np.concatenate(
32
+ [np.array(i).reshape(-1) for i in relevant_products.reviewID.tolist()]
33
+ )
34
+
35
+ return matching_reviews_ids
36
+
37
+
38
+ def query_relevant_documents(
39
+ product_model: Embedder,
40
+ indexer: faiss.Index,
41
+ products: pd.DataFrame,
42
+ query_text: str,
43
+ ) -> pd.DataFrame:
44
+ embedded_query = product_model([query_text])
45
+ dist, idx = indexer.search(embedded_query, 64)
46
+
47
+ relevant_products = products.iloc[idx[dist < 1]]
48
+ return relevant_products
49
+
50
+
51
+ def get_relevant_reviews(
52
+ relevant_products: pd.DataFrame, reviews: pd.DataFrame
53
+ ) -> pd.DataFrame:
54
+ review_ids = ":".join(relevant_products.reviewID).split(":")
55
+ relevant_reviews = reviews.loc[review_ids].drop_duplicates("reviewText")
56
+
57
+ summaries = relevant_reviews.summary
58
+
59
+ relevant_reviews = relevant_reviews[~summaries.isna()]
60
+ relevant_reviews = relevant_reviews[~summaries.str.match(r"\w+ Star(s)?")]
61
+
62
+ return relevant_reviews
63
+
64
+
65
+ def clusterize_reviews(
66
+ relevant_reviews: pd.DataFrame,
67
+ reviews_embedder: Embedder,
68
+ clusterer,
69
+ ) -> pd.Series:
70
+ embedded_reviews = reviews_embedder(relevant_reviews.summary.tolist())
71
+ dist_matrix = cosine_distances(embedded_reviews).astype(np.float64)
72
+ clusters = clusterer.fit(dist_matrix)
73
+ return clusters.labels_
74
+
75
+
76
+ def get_key_reviews(
77
+ reviews_with_topics, extracted_topics, top_k_topics: int = 5
78
+ ) -> list[str]:
79
+ hist_of_topics = reviews_with_topics.topic.value_counts()
80
+ top_k = min(top_k_topics, len(hist_of_topics))
81
+ indices = hist_of_topics.iloc[:top_k].index
82
+
83
+ top_rated_reviews = set(
84
+ reviews_with_topics
85
+ .sort_values(['topic', 'overall'], ascending=False)
86
+ .groupby('topic')
87
+ .head(1)
88
+ .set_index('topic')
89
+ .loc[indices]
90
+ .reviewText
91
+ .tolist()
92
+ )
93
+ representative_reviews = {
94
+ extracted_topics[idx].representative_examples[0]
95
+ for idx in indices
96
+ }
97
+
98
+ return list(top_rated_reviews | representative_reviews)
src/modelling/semantic_search/vectordb_utils.py ADDED
@@ -0,0 +1,73 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import typing as tp
2
+ import tensorflow as tf
3
+ import faiss
4
+ import numpy as np
5
+ from faiss.contrib.ondisk import merge_ondisk
6
+ import pathlib
7
+
8
+
9
+ Embedding = tp.Callable[[list[str]], tf.Tensor]
10
+
11
+
12
+ def batch_list(input_list: list[tp.Any], batch_size: int) -> list[tp.Any]:
13
+ pointers = [*range(0, len(input_list), batch_size), None]
14
+ for i, j in zip(pointers, pointers[1:]):
15
+ yield input_list[i:j]
16
+
17
+
18
+ def train_index(
19
+ model: Embedding,
20
+ faiss_index_str: str,
21
+ text_batch: list[str],
22
+ trained_index_path: pathlib.Path,
23
+ ):
24
+ model_output = model(text_batch)
25
+ index = faiss.index_factory(512, faiss_index_str)
26
+ index.train(model_output)
27
+ faiss.write_index(index, trained_index_path)
28
+
29
+
30
+ def add_sharded_embeddings(
31
+ model: Embedding,
32
+ batched_inputs: list[list[str]],
33
+ trained_index_path: pathlib.Path,
34
+ shard_root_dir: pathlib.Path,
35
+ ):
36
+ for idx, batch in enumerate(batched_inputs):
37
+ index = faiss.read_index(trained_index_path)
38
+ encoded_tensor = model(batch)
39
+ index.add(encoded_tensor)
40
+ faiss.write_index(index, shard_root_dir / f"shard_{idx}.index")
41
+
42
+
43
+ def merge_shards(
44
+ trained_index_path: pathlib.Path,
45
+ shard_root_dir: pathlib.Path,
46
+ populated_index_path: pathlib.Path,
47
+ merged_index_path: pathlib.Path,
48
+ ):
49
+ # NOTE: run this on the deploy env!
50
+ index = faiss.read_index(trained_index_path)
51
+ block_fnames = list(shard_root_dir.iterdir())
52
+ merge_ondisk(index, block_fnames, merged_index_path)
53
+
54
+ faiss.write_index(index, populated_index_path)
55
+
56
+
57
+ def load_populated_index(
58
+ populated_index_path: pathlib.Path, nprobe: int = 16) -> faiss.Index:
59
+ populated_index = faiss.read_index(str(populated_index_path))
60
+ populated_index.nprobe = nprobe
61
+
62
+ return populated_index
63
+
64
+
65
+ def run_query(
66
+ populated_index: faiss.Index,
67
+ model: Embedding,
68
+ query: str,
69
+ top_k: int = 8) -> np.ndarray:
70
+ encoded_query = model([query])
71
+ _, idx = populated_index.search(encoded_query, top_k)
72
+
73
+ return idx[0]
src/modelling/topics/__init__.py ADDED
File without changes
src/modelling/topics/class_tf_idf.py ADDED
@@ -0,0 +1,68 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from sklearn.feature_extraction.text import TfidfTransformer
2
+ from sklearn.preprocessing import normalize
3
+ from sklearn.utils import check_array
4
+ import numpy as np
5
+ import scipy.sparse as sp
6
+
7
+
8
+ class ClassTfidfTransformer(TfidfTransformer):
9
+ """Class-based TF-IDF."""
10
+
11
+ def __init__(
12
+ self,
13
+ bm25_weighting: bool = False,
14
+ reduce_frequent_words: bool = False,
15
+ **kwargs
16
+ ):
17
+ self.bm25_weighting = bm25_weighting
18
+ self.reduce_frequent_words = reduce_frequent_words
19
+ super(ClassTfidfTransformer, self).__init__(**kwargs)
20
+
21
+ def fit(self, X: sp.csr_matrix, multiplier: np.ndarray = None):
22
+ X = check_array(X, accept_sparse=("csr", "csc"))
23
+ if not sp.issparse(X):
24
+ X = sp.csr_matrix(X)
25
+ dtype = np.float64
26
+
27
+ if self.use_idf:
28
+ _, n_features = X.shape
29
+
30
+ # Calculate the frequency of words across all classes
31
+ df = np.squeeze(np.asarray(X.sum(axis=0)))
32
+
33
+ # Calculate the average number of samples as regularization
34
+ avg_nr_samples = int(X.sum(axis=1).mean())
35
+
36
+ # BM25-inspired weighting procedure
37
+ if self.bm25_weighting:
38
+ idf = np.log(1 + ((avg_nr_samples - df + 0.5) / (df + 0.5)))
39
+
40
+ # Divide the average number of samples by the word frequency
41
+ # +1 is added to force values to be positive
42
+ else:
43
+ idf = np.log((avg_nr_samples / df) + 1)
44
+
45
+ # Multiplier to increase/decrease certain idf scores
46
+ if multiplier is not None:
47
+ idf = idf * multiplier
48
+
49
+ self._idf_diag = sp.diags(
50
+ idf,
51
+ offsets=0,
52
+ shape=(n_features, n_features),
53
+ format="csr",
54
+ dtype=dtype,
55
+ )
56
+
57
+ return self
58
+
59
+ def transform(self, X: sp.csr_matrix):
60
+ if self.use_idf:
61
+ X = normalize(X, axis=1, norm="l1", copy=False)
62
+
63
+ if self.reduce_frequent_words:
64
+ X.data = np.sqrt(X.data)
65
+
66
+ X = X * self._idf_diag
67
+
68
+ return X
src/modelling/topics/extraction_utils.py ADDED
@@ -0,0 +1,48 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import pandas as pd
2
+ import numpy as np
3
+ import scipy.sparse as sp
4
+
5
+
6
+ def group_reviews_per_topic(
7
+ reviews: pd.DataFrame, review_text_key: str) -> pd.DataFrame:
8
+ return (
9
+ reviews
10
+ .groupby('topic', as_index=False)
11
+ .agg({review_text_key: ' '.join})
12
+ )
13
+
14
+
15
+ def mark_empty_docs(text: str) -> str:
16
+ if not text:
17
+ return "emptydoc"
18
+ return text
19
+
20
+
21
+ def check_reviews_schema(reviews: pd.DataFrame):
22
+ pass
23
+
24
+
25
+ def top_n_idx_sparse(matrix: sp.csr_matrix, n: int) -> np.ndarray:
26
+ indices = []
27
+ for le, ri in zip(matrix.indptr[:-1], matrix.indptr[1:]):
28
+ n_row_pick = min(n, ri - le)
29
+ values = matrix.indices[
30
+ le + np.argpartition(matrix.data[le:ri], -n_row_pick)[-n_row_pick:]
31
+ ]
32
+ values = [
33
+ values[index] if len(values) >= index + 1 else None
34
+ for index in range(n)
35
+ ]
36
+ indices.append(values)
37
+ return np.array(indices)
38
+
39
+
40
+ def top_n_values_sparse(
41
+ matrix: sp.csr_matrix, indices: np.ndarray) -> np.ndarray:
42
+ top_values = []
43
+ for row, values in enumerate(indices):
44
+ scores = np.array([
45
+ matrix[row, value] if value is not None else 0 for value in values
46
+ ])
47
+ top_values.append(scores)
48
+ return np.array(top_values)
src/modelling/topics/topic_extractor.py ADDED
@@ -0,0 +1,191 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import dataclasses
2
+ import collections
3
+ import typing as tp
4
+
5
+ import numpy as np
6
+ import pandas as pd
7
+ import scipy.sparse as sp
8
+ from sklearn.metrics.pairwise import cosine_similarity
9
+ from sklearn.feature_extraction.text import CountVectorizer, TfidfTransformer
10
+
11
+ from src.modelling.topics import extraction_utils
12
+
13
+
14
+ TopicID = tp.Union[str, int]
15
+
16
+
17
+ @dataclasses.dataclass
18
+ class Topic:
19
+ topic_id: TopicID
20
+ n_grams: list[tuple[str, float]]
21
+ representative_examples: list[str]
22
+ stats: dict[str, float]
23
+ sentiment: tp.Optional[str] = None # postive | negative
24
+ text_label: tp.Optional[str] = None
25
+
26
+
27
+ @dataclasses.dataclass
28
+ class TopicExtractionConfig:
29
+ vectorizer_model: CountVectorizer
30
+ ctfidf_model: TfidfTransformer
31
+ number_of_grams_per_topic: int = 10
32
+ number_of_representative_documents: int = 3
33
+ reduce_topics: tp.Union[int, None] = None
34
+ review_text_key: str = "reviewText"
35
+
36
+ def get_vectorizer_model(self):
37
+ return self.vectorizer_model
38
+
39
+ def get_extraction_model(self):
40
+ return self.ctfidf_model
41
+
42
+
43
+ class TopicExtractor:
44
+ def __init__(self, config: TopicExtractionConfig):
45
+ self.config = config
46
+ self.vectorizer_model = self.config.get_vectorizer_model()
47
+ self.ctfidf_model = self.config.get_extraction_model()
48
+
49
+ self.review_text_key = self.config.review_text_key
50
+ self.c_tf_idf = None
51
+
52
+ def __call__(
53
+ self, reviews: pd.DataFrame
54
+ ) -> dict[TopicID, Topic]:
55
+ extraction_utils.check_reviews_schema(reviews)
56
+ topic_stats = self.compute_topic_stats(reviews)
57
+ self.extract_topics(reviews)
58
+
59
+ representative_examples = self.extract_representative_documents(
60
+ reviews)
61
+
62
+ return {
63
+ topic_id: Topic(
64
+ topic_id=topic_id,
65
+ n_grams=self.words_per_topic[topic_id],
66
+ representative_examples=example,
67
+ stats=topic_stats[topic_id]
68
+ ) for topic_id, example in representative_examples.items()
69
+ }
70
+
71
+ def extract_topics(self, reviews: pd.DataFrame):
72
+ reviews_per_topic = extraction_utils.group_reviews_per_topic(
73
+ reviews, self.review_text_key)
74
+ self.c_tf_idf, vocab = self.compute_c_tf_idf(reviews_per_topic)
75
+ self.words_per_topic = self.extract_words_per_topic(reviews, vocab)
76
+
77
+ def _prepare_c_tf_idf_text(self, raw_text: pd.Series) -> pd.Series:
78
+ clean_text = raw_text.str.replace("\n", " ")
79
+ clean_text = clean_text.str.replace("\t", " ")
80
+ clean_text = clean_text.str.replace(r"[^A-Za-z0-9 ]+", "", regex=True)
81
+ clean_text = clean_text.apply(extraction_utils.mark_empty_docs)
82
+
83
+ return clean_text
84
+
85
+ def compute_topic_stats(self, reviews):
86
+ return collections.defaultdict(dict)
87
+
88
+ def compute_c_tf_idf(
89
+ self,
90
+ reviews_per_topic: pd.DataFrame
91
+ ) -> tuple[sp.csr_matrix, np.ndarray]:
92
+ """Compute C-TF-IDF per topic
93
+
94
+ Args:
95
+ reviews_per_topic: A per topic dataframe, it must be the output of
96
+ `extraction_utils.group_reviews_per_topic`
97
+ """
98
+ clean_reviews = self._prepare_c_tf_idf_text(
99
+ reviews_per_topic[self.review_text_key])
100
+
101
+ # update in place
102
+ self.vectorizer_model.fit(clean_reviews)
103
+ vectorized_reviews = self.vectorizer_model.transform(clean_reviews)
104
+
105
+ vocab = self.vectorizer_model.get_feature_names_out()
106
+ c_tf_idf = self.ctfidf_model.fit_transform(vectorized_reviews)
107
+
108
+ return c_tf_idf, vocab
109
+
110
+ def extract_words_per_topic(
111
+ self, reviews: pd.DataFrame, vocab: np.ndarray):
112
+ labels = reviews.topic.unique().astype(int)
113
+
114
+ indices = extraction_utils.top_n_idx_sparse(
115
+ self.c_tf_idf, self.config.number_of_grams_per_topic
116
+ )
117
+ scores = extraction_utils.top_n_values_sparse(self.c_tf_idf, indices)
118
+ sorted_indices = np.argsort(scores, 1)
119
+ indices = np.take_along_axis(indices, sorted_indices, axis=1)
120
+ scores = np.take_along_axis(scores, sorted_indices, axis=1)
121
+
122
+ # Get top 30 words per topic based on c-TF-IDF score
123
+ topics = {
124
+ label: [
125
+ (vocab[word_index], score)
126
+ if word_index is not None and score > 0
127
+ else ("", 0.00001)
128
+ for word_index, score in
129
+ zip(indices[index][::-1], scores[index][::-1])
130
+ ]
131
+ for index, label in enumerate(labels)
132
+ }
133
+ topics = {
134
+ label: values[:self.config.number_of_grams_per_topic]
135
+ for label, values in topics.items()
136
+ }
137
+
138
+ return topics
139
+
140
+ def extract_representative_documents(self, reviews):
141
+ sample_reviews_per_topic = (
142
+ reviews.groupby('topic')
143
+ .sample(n=500, replace=True)
144
+ .drop_duplicates(subset=[self.review_text_key])
145
+ )
146
+
147
+ repr_docs = []
148
+ repr_docs_indices = []
149
+ repr_docs_mappings = {}
150
+ repr_docs_ids = []
151
+ labels = sorted(list(self.words_per_topic.keys()))
152
+
153
+ for index, topic in enumerate(labels):
154
+ # Slice data
155
+ selection = sample_reviews_per_topic.loc[
156
+ sample_reviews_per_topic.topic == topic, :]
157
+ selected_docs = selection[self.review_text_key].values
158
+ selected_full_docs = selection['reviewText'].values
159
+ selected_docs_ids = selection.index.tolist()
160
+
161
+ # Calculate similarity
162
+ nr_repr_docs = self.config.number_of_representative_documents
163
+ nr_docs = min(nr_repr_docs, len(selected_docs))
164
+ bow = self.vectorizer_model.transform(selected_docs)
165
+ ctfidf = self.ctfidf_model.transform(bow)
166
+ sim_matrix = cosine_similarity(ctfidf, self.c_tf_idf[index])
167
+
168
+ # TODO(shpotes): add diversity
169
+
170
+ # extract top n most representative documents
171
+ indices = np.argpartition(
172
+ sim_matrix.reshape(1, -1)[0], -nr_docs)[-nr_docs:]
173
+ docs = [selected_full_docs[index] for index in indices]
174
+
175
+ doc_ids = [
176
+ selected_docs_ids[index]
177
+ for index, doc in enumerate(selected_docs) if doc in docs
178
+ ]
179
+ repr_docs_ids.append(doc_ids)
180
+ repr_docs.extend(docs)
181
+ repr_docs_indices.append([
182
+ repr_docs_indices[-1][-1] + i + 1 if index != 0 else i
183
+ for i in range(nr_docs)
184
+ ])
185
+
186
+ repr_docs_mappings = {
187
+ topic: repr_docs[i[0]:i[-1]+1]
188
+ for topic, i in zip(self.words_per_topic.keys(), repr_docs_indices)
189
+ }
190
+
191
+ return repr_docs_mappings
src/streaming_loading.py ADDED
@@ -0,0 +1,45 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ import asyncio
3
+ from azure.eventhub.aio import EventHubConsumerClient
4
+ import os
5
+ from google.cloud import storage
6
+
7
+ os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/Users/camivasz/almond-datathon-ffcfe3899e67.json"
8
+
9
+
10
+ CONNECTION_STRING = "Endpoint=sb://factored-datathon.servicebus.windows.net/;SharedAccessKeyName=datathon_group_3;SharedAccessKey=JLEggz9GNlDdLvbypDAudzTABp+WnVeIY+AEhBAupi4=;EntityPath=factored_datathon_amazon_reviews_3"
11
+ EVENT_HUB_LISTEN_POLICY_KEY = "sJJnyi8GGTBAa55jY89kacoT6hXAzWx2B+AEhCPEKYE="
12
+ CONSUMER_GROUP = 'almond'
13
+ EVENT_HUB_NAME = "factored_datathon_amazon_reviews_3"
14
+
15
+ logger = logging.getLogger("azure.eventhub")
16
+ logging.basicConfig(level=logging.INFO)
17
+
18
+ async def on_event(partition_context, event):
19
+ filename = f"{partition_context.partition_id}_{event.sequence_number}.json"
20
+ source_file_name = f"reads/{filename}"
21
+ destination_blob_name = f"patition_0/{filename}"
22
+ with open(f"reads/{filename}", 'wb') as fp:
23
+ fp.write(next(event.body))
24
+ if event.sequence_number > 15391:
25
+ client_storage = storage.Client()
26
+ bucket_name = "amazon-reviews-almond-3"
27
+ bucket = client_storage.bucket(bucket_name)
28
+ blob = bucket.blob(destination_blob_name)
29
+ blob.upload_from_filename(source_file_name)
30
+ logger.info("Received event {} from partition {}".format(event.sequence_number, partition_context.partition_id))
31
+ await partition_context.update_checkpoint(event)
32
+
33
+ async def receive():
34
+ client = EventHubConsumerClient.from_connection_string(CONNECTION_STRING,
35
+ CONSUMER_GROUP,
36
+ eventhub_name=EVENT_HUB_NAME)
37
+ async with client:
38
+ await client.receive(
39
+ on_event=on_event,
40
+ starting_position="-1", # "-1" is from the beginning of the partition.
41
+ )
42
+
43
+ if __name__ == '__main__':
44
+ loop = asyncio.get_event_loop()
45
+ loop.run_until_complete(receive())