|
import concurrent.futures |
|
from collections import defaultdict |
|
|
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
|
|
from ctm.configs.ctm_config_base import ( |
|
BaseConsciousnessTuringMachineConfig, |
|
) |
|
from ctm.processors.processor_base import BaseProcessor |
|
from ctm.supervisors.supervisor_base import BaseSupervisor |
|
|
|
|
|
class BaseConsciousnessTuringMachine(object): |
|
def __call__(self, *args, **kwargs): |
|
return self.forward(*args, **kwargs) |
|
|
|
def __init__(self, ctm_name=None, *args, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
if ctm_name: |
|
self.config = BaseConsciousnessTuringMachineConfig.from_ctm( |
|
ctm_name |
|
) |
|
else: |
|
self.config = BaseConsciousnessTuringMachineConfig() |
|
self.processor_list = [] |
|
self.processor_group_map = defaultdict(list) |
|
self.load_ctm() |
|
|
|
def add_processor(self, processor_name, group_name=None): |
|
processor_instance = BaseProcessor(processor_name) |
|
self.processor_list.append( |
|
{ |
|
"processor_name": processor_name, |
|
"processor_instance": processor_instance, |
|
} |
|
) |
|
if group_name: |
|
self.processor_group_map[processor_name] = group_name |
|
|
|
def add_supervisor(self, supervisor_name): |
|
supervisor_instance = BaseSupervisor(supervisor_name) |
|
self.supervisor = { |
|
"supervisor_name": supervisor_name, |
|
"supervisor_instance": supervisor_instance, |
|
} |
|
|
|
@staticmethod |
|
def ask_processor( |
|
processor, question, context, image_path, audio_path, video_path |
|
): |
|
processor_instance = processor["processor_instance"] |
|
processor_name = processor["processor_name"] |
|
gist, score = processor_instance.ask( |
|
question, context, image_path, audio_path, video_path |
|
) |
|
return {"name": processor_name, "gist": gist, "score": score} |
|
|
|
def ask_processors( |
|
self, question, context, image_path, audio_path, video_path |
|
): |
|
with concurrent.futures.ThreadPoolExecutor() as executor: |
|
futures = [ |
|
executor.submit( |
|
self.ask_processor, |
|
processor, |
|
question, |
|
context, |
|
image_path, |
|
audio_path, |
|
video_path, |
|
) |
|
for processor in self.processor_list |
|
] |
|
results = [ |
|
future.result() |
|
for future in concurrent.futures.as_completed(futures) |
|
] |
|
|
|
output = {} |
|
for result in results: |
|
output[result["name"]] = { |
|
"gist": result["gist"], |
|
"score": result["score"], |
|
} |
|
|
|
assert len(output) == len(self.processor_list) |
|
return output |
|
|
|
def uptree_competition(self, processor_output): |
|
|
|
gists, scores, names = [], [], [] |
|
for name, info in processor_output.items(): |
|
gists.append(info["gist"]) |
|
scores.append(info["score"]) |
|
names.append(name) |
|
|
|
|
|
unique_groups = set(self.processor_group_map.values()) |
|
|
|
|
|
best_processor_by_group = { |
|
group: (None, -1) for group in unique_groups |
|
} |
|
|
|
|
|
for name, score in zip(names, scores): |
|
group = self.processor_group_map[name] |
|
if score > best_processor_by_group[group][1]: |
|
best_processor_by_group[group] = (name, score) |
|
|
|
|
|
best_overall = max( |
|
best_processor_by_group.values(), key=lambda x: x[1] |
|
) |
|
best_name = best_overall[0] |
|
index = names.index(best_name) |
|
|
|
winning_info = { |
|
"name": best_name, |
|
"gist": gists[index], |
|
"score": scores[index], |
|
} |
|
return winning_info |
|
|
|
def ask_supervisor(self, question, processor_info): |
|
final_answer, score = self.supervisor["supervisor_instance"].ask( |
|
question, processor_info["gist"] |
|
) |
|
return final_answer, score |
|
|
|
def downtree_broadcast(self, winning_output): |
|
winning_processor_name = winning_output["name"] |
|
winning_processor_gist = winning_output["gist"] |
|
for processor in self.processor_list: |
|
if processor["processor_name"] != winning_processor_name: |
|
processor["processor_instance"].update_info( |
|
winning_processor_gist |
|
) |
|
return |
|
|
|
def calc_processor_sim(self, processor_output): |
|
processor_gists = [info["gist"] for info in processor_output.values()] |
|
tfidf_vectorizer = TfidfVectorizer() |
|
tfidf_matrix = tfidf_vectorizer.fit_transform(processor_gists) |
|
cosine_sim = cosine_similarity(tfidf_matrix, tfidf_matrix) |
|
return cosine_sim |
|
|
|
def link_form(self, processor_output): |
|
sim = self.calc_processor_sim(processor_output) |
|
print(sim) |
|
|
|
|
|
link_threshold = 0.5 |
|
for i in range(len(sim)): |
|
for j in range(i + 1, len(sim)): |
|
if sim[i][j] > 0.5: |
|
processor1_name = list(processor_output.keys())[i] |
|
processor2_name = list(processor_output.keys())[j] |
|
|
|
|
|
group1 = self.processor_group_map[processor1_name] |
|
group2 = self.processor_group_map[processor2_name] |
|
|
|
group1_count = sum( |
|
[ |
|
1 |
|
for group in self.processor_group_map.values() |
|
if group == group1 |
|
] |
|
) |
|
group2_count = sum( |
|
[ |
|
1 |
|
for group in self.processor_group_map.values() |
|
if group == group2 |
|
] |
|
) |
|
|
|
group_name = ( |
|
group1 if group1_count > group2_count else group2 |
|
) |
|
self.processor_group_map[processor1_name] = group_name |
|
self.processor_group_map[processor2_name] = group_name |
|
return |
|
|
|
def processor_fuse(self, infos, scores): |
|
return infos, scores |
|
|
|
def forward( |
|
self, |
|
question=None, |
|
context=None, |
|
image_path=None, |
|
audio_path=None, |
|
video_path=None, |
|
): |
|
answer_threshold = 0.5 |
|
max_iter = 3 |
|
|
|
for i in range(max_iter): |
|
print("start the {}-th iteration".format(i + 1)) |
|
processor_output = self.ask_processors( |
|
question=question, |
|
context=context, |
|
image_path=image_path, |
|
audio_path=audio_path, |
|
video_path=video_path, |
|
) |
|
import pdb |
|
|
|
pdb.set_trace() |
|
winning_output = self.uptree_competition(processor_output) |
|
answer, score = self.ask_supervisor(question, winning_output) |
|
if score > answer_threshold: |
|
break |
|
else: |
|
self.downtree_broadcast(winning_output) |
|
self.link_form(processor_output) |
|
return answer, score |
|
|
|
def load_ctm(self): |
|
for ( |
|
group_name, |
|
processor_list, |
|
) in self.config.groups_of_processors.items(): |
|
for processor_name in processor_list: |
|
self.add_processor(processor_name, group_name=group_name) |
|
self.add_supervisor(self.config.supervisor) |
|
|