Network-Graph-Engine / readme-github.md
Dita Shahihah
Network Graph Engine + standalone demo (sample data + analytics + interactive viz)
f66e164
|
Raw
History Blame Contribute Delete
16.4 kB
# Network Graph Engine
> Kumpulan tiga modul Python untuk membangun graf jaringan sosial dan geostrategis secara otomatis β€” dari pengambilan data, klasterisasi, konstruksi node-edge, hingga persistensi ke object storage.
Ketiga modul ini merupakan bagian dari sistem **Fusion Campaign API** dan menggunakan pola arsitektur yang sama: async pipeline, graph schema builder, CID resolution via majority vote, dan penyimpanan ke S3-compatible storage.
---
## Daftar Isi
- [Tiga Modul Network](#tiga-modul-network)
- [Shared Architecture β€” Pola yang Digunakan Ketiganya](#shared-architecture)
- [Algoritma CID Resolution](#algoritma-cid-resolution)
- [NCW Geostrategic Network](#1-ncw-geostrategic-network)
- [Kemenkeu Social Media Network](#2-kemenkeu-social-media-network)
- [RETS Narrative Network](#3-rets-narrative-network)
- [Input & Output per Modul](#input--output-per-modul)
- [Edge Types Catalog](#edge-types-catalog)
- [Tech Stack](#tech-stack)
---
## Tiga Modul Network
| File | Class | Use Case |
|---|---|---|
| [ncw_geostrategic.py](ncw_geostrategic.py) | `NetworkNcwGeostrategic` | Graf hubungan geostrategis antar entitas negara dari data JSON statis |
| [kemenkeu_socmed_network.py](kemenkeu_socmed_network.py) | `NetworkKemenkeu` | Graf jaringan media sosial akun Kemenkeu dari Elasticsearch, dikelompokkan by speech-act |
| [rets_network.py](rets_network.py) | `NetworkRets` | Graf narasi untuk event tracking (RETS) dari data klaster monitoring yang sudah diproses |
---
## Shared Architecture
Ketiga class mewarisi `BaseChatCampaign` dan mengikuti pipeline yang sama:
```
Input Data
β”‚
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ create_relationshipsβ”‚ β†’ Bangun nodes[] + edges[] dari data mentah
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”
β”‚ determine_cidβ”‚ β†’ Setiap node mendapat cid terbaik via majority vote
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Network.generate_graph_with_clusters β”‚ β†’ Format final network schema
β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”
β”‚ s3_upsert β”‚ β†’ Upload JSON ke object storage, return public URL
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
β”‚
{ "network_path": "https://..." }
```
### Struktur Node
```json
{
"cid": "cluster_name@network_id",
"id": "@username | #hashtag | intent_label | naration_title",
"entity_id": "platform_user_id | null",
"label": "display label",
"type": "account | hashtag | intent | naration | category-X",
"timestamp": 1700000000000,
"created_at": "2024-01-01 10:00:00",
"typeContent": "post | comment | retweet"
}
```
### Struktur Edge
```json
{
"cid": "cluster_name@network_id",
"post_id": "original_post_id",
"source": "@username",
"type_source": "account",
"target": "@mentioned_user | #hashtag | intent",
"type_target": "account | hashtag | intent | naration",
"type_content": "post | comment | retweet",
"edges_kind": "MENTION | HASHTAG | COMMENT | SPEECH_ACT | NARATION | ...",
"timestamp": 1700000000000,
"sentiment": null,
"statement": "optional annotation text"
}
```
---
## Algoritma CID Resolution
Semua modul menggunakan algoritma yang sama untuk menentukan cluster id (cid) akhir setiap node:
```
Untuk setiap node N:
1. Kumpulkan semua edges yang melibatkan N (sebagai source atau target)
2. Hitung frekuensi tiap cid yang muncul pada edges tersebut
3. Pilih cid dengan frekuensi tertinggi (majority vote)
4. Jika ada tie (frekuensi sama):
β†’ Pilih cid dari node dengan created_at paling baru
5. Jika tidak ada edges (node terisolasi):
β†’ Pertahankan cid awal
```
```python
# Implementasi inti (sama di ketiga modul)
node_cid_count = defaultdict(lambda: defaultdict(int))
for edge in edges:
for node_id in (edge["source"], edge["target"]):
node_cid_count[node_id][edge["cid"]] += 1
for node in nodes:
possible_cids = node_cid_count.get(node["id"], {})
if possible_cids:
max_count = max(possible_cids.values())
best_cids = [cid for cid, cnt in possible_cids.items() if cnt == max_count]
best_cid = max(..., key=lambda x: x["created_at"])["cid"] if len(best_cids) > 1 else best_cids[0]
else:
best_cid = node["cid"]
```
---
## 1. NCW Geostrategic Network
**File:** [ncw_geostrategic.py](ncw_geostrategic.py)
Membangun graf hubungan geostrategis antar entitas (negara, organisasi, individu) berdasarkan data kurator yang disimpan dalam file JSON per kategori wilayah/negara.
### Alur Pipeline
```
Pilih category_network (contoh: "ina", "fra", "india", ...)
β”‚
β–Ό
Baca file JSON:
data_network_raw_json/{category}_ncw_geostrategic.json
β”‚
β–Ό
create_relationships()
β”œβ”€ Setiap record β†’ 1 edge (source β†’ target)
β”œβ”€ Nodes didedup by label
└─ Node type: type_source / type_target dari data mentah
β”‚
β–Ό
determine_cid()
└─ Semua node masuk cluster_0 (single cluster, static data)
β”‚
β–Ό
Network.generate_graph_with_clusters()
β”‚
β–Ό
s3_upsert() β†’ public URL
```
### Kategori yang Didukung
```
recap | ina | fra | india | ken | lka | mar | mdg | mng |
phl | pry | tgo | npl | tur | tls | per | bgd
```
### Format Data Sumber JSON
```json
[
{
"source": "Indonesia",
"type_source": "country",
"target": "ASEAN",
"type_target": "organization",
"created_at": "2024-01-15",
"source_id": "idn_001",
"terget_id": "asean_001",
"note": "Member state, founding member"
}
]
```
---
## 2. Kemenkeu Social Media Network
**File:** [kemenkeu_socmed_network.py](kemenkeu_socmed_network.py)
Graf jaringan media sosial untuk monitoring akun Kemenkeu (Kementerian Keuangan). Mengambil data real-time dari Elasticsearch, mengelompokkan berdasarkan speech-act, dan membangun koneksi interaksi antar akun.
### Alur Pipeline
```
AMS API β†’ get_map_account()
└─ Daftar akun Kemenkeu per platform (twitter, ig, fb, yt, tiktok)
β”‚
β–Ό
pre_clustering() [per platform, paralel]
β”œβ”€ ElasticDataFetcher.get_data_kemenkeu()
β”‚ β†’ Post dalam rentang start-end
└─ produce_profiling_accounts()
β†’ Kafka: kirim profiling job per unique user_id
β”‚
β–Ό
cluster_speech_act()
└─ Kelompokkan post berdasarkan slm.speech_act
contoh cluster: "questioning", "supporting", "criticizing", "undefined"
β”‚
β–Ό
create_relationships() [per cluster]
β”œβ”€ MENTION : @user β†’ @mentioned
β”œβ”€ HASHTAG : @user β†’ #tag
β”œβ”€ COMMENT : @user β†’ @replied (hanya jika bukan akun Kemenkeu)
└─ SPEECH_ACT : @user β†’ intent, intent β†’ semua akun Kemenkeu
β”‚
β–Ό
determine_cid() + statistics_cluster()
β”‚
β–Ό
Network.generate_graph_with_clusters()
β”‚
β–Ό
s3_upsert() β†’ { network_path, statistics }
```
### Statistik Per Klaster
Setiap klaster menghasilkan:
```json
{
"cluster_name": "criticizing",
"total_engagement": 15420.0,
"total_accounts": 238,
"total_comments": 892,
"total_followers": 4500000,
"first_account": { "username": "...", "created_at": "...", "text": "..." },
"top_accounts_by_followers": [ ...5 akun... ],
"top_accounts_by_engagement": [ ...5 post... ]
}
```
### Kafka Profiling Event
Untuk setiap `user_id` unik yang ditemukan, sistem mengirim pesan ke topic profiling:
```json
{
"user_id": "12345678",
"created_at": "2024-01-15 10:30:00",
"timestamp": 1705315800000,
"id": "<hash_id>",
"profiling_type": "account",
"source": "custom network kemenkeu"
}
```
---
## 3. RETS Narrative Network
**File:** [rets_network.py](rets_network.py)
Graf narasi untuk Real-time Event Tracking System. Berbeda dari dua modul lainnya, RETS membangun struktur naratif yang menghubungkan akun sosial media ke **klaster narasi** yang sudah dianalisis sebelumnya oleh pipeline clustering upstream.
### Alur Pipeline
```
Input: { query, unique_clusters[] }
β”‚
β–Ό
Generate network_id = hash(start_date + end_date + wall_time)
β”‚
β–Ό
Normalisasi setiap cluster:
Inject ke setiap member post:
β”œβ”€ naration ← cluster.title
β”œβ”€ cluster_summary ← cluster.core_summary
└─ category_narration ← cluster.narrative_analysis.dominant_category
β”‚
β–Ό
create_relationships() [per cluster]
β”œβ”€ MENTION : @user β†’ @mentioned
β”œβ”€ HASHTAG : @user β†’ #tag
β”œβ”€ QUOTED : @user β†’ @quoted_user
β”œβ”€ COMMENT : @user β†’ @replied
β”œβ”€ NARATION : @user β†’ naration_node
β”‚ naration_node.contentNetwork = cluster_summary (untuk UI rendering)
└─ TARGETED_CATEGORY_NARATION : naration_node β†’ category_node
β”‚
β–Ό
Merge semua schema β†’ determine_cid()
β”‚
β–Ό
Network.generate_graph_with_clusters()
β”‚
β–Ό
s3_upsert() β†’ { network_path } atau { network_path, raw_network }
```
### Struktur Graf Narasi
```
@akun_A ──NARATION──► "Klaster: Kritik Kebijakan Fiskal"
β”‚
└──TARGETED_CATEGORY_NARATION──► "category-kritik"
@akun_B ──MENTION──► @akun_C
@akun_B ──HASHTAG──► #APBN2025
@akun_B ──NARATION──► "Klaster: Dukungan Program Subsidi"
```
### Node Narasi (Khusus RETS)
Node dengan `type = "naration"` menyimpan field tambahan:
```json
{
"id": "Klaster: Kritik Kebijakan Fiskal",
"type": "naration",
"contentNetwork": "Klaster ini mendominasi diskusi terkait...",
"cid": "Klaster: Kritik Kebijakan Fiskal@abc123"
}
```
---
## Input & Output per Modul
### NCW Geostrategic
**Input:**
```python
# Tidak ada parameter request, hanya set category_network
instance = NetworkNcwGeostrategic(category_network="ina")
await instance._run()
```
**Output:**
```json
{
"network_path": "https://storage.example.com/fusion-network/custom-network/network_abc123.json",
"raw_data": [ ...relational records... ]
}
```
---
### Kemenkeu Social Media Network
**Input:**
```python
instance = NetworkKemenkeu(
platform=["twitter", "instagram"],
start="2024-01-01 00:00:00",
end="2024-01-07 23:59:59",
batchSize=10,
cluster=True,
account="kemenkeu_group",
kafprod=kafka_producer
)
await instance._run()
```
**Output:**
```json
{
"network_path": "https://storage.example.com/.../network_xyz.json",
"statistics": {
"criticizing": {
"cluster_name": "criticizing",
"member_count": 412,
"cluster_percentage": "46.20%",
"total_engagement": 28540.0,
"total_accounts": 310,
"total_followers": 8200000,
"top_accounts_by_followers": [ ... ],
"top_accounts_by_engagement": [ ... ]
}
}
}
```
---
### RETS Narrative Network
**Input:**
```json
{
"query": {
"start_date": "2024-01-01",
"end_date": "2024-01-07",
"monitoring_objective": "Monitoring Kebijakan Fiskal Q1 2024"
},
"unique_clusters": [
{
"title": "Kritik Kebijakan Pajak",
"core_summary": "Klaster ini berisi konten yang mengkritik...",
"total_member": 245,
"narrative_analysis": {
"dominant_category": "kritik"
},
"members": [
{
"username": "userA",
"platform": "twitter",
"type": "post",
"created_at": "2024-01-03 09:15:00",
"hashtag": ["APBN2025", "PajakTurun"],
"mention": [{"username": "kemenkeu_ri", "id": "987"}],
"naration": null
}
]
}
]
}
```
**Output (type_result="path"):**
```json
{
"network_path": "https://storage.example.com/.../network_def456.json"
}
```
**Output (type_result="raw"):**
```json
{
"network_path": "https://...",
"raw_network": { ...full graph object... }
}
```
---
## Edge Types Catalog
| Edge Kind | Modul | Source Type | Target Type | Deskripsi |
|---|---|---|---|---|
| `MENTION` | Semua | account | account | @user menyebut @user lain |
| `HASHTAG` | Kemenkeu, RETS | account | hashtag | @user menggunakan #tag |
| `COMMENT` | Kemenkeu, RETS | account | account | @user membalas @user lain |
| `QUOTED` | RETS | account | account | @user mengutip tweet @user |
| `SPEECH_ACT` | Kemenkeu | account / intent | intent / account | Koneksi user ke intent node dan intent ke akun monitoring |
| `NARATION` | RETS | account | naration | @user terhubung ke klaster narasi |
| `TARGETED_CATEGORY_NARATION` | RETS | naration | category | Klaster narasi terhubung ke kategori dominan |
| `{type_source}_{type_target}` | NCW Geo | any | any | Hubungan entitas geostrategis (dinamis dari data sumber) |
---
## Tech Stack
| Komponen | Library |
|---|---|
| Async runtime | `asyncio` |
| HTTP client | `aiohttp` (via BaseChatCampaign) |
| Elasticsearch client | `ElasticDataFetcher` (internal) |
| Kafka producer | `aiokafka.AIOKafkaProducer` |
| Object storage | S3-compatible (via `s3_database`) |
| Graph formatter | `Network` (internal builder) |
| Logging | `loguru` |
| Config management | `configparser` |
| JSON I/O | `srsly` |
### Internal Base Class (`BaseChatCampaign`)
Semua modul mewarisi `BaseChatCampaign` yang menyediakan:
```
hash_id(str) β†’ deterministic hash untuk network/message ID
s3_database(...) β†’ upload/download ke S3-compatible storage
akafka_producer(...) β†’ produce message ke Kafka topic
_requests(url, params, ...) β†’ async HTTP client wrapper
parallel_processing(tasks) β†’ jalankan list coroutine secara paralel
```
---
## Struktur Folder
```
network-graph-engine/
β”‚
β”œβ”€β”€ readme-github.md
β”‚
β”œβ”€β”€ custom_network/ ← tiga modul network utama
β”‚ β”œβ”€β”€ __init__.py
β”‚ β”œβ”€β”€ ncw_geostrategic.py ← NCW Geostrategic
β”‚ β”œβ”€β”€ kemenkeu_socmed_network.py ← Kemenkeu Social Media
β”‚ └── rets_network.py ← RETS Narrative
β”‚
└── tools/ ← shared infrastructure
β”œβ”€β”€ __init__.py ← ekspor BaseChatCampaign, Network, ElasticDataFetcher
β”‚
β”œβ”€β”€ base_model/
β”‚ β”œβ”€β”€ __init__.py
β”‚ └── base.py ← BaseChatCampaign (S3, ES, Kafka, HTTP, concurrency)
β”‚
β”œβ”€β”€ network/
β”‚ β”œβ”€β”€ __init__.py
β”‚ └── tool.py ← Network.generate_graph_with_clusters()
β”‚
β”œβ”€β”€ get_es/
β”‚ β”œβ”€β”€ __init__.py
β”‚ └── tool.py ← ElasticDataFetcher (get_data, get_data_kemenkeu)
β”‚
└── utils/
└── tool.py ← FIELD_EDGES, NODE_COLORS, NODE_SHAPES, ESBODY_SMM, dll
```
---
## Dependency
```
# Core
loguru # Structured logging
pydantic # Data model & validation (BaseChatCampaign extends BaseModel)
srsly # JSON read/write (ncw_geostrategic)
# Storage
s3fs # S3-compatible object storage client
# HTTP & async
aiohttp # Async HTTP client
# Elasticsearch
elasticsearch # AsyncElasticsearch client
# Graph
networkx # DiGraph, spring layout, degree centrality
pandas # DataFrame untuk edge lookup di Network formatter
# Kafka
kafka-python # Sync KafkaProducer (kafka_producer)
aiokafka # Async AIOKafkaProducer (akafka_producer, kemenkeu)
# Database
psycopg2-binary # PostgreSQL driver (insert_postgre)
pymongo # MongoDB client (get_mongo_collection)
neo4j # Neo4j/Memgraph driver (gb_connection, agb_connection)
# Visualisation
matplotlib # DEFAULT_COLORS palette (tab20 colormap)
# Utilities
tqdm # Progress bar di parallel_processing
jmespath # JSON path query di Network formatter
```
### Install
```bash
pip install loguru pydantic srsly s3fs aiohttp elasticsearch networkx pandas \
kafka-python aiokafka psycopg2-binary pymongo neo4j matplotlib tqdm jmespath
```