Dita Shahihah
Network Graph Engine + standalone demo (sample data + analytics + interactive viz)
f66e164
Raw
History Blame Contribute Delete
11.4 kB
import os
import sys
import jmespath
import pandas as pd
from typing import *
import networkx as nx
from datetime import datetime
from configparser import ConfigParser
path_this = os.path.dirname(os.path.abspath(__file__))
path_project = os.path.dirname(os.path.join(path_this, ".."))
path_root = os.path.dirname(os.path.join(path_this, "../.."))
sys.path.append(path_root)
sys.path.append(path_project)
from tools.base_model.base import BaseChatCampaign
from tools.utils.tool import (
DEFAULT_COLORS,
NODE_COLORS,
NODE_SHAPES,
ANALYSIS_TYPE,
ANALYSIS_TYPES_WITH_NODE_EDGE_COLOR,
)
class Network(BaseChatCampaign):
"""
Network graph formatter and visualisation attribute builder.
Takes a raw { nodes, edges } schema and produces a render-ready graph
dict with positions (NetworkX spring layout), colours, shapes, degree
centrality, and edge attributes — suitable for direct consumption by a
front-end graph renderer (e.g. Sigma.js, Cytoscape).
The three custom-network modules in this portfolio only use
generate_graph_with_clusters(); the _run() / _request() methods that
trigger full schema-building from raw ES data are not required for them.
Attributes:
topic: Optional topic label attached to the network.
cluster: If True, nodes are coloured by cluster; if False, by type.
language: Output label language ("id" or "en").
clusterType: Clustering strategy: "default", "issue", or "sentiment".
network_id: Unique ID stamped on generated files (defaults to wall time).
"""
config: ClassVar[ConfigParser] = ConfigParser()
config.read(os.path.join(path_root, "config.conf"))
topic: str = None
cluster: bool = True
language: Literal["id", "en"] = "id"
all_nodes_type: list = ["account", "hashtag", "main_issue"]
analysis_type: ANALYSIS_TYPE = "actor"
clusterType: Literal["default", "issue", "sentiment"] = "default"
network_id: str = datetime.now().strftime("%Y%m%d_%H%M%S%f")[:-3]
# ------------------------------------------------------------------
# Storage helper (optional — only called when save_file=True)
# ------------------------------------------------------------------
async def s3_upsert(self, path: str, data: dict) -> None:
connection = {
"url": self.config["s3"]["url"],
"key": self.config["s3"]["access_key"],
"secret_key": self.config["s3"]["secret_access_key"],
}
await self.s3_database(connection=connection, path=path, type_s3="upsert", data=data)
# ------------------------------------------------------------------
# Graph formatter — primary public method
# ------------------------------------------------------------------
async def generate_graph_with_clusters(
self,
member_all: dict,
platform: str,
save_file: bool = False,
color: str = None,
cid_target: str = None,
use_category_colors: bool = False,
edge_color_by: str = "sentiment",
) -> dict:
"""
Convert a raw { nodes, edges } schema into a render-ready network dict.
Processing steps:
1. Build a NetworkX DiGraph from nodes/edges.
2. Compute spring layout for (x, y) positions.
3. Compute degree centrality → node radius.
4. Assign node colour (by cluster, type, or category).
5. Assign node shape from NODE_SHAPES lookup.
6. For naration/event/main_issue nodes: attach first_account.
7. Assign edge colour (by sentiment, node colour, or ontology).
8. Return combined network dict; optionally persist to S3.
Args:
member_all: Dict with "nodes" (list) and "edges" (list).
platform: Platform label attached to the output dict.
save_file: If True, upload result to S3.
color: Override colour for a specific cluster.
cid_target: Cluster ID whose nodes should get the override colour.
use_category_colors: Colour nodes by their category field.
edge_color_by: Field used for edge colouring ("sentiment" or "node").
Returns:
dict: { "platform": str, "nodes": [...], "edges": [...] }
Each node has "data" and "attributes" sub-dicts.
Each edge has "data", "attributes", "source", "target", "id".
"""
def _node_color(row: dict, status_node: str, cluster_color: tuple) -> str:
if use_category_colors and row.get("category"):
return NODE_COLORS.get((row.get("cid") or "").split("@")[0], "#666666")
if self.cluster:
if self.clusterType == "sentiment" and status_node != "main_issue":
return NODE_COLORS.get((row.get("cid") or "").split("@")[0], "#666666")
return (
NODE_COLORS.get(status_node)
if self.clusterType == "sentiment"
else "#{:02x}{:02x}{:02x}".format(
int(cluster_color[0] * 255),
int(cluster_color[1] * 255),
int(cluster_color[2] * 255),
)
)
if platform == "ontology":
return NODE_COLORS.get("ontology", "#67AB9F")
return NODE_COLORS.get(status_node, "#67AB9F")
# 1. Build DiGraph
G = nx.DiGraph()
for row in member_all["nodes"]:
G.add_node(row["id"], cid=row.get("cid"), created_at=row.get("created_at"))
for row in member_all["edges"]:
G.add_edge(row["source"], row["target"], cid=row.get("cid"))
# 2. Layout + centrality
pos = nx.spring_layout(G, seed=42)
degree_centrality = nx.degree_centrality(G)
unique_cids = list(set(n.get("cid") for n in member_all["nodes"]))
cid_color_map = {cid: DEFAULT_COLORS[i % len(DEFAULT_COLORS)] for i, cid in enumerate(unique_cids)}
# 3. Build edge DataFrame for first_account lookup
df_edges = pd.DataFrame(member_all["edges"])
first_accounts: list = []
nodes_mapping: list = []
# 4–6. Process nodes
for row in member_all["nodes"]:
cid = row.get("cid")
status_node = row.get("type")
node_label = row["label"]
x, y = pos[node_label]
radius = 10 + degree_centrality.get(node_label, 0) * 1000
node_color = _node_color(row, status_node, cid_color_map.get(cid, (0.5, 0.5, 0.5)))
if color and cid == cid_target:
node_color = color
node_mapping = {
"data": {
"cid": cid,
"clusterName": (cid or "").split("@")[0],
"label": node_label,
"id": node_label,
"entity_id": row.get("entity_id"),
"typeNode": status_node,
"typeContent": row.get("typeContent"),
"timestamp": row.get("timestamp"),
"createdAt": row.get("created_at"),
"degreeCount": G.degree(node_label),
"inDegreeCount": G.in_degree(node_label),
"outDegreeCount": G.out_degree(node_label),
},
"attributes": {
"color": node_color,
"shape": NODE_SHAPES.get(status_node, "pentagon"),
"x": float(x * 1000),
"y": float(y * 1000),
"text": {
"content": node_label,
"position": "center",
"align": "center",
"scaling": "true",
"scale": 0.2,
},
"radius": float(radius),
},
"id": node_label,
}
# Attach contentNetwork and first_account for special node types
if jmespath.search("data.typeNode", node_mapping) in {"main_issue", "event", "naration"}:
content = row.get("contentNetwork")
if content:
node_mapping["data"]["contentNetwork"] = content
required_cols = {"type_source", "type_content", status_node}
if required_cols.issubset(df_edges.columns):
candidates = df_edges[
(df_edges["type_source"] == "account")
& (df_edges["type_content"] == "post")
& (df_edges[status_node] == node_label)
].sort_values("created_at").reset_index(drop=True)
if not candidates.empty:
idx = candidates["username"].first_valid_index()
if idx is not None:
uname = candidates.at[idx, "username"]
if isinstance(uname, str) and uname.strip():
node_mapping["data"]["first_account"] = f"@{uname}"
first_accounts.append(f"@{uname}")
nodes_mapping.append(node_mapping)
# Star shape for first-account nodes
for node in nodes_mapping:
if node["id"] in first_accounts:
node["attributes"]["shape"] = "star"
# 7. Process edges
node_color_map = {n["id"]: n["attributes"]["color"] for n in nodes_mapping}
edges_mapping = []
for row in member_all["edges"]:
if edge_color_by == "node":
edge_color = node_color_map.get(row["source"], "#736e77ff")
else:
edge_color = NODE_COLORS.get(
"ontology" if platform == "ontology" else row.get(edge_color_by),
"#736e77ff",
)
edge = {
"data": {
"cid": row.get("cid"),
"postId": row.get("post_id"),
"typeContent": row.get("type_content"),
"timestamp": row.get("timestamp"),
},
"attributes": {
"color": edge_color,
"width": int(row.get("weight", 2)),
"text": {
"content": row.get("edges_kind", ""),
"minVisibleSize": 5,
"size": 2,
},
},
"id": f"{row['source']}-{row['target']}",
"source": row["source"],
"target": row["target"],
}
for key in ["sentiment", "edges_kind", "statement", "weight", "strength", "reason"]:
if row.get(key):
edge["data"][key] = row[key]
edges_mapping.append(edge)
network = {"platform": platform, "nodes": nodes_mapping, "edges": edges_mapping}
if save_file:
path = (
f's3://{self.config["s3"]["bucket_name"]}'
f"/fusion-network/network/result/network_{self.network_id}.json"
)
await self.s3_upsert(path=path, data=network)
return network