|
from src.services.utils import tech_to_dict, stem |
|
import requests as r |
|
import json |
|
import nltk |
|
import itertools |
|
import numpy as np |
|
|
|
from sentence_transformers import * |
|
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') |
|
|
|
def retrieve_constraints(prompt): |
|
request_input = {"models": ["meta-llama/llama-4-scout-17b-16e-instruct"], "messages": [{"role":"user", "content":prompt}]} |
|
response = r.post("https://organizedprogrammers-bettergroqinterface.hf.space/chat", json=request_input) |
|
print(f"response : {response}") |
|
decoded_content = json.loads(response.content.decode()) |
|
llm_response = decoded_content["content"] |
|
|
|
start_marker = '{' |
|
end_marker = '}' |
|
start_index = llm_response.find(start_marker) + len(start_marker) |
|
end_index = llm_response.find(end_marker, start_index) |
|
json_str = llm_response[start_index:end_index].strip() |
|
|
|
constraints_json = json.loads("{"+json_str+"}") |
|
|
|
return constraints_json |
|
|
|
|
|
def preprocess_tech_data(_df): |
|
if _df is None or "description" not in _df.columns: |
|
return [], [] |
|
|
|
technologies_list = _df["description"].to_list() |
|
tech_dict_raw = tech_to_dict(technologies_list) |
|
|
|
tech_dict_filtered = [ |
|
t for t in tech_dict_raw if ( |
|
len(t.get("title", "")) >= 5 and |
|
len(t.get("advantages", "")) >= 5 and |
|
len(t.get("key_components", "")) >= 5 |
|
) |
|
] |
|
|
|
if not tech_dict_filtered: |
|
return [], [] |
|
|
|
processed_tech_wt = stem(tech_dict_filtered,"technologies") |
|
|
|
for t_item_wt in processed_tech_wt: |
|
kc = t_item_wt.get("key_components") |
|
if isinstance(kc, str): |
|
t_item_wt["key_components"] = ''.join(nltk.sent_tokenize(kc)) |
|
else: |
|
t_item_wt["key_components"] = "" |
|
|
|
original_tech_for_display = tech_dict_filtered[:len(processed_tech_wt)] |
|
|
|
|
|
_keys = list(processed_tech_wt[0].keys()) if processed_tech_wt else [] |
|
return processed_tech_wt, _keys, original_tech_for_display |
|
|
|
|
|
def remove_over_repeated_technologies(result): |
|
total_lists = len(result) |
|
tech_title = {} |
|
|
|
for idx, item in enumerate(result): |
|
for tech in item['technologies']: |
|
tech_title[tech[0]['title']] = 0 if tech[0]['title'] not in tech_title else tech_title[tech[0]['title']] + 1 |
|
|
|
threshold = total_lists * 0.3 |
|
print(threshold) |
|
print(tech_title) |
|
to_delete = [] |
|
for tech, lists in tech_title.items(): |
|
if lists > threshold: |
|
print(f"This technology have been found over repeated : " + tech) |
|
to_delete.append(tech) |
|
|
|
for idx, item in enumerate(result): |
|
result[idx]['technologies'] = [tech for tech in item['technologies'] if tech[0]['title'] not in to_delete] |
|
|
|
return result |
|
|
|
def get_contrastive_similarities(constraints, pre_encoded_tech_data, pre_encoded_tech_embeddings): |
|
selected_pairs = [] |
|
matrix = [] |
|
|
|
constraint_descriptions = [c["description"] for c in constraints] |
|
constraint_embeddings = model.encode(constraint_descriptions, show_progress_bar=False) |
|
|
|
for i, constraint in enumerate(constraints): |
|
constraint_embedding = constraint_embeddings[i] |
|
constraint_matrix = [] |
|
for j, tech2 in enumerate(pre_encoded_tech_data): |
|
tech_embedding = pre_encoded_tech_embeddings[j] |
|
|
|
purpose_sim = model.similarity(constraint_embedding, tech_embedding) |
|
|
|
if np.isnan(purpose_sim): |
|
purpose_sim = 0.0 |
|
|
|
selected_pairs.append({ |
|
"constraint": constraint, |
|
"id2": tech2["id"], |
|
"similarity": purpose_sim |
|
}) |
|
constraint_matrix.append(purpose_sim) |
|
matrix.append(constraint_matrix) |
|
return selected_pairs, matrix |
|
|
|
def find_best_list_combinations(list1: list[str], list2: list[str], matrix) -> list[dict]: |
|
if not list1 or not list2: |
|
print("Warning: One or both input lists are empty. Returning an empty list.") |
|
return [] |
|
|
|
MIN_SIMILARITY = 0.3 |
|
MAX_SIMILARITY = 0.8 |
|
|
|
possible_matches_for_each_l1 = [] |
|
for i in range(len(list1)): |
|
valid_matches_for_l1_element = [] |
|
for j in range(len(list2)): |
|
score = matrix[i][j] |
|
|
|
if MIN_SIMILARITY <= score <= MAX_SIMILARITY: |
|
valid_matches_for_l1_element.append((list2[j], score)) |
|
|
|
if not valid_matches_for_l1_element: |
|
print(f"No valid matches found in list2 for '{list1[i]}' from list1 " |
|
f"(score between {MIN_SIMILARITY} and {MAX_SIMILARITY}). " |
|
"Returning an empty list as no complete combinations can be formed.") |
|
|
|
else: |
|
possible_matches_for_each_l1.append((valid_matches_for_l1_element, list1[i])) |
|
|
|
result = [] |
|
for tech_list, problem in possible_matches_for_each_l1: |
|
sorted_list = sorted( |
|
tech_list, |
|
key=lambda x: x[1].item() if hasattr(x[1], 'item') else float(x[1]), |
|
reverse=True |
|
) |
|
top5 = sorted_list[:5] |
|
result.append({ |
|
'technologies': top5, |
|
'problem': problem |
|
}) |
|
|
|
result = remove_over_repeated_technologies(result) |
|
return result |
|
|
|
|
|
def select_technologies(problem_technology_list): |
|
distinct_techs = set() |
|
candidate_map = [] |
|
|
|
for problem_data in problem_technology_list: |
|
cand_dict = {} |
|
for tech_info, sim in problem_data['technologies']: |
|
tech_id = tech_info['id'] |
|
distinct_techs.add(tech_id) |
|
cand_dict[tech_id] = float(sim) |
|
candidate_map.append(cand_dict) |
|
|
|
distinct_techs = sorted(list(distinct_techs)) |
|
n = len(problem_technology_list) |
|
|
|
if n == 0: |
|
return set() |
|
|
|
min_k = None |
|
best_set = None |
|
best_avg = -1 |
|
|
|
print(f"Distinct technologies: {distinct_techs}") |
|
print(f"Candidate map: {candidate_map}") |
|
print(f"Number of problems: {n}") |
|
|
|
for k in range(1, len(distinct_techs)+1): |
|
if min_k is not None and k > min_k: |
|
break |
|
|
|
for T in itertools.combinations(distinct_techs, k): |
|
total_sim = 0.0 |
|
covered = True |
|
print(f"Trying combination: {T}") |
|
for i in range(n): |
|
max_sim = -1.0 |
|
found = False |
|
for tech in T: |
|
if tech in candidate_map[i]: |
|
found = True |
|
sim_val = candidate_map[i][tech] |
|
if sim_val > max_sim: |
|
max_sim = sim_val |
|
if not found: |
|
covered = False |
|
break |
|
else: |
|
total_sim += max_sim |
|
|
|
if covered: |
|
avg_sim = total_sim / n |
|
if min_k is None or k < min_k: |
|
min_k = k |
|
best_set = T |
|
best_avg = avg_sim |
|
elif k == min_k and avg_sim > best_avg: |
|
best_set = T |
|
best_avg = avg_sim |
|
|
|
if min_k is not None and k == min_k: |
|
break |
|
|
|
if best_set is None: |
|
return set() |
|
return set(best_set) |