Spaces:
Running
Running
Dita Shahihah
Network Graph Engine + standalone demo (sample data + analytics + interactive viz)
f66e164 | import os | |
| import sys | |
| import jmespath | |
| import pandas as pd | |
| from typing import * | |
| import networkx as nx | |
| from datetime import datetime | |
| from configparser import ConfigParser | |
| path_this = os.path.dirname(os.path.abspath(__file__)) | |
| path_project = os.path.dirname(os.path.join(path_this, "..")) | |
| path_root = os.path.dirname(os.path.join(path_this, "../..")) | |
| sys.path.append(path_root) | |
| sys.path.append(path_project) | |
| from tools.base_model.base import BaseChatCampaign | |
| from tools.utils.tool import ( | |
| DEFAULT_COLORS, | |
| NODE_COLORS, | |
| NODE_SHAPES, | |
| ANALYSIS_TYPE, | |
| ANALYSIS_TYPES_WITH_NODE_EDGE_COLOR, | |
| ) | |
| class Network(BaseChatCampaign): | |
| """ | |
| Network graph formatter and visualisation attribute builder. | |
| Takes a raw { nodes, edges } schema and produces a render-ready graph | |
| dict with positions (NetworkX spring layout), colours, shapes, degree | |
| centrality, and edge attributes — suitable for direct consumption by a | |
| front-end graph renderer (e.g. Sigma.js, Cytoscape). | |
| The three custom-network modules in this portfolio only use | |
| generate_graph_with_clusters(); the _run() / _request() methods that | |
| trigger full schema-building from raw ES data are not required for them. | |
| Attributes: | |
| topic: Optional topic label attached to the network. | |
| cluster: If True, nodes are coloured by cluster; if False, by type. | |
| language: Output label language ("id" or "en"). | |
| clusterType: Clustering strategy: "default", "issue", or "sentiment". | |
| network_id: Unique ID stamped on generated files (defaults to wall time). | |
| """ | |
| config: ClassVar[ConfigParser] = ConfigParser() | |
| config.read(os.path.join(path_root, "config.conf")) | |
| topic: str = None | |
| cluster: bool = True | |
| language: Literal["id", "en"] = "id" | |
| all_nodes_type: list = ["account", "hashtag", "main_issue"] | |
| analysis_type: ANALYSIS_TYPE = "actor" | |
| clusterType: Literal["default", "issue", "sentiment"] = "default" | |
| network_id: str = datetime.now().strftime("%Y%m%d_%H%M%S%f")[:-3] | |
| # ------------------------------------------------------------------ | |
| # Storage helper (optional — only called when save_file=True) | |
| # ------------------------------------------------------------------ | |
| async def s3_upsert(self, path: str, data: dict) -> None: | |
| connection = { | |
| "url": self.config["s3"]["url"], | |
| "key": self.config["s3"]["access_key"], | |
| "secret_key": self.config["s3"]["secret_access_key"], | |
| } | |
| await self.s3_database(connection=connection, path=path, type_s3="upsert", data=data) | |
| # ------------------------------------------------------------------ | |
| # Graph formatter — primary public method | |
| # ------------------------------------------------------------------ | |
| async def generate_graph_with_clusters( | |
| self, | |
| member_all: dict, | |
| platform: str, | |
| save_file: bool = False, | |
| color: str = None, | |
| cid_target: str = None, | |
| use_category_colors: bool = False, | |
| edge_color_by: str = "sentiment", | |
| ) -> dict: | |
| """ | |
| Convert a raw { nodes, edges } schema into a render-ready network dict. | |
| Processing steps: | |
| 1. Build a NetworkX DiGraph from nodes/edges. | |
| 2. Compute spring layout for (x, y) positions. | |
| 3. Compute degree centrality → node radius. | |
| 4. Assign node colour (by cluster, type, or category). | |
| 5. Assign node shape from NODE_SHAPES lookup. | |
| 6. For naration/event/main_issue nodes: attach first_account. | |
| 7. Assign edge colour (by sentiment, node colour, or ontology). | |
| 8. Return combined network dict; optionally persist to S3. | |
| Args: | |
| member_all: Dict with "nodes" (list) and "edges" (list). | |
| platform: Platform label attached to the output dict. | |
| save_file: If True, upload result to S3. | |
| color: Override colour for a specific cluster. | |
| cid_target: Cluster ID whose nodes should get the override colour. | |
| use_category_colors: Colour nodes by their category field. | |
| edge_color_by: Field used for edge colouring ("sentiment" or "node"). | |
| Returns: | |
| dict: { "platform": str, "nodes": [...], "edges": [...] } | |
| Each node has "data" and "attributes" sub-dicts. | |
| Each edge has "data", "attributes", "source", "target", "id". | |
| """ | |
| def _node_color(row: dict, status_node: str, cluster_color: tuple) -> str: | |
| if use_category_colors and row.get("category"): | |
| return NODE_COLORS.get((row.get("cid") or "").split("@")[0], "#666666") | |
| if self.cluster: | |
| if self.clusterType == "sentiment" and status_node != "main_issue": | |
| return NODE_COLORS.get((row.get("cid") or "").split("@")[0], "#666666") | |
| return ( | |
| NODE_COLORS.get(status_node) | |
| if self.clusterType == "sentiment" | |
| else "#{:02x}{:02x}{:02x}".format( | |
| int(cluster_color[0] * 255), | |
| int(cluster_color[1] * 255), | |
| int(cluster_color[2] * 255), | |
| ) | |
| ) | |
| if platform == "ontology": | |
| return NODE_COLORS.get("ontology", "#67AB9F") | |
| return NODE_COLORS.get(status_node, "#67AB9F") | |
| # 1. Build DiGraph | |
| G = nx.DiGraph() | |
| for row in member_all["nodes"]: | |
| G.add_node(row["id"], cid=row.get("cid"), created_at=row.get("created_at")) | |
| for row in member_all["edges"]: | |
| G.add_edge(row["source"], row["target"], cid=row.get("cid")) | |
| # 2. Layout + centrality | |
| pos = nx.spring_layout(G, seed=42) | |
| degree_centrality = nx.degree_centrality(G) | |
| unique_cids = list(set(n.get("cid") for n in member_all["nodes"])) | |
| cid_color_map = {cid: DEFAULT_COLORS[i % len(DEFAULT_COLORS)] for i, cid in enumerate(unique_cids)} | |
| # 3. Build edge DataFrame for first_account lookup | |
| df_edges = pd.DataFrame(member_all["edges"]) | |
| first_accounts: list = [] | |
| nodes_mapping: list = [] | |
| # 4–6. Process nodes | |
| for row in member_all["nodes"]: | |
| cid = row.get("cid") | |
| status_node = row.get("type") | |
| node_label = row["label"] | |
| x, y = pos[node_label] | |
| radius = 10 + degree_centrality.get(node_label, 0) * 1000 | |
| node_color = _node_color(row, status_node, cid_color_map.get(cid, (0.5, 0.5, 0.5))) | |
| if color and cid == cid_target: | |
| node_color = color | |
| node_mapping = { | |
| "data": { | |
| "cid": cid, | |
| "clusterName": (cid or "").split("@")[0], | |
| "label": node_label, | |
| "id": node_label, | |
| "entity_id": row.get("entity_id"), | |
| "typeNode": status_node, | |
| "typeContent": row.get("typeContent"), | |
| "timestamp": row.get("timestamp"), | |
| "createdAt": row.get("created_at"), | |
| "degreeCount": G.degree(node_label), | |
| "inDegreeCount": G.in_degree(node_label), | |
| "outDegreeCount": G.out_degree(node_label), | |
| }, | |
| "attributes": { | |
| "color": node_color, | |
| "shape": NODE_SHAPES.get(status_node, "pentagon"), | |
| "x": float(x * 1000), | |
| "y": float(y * 1000), | |
| "text": { | |
| "content": node_label, | |
| "position": "center", | |
| "align": "center", | |
| "scaling": "true", | |
| "scale": 0.2, | |
| }, | |
| "radius": float(radius), | |
| }, | |
| "id": node_label, | |
| } | |
| # Attach contentNetwork and first_account for special node types | |
| if jmespath.search("data.typeNode", node_mapping) in {"main_issue", "event", "naration"}: | |
| content = row.get("contentNetwork") | |
| if content: | |
| node_mapping["data"]["contentNetwork"] = content | |
| required_cols = {"type_source", "type_content", status_node} | |
| if required_cols.issubset(df_edges.columns): | |
| candidates = df_edges[ | |
| (df_edges["type_source"] == "account") | |
| & (df_edges["type_content"] == "post") | |
| & (df_edges[status_node] == node_label) | |
| ].sort_values("created_at").reset_index(drop=True) | |
| if not candidates.empty: | |
| idx = candidates["username"].first_valid_index() | |
| if idx is not None: | |
| uname = candidates.at[idx, "username"] | |
| if isinstance(uname, str) and uname.strip(): | |
| node_mapping["data"]["first_account"] = f"@{uname}" | |
| first_accounts.append(f"@{uname}") | |
| nodes_mapping.append(node_mapping) | |
| # Star shape for first-account nodes | |
| for node in nodes_mapping: | |
| if node["id"] in first_accounts: | |
| node["attributes"]["shape"] = "star" | |
| # 7. Process edges | |
| node_color_map = {n["id"]: n["attributes"]["color"] for n in nodes_mapping} | |
| edges_mapping = [] | |
| for row in member_all["edges"]: | |
| if edge_color_by == "node": | |
| edge_color = node_color_map.get(row["source"], "#736e77ff") | |
| else: | |
| edge_color = NODE_COLORS.get( | |
| "ontology" if platform == "ontology" else row.get(edge_color_by), | |
| "#736e77ff", | |
| ) | |
| edge = { | |
| "data": { | |
| "cid": row.get("cid"), | |
| "postId": row.get("post_id"), | |
| "typeContent": row.get("type_content"), | |
| "timestamp": row.get("timestamp"), | |
| }, | |
| "attributes": { | |
| "color": edge_color, | |
| "width": int(row.get("weight", 2)), | |
| "text": { | |
| "content": row.get("edges_kind", ""), | |
| "minVisibleSize": 5, | |
| "size": 2, | |
| }, | |
| }, | |
| "id": f"{row['source']}-{row['target']}", | |
| "source": row["source"], | |
| "target": row["target"], | |
| } | |
| for key in ["sentiment", "edges_kind", "statement", "weight", "strength", "reason"]: | |
| if row.get(key): | |
| edge["data"][key] = row[key] | |
| edges_mapping.append(edge) | |
| network = {"platform": platform, "nodes": nodes_mapping, "edges": edges_mapping} | |
| if save_file: | |
| path = ( | |
| f's3://{self.config["s3"]["bucket_name"]}' | |
| f"/fusion-network/network/result/network_{self.network_id}.json" | |
| ) | |
| await self.s3_upsert(path=path, data=network) | |
| return network | |