# Network Graph Engine > Kumpulan tiga modul Python untuk membangun graf jaringan sosial dan geostrategis secara otomatis — dari pengambilan data, klasterisasi, konstruksi node-edge, hingga persistensi ke object storage. Ketiga modul ini merupakan bagian dari sistem **Fusion Campaign API** dan menggunakan pola arsitektur yang sama: async pipeline, graph schema builder, CID resolution via majority vote, dan penyimpanan ke S3-compatible storage. --- ## Daftar Isi - [Tiga Modul Network](#tiga-modul-network) - [Shared Architecture — Pola yang Digunakan Ketiganya](#shared-architecture) - [Algoritma CID Resolution](#algoritma-cid-resolution) - [NCW Geostrategic Network](#1-ncw-geostrategic-network) - [Kemenkeu Social Media Network](#2-kemenkeu-social-media-network) - [RETS Narrative Network](#3-rets-narrative-network) - [Input & Output per Modul](#input--output-per-modul) - [Edge Types Catalog](#edge-types-catalog) - [Tech Stack](#tech-stack) --- ## Tiga Modul Network | File | Class | Use Case | |---|---|---| | [ncw_geostrategic.py](ncw_geostrategic.py) | `NetworkNcwGeostrategic` | Graf hubungan geostrategis antar entitas negara dari data JSON statis | | [kemenkeu_socmed_network.py](kemenkeu_socmed_network.py) | `NetworkKemenkeu` | Graf jaringan media sosial akun Kemenkeu dari Elasticsearch, dikelompokkan by speech-act | | [rets_network.py](rets_network.py) | `NetworkRets` | Graf narasi untuk event tracking (RETS) dari data klaster monitoring yang sudah diproses | --- ## Shared Architecture Ketiga class mewarisi `BaseChatCampaign` dan mengikuti pipeline yang sama: ``` Input Data │ ▼ ┌─────────────────────┐ │ create_relationships│ → Bangun nodes[] + edges[] dari data mentah └──────────┬──────────┘ │ ┌──────▼──────┐ │ determine_cid│ → Setiap node mendapat cid terbaik via majority vote └──────┬───────┘ │ ┌──────▼───────────────────────────────┐ │ Network.generate_graph_with_clusters │ → Format final network schema └──────┬────────────────────────────────┘ │ ┌──────▼──────┐ │ s3_upsert │ → Upload JSON ke object storage, return public URL └─────────────┘ │ { "network_path": "https://..." } ``` ### Struktur Node ```json { "cid": "cluster_name@network_id", "id": "@username | #hashtag | intent_label | naration_title", "entity_id": "platform_user_id | null", "label": "display label", "type": "account | hashtag | intent | naration | category-X", "timestamp": 1700000000000, "created_at": "2024-01-01 10:00:00", "typeContent": "post | comment | retweet" } ``` ### Struktur Edge ```json { "cid": "cluster_name@network_id", "post_id": "original_post_id", "source": "@username", "type_source": "account", "target": "@mentioned_user | #hashtag | intent", "type_target": "account | hashtag | intent | naration", "type_content": "post | comment | retweet", "edges_kind": "MENTION | HASHTAG | COMMENT | SPEECH_ACT | NARATION | ...", "timestamp": 1700000000000, "sentiment": null, "statement": "optional annotation text" } ``` --- ## Algoritma CID Resolution Semua modul menggunakan algoritma yang sama untuk menentukan cluster id (cid) akhir setiap node: ``` Untuk setiap node N: 1. Kumpulkan semua edges yang melibatkan N (sebagai source atau target) 2. Hitung frekuensi tiap cid yang muncul pada edges tersebut 3. Pilih cid dengan frekuensi tertinggi (majority vote) 4. Jika ada tie (frekuensi sama): → Pilih cid dari node dengan created_at paling baru 5. Jika tidak ada edges (node terisolasi): → Pertahankan cid awal ``` ```python # Implementasi inti (sama di ketiga modul) node_cid_count = defaultdict(lambda: defaultdict(int)) for edge in edges: for node_id in (edge["source"], edge["target"]): node_cid_count[node_id][edge["cid"]] += 1 for node in nodes: possible_cids = node_cid_count.get(node["id"], {}) if possible_cids: max_count = max(possible_cids.values()) best_cids = [cid for cid, cnt in possible_cids.items() if cnt == max_count] best_cid = max(..., key=lambda x: x["created_at"])["cid"] if len(best_cids) > 1 else best_cids[0] else: best_cid = node["cid"] ``` --- ## 1. NCW Geostrategic Network **File:** [ncw_geostrategic.py](ncw_geostrategic.py) Membangun graf hubungan geostrategis antar entitas (negara, organisasi, individu) berdasarkan data kurator yang disimpan dalam file JSON per kategori wilayah/negara. ### Alur Pipeline ``` Pilih category_network (contoh: "ina", "fra", "india", ...) │ ▼ Baca file JSON: data_network_raw_json/{category}_ncw_geostrategic.json │ ▼ create_relationships() ├─ Setiap record → 1 edge (source → target) ├─ Nodes didedup by label └─ Node type: type_source / type_target dari data mentah │ ▼ determine_cid() └─ Semua node masuk cluster_0 (single cluster, static data) │ ▼ Network.generate_graph_with_clusters() │ ▼ s3_upsert() → public URL ``` ### Kategori yang Didukung ``` recap | ina | fra | india | ken | lka | mar | mdg | mng | phl | pry | tgo | npl | tur | tls | per | bgd ``` ### Format Data Sumber JSON ```json [ { "source": "Indonesia", "type_source": "country", "target": "ASEAN", "type_target": "organization", "created_at": "2024-01-15", "source_id": "idn_001", "terget_id": "asean_001", "note": "Member state, founding member" } ] ``` --- ## 2. Kemenkeu Social Media Network **File:** [kemenkeu_socmed_network.py](kemenkeu_socmed_network.py) Graf jaringan media sosial untuk monitoring akun Kemenkeu (Kementerian Keuangan). Mengambil data real-time dari Elasticsearch, mengelompokkan berdasarkan speech-act, dan membangun koneksi interaksi antar akun. ### Alur Pipeline ``` AMS API → get_map_account() └─ Daftar akun Kemenkeu per platform (twitter, ig, fb, yt, tiktok) │ ▼ pre_clustering() [per platform, paralel] ├─ ElasticDataFetcher.get_data_kemenkeu() │ → Post dalam rentang start-end └─ produce_profiling_accounts() → Kafka: kirim profiling job per unique user_id │ ▼ cluster_speech_act() └─ Kelompokkan post berdasarkan slm.speech_act contoh cluster: "questioning", "supporting", "criticizing", "undefined" │ ▼ create_relationships() [per cluster] ├─ MENTION : @user → @mentioned ├─ HASHTAG : @user → #tag ├─ COMMENT : @user → @replied (hanya jika bukan akun Kemenkeu) └─ SPEECH_ACT : @user → intent, intent → semua akun Kemenkeu │ ▼ determine_cid() + statistics_cluster() │ ▼ Network.generate_graph_with_clusters() │ ▼ s3_upsert() → { network_path, statistics } ``` ### Statistik Per Klaster Setiap klaster menghasilkan: ```json { "cluster_name": "criticizing", "total_engagement": 15420.0, "total_accounts": 238, "total_comments": 892, "total_followers": 4500000, "first_account": { "username": "...", "created_at": "...", "text": "..." }, "top_accounts_by_followers": [ ...5 akun... ], "top_accounts_by_engagement": [ ...5 post... ] } ``` ### Kafka Profiling Event Untuk setiap `user_id` unik yang ditemukan, sistem mengirim pesan ke topic profiling: ```json { "user_id": "12345678", "created_at": "2024-01-15 10:30:00", "timestamp": 1705315800000, "id": "", "profiling_type": "account", "source": "custom network kemenkeu" } ``` --- ## 3. RETS Narrative Network **File:** [rets_network.py](rets_network.py) Graf narasi untuk Real-time Event Tracking System. Berbeda dari dua modul lainnya, RETS membangun struktur naratif yang menghubungkan akun sosial media ke **klaster narasi** yang sudah dianalisis sebelumnya oleh pipeline clustering upstream. ### Alur Pipeline ``` Input: { query, unique_clusters[] } │ ▼ Generate network_id = hash(start_date + end_date + wall_time) │ ▼ Normalisasi setiap cluster: Inject ke setiap member post: ├─ naration ← cluster.title ├─ cluster_summary ← cluster.core_summary └─ category_narration ← cluster.narrative_analysis.dominant_category │ ▼ create_relationships() [per cluster] ├─ MENTION : @user → @mentioned ├─ HASHTAG : @user → #tag ├─ QUOTED : @user → @quoted_user ├─ COMMENT : @user → @replied ├─ NARATION : @user → naration_node │ naration_node.contentNetwork = cluster_summary (untuk UI rendering) └─ TARGETED_CATEGORY_NARATION : naration_node → category_node │ ▼ Merge semua schema → determine_cid() │ ▼ Network.generate_graph_with_clusters() │ ▼ s3_upsert() → { network_path } atau { network_path, raw_network } ``` ### Struktur Graf Narasi ``` @akun_A ──NARATION──► "Klaster: Kritik Kebijakan Fiskal" │ └──TARGETED_CATEGORY_NARATION──► "category-kritik" @akun_B ──MENTION──► @akun_C @akun_B ──HASHTAG──► #APBN2025 @akun_B ──NARATION──► "Klaster: Dukungan Program Subsidi" ``` ### Node Narasi (Khusus RETS) Node dengan `type = "naration"` menyimpan field tambahan: ```json { "id": "Klaster: Kritik Kebijakan Fiskal", "type": "naration", "contentNetwork": "Klaster ini mendominasi diskusi terkait...", "cid": "Klaster: Kritik Kebijakan Fiskal@abc123" } ``` --- ## Input & Output per Modul ### NCW Geostrategic **Input:** ```python # Tidak ada parameter request, hanya set category_network instance = NetworkNcwGeostrategic(category_network="ina") await instance._run() ``` **Output:** ```json { "network_path": "https://storage.example.com/fusion-network/custom-network/network_abc123.json", "raw_data": [ ...relational records... ] } ``` --- ### Kemenkeu Social Media Network **Input:** ```python instance = NetworkKemenkeu( platform=["twitter", "instagram"], start="2024-01-01 00:00:00", end="2024-01-07 23:59:59", batchSize=10, cluster=True, account="kemenkeu_group", kafprod=kafka_producer ) await instance._run() ``` **Output:** ```json { "network_path": "https://storage.example.com/.../network_xyz.json", "statistics": { "criticizing": { "cluster_name": "criticizing", "member_count": 412, "cluster_percentage": "46.20%", "total_engagement": 28540.0, "total_accounts": 310, "total_followers": 8200000, "top_accounts_by_followers": [ ... ], "top_accounts_by_engagement": [ ... ] } } } ``` --- ### RETS Narrative Network **Input:** ```json { "query": { "start_date": "2024-01-01", "end_date": "2024-01-07", "monitoring_objective": "Monitoring Kebijakan Fiskal Q1 2024" }, "unique_clusters": [ { "title": "Kritik Kebijakan Pajak", "core_summary": "Klaster ini berisi konten yang mengkritik...", "total_member": 245, "narrative_analysis": { "dominant_category": "kritik" }, "members": [ { "username": "userA", "platform": "twitter", "type": "post", "created_at": "2024-01-03 09:15:00", "hashtag": ["APBN2025", "PajakTurun"], "mention": [{"username": "kemenkeu_ri", "id": "987"}], "naration": null } ] } ] } ``` **Output (type_result="path"):** ```json { "network_path": "https://storage.example.com/.../network_def456.json" } ``` **Output (type_result="raw"):** ```json { "network_path": "https://...", "raw_network": { ...full graph object... } } ``` --- ## Edge Types Catalog | Edge Kind | Modul | Source Type | Target Type | Deskripsi | |---|---|---|---|---| | `MENTION` | Semua | account | account | @user menyebut @user lain | | `HASHTAG` | Kemenkeu, RETS | account | hashtag | @user menggunakan #tag | | `COMMENT` | Kemenkeu, RETS | account | account | @user membalas @user lain | | `QUOTED` | RETS | account | account | @user mengutip tweet @user | | `SPEECH_ACT` | Kemenkeu | account / intent | intent / account | Koneksi user ke intent node dan intent ke akun monitoring | | `NARATION` | RETS | account | naration | @user terhubung ke klaster narasi | | `TARGETED_CATEGORY_NARATION` | RETS | naration | category | Klaster narasi terhubung ke kategori dominan | | `{type_source}_{type_target}` | NCW Geo | any | any | Hubungan entitas geostrategis (dinamis dari data sumber) | --- ## Tech Stack | Komponen | Library | |---|---| | Async runtime | `asyncio` | | HTTP client | `aiohttp` (via BaseChatCampaign) | | Elasticsearch client | `ElasticDataFetcher` (internal) | | Kafka producer | `aiokafka.AIOKafkaProducer` | | Object storage | S3-compatible (via `s3_database`) | | Graph formatter | `Network` (internal builder) | | Logging | `loguru` | | Config management | `configparser` | | JSON I/O | `srsly` | ### Internal Base Class (`BaseChatCampaign`) Semua modul mewarisi `BaseChatCampaign` yang menyediakan: ``` hash_id(str) → deterministic hash untuk network/message ID s3_database(...) → upload/download ke S3-compatible storage akafka_producer(...) → produce message ke Kafka topic _requests(url, params, ...) → async HTTP client wrapper parallel_processing(tasks) → jalankan list coroutine secara paralel ``` --- ## Struktur Folder ``` network-graph-engine/ │ ├── readme-github.md │ ├── custom_network/ ← tiga modul network utama │ ├── __init__.py │ ├── ncw_geostrategic.py ← NCW Geostrategic │ ├── kemenkeu_socmed_network.py ← Kemenkeu Social Media │ └── rets_network.py ← RETS Narrative │ └── tools/ ← shared infrastructure ├── __init__.py ← ekspor BaseChatCampaign, Network, ElasticDataFetcher │ ├── base_model/ │ ├── __init__.py │ └── base.py ← BaseChatCampaign (S3, ES, Kafka, HTTP, concurrency) │ ├── network/ │ ├── __init__.py │ └── tool.py ← Network.generate_graph_with_clusters() │ ├── get_es/ │ ├── __init__.py │ └── tool.py ← ElasticDataFetcher (get_data, get_data_kemenkeu) │ └── utils/ └── tool.py ← FIELD_EDGES, NODE_COLORS, NODE_SHAPES, ESBODY_SMM, dll ``` --- ## Dependency ``` # Core loguru # Structured logging pydantic # Data model & validation (BaseChatCampaign extends BaseModel) srsly # JSON read/write (ncw_geostrategic) # Storage s3fs # S3-compatible object storage client # HTTP & async aiohttp # Async HTTP client # Elasticsearch elasticsearch # AsyncElasticsearch client # Graph networkx # DiGraph, spring layout, degree centrality pandas # DataFrame untuk edge lookup di Network formatter # Kafka kafka-python # Sync KafkaProducer (kafka_producer) aiokafka # Async AIOKafkaProducer (akafka_producer, kemenkeu) # Database psycopg2-binary # PostgreSQL driver (insert_postgre) pymongo # MongoDB client (get_mongo_collection) neo4j # Neo4j/Memgraph driver (gb_connection, agb_connection) # Visualisation matplotlib # DEFAULT_COLORS palette (tab20 colormap) # Utilities tqdm # Progress bar di parallel_processing jmespath # JSON path query di Network formatter ``` ### Install ```bash pip install loguru pydantic srsly s3fs aiohttp elasticsearch networkx pandas \ kafka-python aiokafka psycopg2-binary pymongo neo4j matplotlib tqdm jmespath ```