Spaces:
Sleeping
Sleeping
| """ | |
| Cogni-Engine v1 — Mathematical & Utility Functions | |
| Pure math operations, vectorization, tokenization, clustering. | |
| Every computational module depends on this file. | |
| """ | |
| import math | |
| import time | |
| import hashlib | |
| import random | |
| import re | |
| from collections import Counter | |
| from typing import List, Tuple, Dict, Optional, Any | |
| import numpy as np | |
| import config | |
| # ═══════════════════════════════════════════════════════════ | |
| # VECTOR OPERATIONS | |
| # ═══════════════════════════════════════════════════════════ | |
| def dot_product(a: np.ndarray, b: np.ndarray) -> float: | |
| """Dot product of two vectors.""" | |
| return float(np.dot(a, b)) | |
| def magnitude(v: np.ndarray) -> float: | |
| """Euclidean magnitude (L2 norm) of a vector.""" | |
| return float(np.linalg.norm(v)) | |
| def normalize(v: np.ndarray) -> np.ndarray: | |
| """Normalize vector to unit length. Returns zero vector if magnitude is 0.""" | |
| mag = magnitude(v) | |
| if mag < 1e-10: | |
| return np.zeros_like(v) | |
| return v / mag | |
| def cosine_similarity(a: np.ndarray, b: np.ndarray) -> float: | |
| """ | |
| Cosine similarity between two vectors. | |
| Returns value in [-1, 1]. Higher = more similar. | |
| Returns 0 if either vector is zero. | |
| """ | |
| mag_a = magnitude(a) | |
| mag_b = magnitude(b) | |
| if mag_a < 1e-10 or mag_b < 1e-10: | |
| return 0.0 | |
| return float(np.dot(a, b) / (mag_a * mag_b)) | |
| def euclidean_distance(a: np.ndarray, b: np.ndarray) -> float: | |
| """Euclidean distance between two vectors.""" | |
| return float(np.linalg.norm(a - b)) | |
| def vector_add(a: np.ndarray, b: np.ndarray) -> np.ndarray: | |
| """Element-wise addition.""" | |
| return a + b | |
| def vector_subtract(a: np.ndarray, b: np.ndarray) -> np.ndarray: | |
| """Element-wise subtraction.""" | |
| return a - b | |
| def vector_scale(v: np.ndarray, scalar: float) -> np.ndarray: | |
| """Multiply vector by scalar.""" | |
| return v * scalar | |
| def vector_mean(vectors: List[np.ndarray]) -> np.ndarray: | |
| """Compute mean vector from list of vectors.""" | |
| if not vectors: | |
| return np.zeros(config.VECTOR_DIM) | |
| return np.mean(vectors, axis=0) | |
| def vector_weighted_mean(vectors: List[np.ndarray], weights: List[float]) -> np.ndarray: | |
| """Compute weighted mean vector.""" | |
| if not vectors or not weights: | |
| return np.zeros(config.VECTOR_DIM) | |
| weights_arr = np.array(weights) | |
| weight_sum = np.sum(weights_arr) | |
| if weight_sum < 1e-10: | |
| return vector_mean(vectors) | |
| weighted = sum(v * w for v, w in zip(vectors, weights_arr)) | |
| return weighted / weight_sum | |
| def batch_cosine_similarity(query: np.ndarray, matrix: np.ndarray) -> np.ndarray: | |
| """ | |
| Compute cosine similarity between query vector and each row of matrix. | |
| Returns array of similarities. | |
| matrix shape: (N, dim) | |
| """ | |
| if matrix.shape[0] == 0: | |
| return np.array([]) | |
| query_norm = normalize(query) | |
| norms = np.linalg.norm(matrix, axis=1, keepdims=True) | |
| norms = np.where(norms < 1e-10, 1.0, norms) | |
| matrix_norm = matrix / norms | |
| similarities = matrix_norm @ query_norm | |
| return similarities | |
| def vector_to_list(v: np.ndarray) -> List[float]: | |
| """Convert numpy vector to Python list for JSON serialization.""" | |
| return [round(float(x), 6) for x in v] | |
| def list_to_vector(lst: List[float]) -> np.ndarray: | |
| """Convert Python list back to numpy vector.""" | |
| return np.array(lst, dtype=np.float32) | |
| # ═══════════════════════════════════════════════════════════ | |
| # SOFTMAX & PROBABILITY | |
| # ═══════════════════════════════════════════════════════════ | |
| def softmax(x: np.ndarray, temperature: float = 1.0) -> np.ndarray: | |
| """ | |
| Softmax function with temperature. | |
| Higher temperature = more uniform distribution (more random). | |
| Lower temperature = more peaked (more deterministic). | |
| """ | |
| if temperature < 1e-10: | |
| # Near-zero temperature: argmax (deterministic) | |
| result = np.zeros_like(x, dtype=np.float64) | |
| result[np.argmax(x)] = 1.0 | |
| return result | |
| scaled = x / temperature | |
| # Numerical stability: subtract max | |
| shifted = scaled - np.max(scaled) | |
| exp_vals = np.exp(shifted) | |
| total = np.sum(exp_vals) | |
| if total < 1e-10: | |
| return np.ones_like(x, dtype=np.float64) / len(x) | |
| return exp_vals / total | |
| def weighted_choice(items: list, weights: list, temperature: float = 1.0) -> Any: | |
| """ | |
| Select one item from list based on weights. | |
| Temperature controls randomness. | |
| """ | |
| if not items: | |
| return None | |
| if len(items) == 1: | |
| return items[0] | |
| w = np.array(weights, dtype=np.float64) | |
| probs = softmax(w, temperature) | |
| cumulative = np.cumsum(probs) | |
| r = random.random() | |
| for i, c in enumerate(cumulative): | |
| if r <= c: | |
| return items[i] | |
| return items[-1] | |
| def weighted_sample(items: list, weights: list, k: int, temperature: float = 1.0) -> list: | |
| """ | |
| Select k items without replacement based on weights. | |
| """ | |
| if not items or k <= 0: | |
| return [] | |
| k = min(k, len(items)) | |
| remaining_items = list(items) | |
| remaining_weights = list(weights) | |
| selected = [] | |
| for _ in range(k): | |
| if not remaining_items: | |
| break | |
| choice = weighted_choice(remaining_items, remaining_weights, temperature) | |
| idx = remaining_items.index(choice) | |
| selected.append(choice) | |
| remaining_items.pop(idx) | |
| remaining_weights.pop(idx) | |
| return selected | |
| def top_k_indices(scores: np.ndarray, k: int) -> List[int]: | |
| """Return indices of top-k highest scores.""" | |
| if len(scores) == 0: | |
| return [] | |
| k = min(k, len(scores)) | |
| return list(np.argsort(scores)[-k:][::-1]) | |
| # ═══════════════════════════════════════════════════════════ | |
| # TEXT PROCESSING & TOKENIZER | |
| # ═══════════════════════════════════════════════════════════ | |
| # Indonesian stopwords (common words that don't carry meaning) | |
| STOPWORDS_ID = { | |
| "dan", "atau", "yang", "di", "ke", "dari", "untuk", "pada", | |
| "dengan", "adalah", "ini", "itu", "akan", "telah", "sudah", | |
| "tidak", "bukan", "juga", "saja", "hanya", "dapat", "bisa", | |
| "oleh", "karena", "jika", "maka", "saat", "ketika", "dalam", | |
| "luar", "atas", "bawah", "antara", "setelah", "sebelum", | |
| "sedang", "masih", "belum", "sangat", "lebih", "paling", | |
| "seperti", "sebagai", "secara", "mereka", "kami", "kita", | |
| "saya", "aku", "kamu", "dia", "ia", "nya", "pun", "lah", | |
| "kah", "tah", "per", "pernah", "bahwa", "agar", "supaya", | |
| "serta", "maupun", "namun", "tetapi", "tapi", "lagi", "lalu", | |
| "kemudian", "meski", "meskipun", "walau", "walaupun", "bila", | |
| "the", "a", "an", "is", "are", "was", "were", "be", "been", | |
| "being", "have", "has", "had", "do", "does", "did", "will", | |
| "would", "could", "should", "may", "might", "shall", "can", | |
| "of", "in", "to", "for", "with", "on", "at", "from", "by", | |
| "about", "as", "into", "through", "during", "before", "after", | |
| "and", "but", "or", "nor", "not", "so", "yet", "both", | |
| "this", "that", "these", "those", "it", "its", "they", "them", | |
| "he", "she", "we", "you", "i", "me", "my", "your", "his", "her" | |
| } | |
| # Indonesian affixes for stemming-lite | |
| ID_PREFIXES = ["meng", "mem", "men", "meny", "me", "peng", "pem", | |
| "pen", "peny", "pe", "ber", "di", "ke", "se", "ter"] | |
| ID_SUFFIXES = ["kan", "an", "nya", "lah", "kah", "pun", "i"] | |
| def normalize_text(text: str) -> str: | |
| """Normalize text: lowercase, clean whitespace, basic cleanup.""" | |
| text = text.lower().strip() | |
| # Normalize unicode whitespace | |
| text = re.sub(r'\s+', ' ', text) | |
| # Remove excessive punctuation but keep basic ones | |
| text = re.sub(r'[^\w\s\.\,\?\!\-\/\(\)]', ' ', text) | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| return text | |
| def tokenize(text: str, remove_stopwords: bool = False) -> List[str]: | |
| """ | |
| Tokenize text into words. | |
| Handles Indonesian and English. | |
| """ | |
| normalized = normalize_text(text) | |
| # Split on whitespace and punctuation boundaries | |
| tokens = re.findall(r'[a-zA-Z0-9\u00C0-\u024F\u1E00-\u1EFF]+', normalized) | |
| tokens = [t for t in tokens if len(t) > 1] # Remove single chars | |
| if remove_stopwords: | |
| tokens = [t for t in tokens if t not in STOPWORDS_ID] | |
| return tokens | |
| def stem_indonesian_lite(word: str) -> str: | |
| """ | |
| Lightweight Indonesian stemming. | |
| Not perfect, but sufficient for similarity matching. | |
| Removes common prefixes and suffixes. | |
| """ | |
| original = word.lower() | |
| if len(original) <= 4: | |
| return original | |
| result = original | |
| # Remove suffixes first | |
| for suffix in sorted(ID_SUFFIXES, key=len, reverse=True): | |
| if result.endswith(suffix) and len(result) - len(suffix) >= 3: | |
| result = result[:-len(suffix)] | |
| break | |
| # Remove prefixes | |
| for prefix in sorted(ID_PREFIXES, key=len, reverse=True): | |
| if result.startswith(prefix) and len(result) - len(prefix) >= 3: | |
| result = result[len(prefix):] | |
| break | |
| return result | |
| def extract_keywords(text: str, max_keywords: int = 20) -> List[str]: | |
| """Extract important keywords from text.""" | |
| tokens = tokenize(text, remove_stopwords=True) | |
| # Stem and count | |
| stemmed_map = {} | |
| for token in tokens: | |
| stem = stem_indonesian_lite(token) | |
| if stem not in stemmed_map: | |
| stemmed_map[stem] = token # Keep original form | |
| # Return unique keywords, limited | |
| keywords = list(stemmed_map.values())[:max_keywords] | |
| return keywords | |
| def extract_entities_simple(text: str) -> List[str]: | |
| """ | |
| Simple entity extraction based on capitalization and patterns. | |
| Not NER — just heuristic extraction. | |
| """ | |
| entities = [] | |
| # Find capitalized words (potential proper nouns) | |
| # but not at sentence start | |
| sentences = re.split(r'[.!?]', text) | |
| for sentence in sentences: | |
| words = sentence.strip().split() | |
| for i, word in enumerate(words): | |
| clean = re.sub(r'[^\w]', '', word) | |
| if not clean: | |
| continue | |
| # Capitalized and not first word of sentence | |
| if i > 0 and clean[0].isupper() and len(clean) > 1: | |
| entities.append(clean) | |
| # Find quoted terms | |
| quoted = re.findall(r'"([^"]+)"', text) | |
| entities.extend(quoted) | |
| quoted2 = re.findall(r"'([^']+)'", text) | |
| entities.extend(quoted2) | |
| # Deduplicate while preserving order | |
| seen = set() | |
| unique = [] | |
| for e in entities: | |
| lower = e.lower() | |
| if lower not in seen: | |
| seen.add(lower) | |
| unique.append(e) | |
| return unique | |
| def char_ngrams(text: str, n: int) -> List[str]: | |
| """Generate character n-grams from text.""" | |
| text = text.lower().strip() | |
| padded = f"#{text}#" # Boundary markers | |
| grams = [] | |
| for i in range(len(padded) - n + 1): | |
| grams.append(padded[i:i+n]) | |
| return grams | |
| # ═══════════════════════════════════════════════════════════ | |
| # TEXT VECTORIZATION (No ML model — pure math) | |
| # ═══════════════════════════════════════════════════════════ | |
| # Random projection matrix (generated once, deterministic) | |
| _projection_matrix = None | |
| def _get_projection_matrix() -> np.ndarray: | |
| """ | |
| Generate or return cached random projection matrix. | |
| Maps from HASH_BUCKETS dimensions to VECTOR_DIM dimensions. | |
| Deterministic via seed. | |
| """ | |
| global _projection_matrix | |
| if _projection_matrix is None: | |
| rng = np.random.RandomState(config.RANDOM_PROJECTION_SEED) | |
| # Gaussian random projection (preserves distances) | |
| _projection_matrix = rng.randn( | |
| config.HASH_BUCKETS, config.VECTOR_DIM | |
| ).astype(np.float32) | |
| # Scale for unit variance | |
| _projection_matrix /= np.sqrt(config.HASH_BUCKETS) | |
| return _projection_matrix | |
| def _hash_to_bucket(text: str, num_buckets: int) -> int: | |
| """Deterministic hash of text to bucket index.""" | |
| h = hashlib.md5(text.encode('utf-8')).hexdigest() | |
| return int(h, 16) % num_buckets | |
| def text_to_sparse_vector(text: str) -> np.ndarray: | |
| """ | |
| Convert text to sparse high-dimensional vector using character n-gram hashing. | |
| Output: vector of size HASH_BUCKETS. | |
| """ | |
| sparse = np.zeros(config.HASH_BUCKETS, dtype=np.float32) | |
| for n in config.NGRAM_SIZES: | |
| grams = char_ngrams(text, n) | |
| for gram in grams: | |
| bucket = _hash_to_bucket(gram, config.HASH_BUCKETS) | |
| sparse[bucket] += 1.0 | |
| # Also hash whole words for word-level signal | |
| tokens = tokenize(text, remove_stopwords=True) | |
| for token in tokens: | |
| bucket = _hash_to_bucket(f"w_{token}", config.HASH_BUCKETS) | |
| sparse[bucket] += 2.0 # Words weighted more than char n-grams | |
| # Normalize | |
| norm = np.linalg.norm(sparse) | |
| if norm > 1e-10: | |
| sparse /= norm | |
| return sparse | |
| def text_to_vector(text: str) -> np.ndarray: | |
| """ | |
| Full pipeline: text → sparse vector → random projection → dense 128-dim vector. | |
| This is the main embedding function used throughout the system. | |
| """ | |
| sparse = text_to_sparse_vector(text) | |
| proj_matrix = _get_projection_matrix() | |
| dense = sparse @ proj_matrix # (HASH_BUCKETS,) @ (HASH_BUCKETS, VECTOR_DIM) → (VECTOR_DIM,) | |
| return normalize(dense) | |
| def texts_to_vectors(texts: List[str]) -> np.ndarray: | |
| """Batch vectorize multiple texts. Returns matrix (N, VECTOR_DIM).""" | |
| if not texts: | |
| return np.zeros((0, config.VECTOR_DIM), dtype=np.float32) | |
| vectors = [text_to_vector(t) for t in texts] | |
| return np.array(vectors, dtype=np.float32) | |
| # ═══════════════════════════════════════════════════════════ | |
| # TF-IDF (Corpus-aware weighting) | |
| # ═══════════════════════════════════════════════════════════ | |
| class TFIDFCalculator: | |
| """ | |
| Maintains corpus statistics for TF-IDF weighting. | |
| Used to boost importance of rare terms in vectors. | |
| """ | |
| def __init__(self): | |
| self.document_count = 0 | |
| self.document_frequency = Counter() # term → number of docs containing it | |
| self._dirty = True | |
| self._idf_cache = {} | |
| def add_document(self, tokens: List[str]): | |
| """Register a document's tokens for IDF calculation.""" | |
| self.document_count += 1 | |
| unique_tokens = set(tokens) | |
| for token in unique_tokens: | |
| self.document_frequency[token] += 1 | |
| self._dirty = True | |
| def get_idf(self, token: str) -> float: | |
| """Get inverse document frequency for a token.""" | |
| if self._dirty: | |
| self._rebuild_idf_cache() | |
| return self._idf_cache.get(token, self._default_idf()) | |
| def _rebuild_idf_cache(self): | |
| """Rebuild IDF cache.""" | |
| self._idf_cache = {} | |
| for token, df in self.document_frequency.items(): | |
| # Smooth IDF: log((N + 1) / (df + 1)) + 1 | |
| self._idf_cache[token] = math.log( | |
| (self.document_count + 1) / (df + 1) | |
| ) + 1.0 | |
| self._dirty = False | |
| def _default_idf(self) -> float: | |
| """IDF for unknown tokens (maximum importance).""" | |
| if self.document_count == 0: | |
| return 1.0 | |
| return math.log(self.document_count + 1) + 1.0 | |
| def compute_tfidf_vector(self, text: str) -> np.ndarray: | |
| """ | |
| Compute TF-IDF weighted sparse vector for text. | |
| Then project to dense vector. | |
| """ | |
| tokens = tokenize(text, remove_stopwords=True) | |
| if not tokens: | |
| return np.zeros(config.VECTOR_DIM, dtype=np.float32) | |
| # Term frequency | |
| tf = Counter(tokens) | |
| max_tf = max(tf.values()) if tf else 1 | |
| # Build sparse vector with TF-IDF weights | |
| sparse = np.zeros(config.HASH_BUCKETS, dtype=np.float32) | |
| for token, count in tf.items(): | |
| # Augmented TF: 0.5 + 0.5 * (count / max_count) | |
| tf_score = 0.5 + 0.5 * (count / max_tf) | |
| idf_score = self.get_idf(token) | |
| tfidf = tf_score * idf_score | |
| # Hash token to bucket | |
| bucket = _hash_to_bucket(f"w_{token}", config.HASH_BUCKETS) | |
| sparse[bucket] += tfidf | |
| # Also add character n-grams with reduced weight | |
| for n in config.NGRAM_SIZES: | |
| for gram in char_ngrams(token, n): | |
| bucket = _hash_to_bucket(gram, config.HASH_BUCKETS) | |
| sparse[bucket] += tfidf * 0.3 | |
| # Normalize and project | |
| norm = np.linalg.norm(sparse) | |
| if norm > 1e-10: | |
| sparse /= norm | |
| proj_matrix = _get_projection_matrix() | |
| dense = sparse @ proj_matrix | |
| return normalize(dense) | |
| def get_stats(self) -> dict: | |
| """Return corpus statistics.""" | |
| return { | |
| "document_count": self.document_count, | |
| "vocabulary_size": len(self.document_frequency), | |
| "avg_df": ( | |
| sum(self.document_frequency.values()) / len(self.document_frequency) | |
| if self.document_frequency else 0 | |
| ) | |
| } | |
| # Global TF-IDF calculator instance (shared across system) | |
| tfidf = TFIDFCalculator() | |
| def text_to_vector_tfidf(text: str) -> np.ndarray: | |
| """ | |
| Enhanced vectorization using TF-IDF weights. | |
| Falls back to basic vectorization if corpus is too small. | |
| """ | |
| if tfidf.document_count < 10: | |
| # Not enough corpus data for meaningful IDF | |
| return text_to_vector(text) | |
| return tfidf.compute_tfidf_vector(text) | |
| # ═══════════════════════════════════════════════════════════ | |
| # CLUSTERING (for Abstraction) | |
| # ═══════════════════════════════════════════════════════════ | |
| def kmeans( | |
| vectors: np.ndarray, | |
| k: int, | |
| max_iterations: int = None, | |
| min_cluster_size: int = None | |
| ) -> List[List[int]]: | |
| """ | |
| Simple K-means clustering. | |
| Args: | |
| vectors: matrix (N, dim) | |
| k: number of clusters | |
| max_iterations: iteration limit | |
| min_cluster_size: minimum members per valid cluster | |
| Returns: | |
| List of clusters, each cluster is list of indices | |
| """ | |
| if max_iterations is None: | |
| max_iterations = config.CLUSTER_ITERATIONS | |
| if min_cluster_size is None: | |
| min_cluster_size = config.CLUSTER_MIN_SIZE | |
| n = vectors.shape[0] | |
| if n == 0 or k <= 0: | |
| return [] | |
| k = min(k, n) | |
| # Initialize centroids: random selection from data | |
| rng = np.random.RandomState(int(time.time()) % 2**31) | |
| centroid_indices = rng.choice(n, size=k, replace=False) | |
| centroids = vectors[centroid_indices].copy() | |
| assignments = np.zeros(n, dtype=int) | |
| for iteration in range(max_iterations): | |
| # Assign each point to nearest centroid | |
| new_assignments = np.zeros(n, dtype=int) | |
| for i in range(n): | |
| similarities = np.array([ | |
| cosine_similarity(vectors[i], centroids[j]) | |
| for j in range(k) | |
| ]) | |
| new_assignments[i] = np.argmax(similarities) | |
| # Check convergence | |
| if np.array_equal(assignments, new_assignments): | |
| break | |
| assignments = new_assignments | |
| # Update centroids | |
| for j in range(k): | |
| members = vectors[assignments == j] | |
| if len(members) > 0: | |
| centroids[j] = normalize(np.mean(members, axis=0)) | |
| # Build cluster lists | |
| clusters = [] | |
| for j in range(k): | |
| member_indices = list(np.where(assignments == j)[0]) | |
| if len(member_indices) >= min_cluster_size: | |
| clusters.append(member_indices) | |
| return clusters | |
| def find_natural_clusters( | |
| vectors: np.ndarray, | |
| similarity_threshold: float = None | |
| ) -> List[List[int]]: | |
| """ | |
| Find natural clusters using agglomerative approach. | |
| Groups vectors that are mutually similar above threshold. | |
| Better than k-means when k is unknown. | |
| """ | |
| if similarity_threshold is None: | |
| similarity_threshold = config.CLUSTER_SIMILARITY_INTRA | |
| n = vectors.shape[0] | |
| if n == 0: | |
| return [] | |
| # Start: each point is its own cluster | |
| cluster_map = {i: i for i in range(n)} # point → cluster_id | |
| cluster_members = {i: [i] for i in range(n)} | |
| # Compute pairwise similarities | |
| for i in range(n): | |
| for j in range(i + 1, n): | |
| sim = cosine_similarity(vectors[i], vectors[j]) | |
| if sim >= similarity_threshold: | |
| ci = cluster_map[i] | |
| cj = cluster_map[j] | |
| if ci != cj: | |
| # Merge smaller into larger | |
| if len(cluster_members[ci]) < len(cluster_members[cj]): | |
| ci, cj = cj, ci | |
| # Merge cj into ci | |
| for member in cluster_members[cj]: | |
| cluster_map[member] = ci | |
| cluster_members[ci].extend(cluster_members[cj]) | |
| del cluster_members[cj] | |
| # Filter by minimum size | |
| clusters = [ | |
| members for members in cluster_members.values() | |
| if len(members) >= config.CLUSTER_MIN_SIZE | |
| ] | |
| # Cap cluster size | |
| capped = [] | |
| for cluster in clusters: | |
| if len(cluster) > config.CLUSTER_MAX_SIZE: | |
| # Keep only the most central members | |
| cluster_vectors = vectors[cluster] | |
| centroid = normalize(np.mean(cluster_vectors, axis=0)) | |
| sims = [cosine_similarity(vectors[idx], centroid) for idx in cluster] | |
| sorted_pairs = sorted(zip(sims, cluster), reverse=True) | |
| cluster = [idx for _, idx in sorted_pairs[:config.CLUSTER_MAX_SIZE]] | |
| capped.append(cluster) | |
| return capped | |
| # ═══════════════════════════════════════════════════════════ | |
| # VARIATION & RANDOMNESS | |
| # ═══════════════════════════════════════════════════════════ | |
| def variation_seed() -> int: | |
| """ | |
| Generate a variation seed from current timestamp. | |
| Used to make responses non-deterministic. | |
| Changes every 100ms for fine-grained variation. | |
| """ | |
| return int(time.time() * 10) % 2**31 | |
| def seeded_random(seed: int) -> random.Random: | |
| """Create a seeded random instance for reproducible-within-request variation.""" | |
| return random.Random(seed) | |
| def add_noise(vector: np.ndarray, noise_level: float = 0.01) -> np.ndarray: | |
| """Add small random noise to vector for variation.""" | |
| noise = np.random.randn(*vector.shape).astype(np.float32) * noise_level | |
| return normalize(vector + noise) | |
| # ═══════════════════════════════════════════════════════════ | |
| # INTENT DETECTION (Rule-based, no ML) | |
| # ═══════════════════════════════════════════════════════════ | |
| # Intent patterns: (regex_pattern, intent_type, confidence) | |
| INTENT_PATTERNS = [ | |
| # Indonesian | |
| (r'\b(apa\s+itu|apakah|jelaskan|ceritakan)\b', 'explain', 0.85), | |
| (r'\b(hubungan|kaitannya|relasi|kaitan)\b', 'relation', 0.85), | |
| (r'\b(bagaimana\s+cara|caranya|gimana|langkah)\b', 'how_to', 0.85), | |
| (r'\b(bandingkan|perbedaan|persamaan|beda|mirip)\b', 'compare', 0.85), | |
| (r'\b(definisi|arti|makna|maksud)\b', 'define', 0.90), | |
| (r'\b(sebutkan|daftar|list|apa\s+saja)\b', 'list', 0.85), | |
| (r'\b(mengapa|kenapa|sebab|alasan)\b', 'cause', 0.85), | |
| (r'\b(pendapat|menurut|opini|pandangan)\b', 'opinion', 0.80), | |
| (r'\b(halo|hai|hey|hi|selamat\s+pagi|selamat\s+siang|selamat\s+malam)\b', 'greeting', 0.90), | |
| # English | |
| (r'\b(what\s+is|explain|describe|tell\s+me\s+about)\b', 'explain', 0.85), | |
| (r'\b(relationship|connection|relate|linked)\b', 'relation', 0.85), | |
| (r'\b(how\s+to|how\s+do|how\s+can|steps)\b', 'how_to', 0.85), | |
| (r'\b(compare|difference|similar|versus|vs)\b', 'compare', 0.85), | |
| (r'\b(define|definition|meaning)\b', 'define', 0.90), | |
| (r'\b(list|enumerate|name\s+all|what\s+are)\b', 'list', 0.85), | |
| (r'\b(why|reason|cause)\b', 'cause', 0.85), | |
| (r'\b(opinion|think\s+about|view|perspective)\b', 'opinion', 0.80), | |
| (r'\b(hello|hi|hey|greetings|good\s+morning)\b', 'greeting', 0.90), | |
| ] | |
| def detect_intent(text: str) -> Tuple[str, float]: | |
| """ | |
| Detect user intent from text. | |
| Returns (intent_type, confidence). | |
| """ | |
| text_lower = text.lower().strip() | |
| best_intent = 'general' | |
| best_confidence = 0.3 # Default confidence for general | |
| for pattern, intent, conf in INTENT_PATTERNS: | |
| if re.search(pattern, text_lower): | |
| if conf > best_confidence: | |
| best_intent = intent | |
| best_confidence = conf | |
| return best_intent, best_confidence | |
| # ═══════════════════════════════════════════════════════════ | |
| # RELATION EXTRACTION (from data entries) | |
| # ═══════════════════════════════════════════════════════════ | |
| # Maps data type → likely edge relations to create | |
| DATA_TYPE_RELATIONS = { | |
| "fact": ["related_to"], | |
| "definition": ["defined_as"], | |
| "explanation": ["related_to", "is_a"], | |
| "description": ["has", "related_to"], | |
| "property": ["has"], | |
| "statistic": ["has", "related_to"], | |
| "relation": [], # Explicit relation, handled separately | |
| "cause_effect": ["causes"], | |
| "comparison": ["related_to"], | |
| "hierarchy": ["is_a", "part_of"], | |
| "composition": ["contains", "part_of"], | |
| "dependency": ["requires"], | |
| "contradiction": ["opposite_of"], | |
| "process": ["follows"], | |
| "procedure": ["follows"], | |
| "event": ["related_to"], | |
| "history": ["follows", "related_to"], | |
| "qa": ["defined_as", "related_to"], | |
| "synonym": ["synonym_of"], | |
| "antonym": ["opposite_of"], | |
| "analogy": ["analogous_to"], | |
| "example": ["example_of"], | |
| "quote": ["related_to"], | |
| "term": ["defined_as"], | |
| } | |
| def get_relations_for_type(data_type: str) -> List[str]: | |
| """Get default edge relation types for a data type.""" | |
| # Check core types | |
| if data_type in DATA_TYPE_RELATIONS: | |
| return DATA_TYPE_RELATIONS[data_type] | |
| # Custom types default to related_to | |
| if data_type.startswith("custom_"): | |
| return ["related_to"] | |
| return ["related_to"] | |
| # ═══════════════════════════════════════════════════════════ | |
| # SYSTEM PROMPT PARSER | |
| # ═══════════════════════════════════════════════════════════ | |
| def parse_system_prompt(system_prompt: str) -> dict: | |
| """ | |
| Parse system prompt to extract personality parameters. | |
| Returns dict with personality configuration. | |
| """ | |
| if not system_prompt: | |
| return { | |
| "name": None, | |
| "formality": config.DEFAULT_FORMALITY, | |
| "tone_warmth": 0.5, | |
| "use_emoji": False, | |
| "language": config.DEFAULT_LANGUAGE, | |
| "style_markers": [], | |
| "constraints": [], | |
| "raw": "" | |
| } | |
| text_lower = system_prompt.lower() | |
| result = { | |
| "name": None, | |
| "formality": config.DEFAULT_FORMALITY, | |
| "tone_warmth": 0.5, | |
| "use_emoji": False, | |
| "language": config.DEFAULT_LANGUAGE, | |
| "style_markers": [], | |
| "constraints": [], | |
| "raw": system_prompt | |
| } | |
| # Extract name | |
| name_patterns = [ | |
| r'(?:kamu\s+adalah|nama\s*(?:mu|kamu)\s+adalah?|you\s+are|your\s+name\s+is)\s+([A-Z][a-zA-Z]+)', | |
| r'(?:namamu|namaku)\s+([A-Z][a-zA-Z]+)', | |
| ] | |
| for pattern in name_patterns: | |
| match = re.search(pattern, system_prompt, re.IGNORECASE) | |
| if match: | |
| result["name"] = match.group(1) | |
| break | |
| # Detect formality | |
| casual_markers = ["santai", "casual", "informal", "gaul", "friendly", "fun"] | |
| formal_markers = ["formal", "academic", "professional", "resmi", "sopan"] | |
| casual_count = sum(1 for m in casual_markers if m in text_lower) | |
| formal_count = sum(1 for m in formal_markers if m in text_lower) | |
| if casual_count > formal_count: | |
| result["formality"] = 0.2 | |
| elif formal_count > casual_count: | |
| result["formality"] = 0.8 | |
| # Detect warmth | |
| warm_markers = ["ramah", "hangat", "warm", "kind", "friendly", "baik"] | |
| cold_markers = ["tegas", "strict", "cold", "direct", "blunt"] | |
| warm_count = sum(1 for m in warm_markers if m in text_lower) | |
| cold_count = sum(1 for m in cold_markers if m in text_lower) | |
| if warm_count > cold_count: | |
| result["tone_warmth"] = 0.8 | |
| elif cold_count > warm_count: | |
| result["tone_warmth"] = 0.2 | |
| # Detect emoji | |
| if any(m in text_lower for m in ["emoji", "emoticon", "emotikon"]): | |
| result["use_emoji"] = True | |
| # Detect language | |
| if any(m in text_lower for m in ["english", "inggris", "respond in english"]): | |
| result["language"] = "en" | |
| elif any(m in text_lower for m in ["indonesia", "bahasa indonesia"]): | |
| result["language"] = "id" | |
| return result | |
| # ═══════════════════════════════════════════════════════════ | |
| # GENERAL UTILITIES | |
| # ═══════════════════════════════════════════════════════════ | |
| def clamp(value: float, min_val: float, max_val: float) -> float: | |
| """Clamp value between min and max.""" | |
| return max(min_val, min(max_val, value)) | |
| def safe_log(x: float) -> float: | |
| """Safe logarithm that handles zero and negative.""" | |
| if x <= 0: | |
| return 0.0 | |
| return math.log(x) | |
| def timestamp_now() -> str: | |
| """ISO format timestamp.""" | |
| return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) | |
| def hash_file_content(content: str) -> str: | |
| """SHA256 hash of file content for change detection.""" | |
| return hashlib.sha256(content.encode('utf-8')).hexdigest() | |
| def chunk_list(lst: list, chunk_size: int) -> List[list]: | |
| """Split list into chunks of given size.""" | |
| return [lst[i:i + chunk_size] for i in range(0, len(lst), chunk_size)] | |
| def merge_dicts(base: dict, override: dict) -> dict: | |
| """Merge two dicts, override takes precedence.""" | |
| result = base.copy() | |
| result.update(override) | |
| return result | |
| def truncate_text(text: str, max_length: int = 200) -> str: | |
| """Truncate text with ellipsis.""" | |
| if len(text) <= max_length: | |
| return text | |
| return text[:max_length - 3] + "..." | |
| def calculate_intelligence_score(metrics: dict) -> float: | |
| """ | |
| Calculate composite intelligence score from graph metrics. | |
| Higher = more knowledgeable and better connected. | |
| """ | |
| weights = config.INTELLIGENCE_WEIGHTS | |
| score = 0.0 | |
| score += safe_log(metrics.get("total_nodes", 0) + 1) * weights["log_nodes"] | |
| score += safe_log(metrics.get("total_edges", 0) + 1) * weights["log_edges"] | |
| score += clamp( | |
| metrics.get("avg_connections", 0), 0, 50 | |
| ) / 50.0 * 10.0 * weights["avg_connections"] | |
| score += clamp( | |
| metrics.get("max_abstraction_depth", 0), 0, config.MAX_ABSTRACTION_DEPTH | |
| ) / config.MAX_ABSTRACTION_DEPTH * 10.0 * weights["max_abstraction_depth"] | |
| score += clamp( | |
| metrics.get("avg_chain_length", 0), 0, 20 | |
| ) / 20.0 * 10.0 * weights["avg_chain_length"] | |
| score += clamp( | |
| metrics.get("inference_ratio", 0), 0, 1 | |
| ) * 10.0 * weights["inference_ratio"] | |
| score += clamp( | |
| metrics.get("avg_confidence", 0), 0, 1 | |
| ) * 10.0 * weights["avg_confidence"] | |
| return round(score, 2) | |
| def format_duration(seconds: float) -> str: | |
| """Format seconds into human readable duration.""" | |
| if seconds < 60: | |
| return f"{seconds:.0f}s" | |
| if seconds < 3600: | |
| return f"{seconds/60:.0f}m" | |
| if seconds < 86400: | |
| return f"{seconds/3600:.1f}h" | |
| return f"{seconds/86400:.1f}d" |