Spaces:
Sleeping
A newer version of the Streamlit SDK is available: 1.58.0
Network Graph Engine
Kumpulan tiga modul Python untuk membangun graf jaringan sosial dan geostrategis secara otomatis β dari pengambilan data, klasterisasi, konstruksi node-edge, hingga persistensi ke object storage.
Ketiga modul ini merupakan bagian dari sistem Fusion Campaign API dan menggunakan pola arsitektur yang sama: async pipeline, graph schema builder, CID resolution via majority vote, dan penyimpanan ke S3-compatible storage.
Daftar Isi
- Tiga Modul Network
- Shared Architecture β Pola yang Digunakan Ketiganya
- Algoritma CID Resolution
- NCW Geostrategic Network
- Kemenkeu Social Media Network
- RETS Narrative Network
- Input & Output per Modul
- Edge Types Catalog
- Tech Stack
Tiga Modul Network
| File | Class | Use Case |
|---|---|---|
| ncw_geostrategic.py | NetworkNcwGeostrategic |
Graf hubungan geostrategis antar entitas negara dari data JSON statis |
| kemenkeu_socmed_network.py | NetworkKemenkeu |
Graf jaringan media sosial akun Kemenkeu dari Elasticsearch, dikelompokkan by speech-act |
| rets_network.py | NetworkRets |
Graf narasi untuk event tracking (RETS) dari data klaster monitoring yang sudah diproses |
Shared Architecture
Ketiga class mewarisi BaseChatCampaign dan mengikuti pipeline yang sama:
Input Data
β
βΌ
βββββββββββββββββββββββ
β create_relationshipsβ β Bangun nodes[] + edges[] dari data mentah
ββββββββββββ¬βββββββββββ
β
ββββββββΌβββββββ
β determine_cidβ β Setiap node mendapat cid terbaik via majority vote
ββββββββ¬ββββββββ
β
ββββββββΌββββββββββββββββββββββββββββββββ
β Network.generate_graph_with_clusters β β Format final network schema
ββββββββ¬βββββββββββββββββββββββββββββββββ
β
ββββββββΌβββββββ
β s3_upsert β β Upload JSON ke object storage, return public URL
βββββββββββββββ
β
{ "network_path": "https://..." }
Struktur Node
{
"cid": "cluster_name@network_id",
"id": "@username | #hashtag | intent_label | naration_title",
"entity_id": "platform_user_id | null",
"label": "display label",
"type": "account | hashtag | intent | naration | category-X",
"timestamp": 1700000000000,
"created_at": "2024-01-01 10:00:00",
"typeContent": "post | comment | retweet"
}
Struktur Edge
{
"cid": "cluster_name@network_id",
"post_id": "original_post_id",
"source": "@username",
"type_source": "account",
"target": "@mentioned_user | #hashtag | intent",
"type_target": "account | hashtag | intent | naration",
"type_content": "post | comment | retweet",
"edges_kind": "MENTION | HASHTAG | COMMENT | SPEECH_ACT | NARATION | ...",
"timestamp": 1700000000000,
"sentiment": null,
"statement": "optional annotation text"
}
Algoritma CID Resolution
Semua modul menggunakan algoritma yang sama untuk menentukan cluster id (cid) akhir setiap node:
Untuk setiap node N:
1. Kumpulkan semua edges yang melibatkan N (sebagai source atau target)
2. Hitung frekuensi tiap cid yang muncul pada edges tersebut
3. Pilih cid dengan frekuensi tertinggi (majority vote)
4. Jika ada tie (frekuensi sama):
β Pilih cid dari node dengan created_at paling baru
5. Jika tidak ada edges (node terisolasi):
β Pertahankan cid awal
# Implementasi inti (sama di ketiga modul)
node_cid_count = defaultdict(lambda: defaultdict(int))
for edge in edges:
for node_id in (edge["source"], edge["target"]):
node_cid_count[node_id][edge["cid"]] += 1
for node in nodes:
possible_cids = node_cid_count.get(node["id"], {})
if possible_cids:
max_count = max(possible_cids.values())
best_cids = [cid for cid, cnt in possible_cids.items() if cnt == max_count]
best_cid = max(..., key=lambda x: x["created_at"])["cid"] if len(best_cids) > 1 else best_cids[0]
else:
best_cid = node["cid"]
1. NCW Geostrategic Network
File: ncw_geostrategic.py
Membangun graf hubungan geostrategis antar entitas (negara, organisasi, individu) berdasarkan data kurator yang disimpan dalam file JSON per kategori wilayah/negara.
Alur Pipeline
Pilih category_network (contoh: "ina", "fra", "india", ...)
β
βΌ
Baca file JSON:
data_network_raw_json/{category}_ncw_geostrategic.json
β
βΌ
create_relationships()
ββ Setiap record β 1 edge (source β target)
ββ Nodes didedup by label
ββ Node type: type_source / type_target dari data mentah
β
βΌ
determine_cid()
ββ Semua node masuk cluster_0 (single cluster, static data)
β
βΌ
Network.generate_graph_with_clusters()
β
βΌ
s3_upsert() β public URL
Kategori yang Didukung
recap | ina | fra | india | ken | lka | mar | mdg | mng |
phl | pry | tgo | npl | tur | tls | per | bgd
Format Data Sumber JSON
[
{
"source": "Indonesia",
"type_source": "country",
"target": "ASEAN",
"type_target": "organization",
"created_at": "2024-01-15",
"source_id": "idn_001",
"terget_id": "asean_001",
"note": "Member state, founding member"
}
]
2. Kemenkeu Social Media Network
File: kemenkeu_socmed_network.py
Graf jaringan media sosial untuk monitoring akun Kemenkeu (Kementerian Keuangan). Mengambil data real-time dari Elasticsearch, mengelompokkan berdasarkan speech-act, dan membangun koneksi interaksi antar akun.
Alur Pipeline
AMS API β get_map_account()
ββ Daftar akun Kemenkeu per platform (twitter, ig, fb, yt, tiktok)
β
βΌ
pre_clustering() [per platform, paralel]
ββ ElasticDataFetcher.get_data_kemenkeu()
β β Post dalam rentang start-end
ββ produce_profiling_accounts()
β Kafka: kirim profiling job per unique user_id
β
βΌ
cluster_speech_act()
ββ Kelompokkan post berdasarkan slm.speech_act
contoh cluster: "questioning", "supporting", "criticizing", "undefined"
β
βΌ
create_relationships() [per cluster]
ββ MENTION : @user β @mentioned
ββ HASHTAG : @user β #tag
ββ COMMENT : @user β @replied (hanya jika bukan akun Kemenkeu)
ββ SPEECH_ACT : @user β intent, intent β semua akun Kemenkeu
β
βΌ
determine_cid() + statistics_cluster()
β
βΌ
Network.generate_graph_with_clusters()
β
βΌ
s3_upsert() β { network_path, statistics }
Statistik Per Klaster
Setiap klaster menghasilkan:
{
"cluster_name": "criticizing",
"total_engagement": 15420.0,
"total_accounts": 238,
"total_comments": 892,
"total_followers": 4500000,
"first_account": { "username": "...", "created_at": "...", "text": "..." },
"top_accounts_by_followers": [ ...5 akun... ],
"top_accounts_by_engagement": [ ...5 post... ]
}
Kafka Profiling Event
Untuk setiap user_id unik yang ditemukan, sistem mengirim pesan ke topic profiling:
{
"user_id": "12345678",
"created_at": "2024-01-15 10:30:00",
"timestamp": 1705315800000,
"id": "<hash_id>",
"profiling_type": "account",
"source": "custom network kemenkeu"
}
3. RETS Narrative Network
File: rets_network.py
Graf narasi untuk Real-time Event Tracking System. Berbeda dari dua modul lainnya, RETS membangun struktur naratif yang menghubungkan akun sosial media ke klaster narasi yang sudah dianalisis sebelumnya oleh pipeline clustering upstream.
Alur Pipeline
Input: { query, unique_clusters[] }
β
βΌ
Generate network_id = hash(start_date + end_date + wall_time)
β
βΌ
Normalisasi setiap cluster:
Inject ke setiap member post:
ββ naration β cluster.title
ββ cluster_summary β cluster.core_summary
ββ category_narration β cluster.narrative_analysis.dominant_category
β
βΌ
create_relationships() [per cluster]
ββ MENTION : @user β @mentioned
ββ HASHTAG : @user β #tag
ββ QUOTED : @user β @quoted_user
ββ COMMENT : @user β @replied
ββ NARATION : @user β naration_node
β naration_node.contentNetwork = cluster_summary (untuk UI rendering)
ββ TARGETED_CATEGORY_NARATION : naration_node β category_node
β
βΌ
Merge semua schema β determine_cid()
β
βΌ
Network.generate_graph_with_clusters()
β
βΌ
s3_upsert() β { network_path } atau { network_path, raw_network }
Struktur Graf Narasi
@akun_A ββNARATIONβββΊ "Klaster: Kritik Kebijakan Fiskal"
β
βββTARGETED_CATEGORY_NARATIONβββΊ "category-kritik"
@akun_B ββMENTIONβββΊ @akun_C
@akun_B ββHASHTAGβββΊ #APBN2025
@akun_B ββNARATIONβββΊ "Klaster: Dukungan Program Subsidi"
Node Narasi (Khusus RETS)
Node dengan type = "naration" menyimpan field tambahan:
{
"id": "Klaster: Kritik Kebijakan Fiskal",
"type": "naration",
"contentNetwork": "Klaster ini mendominasi diskusi terkait...",
"cid": "Klaster: Kritik Kebijakan Fiskal@abc123"
}
Input & Output per Modul
NCW Geostrategic
Input:
# Tidak ada parameter request, hanya set category_network
instance = NetworkNcwGeostrategic(category_network="ina")
await instance._run()
Output:
{
"network_path": "https://storage.example.com/fusion-network/custom-network/network_abc123.json",
"raw_data": [ ...relational records... ]
}
Kemenkeu Social Media Network
Input:
instance = NetworkKemenkeu(
platform=["twitter", "instagram"],
start="2024-01-01 00:00:00",
end="2024-01-07 23:59:59",
batchSize=10,
cluster=True,
account="kemenkeu_group",
kafprod=kafka_producer
)
await instance._run()
Output:
{
"network_path": "https://storage.example.com/.../network_xyz.json",
"statistics": {
"criticizing": {
"cluster_name": "criticizing",
"member_count": 412,
"cluster_percentage": "46.20%",
"total_engagement": 28540.0,
"total_accounts": 310,
"total_followers": 8200000,
"top_accounts_by_followers": [ ... ],
"top_accounts_by_engagement": [ ... ]
}
}
}
RETS Narrative Network
Input:
{
"query": {
"start_date": "2024-01-01",
"end_date": "2024-01-07",
"monitoring_objective": "Monitoring Kebijakan Fiskal Q1 2024"
},
"unique_clusters": [
{
"title": "Kritik Kebijakan Pajak",
"core_summary": "Klaster ini berisi konten yang mengkritik...",
"total_member": 245,
"narrative_analysis": {
"dominant_category": "kritik"
},
"members": [
{
"username": "userA",
"platform": "twitter",
"type": "post",
"created_at": "2024-01-03 09:15:00",
"hashtag": ["APBN2025", "PajakTurun"],
"mention": [{"username": "kemenkeu_ri", "id": "987"}],
"naration": null
}
]
}
]
}
Output (type_result="path"):
{
"network_path": "https://storage.example.com/.../network_def456.json"
}
Output (type_result="raw"):
{
"network_path": "https://...",
"raw_network": { ...full graph object... }
}
Edge Types Catalog
| Edge Kind | Modul | Source Type | Target Type | Deskripsi |
|---|---|---|---|---|
MENTION |
Semua | account | account | @user menyebut @user lain |
HASHTAG |
Kemenkeu, RETS | account | hashtag | @user menggunakan #tag |
COMMENT |
Kemenkeu, RETS | account | account | @user membalas @user lain |
QUOTED |
RETS | account | account | @user mengutip tweet @user |
SPEECH_ACT |
Kemenkeu | account / intent | intent / account | Koneksi user ke intent node dan intent ke akun monitoring |
NARATION |
RETS | account | naration | @user terhubung ke klaster narasi |
TARGETED_CATEGORY_NARATION |
RETS | naration | category | Klaster narasi terhubung ke kategori dominan |
{type_source}_{type_target} |
NCW Geo | any | any | Hubungan entitas geostrategis (dinamis dari data sumber) |
Tech Stack
| Komponen | Library |
|---|---|
| Async runtime | asyncio |
| HTTP client | aiohttp (via BaseChatCampaign) |
| Elasticsearch client | ElasticDataFetcher (internal) |
| Kafka producer | aiokafka.AIOKafkaProducer |
| Object storage | S3-compatible (via s3_database) |
| Graph formatter | Network (internal builder) |
| Logging | loguru |
| Config management | configparser |
| JSON I/O | srsly |
Internal Base Class (BaseChatCampaign)
Semua modul mewarisi BaseChatCampaign yang menyediakan:
hash_id(str) β deterministic hash untuk network/message ID
s3_database(...) β upload/download ke S3-compatible storage
akafka_producer(...) β produce message ke Kafka topic
_requests(url, params, ...) β async HTTP client wrapper
parallel_processing(tasks) β jalankan list coroutine secara paralel
Struktur Folder
network-graph-engine/
β
βββ readme-github.md
β
βββ custom_network/ β tiga modul network utama
β βββ __init__.py
β βββ ncw_geostrategic.py β NCW Geostrategic
β βββ kemenkeu_socmed_network.py β Kemenkeu Social Media
β βββ rets_network.py β RETS Narrative
β
βββ tools/ β shared infrastructure
βββ __init__.py β ekspor BaseChatCampaign, Network, ElasticDataFetcher
β
βββ base_model/
β βββ __init__.py
β βββ base.py β BaseChatCampaign (S3, ES, Kafka, HTTP, concurrency)
β
βββ network/
β βββ __init__.py
β βββ tool.py β Network.generate_graph_with_clusters()
β
βββ get_es/
β βββ __init__.py
β βββ tool.py β ElasticDataFetcher (get_data, get_data_kemenkeu)
β
βββ utils/
βββ tool.py β FIELD_EDGES, NODE_COLORS, NODE_SHAPES, ESBODY_SMM, dll
Dependency
# Core
loguru # Structured logging
pydantic # Data model & validation (BaseChatCampaign extends BaseModel)
srsly # JSON read/write (ncw_geostrategic)
# Storage
s3fs # S3-compatible object storage client
# HTTP & async
aiohttp # Async HTTP client
# Elasticsearch
elasticsearch # AsyncElasticsearch client
# Graph
networkx # DiGraph, spring layout, degree centrality
pandas # DataFrame untuk edge lookup di Network formatter
# Kafka
kafka-python # Sync KafkaProducer (kafka_producer)
aiokafka # Async AIOKafkaProducer (akafka_producer, kemenkeu)
# Database
psycopg2-binary # PostgreSQL driver (insert_postgre)
pymongo # MongoDB client (get_mongo_collection)
neo4j # Neo4j/Memgraph driver (gb_connection, agb_connection)
# Visualisation
matplotlib # DEFAULT_COLORS palette (tab20 colormap)
# Utilities
tqdm # Progress bar di parallel_processing
jmespath # JSON path query di Network formatter
Install
pip install loguru pydantic srsly s3fs aiohttp elasticsearch networkx pandas \
kafka-python aiokafka psycopg2-binary pymongo neo4j matplotlib tqdm jmespath