Network-Graph-Engine / readme-github.md
Dita Shahihah
Network Graph Engine + standalone demo (sample data + analytics + interactive viz)
f66e164
|
Raw
History Blame Contribute Delete
16.4 kB

A newer version of the Streamlit SDK is available: 1.58.0

Upgrade

Network Graph Engine

Kumpulan tiga modul Python untuk membangun graf jaringan sosial dan geostrategis secara otomatis β€” dari pengambilan data, klasterisasi, konstruksi node-edge, hingga persistensi ke object storage.

Ketiga modul ini merupakan bagian dari sistem Fusion Campaign API dan menggunakan pola arsitektur yang sama: async pipeline, graph schema builder, CID resolution via majority vote, dan penyimpanan ke S3-compatible storage.


Daftar Isi


Tiga Modul Network

File Class Use Case
ncw_geostrategic.py NetworkNcwGeostrategic Graf hubungan geostrategis antar entitas negara dari data JSON statis
kemenkeu_socmed_network.py NetworkKemenkeu Graf jaringan media sosial akun Kemenkeu dari Elasticsearch, dikelompokkan by speech-act
rets_network.py NetworkRets Graf narasi untuk event tracking (RETS) dari data klaster monitoring yang sudah diproses

Shared Architecture

Ketiga class mewarisi BaseChatCampaign dan mengikuti pipeline yang sama:

Input Data
    β”‚
    β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  create_relationshipsβ”‚  β†’ Bangun nodes[] + edges[] dari data mentah
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
           β”‚
    β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”
    β”‚ determine_cidβ”‚  β†’ Setiap node mendapat cid terbaik via majority vote
    β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
           β”‚
    β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚ Network.generate_graph_with_clusters  β”‚  β†’ Format final network schema
    β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
           β”‚
    β”Œβ”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”
    β”‚  s3_upsert  β”‚  β†’ Upload JSON ke object storage, return public URL
    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
           β”‚
    { "network_path": "https://..." }

Struktur Node

{
  "cid": "cluster_name@network_id",
  "id": "@username | #hashtag | intent_label | naration_title",
  "entity_id": "platform_user_id | null",
  "label": "display label",
  "type": "account | hashtag | intent | naration | category-X",
  "timestamp": 1700000000000,
  "created_at": "2024-01-01 10:00:00",
  "typeContent": "post | comment | retweet"
}

Struktur Edge

{
  "cid": "cluster_name@network_id",
  "post_id": "original_post_id",
  "source": "@username",
  "type_source": "account",
  "target": "@mentioned_user | #hashtag | intent",
  "type_target": "account | hashtag | intent | naration",
  "type_content": "post | comment | retweet",
  "edges_kind": "MENTION | HASHTAG | COMMENT | SPEECH_ACT | NARATION | ...",
  "timestamp": 1700000000000,
  "sentiment": null,
  "statement": "optional annotation text"
}

Algoritma CID Resolution

Semua modul menggunakan algoritma yang sama untuk menentukan cluster id (cid) akhir setiap node:

Untuk setiap node N:
  1. Kumpulkan semua edges yang melibatkan N (sebagai source atau target)
  2. Hitung frekuensi tiap cid yang muncul pada edges tersebut
  3. Pilih cid dengan frekuensi tertinggi (majority vote)
  4. Jika ada tie (frekuensi sama):
       β†’ Pilih cid dari node dengan created_at paling baru
  5. Jika tidak ada edges (node terisolasi):
       β†’ Pertahankan cid awal
# Implementasi inti (sama di ketiga modul)
node_cid_count = defaultdict(lambda: defaultdict(int))
for edge in edges:
    for node_id in (edge["source"], edge["target"]):
        node_cid_count[node_id][edge["cid"]] += 1

for node in nodes:
    possible_cids = node_cid_count.get(node["id"], {})
    if possible_cids:
        max_count = max(possible_cids.values())
        best_cids = [cid for cid, cnt in possible_cids.items() if cnt == max_count]
        best_cid = max(..., key=lambda x: x["created_at"])["cid"] if len(best_cids) > 1 else best_cids[0]
    else:
        best_cid = node["cid"]

1. NCW Geostrategic Network

File: ncw_geostrategic.py

Membangun graf hubungan geostrategis antar entitas (negara, organisasi, individu) berdasarkan data kurator yang disimpan dalam file JSON per kategori wilayah/negara.

Alur Pipeline

Pilih category_network (contoh: "ina", "fra", "india", ...)
    β”‚
    β–Ό
Baca file JSON:
  data_network_raw_json/{category}_ncw_geostrategic.json
    β”‚
    β–Ό
create_relationships()
  β”œβ”€ Setiap record β†’ 1 edge (source β†’ target)
  β”œβ”€ Nodes didedup by label
  └─ Node type: type_source / type_target dari data mentah
    β”‚
    β–Ό
determine_cid()
  └─ Semua node masuk cluster_0 (single cluster, static data)
    β”‚
    β–Ό
Network.generate_graph_with_clusters()
    β”‚
    β–Ό
s3_upsert() β†’ public URL

Kategori yang Didukung

recap | ina | fra | india | ken | lka | mar | mdg | mng |
phl   | pry | tgo | npl   | tur | tls | per | bgd

Format Data Sumber JSON

[
  {
    "source": "Indonesia",
    "type_source": "country",
    "target": "ASEAN",
    "type_target": "organization",
    "created_at": "2024-01-15",
    "source_id": "idn_001",
    "terget_id": "asean_001",
    "note": "Member state, founding member"
  }
]

2. Kemenkeu Social Media Network

File: kemenkeu_socmed_network.py

Graf jaringan media sosial untuk monitoring akun Kemenkeu (Kementerian Keuangan). Mengambil data real-time dari Elasticsearch, mengelompokkan berdasarkan speech-act, dan membangun koneksi interaksi antar akun.

Alur Pipeline

AMS API β†’ get_map_account()
  └─ Daftar akun Kemenkeu per platform (twitter, ig, fb, yt, tiktok)
    β”‚
    β–Ό
pre_clustering() [per platform, paralel]
  β”œβ”€ ElasticDataFetcher.get_data_kemenkeu()
  β”‚    β†’ Post dalam rentang start-end
  └─ produce_profiling_accounts()
       β†’ Kafka: kirim profiling job per unique user_id
    β”‚
    β–Ό
cluster_speech_act()
  └─ Kelompokkan post berdasarkan slm.speech_act
     contoh cluster: "questioning", "supporting", "criticizing", "undefined"
    β”‚
    β–Ό
create_relationships() [per cluster]
  β”œβ”€ MENTION     : @user β†’ @mentioned
  β”œβ”€ HASHTAG     : @user β†’ #tag
  β”œβ”€ COMMENT     : @user β†’ @replied (hanya jika bukan akun Kemenkeu)
  └─ SPEECH_ACT  : @user β†’ intent, intent β†’ semua akun Kemenkeu
    β”‚
    β–Ό
determine_cid() + statistics_cluster()
    β”‚
    β–Ό
Network.generate_graph_with_clusters()
    β”‚
    β–Ό
s3_upsert() β†’ { network_path, statistics }

Statistik Per Klaster

Setiap klaster menghasilkan:

{
  "cluster_name": "criticizing",
  "total_engagement": 15420.0,
  "total_accounts": 238,
  "total_comments": 892,
  "total_followers": 4500000,
  "first_account": { "username": "...", "created_at": "...", "text": "..." },
  "top_accounts_by_followers": [ ...5 akun... ],
  "top_accounts_by_engagement": [ ...5 post... ]
}

Kafka Profiling Event

Untuk setiap user_id unik yang ditemukan, sistem mengirim pesan ke topic profiling:

{
  "user_id": "12345678",
  "created_at": "2024-01-15 10:30:00",
  "timestamp": 1705315800000,
  "id": "<hash_id>",
  "profiling_type": "account",
  "source": "custom network kemenkeu"
}

3. RETS Narrative Network

File: rets_network.py

Graf narasi untuk Real-time Event Tracking System. Berbeda dari dua modul lainnya, RETS membangun struktur naratif yang menghubungkan akun sosial media ke klaster narasi yang sudah dianalisis sebelumnya oleh pipeline clustering upstream.

Alur Pipeline

Input: { query, unique_clusters[] }
    β”‚
    β–Ό
Generate network_id = hash(start_date + end_date + wall_time)
    β”‚
    β–Ό
Normalisasi setiap cluster:
  Inject ke setiap member post:
    β”œβ”€ naration    ← cluster.title
    β”œβ”€ cluster_summary ← cluster.core_summary
    └─ category_narration ← cluster.narrative_analysis.dominant_category
    β”‚
    β–Ό
create_relationships() [per cluster]
  β”œβ”€ MENTION                    : @user β†’ @mentioned
  β”œβ”€ HASHTAG                    : @user β†’ #tag
  β”œβ”€ QUOTED                     : @user β†’ @quoted_user
  β”œβ”€ COMMENT                    : @user β†’ @replied
  β”œβ”€ NARATION                   : @user β†’ naration_node
  β”‚     naration_node.contentNetwork = cluster_summary (untuk UI rendering)
  └─ TARGETED_CATEGORY_NARATION : naration_node β†’ category_node
    β”‚
    β–Ό
Merge semua schema β†’ determine_cid()
    β”‚
    β–Ό
Network.generate_graph_with_clusters()
    β”‚
    β–Ό
s3_upsert() β†’ { network_path } atau { network_path, raw_network }

Struktur Graf Narasi

@akun_A ──NARATION──► "Klaster: Kritik Kebijakan Fiskal"
                              β”‚
                              └──TARGETED_CATEGORY_NARATION──► "category-kritik"
                                          
@akun_B ──MENTION──► @akun_C
@akun_B ──HASHTAG──► #APBN2025
@akun_B ──NARATION──► "Klaster: Dukungan Program Subsidi"

Node Narasi (Khusus RETS)

Node dengan type = "naration" menyimpan field tambahan:

{
  "id": "Klaster: Kritik Kebijakan Fiskal",
  "type": "naration",
  "contentNetwork": "Klaster ini mendominasi diskusi terkait...",
  "cid": "Klaster: Kritik Kebijakan Fiskal@abc123"
}

Input & Output per Modul

NCW Geostrategic

Input:

# Tidak ada parameter request, hanya set category_network
instance = NetworkNcwGeostrategic(category_network="ina")
await instance._run()

Output:

{
  "network_path": "https://storage.example.com/fusion-network/custom-network/network_abc123.json",
  "raw_data": [ ...relational records... ]
}

Kemenkeu Social Media Network

Input:

instance = NetworkKemenkeu(
    platform=["twitter", "instagram"],
    start="2024-01-01 00:00:00",
    end="2024-01-07 23:59:59",
    batchSize=10,
    cluster=True,
    account="kemenkeu_group",
    kafprod=kafka_producer
)
await instance._run()

Output:

{
  "network_path": "https://storage.example.com/.../network_xyz.json",
  "statistics": {
    "criticizing": {
      "cluster_name": "criticizing",
      "member_count": 412,
      "cluster_percentage": "46.20%",
      "total_engagement": 28540.0,
      "total_accounts": 310,
      "total_followers": 8200000,
      "top_accounts_by_followers": [ ... ],
      "top_accounts_by_engagement": [ ... ]
    }
  }
}

RETS Narrative Network

Input:

{
  "query": {
    "start_date": "2024-01-01",
    "end_date": "2024-01-07",
    "monitoring_objective": "Monitoring Kebijakan Fiskal Q1 2024"
  },
  "unique_clusters": [
    {
      "title": "Kritik Kebijakan Pajak",
      "core_summary": "Klaster ini berisi konten yang mengkritik...",
      "total_member": 245,
      "narrative_analysis": {
        "dominant_category": "kritik"
      },
      "members": [
        {
          "username": "userA",
          "platform": "twitter",
          "type": "post",
          "created_at": "2024-01-03 09:15:00",
          "hashtag": ["APBN2025", "PajakTurun"],
          "mention": [{"username": "kemenkeu_ri", "id": "987"}],
          "naration": null
        }
      ]
    }
  ]
}

Output (type_result="path"):

{
  "network_path": "https://storage.example.com/.../network_def456.json"
}

Output (type_result="raw"):

{
  "network_path": "https://...",
  "raw_network": { ...full graph object... }
}

Edge Types Catalog

Edge Kind Modul Source Type Target Type Deskripsi
MENTION Semua account account @user menyebut @user lain
HASHTAG Kemenkeu, RETS account hashtag @user menggunakan #tag
COMMENT Kemenkeu, RETS account account @user membalas @user lain
QUOTED RETS account account @user mengutip tweet @user
SPEECH_ACT Kemenkeu account / intent intent / account Koneksi user ke intent node dan intent ke akun monitoring
NARATION RETS account naration @user terhubung ke klaster narasi
TARGETED_CATEGORY_NARATION RETS naration category Klaster narasi terhubung ke kategori dominan
{type_source}_{type_target} NCW Geo any any Hubungan entitas geostrategis (dinamis dari data sumber)

Tech Stack

Komponen Library
Async runtime asyncio
HTTP client aiohttp (via BaseChatCampaign)
Elasticsearch client ElasticDataFetcher (internal)
Kafka producer aiokafka.AIOKafkaProducer
Object storage S3-compatible (via s3_database)
Graph formatter Network (internal builder)
Logging loguru
Config management configparser
JSON I/O srsly

Internal Base Class (BaseChatCampaign)

Semua modul mewarisi BaseChatCampaign yang menyediakan:

hash_id(str)                 β†’ deterministic hash untuk network/message ID
s3_database(...)             β†’ upload/download ke S3-compatible storage
akafka_producer(...)         β†’ produce message ke Kafka topic
_requests(url, params, ...)  β†’ async HTTP client wrapper
parallel_processing(tasks)   β†’ jalankan list coroutine secara paralel

Struktur Folder

network-graph-engine/
β”‚
β”œβ”€β”€ readme-github.md
β”‚
β”œβ”€β”€ custom_network/                      ← tiga modul network utama
β”‚   β”œβ”€β”€ __init__.py
β”‚   β”œβ”€β”€ ncw_geostrategic.py             ← NCW Geostrategic
β”‚   β”œβ”€β”€ kemenkeu_socmed_network.py      ← Kemenkeu Social Media
β”‚   └── rets_network.py                 ← RETS Narrative
β”‚
└── tools/                              ← shared infrastructure
    β”œβ”€β”€ __init__.py                     ← ekspor BaseChatCampaign, Network, ElasticDataFetcher
    β”‚
    β”œβ”€β”€ base_model/
    β”‚   β”œβ”€β”€ __init__.py
    β”‚   └── base.py                     ← BaseChatCampaign (S3, ES, Kafka, HTTP, concurrency)
    β”‚
    β”œβ”€β”€ network/
    β”‚   β”œβ”€β”€ __init__.py
    β”‚   └── tool.py                     ← Network.generate_graph_with_clusters()
    β”‚
    β”œβ”€β”€ get_es/
    β”‚   β”œβ”€β”€ __init__.py
    β”‚   └── tool.py                     ← ElasticDataFetcher (get_data, get_data_kemenkeu)
    β”‚
    └── utils/
        └── tool.py                     ← FIELD_EDGES, NODE_COLORS, NODE_SHAPES, ESBODY_SMM, dll

Dependency

# Core
loguru          # Structured logging
pydantic        # Data model & validation (BaseChatCampaign extends BaseModel)
srsly           # JSON read/write (ncw_geostrategic)

# Storage
s3fs            # S3-compatible object storage client

# HTTP & async
aiohttp         # Async HTTP client

# Elasticsearch
elasticsearch   # AsyncElasticsearch client

# Graph
networkx        # DiGraph, spring layout, degree centrality
pandas          # DataFrame untuk edge lookup di Network formatter

# Kafka
kafka-python    # Sync KafkaProducer (kafka_producer)
aiokafka        # Async AIOKafkaProducer (akafka_producer, kemenkeu)

# Database
psycopg2-binary # PostgreSQL driver (insert_postgre)
pymongo         # MongoDB client (get_mongo_collection)
neo4j           # Neo4j/Memgraph driver (gb_connection, agb_connection)

# Visualisation
matplotlib      # DEFAULT_COLORS palette (tab20 colormap)

# Utilities
tqdm            # Progress bar di parallel_processing
jmespath        # JSON path query di Network formatter

Install

pip install loguru pydantic srsly s3fs aiohttp elasticsearch networkx pandas \
            kafka-python aiokafka psycopg2-binary pymongo neo4j matplotlib tqdm jmespath