Devikalalitha
commited on
Commit
•
0821777
1
Parent(s):
fa7dea3
Update insert_into_milvus_db.py
Browse files- insert_into_milvus_db.py +71 -62
insert_into_milvus_db.py
CHANGED
@@ -54,73 +54,82 @@ def get_secrets():
|
|
54 |
|
55 |
|
56 |
def create_schema(uri: str, token: str, collection_name: str):
|
57 |
-
|
58 |
-
|
59 |
-
|
60 |
-
if not utility.has_collection(collection_name):
|
61 |
-
dim = 768 # embeddings dim
|
62 |
-
|
63 |
-
article_url = FieldSchema(name="article_url", dtype=DataType.VARCHAR, max_length=10000,
|
64 |
-
is_primary=True, description="url of the article")
|
65 |
-
article_title = FieldSchema(name="article_title", dtype=DataType.VARCHAR, max_length=5000,
|
66 |
-
is_primary=False, description="headline of the article")
|
67 |
-
|
68 |
-
article_src = FieldSchema(name="article_src", dtype=DataType.VARCHAR, max_length=1000,
|
69 |
-
is_primary=False, description="src of the article")
|
70 |
-
|
71 |
-
article_date = FieldSchema(name="article_date", dtype=DataType.VARCHAR, max_length=1000,
|
72 |
-
is_primary=False, description="date of the article")
|
73 |
-
|
74 |
-
article_age = FieldSchema(name="article_age", dtype=DataType.INT64,
|
75 |
-
is_primary=False, description="age of the article")
|
76 |
|
77 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
78 |
|
79 |
-
|
80 |
-
|
81 |
-
|
82 |
-
|
83 |
-
|
84 |
-
|
85 |
-
|
86 |
-
|
87 |
-
|
88 |
-
|
89 |
-
|
90 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
91 |
|
92 |
|
93 |
def prepare_docs(vectorizer):
|
94 |
-
|
95 |
-
|
96 |
-
|
97 |
-
|
98 |
-
|
99 |
-
|
100 |
-
|
101 |
-
|
102 |
-
|
103 |
-
|
104 |
-
|
105 |
-
|
106 |
-
|
107 |
-
|
108 |
-
|
|
|
|
|
|
|
109 |
|
110 |
|
111 |
def upsert_db(vectorizer, collection):
|
112 |
-
|
113 |
-
|
114 |
-
|
115 |
-
|
116 |
-
|
117 |
-
|
118 |
-
|
119 |
-
|
120 |
-
|
121 |
-
|
122 |
-
|
123 |
-
|
124 |
-
|
125 |
-
|
126 |
-
|
|
|
|
|
|
|
|
54 |
|
55 |
|
56 |
def create_schema(uri: str, token: str, collection_name: str):
|
57 |
+
try:
|
58 |
+
logger.warning('Entering create_schema()')
|
59 |
+
connections.connect("default", uri=uri, token=token)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
60 |
|
61 |
+
if not utility.has_collection(collection_name):
|
62 |
+
dim = 768 # embeddings dim
|
63 |
+
|
64 |
+
article_url = FieldSchema(name="article_url", dtype=DataType.VARCHAR, max_length=10000,
|
65 |
+
is_primary=True, description="url of the article")
|
66 |
+
article_title = FieldSchema(name="article_title", dtype=DataType.VARCHAR, max_length=5000,
|
67 |
+
is_primary=False, description="headline of the article")
|
68 |
|
69 |
+
article_src = FieldSchema(name="article_src", dtype=DataType.VARCHAR, max_length=1000,
|
70 |
+
is_primary=False, description="src of the article")
|
71 |
+
|
72 |
+
article_date = FieldSchema(name="article_date", dtype=DataType.VARCHAR, max_length=1000,
|
73 |
+
is_primary=False, description="date of the article")
|
74 |
+
|
75 |
+
article_age = FieldSchema(name="article_age", dtype=DataType.INT64,
|
76 |
+
is_primary=False, description="age of the article")
|
77 |
+
|
78 |
+
article_embed = FieldSchema(name="article_embed", dtype=DataType.FLOAT_VECTOR, dim=dim) # description embeddings
|
79 |
+
|
80 |
+
schema = CollectionSchema(fields=[article_url, article_title, article_src,
|
81 |
+
article_date, article_age, article_embed],
|
82 |
+
auto_id=False, description="collection of news articles")
|
83 |
+
logger.warning("Creating the collection")
|
84 |
+
collection = Collection(name=collection_name, schema=schema)
|
85 |
+
# logger.warning(f"Schema: {schema}")
|
86 |
+
logger.warning("Successfully created collection")
|
87 |
+
else:
|
88 |
+
collection = Collection(name=collection_name)
|
89 |
+
logger.warning("Using existing collection")
|
90 |
+
logger.warning('Exiting create_schema()')
|
91 |
+
return collection
|
92 |
+
except:
|
93 |
+
raise
|
94 |
|
95 |
|
96 |
def prepare_docs(vectorizer):
|
97 |
+
try:
|
98 |
+
logger.warning('Entering prepare_docs()')
|
99 |
+
logger.warning('Retrieving latest news')
|
100 |
+
news_df = get_news()
|
101 |
+
if news_df is None:
|
102 |
+
raise Exception("ERROR: No latest news in retrieved")
|
103 |
+
logger.warning('Successfully retrieved latest news')
|
104 |
+
article_url = news_df['url'].tolist()
|
105 |
+
article_title = news_df['title'].tolist()
|
106 |
+
article_src = news_df['src'].tolist()
|
107 |
+
article_date = news_df['parsed_date'].tolist()
|
108 |
+
article_age = news_df['news_age'].tolist()
|
109 |
+
article_embed = vectorizer.vectorize_(article_title)
|
110 |
+
logger.warning('Exiting prepare_docs()')
|
111 |
+
return [article_url, article_title, article_src,
|
112 |
+
article_date, article_age, article_embed]
|
113 |
+
except:
|
114 |
+
raise
|
115 |
|
116 |
|
117 |
def upsert_db(vectorizer, collection):
|
118 |
+
try:
|
119 |
+
logger.warning('Entering upsert_db()')
|
120 |
+
collection_is_empty = 0
|
121 |
+
if collection.is_empty:
|
122 |
+
collection_is_empty = 1
|
123 |
+
|
124 |
+
docs_to_upsert = prepare_docs(vectorizer)
|
125 |
+
ins_resp = collection.upsert(docs_to_upsert)
|
126 |
+
if ins_resp.err_count != 0:
|
127 |
+
raise Exception(f'Milvus Insertion not successful. {ins_resp.err_count} errors reported.')
|
128 |
+
if collection_is_empty:
|
129 |
+
index_params = {"index_type": "AUTOINDEX", "metric_type": "IP", "params": {}}
|
130 |
+
collection.create_index(field_name='article_embed', index_params=index_params)
|
131 |
+
collection.load()
|
132 |
+
logger.warning('Upsert successful')
|
133 |
+
logger.warning('Exiting upsert_db()')
|
134 |
+
except:
|
135 |
+
raise
|