import kuzu import hashlib import os import logging import openpyxl import pandas as pd import uuid import json import networkx as nx import time import numpy as np class SocialNetworkAnalysis: data_dir: str k_client: kuzu.Connection def __init__(self,name:str): session_id = hashlib.sha256(name.encode("utf-8")).hexdigest() db = kuzu.Database(f"./db/{session_id}_db") self.k_client = kuzu.Connection(db) if not os.path.exists(f"./data/{session_id}"): os.makedirs(f"./data/{session_id}") self.data_dir = f"./data/{session_id}" def get_files(self): all_files = [] logging.info("Getting files") logging.info(f"Data dir: {self.data_dir}") for root, dirs, files in os.walk(self.data_dir): for file in files: all_files.append(os.path.join(root, file)) return all_files def add_file(self, file): logging.info(f"Adding file: {file}") with open(f"{self.data_dir}/{file.name}","wb") as f: f.write(file.getvalue()) def generate_data_dictionary(self, file_path): try: workbook = openpyxl.load_workbook(file_path,data_only=True) except Exception as e: return { 'status': False, 'message': str(e) } helper = {} try: for sheet_name in workbook.sheetnames: helper[sheet_name] = {} sheet = workbook[sheet_name] if "net_" in sheet_name: helper[sheet_name]["object_name"] = sheet_name.replace("net_","").split("_")[1].lower().replace(" ","_").replace("-","_").replace(".","_").replace("(","_").replace(")","_") helper[sheet_name]["isTable"] = False helper[sheet_name]["isRelationship"] = True helper[sheet_name]["source"] = "?" helper[sheet_name]["target"] = "?" else: is_header_row = True for row in sheet.iter_rows(values_only=True): if is_header_row: helper[sheet_name]["object_name"] = sheet_name.lower().replace(" ","_").replace("-","_").replace(".","_").replace("(","_").replace(")","_") helper[sheet_name]["primary_key"] = "?" helper[sheet_name]["columns"] = {} helper[sheet_name]["isTable"] = True helper[sheet_name]["isRelationship"] = False header_row = row is_header_row = False for column in header_row: if column is not None: helper[sheet_name]["columns"][column.lower().replace(" ","_").replace("-","_").replace(".","_").replace("(","_").replace(")","_")] = "STRING" break except Exception as e: return { 'status': False, 'message': str(e) } return helper def clean_data(self, df): # reads the data provided in pandas dataframe and cleans missing values. df.replace('N/A', None, inplace=True) df.replace('NA', None, inplace=True) non_none_columns = [col for col in df.columns if col is not None] # Create a new DataFrame containing only columns with non-None names df = df[non_none_columns] return df def load_data(self, file_path,helper): if not os.path.exists(self.data_dir+"/helpers.json"): with open(self.data_dir+"/helpers.json","w") as f: f.write(json.dumps({})) helpers = {} with open(self.data_dir+"/helpers.json","r") as f: helpers = json.load(f) helpers[file_path] = helper with open(self.data_dir+"/helpers.json","w") as f: f.write(json.dumps(helpers)) try: workbook = openpyxl.load_workbook(file_path,data_only=True) except Exception as e: return { 'status': False, 'message': str(e) } all_data = {} try: for sheet_name in workbook.sheetnames: sheet = workbook[sheet_name] sheet_data = [] is_header_row = True for row in sheet.iter_rows(values_only=True): if is_header_row: header_row = row is_header_row = False continue sheet_data.append(dict(zip(header_row, row))) all_data[sheet_name] = sheet_data except Exception as e: return { 'status': False, 'message': str(e) } try: for key, value in all_data.items(): if key not in helper.keys(): continue if helper[key]["isTable"]: cleaned_data = self.clean_data(pd.DataFrame(value)) create_table_response = self.create_tables(helper[key]) if create_table_response['status'] == False: return create_table_response if not os.path.exists("./tmp"): os.makedirs("./tmp") tmp_file_path = f"""./tmp/{str(uuid.uuid4()).replace("-","")}.csv""" cleaned_data.to_csv(tmp_file_path, index=False,header=False) self.ingest_data(helper[key],tmp_file_path) os.remove(tmp_file_path) continue except Exception as e: return { 'status': False, 'message': str(e) } try: for key, value in all_data.items(): if key not in helper.keys(): continue if helper[key]["isRelationship"] and key in helper.keys(): cleaned_data = self.clean_data(pd.DataFrame(value)) create_rel_response = self.create_relationships(helper[key]) if create_rel_response['status'] == False: return create_rel_response if not os.path.exists("./tmp"): os.makedirs("./tmp") tmp_file_path = f"""./tmp/{str(uuid.uuid4()).replace("-","")}.csv""" cleaned_data.to_csv(tmp_file_path, index=False,header=False) self.ingest_relationships(helper[key],tmp_file_path) os.remove(tmp_file_path) continue except Exception as e: return { 'status': False, 'message': str(e) } return { 'status': True, 'message': "Data loaded successfully" } def create_tables(self, helper): columns = "" logging.info(helper) for key, value in helper["columns"].items(): columns += f"{key} {value}," columns += f"""PRIMARY KEY ({helper["primary_key"]})""" try: self.k_client.execute(f"""CREATE NODE TABLE {helper["object_name"]}( {columns} )""") except Exception as e: if "already exists" in str(e): return { 'status': True, 'message': "Table already exists" } return { 'status': True, 'message': "Table created successfully" } def create_relationships(self, helper): # CREATE REL TABLE LivesIn(FROM User TO City) try: self.k_client.execute(f"""CREATE REL TABLE {helper["object_name"]}( FROM {helper["source"]} TO {helper["target"]} )""") except Exception as e: if "already exists" in str(e): return { 'status': True, 'message': "Relationship already exists" } return { 'status': True, 'message': "Relationship created successfully" } def ingest_relationships(self,helper,tmp_csv_path): try: self.k_client.execute(f"""COPY {helper["object_name"]} FROM "{tmp_csv_path}" """) return { 'status': True, 'message': "Relationsip ingested successfully" } except Exception as e: print(str(e)) if "COPY commands can only" in str(e): return { 'status': True, 'message': "Relationship already exists" } def ingest_data(self,helper,tmp_csv_path): try: self.k_client.execute(f"""COPY {helper["object_name"]} FROM "{tmp_csv_path}" """) return { 'status': True, 'message': "Data ingested successfully" } except Exception as e: print(str(e)) if "COPY commands can only" in str(e): return { 'status': True, 'message': "Data already exists" } def get_helper(self): with open(self.data_dir+"/helpers.json","r") as f: return json.load(f) def save_query(self,query_name,query): if not os.path.exists(self.data_dir+"/queries.json"): with open(self.data_dir+"/queries.json","w") as f: f.write(json.dumps({ "sample_query": "MATCH (n) RETURN n LIMIT 10" })) time.sleep(0.5) queries = {} with open(self.data_dir+"/queries.json","r") as f: queries = json.load(f) queries[query_name] = query with open(self.data_dir+"/queries.json","w") as f: f.write(json.dumps(queries)) def get_queries(self): if not os.path.exists(self.data_dir+"/queries.json"): with open(self.data_dir+"/queries.json","w") as f: f.write(json.dumps({ "sample_query": "MATCH (n) RETURN n LIMIT 10" })) time.sleep(1) with open(self.data_dir+"/queries.json","r") as f: return json.load(f) def execute_query(self,query): try: results = self.k_client.execute(query) return results.get_as_df() except Exception as e: return { 'status': False, 'message': str(e) } def apply_networkx_analysis(self,df:pd.DataFrame,source,target): G = nx.from_pandas_edgelist(df, source=source, target=target) # Calculate the degree of each node degree = dict(nx.degree(G)) degree_centrality = nx.degree_centrality(G) closeness_centrality = nx.closeness_centrality(G) betweenness_centrality = nx.betweenness_centrality(G) eigenvector_centrality = nx.eigenvector_centrality(G) clustering = nx.clustering(G) pagerank = nx.pagerank(G) edge_betweenness_centrality = nx.edge_betweenness_centrality(G) df['degree'] = df[source].map(degree) df['degree_centrality'] = df[source].map(degree_centrality) df['closeness_centrality'] = df[source].map(closeness_centrality) df['betweenness_centrality'] = df[source].map(betweenness_centrality) df['eigenvector_centrality'] = df[source].map(eigenvector_centrality) df['clustering'] = df[source].map(clustering) df['pagerank'] = df[source].map(pagerank) df['edge_betweenness_centrality'] = df.apply(lambda row: edge_betweenness_centrality.get((row[source], row[target]), np.nan), axis=1) return df def fit_ergm(self,G: nx.DiGraph): num_edges = len(G.edges()) # Calculate the number of reciprocated edges num_recip_edges = sum((v, u) in G.edges() for u, v in G.edges()) # Calculate the density density = num_edges / (len(G.nodes()) * (len(G.nodes()) - 1)) # Calculate the reciprocity reciprocity = num_recip_edges / num_edges if num_edges > 0 else np.nan # Calculate the degree distribution degree_dist = dict(G.degree()) # Calculate the clustering coefficient clustering_coefficient = nx.average_clustering(G) # Calculate the transitivity transitivity = nx.transitivity(G) summary = { 'Density': density, 'Reciprocity': reciprocity, 'Degree Distribution': degree_dist, 'Clustering Coefficient': clustering_coefficient, 'Transitivity': transitivity } num_params = len(summary) num_edges = len(G.edges()) n = len(G.nodes()) # Calculate the log-likelihood log_likelihood = -0.5 * (num_edges * np.log(n * (n - 1)) - num_params) # Calculate AIC and BIC aic = -2 * log_likelihood + 2 * num_params bic = -2 * log_likelihood + num_params * np.log(n * (n - 1)) return { 'density': density, 'reciprocity': reciprocity, 'degree_distribution': degree_dist, 'clustering_coefficient': clustering_coefficient, 'transitivity': transitivity, 'num_edges': num_edges, 'num_nodes': n, 'log_likelihood': log_likelihood, 'aic': aic, 'bic': bic } def apply_ergm(self,df,source,target): G = nx.from_pandas_edgelist(df, source=source, target=target,create_using=nx.DiGraph()) observed_ergm_summary = self.fit_ergm(G) return { 'ergm': observed_ergm_summary }