Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import pandas as pd | |
| import cv2 | |
| import redis | |
| # insight face | |
| from insightface.app import FaceAnalysis | |
| from sklearn.metrics import pairwise | |
| # time | |
| import time | |
| from datetime import datetime | |
| import os | |
| # Connect to Redis Client | |
| hostname = 'redis-11109.c321.us-east-1-2.ec2.cloud.redislabs.com' | |
| portnumber = 11109 | |
| password = 'HI3acnzpSHJcssm7qtuuv7GmeakjIEKl' | |
| r = redis.StrictRedis(host=hostname, | |
| port=portnumber, | |
| password=password) | |
| # Retrive Data from database | |
| def retrive_data(name): | |
| retrive_dict= r.hgetall(name) | |
| retrive_series = pd.Series(retrive_dict) | |
| retrive_series = retrive_series.apply(lambda x: np.frombuffer(x,dtype=np.float32)) | |
| index = retrive_series.index | |
| index = list(map(lambda x: x.decode(), index)) | |
| retrive_series.index = index | |
| retrive_df = retrive_series.to_frame().reset_index() | |
| retrive_df.columns = ['name_role','facial_features'] | |
| retrive_df[['Name','Role']] = retrive_df['name_role'].apply(lambda x: x.split('@')).apply(pd.Series) | |
| return retrive_df[['Name','Role','facial_features']] | |
| # configure face analysis | |
| faceapp = FaceAnalysis(name='buffalo_sc',root='insightface_model', providers = ['CPUExecutionProvider']) | |
| faceapp.prepare(ctx_id = 0, det_size=(640,640), det_thresh = 0.5) | |
| # ML Search Algorithm | |
| def ml_search_algorithm(dataframe,feature_column,test_vector, | |
| name_role=['Name','Role'],thresh=0.5): | |
| """ | |
| cosine similarity base search algorithm | |
| """ | |
| # step-1: take the dataframe (collection of data) | |
| dataframe = dataframe.copy() | |
| # step-2: Index face embeding from the dataframe and convert into array | |
| X_list = dataframe[feature_column].tolist() | |
| x = np.asarray(X_list) | |
| # step-3: Cal. cosine similarity | |
| similar = pairwise.cosine_similarity(x,test_vector.reshape(1,-1)) | |
| similar_arr = np.array(similar).flatten() | |
| dataframe['cosine'] = similar_arr | |
| # step-4: filter the data | |
| data_filter = dataframe.query(f'cosine >= {thresh}') | |
| if len(data_filter) > 0: | |
| # step-5: get the person name | |
| data_filter.reset_index(drop=True,inplace=True) | |
| argmax = data_filter['cosine'].argmax() | |
| person_name, person_role = data_filter.loc[argmax][name_role] | |
| else: | |
| person_name = 'Unknown' | |
| person_role = 'Unknown' | |
| return person_name, person_role | |
| ### Real Time Prediction | |
| # we need to save logs for every 1 mins | |
| class RealTimePred: | |
| def __init__(self): | |
| self.logs = dict(name=[],role=[],current_time=[]) | |
| def reset_dict(self): | |
| self.logs = dict(name=[],role=[],current_time=[]) | |
| def saveLogs_redis(self): | |
| # step-1: create a logs dataframe | |
| dataframe = pd.DataFrame(self.logs) | |
| # step-2: drop the duplicate information (distinct name) | |
| dataframe.drop_duplicates('name',inplace=True) | |
| # step-3: push data to redis database (list) | |
| # encode the data | |
| name_list = dataframe['name'].tolist() | |
| role_list = dataframe['role'].tolist() | |
| ctime_list = dataframe['current_time'].tolist() | |
| encoded_data = [] | |
| for name, role, ctime in zip(name_list, role_list, ctime_list): | |
| if name != 'Unknown': | |
| concat_string = f"{name}@{role}@{ctime}" | |
| encoded_data.append(concat_string) | |
| if len(encoded_data) >0: | |
| r.lpush('attendance:logs',*encoded_data) | |
| self.reset_dict() | |
| def face_prediction(self,test_image, dataframe,feature_column, | |
| name_role=['Name','Role'],thresh=0.5): | |
| # step-1: find the time | |
| current_time = str(datetime.now()) | |
| # step-1: take the test image and apply to insight face | |
| results = faceapp.get(test_image) | |
| test_copy = test_image.copy() | |
| # step-2: use for loop and extract each embedding and pass to ml_search_algorithm | |
| for res in results: | |
| x1, y1, x2, y2 = res['bbox'].astype(int) | |
| embeddings = res['embedding'] | |
| person_name, person_role = ml_search_algorithm(dataframe, | |
| feature_column, | |
| test_vector=embeddings, | |
| name_role=name_role, | |
| thresh=thresh) | |
| if person_name == 'Unknown': | |
| color =(0,0,255) # bgr | |
| else: | |
| color = (0,255,0) | |
| cv2.rectangle(test_copy,(x1,y1),(x2,y2),color) | |
| text_gen = person_name | |
| cv2.putText(test_copy,text_gen,(x1,y1),cv2.FONT_HERSHEY_DUPLEX,0.7,color,2) | |
| cv2.putText(test_copy,current_time,(x1,y2+10),cv2.FONT_HERSHEY_DUPLEX,0.7,color,2) | |
| # save info in logs dict | |
| self.logs['name'].append(person_name) | |
| self.logs['role'].append(person_role) | |
| self.logs['current_time'].append(current_time) | |
| return test_copy | |
| #### Registration Form | |
| class RegistrationForm: | |
| def __init__(self): | |
| self.sample = 0 | |
| def reset(self): | |
| self.sample = 0 | |
| def get_embedding(self,frame): | |
| # get results from insightface model | |
| results = faceapp.get(frame,max_num=1) | |
| embeddings = None | |
| for res in results: | |
| self.sample += 1 | |
| x1, y1, x2, y2 = res['bbox'].astype(int) | |
| cv2.rectangle(frame, (x1,y1),(x2,y2),(0,255,0),1) | |
| # put text samples info | |
| text = f"samples = {self.sample}" | |
| cv2.putText(frame,text,(x1,y1),cv2.FONT_HERSHEY_DUPLEX,0.6,(255,255,0),2) | |
| # facial features | |
| embeddings = res['embedding'] | |
| return frame, embeddings | |
| def save_data_in_redis_db(self,name,role): | |
| # validation name | |
| if name is not None: | |
| if name.strip() != '': | |
| key = f'{name}@{role}' | |
| else: | |
| return 'name_false' | |
| else: | |
| return 'name_false' | |
| # if face_embedding.txt exists | |
| if 'face_embedding.txt' not in os.listdir(): | |
| return 'file_false' | |
| # step-1: load "face_embedding.txt" | |
| x_array = np.loadtxt('face_embedding.txt',dtype=np.float32) # flatten array | |
| # step-2: convert into array (proper shape) | |
| received_samples = int(x_array.size/512) | |
| x_array = x_array.reshape(received_samples,512) | |
| x_array = np.asarray(x_array) | |
| # step-3: cal. mean embeddings | |
| x_mean = x_array.mean(axis=0) | |
| x_mean = x_mean.astype(np.float32) | |
| x_mean_bytes = x_mean.tobytes() | |
| # step-4: save this into redis database | |
| # redis hashes | |
| r.hset(name='academy:register',key=key,value=x_mean_bytes) | |
| # | |
| os.remove('face_embedding.txt') | |
| self.reset() | |
| return True |